#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ 读取定义在`conf/datax/config/${源类型}-${目标类型}/${项目}-${分层}-${Hive环境}[/数据库环境[/数据分组]]/${源类型}-${目标类型}-${源库名称}-${源表名称}.ini`中的配置, 以及在上述配置中定义、存储于`conf/datax/datasource/${ds-type}/${project}-${layer}-${env}`的`${ds-type}-${ds-name}.ini`中的 `reader`和`writer`,生成DataX作业配置文件。 若未提供`-output`来指定路径存储生成的DataX作业配置文件,则会默认将生成的DataX作业配置文件存储于`conf/datax/generated`中。 """ import os import sys project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root_dir) from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_RED, NORM_YEL from dw_base.common.config_constants import K_CONFIG_FILE, K_DIRECTORY, KL_HELP from dw_base.datax.job_config_generator import JobConfigGenerator from dw_base.utils.common_utils import exist from dw_base.utils.config_utils import parse_args from dw_base.utils.datetime_utils import get_yesterday, get_today from dw_base.utils.file_utils import list_files, get_abs_path from dw_base.utils.log_utils import pretty_print def usage(code: int): print( f'{NORM_MGT}Usage: {sys.argv[0]}\n' f'{NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法{DO_RESET}' ) print( f'{NORM_MGT}Usage: {sys.argv[0]}\n' f'{NORM_GRN}\t<[-]-c< /=>job config> DataX作业配置生成器配置文件路径,可以多次传入-c/--c或以逗号分隔的形式\n' f'{NORM_GRN}\t 传入多个,传多个时参数start-date和stop-date共同使用\n' f'{NORM_GRN}\t<[-]-d< /=>job config directory> 扫描指定路径下所有DataX作业配置生成器配置\n' f'{NORM_GRN}\t 可以多次传入-d/--d或以逗号分隔的形式传入多个,优先级低于-c/--c\n' f'{NORM_GRN}\t<[-]-r> 递归扫描指定路径下所有DataX作业配置生成器配置,与-d/--d配合使用\n' f'{NORM_CYN}\t[[-]-start-date< /=>start date] yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n' f'{NORM_CYN}\t[[-]-stop-date< /=>stop date] yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n' f'{NORM_CYN}\t[[-]-o< /=>output path] DataX作业配置输出文件夹(绝对路径)' f'{DO_RESET}' ) exit(code) def collect_generate_config_files(): generator_config_files = set() if CONFIG.__contains__(K_CONFIG_FILE): # 传递了`DataX作业配置生成器配置.ini文件` generator_config = CONFIG.get(K_CONFIG_FILE) pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件') if isinstance(generator_config, list): # 以列表形式提供的 for c in generator_config: generator_config_files.add(get_abs_path(c, check_exist=False)) else: # 以逗号分隔形式提供的 for c in generator_config.split(','): generator_config_files.add(get_abs_path(c, check_exist=False)) elif CONFIG.__contains__(K_DIRECTORY): # 传递了含有`DataX作业配置生成器配置.ini文件`的目录 pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件目录') recursive = CONFIG.get('r', False) generator_config_dirs = [] if isinstance(CONFIG.get(K_DIRECTORY), str): dir_conf = CONFIG.get(K_DIRECTORY).split(',') # type:list # 以逗号分隔的多个目录 else: # 目录列表 dir_conf = CONFIG.get(K_DIRECTORY) # type:list for d in dir_conf: d = get_abs_path(d, check_exist=False) if not os.path.exists(d): pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 不存在') raise FileNotFoundError(d) if not os.path.isdir(d): pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 存在,但不是目录') raise NotADirectoryError(d) generator_config_dirs.append(d) for each_dir in generator_config_dirs: # 递归处理目录 files = list_files(each_dir, recursive) for file in files: generator_config_files.add(file) return generator_config_files if __name__ == '__main__': pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}') CONFIG, _ = parse_args(sys.argv[1:]) # 未提供任何参数或查看帮助 if len(sys.argv) == 1 or exist(CONFIG, KL_HELP): usage(0) # DataX作业配置生成器配置文件列表 generator_config_files = collect_generate_config_files() if len(generator_config_files) == 0: pretty_print(f'{NORM_RED}未找到任何有效的“DataX作业配置文件生成器”配置文件') exit(1) # 检查所有文件是否都存在 for gcf in generator_config_files: short_name = gcf.replace(f'{project_root_dir}/', '') try: if not os.path.exists(gcf): pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 不存在') raise FileNotFoundError(gcf) elif not os.path.isfile(gcf): pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 存在,但不是文件') raise IsADirectoryError(gcf) except Exception as e: pretty_print(f'使用配置文件 {short_name} 生成DataX作业配置文件(.json)失败') raise e # 开始生成`DataX作业配置文件` start_date = CONFIG.get('start-date', get_yesterday()) stop_date = CONFIG.get('stop-date', get_today()) for gcf in generator_config_files: # gcf应形如:${project_base_dir}/conf/datax/config/${src-type}-${dst-type}/${project}-${layer}-${env}/${src-type}-${dst-type}-${src-name}.ini # ${project}-${layer}-${env} temp = os.path.dirname(gcf).replace(f'{project_root_dir}/', '').replace(f'conf/datax/config/', '').split('/') src_dst = temp[0] if len(temp) > 1: project_layer_env = temp[1] else: project_layer_env = 'default' # project_layer_env = os.path.basename(path.dirname(gcf)) # src_dst = os.path.basename(path.dirname(path.dirname(gcf))) # 默认输出(绝对)路径 default_output_dir = f'{project_root_dir}/conf/datax/generated' job_config_name = os.path.basename(gcf).replace('.ini', '.json') # 指定的输出路径,可以是绝对路径,也可以是相对(项目根目录)的路径 output_path_arr = ([CONFIG.get("o", default_output_dir), src_dst, project_layer_env] + temp[2:] + [job_config_name]) output = '/'.join(output_path_arr) os.system(f'mkdir -p {os.path.dirname(output)}') try: pretty_print(f'{NORM_MGT}开始使用 {NORM_GRN}{gcf}{NORM_MGT} 生成DataX作业配置文件') job_config_generator = JobConfigGenerator(project_root_dir, gcf, start_date, stop_date, output) job_config_generator.run() pretty_print(f'{NORM_MGT}DataX作业配置文件 {NORM_GRN}{output}{NORM_MGT} 生成成功') except Exception as e: pretty_print(f'使用配置文件 {gcf} 生成DataX作业配置文件(.json)失败') pretty_print(f'{NORM_MGT}使用配置文件 ${NORM_GRN}{gcf} {NORM_MGT}生成DataX作业配置文件失败') raise e