# -*- coding:utf-8 -*- import json import os from configparser import ConfigParser from typing import Dict, Optional from dw_base.datax.datax_constants import * from dw_base.datax.plugins.plugin_factory import PluginFactory from dw_base.datax.tuning import load_tuning_conf, merge_speed from dw_base.utils.file_utils import delete_file, get_abs_path class JobConfigGenerator(object): """ 生成 DataX 作业配置文件(json)。 speed 三参走三级合并:L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 cli_speed_overrides。 合并后的 speed 写进 job.setting.speed + core.transport.channel.speed。 """ def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str, cli_speed_overrides: Optional[Dict[str, int]] = None): """ Args: base_dir: 项目目录 generator_config: DataX 作业配置生成器配置文件路径(.ini) start_date / stop_date: 内部日期 output: 生成的 DataX json 输出路径 cli_speed_overrides: L3 CLI 覆盖,形如 {'channel': 20, 'byte': None, 'record': None} """ self.generator_config = get_abs_path(generator_config) self.base_dir = base_dir self.start_date = start_date self.stop_date = stop_date self.output = output self.cli_speed_overrides = cli_speed_overrides or {} self.config_parser = ConfigParser() self.config_parser.read(self.generator_config) def get_reader(self): reader = PluginFactory.get_plugin('reader', self.base_dir, self.config_parser, self.start_date, self.stop_date) return reader.configure() def get_writer(self): writer = PluginFactory.get_plugin('writer', self.base_dir, self.config_parser, self.start_date, self.stop_date) return writer.configure() def assemble(self): # speed 三级合并(L1 conf < L2 ini [speed] < L3 CLI) tuning_conf_path = os.path.join(self.base_dir, 'conf', 'datax-tuning.conf') l1 = load_tuning_conf(tuning_conf_path) merged = merge_speed(l1, self.config_parser, self.cli_speed_overrides) speed = { JOB_SETTING_SPEED_CHANNEL: merged['channel'], JOB_SETTING_SPEED_BYTE: merged['byte'], JOB_SETTING_SPEED_RECORD: merged['record'], } core_speed = { 'transport': { 'channel': { 'speed': { 'byte': merged['byte'], 'record': merged['record'], } } } } job_config_json = { 'job': { 'content': [ { 'reader': self.get_reader(), 'writer': self.get_writer() } ], 'setting': { 'speed': speed } }, 'core': core_speed } return job_config_json def run(self): job_config_json = self.assemble() # HDFS Mount的覆盖写入貌似有问题 delete_file(self.output) with open(self.output, 'w') as w: json.dump(job_config_json, w, ensure_ascii=False)