job_config_generator.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # -*- coding:utf-8 -*-
  2. import json
  3. from configparser import ConfigParser
  4. from dw_base.datax.datax_constants import *
  5. from dw_base.datax.plugins.plugin_factory import PluginFactory
  6. from dw_base.utils import datetime_utils
  7. from dw_base.utils.file_utils import delete_file, get_abs_path
  8. class JobConfigGenerator(object):
  9. """
  10. 生成DataX作业配置文件
  11. """
  12. def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str):
  13. """
  14. 初始化
  15. Args:
  16. base_dir: 项目目录
  17. generator_config: DataX作业配置生成器配置文件,格式为ini文件,包含两部分reader和writer,两部分都包含dataSource配置
  18. start_date: 内参,开始日期
  19. stop_date: 内参,结束日期
  20. output: 结果(DataX作业配置文件)输出文件路径
  21. """
  22. self.generator_config = get_abs_path(generator_config)
  23. self.base_dir = base_dir
  24. self.start_date = start_date
  25. self.stop_date = stop_date
  26. self.output = output
  27. self.config_parser = ConfigParser()
  28. self.config_parser.read(self.generator_config)
  29. def get_reader(self):
  30. reader = PluginFactory.get_plugin('reader', self.base_dir, self.config_parser, self.start_date, self.stop_date)
  31. return reader.configure()
  32. def get_writer(self):
  33. writer = PluginFactory.get_plugin('writer', self.base_dir, self.config_parser, self.start_date, self.stop_date)
  34. return writer.configure()
  35. @staticmethod
  36. def get_speed(channel=6, byte=268435456, record=100000):
  37. return {JOB_SETTING_SPEED_CHANNEL: channel, JOB_SETTING_SPEED_BYTE: byte, JOB_SETTING_SPEED_RECORD: record}
  38. @staticmethod
  39. def get_core_speed(byte=268435456, record=100000):
  40. return {
  41. 'transport': {
  42. 'channel': {
  43. 'speed': {
  44. 'byte': byte,
  45. 'record': record
  46. }
  47. }
  48. }
  49. }
  50. def assemble(self):
  51. local_time = int(datetime_utils.formatted_now('%H%M'))
  52. if 750 < local_time < 1900:
  53. speed = self.get_speed(10, byte=10485760, record=40000)
  54. core_speed = self.get_core_speed(byte=10485760, record=40000)
  55. else:
  56. speed = self.get_speed()
  57. core_speed = self.get_core_speed()
  58. job_config_json = {
  59. 'job': {
  60. 'content': [
  61. {
  62. 'reader': self.get_reader(),
  63. 'writer': self.get_writer()
  64. }
  65. ],
  66. 'setting': {
  67. 'speed': speed
  68. }
  69. },
  70. 'core': core_speed
  71. }
  72. return job_config_json
  73. def run(self):
  74. job_config_json = self.assemble()
  75. # HDFS Mount的覆盖写入貌似有问题
  76. delete_file(self.output)
  77. with open(self.output, 'w') as w:
  78. json.dump(job_config_json, w, ensure_ascii=False)