job_config_generator.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # -*- coding:utf-8 -*-
  2. import json
  3. import os
  4. from configparser import ConfigParser
  5. from typing import Dict, Optional
  6. from dw_base.datax.datax_constants import *
  7. from dw_base.datax.plugins.plugin_factory import PluginFactory
  8. from dw_base.datax.tuning import load_tuning_conf, merge_speed
  9. from dw_base.utils.file_utils import delete_file, get_abs_path
  10. class JobConfigGenerator(object):
  11. """
  12. 生成 DataX 作业配置文件(json)。
  13. speed 三参走三级合并:L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 cli_speed_overrides。
  14. 合并后的 speed 写进 job.setting.speed + core.transport.channel.speed。
  15. """
  16. def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str,
  17. cli_speed_overrides: Optional[Dict[str, int]] = None):
  18. """
  19. Args:
  20. base_dir: 项目目录
  21. generator_config: DataX 作业配置生成器配置文件路径(.ini)
  22. start_date / stop_date: 内部日期
  23. output: 生成的 DataX json 输出路径
  24. cli_speed_overrides: L3 CLI 覆盖,形如 {'channel': 20, 'byte': None, 'record': None}
  25. """
  26. self.generator_config = get_abs_path(generator_config)
  27. self.base_dir = base_dir
  28. self.start_date = start_date
  29. self.stop_date = stop_date
  30. self.output = output
  31. self.cli_speed_overrides = cli_speed_overrides or {}
  32. self.config_parser = ConfigParser()
  33. self.config_parser.read(self.generator_config)
  34. def get_reader(self):
  35. reader = PluginFactory.get_plugin('reader', self.base_dir, self.config_parser, self.start_date, self.stop_date)
  36. return reader.configure()
  37. def get_writer(self):
  38. writer = PluginFactory.get_plugin('writer', self.base_dir, self.config_parser, self.start_date, self.stop_date)
  39. return writer.configure()
  40. def assemble(self):
  41. # speed 三级合并(L1 conf < L2 ini [speed] < L3 CLI)
  42. tuning_conf_path = os.path.join(self.base_dir, 'conf', 'datax-tuning.conf')
  43. l1 = load_tuning_conf(tuning_conf_path)
  44. merged = merge_speed(l1, self.config_parser, self.cli_speed_overrides)
  45. speed = {
  46. JOB_SETTING_SPEED_CHANNEL: merged['channel'],
  47. JOB_SETTING_SPEED_BYTE: merged['byte'],
  48. JOB_SETTING_SPEED_RECORD: merged['record'],
  49. }
  50. core_speed = {
  51. 'transport': {
  52. 'channel': {
  53. 'speed': {
  54. 'byte': merged['byte'],
  55. 'record': merged['record'],
  56. }
  57. }
  58. }
  59. }
  60. job_config_json = {
  61. 'job': {
  62. 'content': [
  63. {
  64. 'reader': self.get_reader(),
  65. 'writer': self.get_writer()
  66. }
  67. ],
  68. 'setting': {
  69. 'speed': speed
  70. }
  71. },
  72. 'core': core_speed
  73. }
  74. return job_config_json
  75. def run(self):
  76. job_config_json = self.assemble()
  77. # HDFS Mount的覆盖写入貌似有问题
  78. delete_file(self.output)
  79. with open(self.output, 'w') as w:
  80. json.dump(job_config_json, w, ensure_ascii=False)