| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- 读取定义在`conf/datax/config/${源类型}-${目标类型}/${项目}-${分层}-${Hive环境}[/数据库环境[/数据分组]]/${源类型}-${目标类型}-${源库名称}-${源表名称}.ini`中的配置,
- 以及在上述配置中定义、存储于`conf/datax/datasource/${ds-type}/${project}-${layer}-${env}`的`${ds-type}-${ds-name}.ini`中的
- `reader`和`writer`,生成DataX作业配置文件。
- 若未提供`-output`来指定路径存储生成的DataX作业配置文件,则会默认将生成的DataX作业配置文件存储于`conf/datax/generated`中。
- """
- import os
- import sys
- project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(project_root_dir)
- from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_RED, NORM_YEL
- from dw_base.common.config_constants import K_CONFIG_FILE, K_DIRECTORY, KL_HELP
- from dw_base.datax.job_config_generator import JobConfigGenerator
- from dw_base.utils.common_utils import exist
- from dw_base.utils.config_utils import parse_args
- from dw_base.utils.datetime_utils import get_yesterday, get_today
- from dw_base.utils.file_utils import list_files, get_abs_path
- from dw_base.utils.log_utils import pretty_print
- def usage(code: int):
- print(
- f'{NORM_MGT}Usage: {sys.argv[0]}\n'
- f'{NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法{DO_RESET}'
- )
- print(
- f'{NORM_MGT}Usage: {sys.argv[0]}\n'
- f'{NORM_GRN}\t<[-]-c< /=>job config> DataX作业配置生成器配置文件路径,可以多次传入-c/--c或以逗号分隔的形式\n'
- f'{NORM_GRN}\t 传入多个,传多个时参数start-date和stop-date共同使用\n'
- f'{NORM_GRN}\t<[-]-d< /=>job config directory> 扫描指定路径下所有DataX作业配置生成器配置\n'
- f'{NORM_GRN}\t 可以多次传入-d/--d或以逗号分隔的形式传入多个,优先级低于-c/--c\n'
- f'{NORM_GRN}\t<[-]-r> 递归扫描指定路径下所有DataX作业配置生成器配置,与-d/--d配合使用\n'
- f'{NORM_CYN}\t[[-]-start-date< /=>start date] yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n'
- f'{NORM_CYN}\t[[-]-stop-date< /=>stop date] yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n'
- f'{NORM_CYN}\t[[-]-o< /=>output path] DataX作业配置输出文件夹(绝对路径)'
- f'{DO_RESET}'
- )
- exit(code)
- def collect_generate_config_files():
- generator_config_files = set()
- if CONFIG.__contains__(K_CONFIG_FILE):
- # 传递了`DataX作业配置生成器配置.ini文件`
- generator_config = CONFIG.get(K_CONFIG_FILE)
- pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件')
- if isinstance(generator_config, list):
- # 以列表形式提供的
- for c in generator_config:
- generator_config_files.add(get_abs_path(c, check_exist=False))
- else:
- # 以逗号分隔形式提供的
- for c in generator_config.split(','):
- generator_config_files.add(get_abs_path(c, check_exist=False))
- elif CONFIG.__contains__(K_DIRECTORY):
- # 传递了含有`DataX作业配置生成器配置.ini文件`的目录
- pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件目录')
- recursive = CONFIG.get('r', False)
- generator_config_dirs = []
- if isinstance(CONFIG.get(K_DIRECTORY), str):
- dir_conf = CONFIG.get(K_DIRECTORY).split(',') # type:list
- # 以逗号分隔的多个目录
- else:
- # 目录列表
- dir_conf = CONFIG.get(K_DIRECTORY) # type:list
- for d in dir_conf:
- d = get_abs_path(d, check_exist=False)
- if not os.path.exists(d):
- pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 不存在')
- raise FileNotFoundError(d)
- if not os.path.isdir(d):
- pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 存在,但不是目录')
- raise NotADirectoryError(d)
- generator_config_dirs.append(d)
- for each_dir in generator_config_dirs:
- # 递归处理目录
- files = list_files(each_dir, recursive)
- for file in files:
- generator_config_files.add(file)
- return generator_config_files
- if __name__ == '__main__':
- pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
- CONFIG, _ = parse_args(sys.argv[1:])
- # 未提供任何参数或查看帮助
- if len(sys.argv) == 1 or exist(CONFIG, KL_HELP):
- usage(0)
- # DataX作业配置生成器配置文件列表
- generator_config_files = collect_generate_config_files()
- if len(generator_config_files) == 0:
- pretty_print(f'{NORM_RED}未找到任何有效的“DataX作业配置文件生成器”配置文件')
- exit(1)
- # 检查所有文件是否都存在
- for gcf in generator_config_files:
- short_name = gcf.replace(f'{project_root_dir}/', '')
- try:
- if not os.path.exists(gcf):
- pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 不存在')
- raise FileNotFoundError(gcf)
- elif not os.path.isfile(gcf):
- pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 存在,但不是文件')
- raise IsADirectoryError(gcf)
- except Exception as e:
- pretty_print(f'使用配置文件 {short_name} 生成DataX作业配置文件(.json)失败')
- raise e
- # 开始生成`DataX作业配置文件`
- start_date = CONFIG.get('start-date', get_yesterday())
- stop_date = CONFIG.get('stop-date', get_today())
- for gcf in generator_config_files:
- # gcf应形如:${project_base_dir}/conf/datax/config/${src-type}-${dst-type}/${project}-${layer}-${env}/${src-type}-${dst-type}-${src-name}.ini
- # ${project}-${layer}-${env}
- temp = os.path.dirname(gcf).replace(f'{project_root_dir}/', '').replace(f'conf/datax/config/', '').split('/')
- src_dst = temp[0]
- if len(temp) > 1:
- project_layer_env = temp[1]
- else:
- project_layer_env = 'default'
- # project_layer_env = os.path.basename(path.dirname(gcf))
- # src_dst = os.path.basename(path.dirname(path.dirname(gcf)))
- # 默认输出(绝对)路径
- default_output_dir = f'{project_root_dir}/conf/datax/generated'
- job_config_name = os.path.basename(gcf).replace('.ini', '.json')
- # 指定的输出路径,可以是绝对路径,也可以是相对(项目根目录)的路径
- output_path_arr = ([CONFIG.get("o", default_output_dir), src_dst, project_layer_env] + temp[2:]
- + [job_config_name])
- output = '/'.join(output_path_arr)
- os.system(f'mkdir -p {os.path.dirname(output)}')
- try:
- pretty_print(f'{NORM_MGT}开始使用 {NORM_GRN}{gcf}{NORM_MGT} 生成DataX作业配置文件')
- job_config_generator = JobConfigGenerator(project_root_dir, gcf, start_date, stop_date, output)
- job_config_generator.run()
- pretty_print(f'{NORM_MGT}DataX作业配置文件 {NORM_GRN}{output}{NORM_MGT} 生成成功')
- except Exception as e:
- pretty_print(f'使用配置文件 {gcf} 生成DataX作业配置文件(.json)失败')
- pretty_print(f'{NORM_MGT}使用配置文件 ${NORM_GRN}{gcf} {NORM_MGT}生成DataX作业配置文件失败')
- raise e
|