# -*- coding:utf-8 -*- import json from configparser import ConfigParser from dw_base.datax.datax_constants import * from dw_base.datax.plugins.plugin_factory import PluginFactory from dw_base.utils import datetime_utils from dw_base.utils.file_utils import delete_file, get_abs_path class JobConfigGenerator(object): """ 生成DataX作业配置文件 """ def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str): """ 初始化 Args: base_dir: 项目目录 generator_config: DataX作业配置生成器配置文件,格式为ini文件,包含两部分reader和writer,两部分都包含dataSource配置 start_date: 内参,开始日期 stop_date: 内参,结束日期 output: 结果(DataX作业配置文件)输出文件路径 """ self.generator_config = get_abs_path(generator_config) self.base_dir = base_dir self.start_date = start_date self.stop_date = stop_date self.output = output self.config_parser = ConfigParser() self.config_parser.read(self.generator_config) def get_reader(self): reader = PluginFactory.get_plugin('reader', self.base_dir, self.config_parser, self.start_date, self.stop_date) return reader.configure() def get_writer(self): writer = PluginFactory.get_plugin('writer', self.base_dir, self.config_parser, self.start_date, self.stop_date) return writer.configure() @staticmethod def get_speed(channel=6, byte=268435456, record=100000): return {JOB_SETTING_SPEED_CHANNEL: channel, JOB_SETTING_SPEED_BYTE: byte, JOB_SETTING_SPEED_RECORD: record} @staticmethod def get_core_speed(byte=268435456, record=100000): return { 'transport': { 'channel': { 'speed': { 'byte': byte, 'record': record } } } } def assemble(self): local_time = int(datetime_utils.formatted_now('%H%M')) if 750 < local_time < 1900: speed = self.get_speed(10, byte=10485760, record=40000) core_speed = self.get_core_speed(byte=10485760, record=40000) else: speed = self.get_speed() core_speed = self.get_core_speed() job_config_json = { 'job': { 'content': [ { 'reader': self.get_reader(), 'writer': self.get_writer() } ], 'setting': { 'speed': speed } }, 'core': core_speed } return job_config_json def run(self): job_config_json = self.assemble() # HDFS Mount的覆盖写入貌似有问题 delete_file(self.output) with open(self.output, 'w') as w: json.dump(job_config_json, w, ensure_ascii=False)