| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- # -*- coding:utf-8 -*-
- import json
- from configparser import ConfigParser
- from dw_base.datax.datax_constants import *
- from dw_base.datax.plugins.plugin_factory import PluginFactory
- from dw_base.utils import datetime_utils
- from dw_base.utils.file_utils import delete_file, get_abs_path
- class JobConfigGenerator(object):
- """
- 生成DataX作业配置文件
- """
- def __init__(self, base_dir: str, generator_config: str, start_date: str, stop_date: str, output: str):
- """
- 初始化
- Args:
- base_dir: 项目目录
- generator_config: DataX作业配置生成器配置文件,格式为ini文件,包含两部分reader和writer,两部分都包含dataSource配置
- start_date: 内参,开始日期
- stop_date: 内参,结束日期
- output: 结果(DataX作业配置文件)输出文件路径
- """
- self.generator_config = get_abs_path(generator_config)
- self.base_dir = base_dir
- self.start_date = start_date
- self.stop_date = stop_date
- self.output = output
- self.config_parser = ConfigParser()
- self.config_parser.read(self.generator_config)
- def get_reader(self):
- reader = PluginFactory.get_plugin('reader', self.base_dir, self.config_parser, self.start_date, self.stop_date)
- return reader.configure()
- def get_writer(self):
- writer = PluginFactory.get_plugin('writer', self.base_dir, self.config_parser, self.start_date, self.stop_date)
- return writer.configure()
- @staticmethod
- def get_speed(channel=6, byte=268435456, record=100000):
- return {JOB_SETTING_SPEED_CHANNEL: channel, JOB_SETTING_SPEED_BYTE: byte, JOB_SETTING_SPEED_RECORD: record}
- @staticmethod
- def get_core_speed(byte=268435456, record=100000):
- return {
- 'transport': {
- 'channel': {
- 'speed': {
- 'byte': byte,
- 'record': record
- }
- }
- }
- }
- def assemble(self):
- local_time = int(datetime_utils.formatted_now('%H%M'))
- if 750 < local_time < 1900:
- speed = self.get_speed(10, byte=10485760, record=40000)
- core_speed = self.get_core_speed(byte=10485760, record=40000)
- else:
- speed = self.get_speed()
- core_speed = self.get_core_speed()
- job_config_json = {
- 'job': {
- 'content': [
- {
- 'reader': self.get_reader(),
- 'writer': self.get_writer()
- }
- ],
- 'setting': {
- 'speed': speed
- }
- },
- 'core': core_speed
- }
- return job_config_json
- def run(self):
- job_config_json = self.assemble()
- # HDFS Mount的覆盖写入貌似有问题
- delete_file(self.output)
- with open(self.output, 'w') as w:
- json.dump(job_config_json, w, ensure_ascii=False)
|