| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # -*- coding:utf-8 -*-
- import json
- import os
- from configparser import ConfigParser
- from typing import Dict, Optional
- from dw_base.datax.datax_constants import *
- from dw_base.datax.plugins.plugin_factory import PluginFactory
- from dw_base.datax.tuning import load_tuning_conf, merge_speed
- from dw_base.utils.file_utils import delete_file, get_abs_path
- class JobConfigGenerator(object):
- """
- 生成 DataX 作业配置文件(json)。
- speed 三参走三级合并:L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 cli_speed_overrides。
- 合并后的 speed 写进 job.setting.speed + core.transport.channel.speed。
- """
- def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str,
- cli_speed_overrides: Optional[Dict[str, int]] = None):
- """
- Args:
- base_dir: 项目目录
- generator_config: DataX 作业配置生成器配置文件路径(.ini)
- start_date / stop_date: 内部日期
- output: 生成的 DataX json 输出路径
- cli_speed_overrides: L3 CLI 覆盖,形如 {'channel': 20, 'byte': None, 'record': None}
- """
- self.generator_config = get_abs_path(generator_config)
- self.base_dir = base_dir
- self.start_date = start_date
- self.stop_date = stop_date
- self.output = output
- self.cli_speed_overrides = cli_speed_overrides or {}
- self.config_parser = ConfigParser()
- self.config_parser.read(self.generator_config)
- def get_reader(self):
- reader = PluginFactory.get_plugin('reader', self.base_dir, self.config_parser, self.start_date, self.stop_date)
- return reader.configure()
- def get_writer(self):
- writer = PluginFactory.get_plugin('writer', self.base_dir, self.config_parser, self.start_date, self.stop_date)
- return writer.configure()
- def assemble(self):
- # speed 三级合并(L1 conf < L2 ini [speed] < L3 CLI)
- tuning_conf_path = os.path.join(self.base_dir, 'conf', 'datax-tuning.conf')
- l1 = load_tuning_conf(tuning_conf_path)
- merged = merge_speed(l1, self.config_parser, self.cli_speed_overrides)
- speed = {
- JOB_SETTING_SPEED_CHANNEL: merged['channel'],
- JOB_SETTING_SPEED_BYTE: merged['byte'],
- JOB_SETTING_SPEED_RECORD: merged['record'],
- }
- core_speed = {
- 'transport': {
- 'channel': {
- 'speed': {
- 'byte': merged['byte'],
- 'record': merged['record'],
- }
- }
- }
- }
- job_config_json = {
- 'job': {
- 'content': [
- {
- 'reader': self.get_reader(),
- 'writer': self.get_writer()
- }
- ],
- 'setting': {
- 'speed': speed
- }
- },
- 'core': core_speed
- }
- return job_config_json
- def run(self):
- job_config_json = self.assemble()
- # HDFS Mount的覆盖写入貌似有问题
- delete_file(self.output)
- with open(self.output, 'w') as w:
- json.dump(job_config_json, w, ensure_ascii=False)
|