datax-job-config-generator.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. 读取定义在`conf/datax/config/${源类型}-${目标类型}/${项目}-${分层}-${Hive环境}[/数据库环境[/数据分组]]/${源类型}-${目标类型}-${源库名称}-${源表名称}.ini`中的配置,
  5. 以及在上述配置中定义、存储于`conf/datax/datasource/${ds-type}/${project}-${layer}-${env}`的`${ds-type}-${ds-name}.ini`中的
  6. `reader`和`writer`,生成DataX作业配置文件。
  7. 若未提供`-output`来指定路径存储生成的DataX作业配置文件,则会默认将生成的DataX作业配置文件存储于`conf/datax/generated`中。
  8. """
  9. import os
  10. import sys
  11. project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  12. sys.path.append(project_root_dir)
  13. from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_RED, NORM_YEL
  14. from dw_base.common.config_constants import K_CONFIG_FILE, K_DIRECTORY, KL_HELP
  15. from dw_base.datax.job_config_generator import JobConfigGenerator
  16. from dw_base.utils.common_utils import exist
  17. from dw_base.utils.config_utils import parse_args
  18. from dw_base.utils.datetime_utils import get_yesterday, get_today
  19. from dw_base.utils.file_utils import list_files, get_abs_path
  20. from dw_base.utils.log_utils import pretty_print
  21. def usage(code: int):
  22. print(
  23. f'{NORM_MGT}Usage: {sys.argv[0]}\n'
  24. f'{NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法{DO_RESET}'
  25. )
  26. print(
  27. f'{NORM_MGT}Usage: {sys.argv[0]}\n'
  28. f'{NORM_GRN}\t<[-]-c< /=>job config> DataX作业配置生成器配置文件路径,可以多次传入-c/--c或以逗号分隔的形式\n'
  29. f'{NORM_GRN}\t 传入多个,传多个时参数start-date和stop-date共同使用\n'
  30. f'{NORM_GRN}\t<[-]-d< /=>job config directory> 扫描指定路径下所有DataX作业配置生成器配置\n'
  31. f'{NORM_GRN}\t 可以多次传入-d/--d或以逗号分隔的形式传入多个,优先级低于-c/--c\n'
  32. f'{NORM_GRN}\t<[-]-r> 递归扫描指定路径下所有DataX作业配置生成器配置,与-d/--d配合使用\n'
  33. f'{NORM_CYN}\t[[-]-start-date< /=>start date] yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n'
  34. f'{NORM_CYN}\t[[-]-stop-date< /=>stop date] yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n'
  35. f'{NORM_CYN}\t[[-]-o< /=>output path] DataX作业配置输出文件夹(绝对路径)'
  36. f'{DO_RESET}'
  37. )
  38. exit(code)
  39. def collect_generate_config_files():
  40. generator_config_files = set()
  41. if CONFIG.__contains__(K_CONFIG_FILE):
  42. # 传递了`DataX作业配置生成器配置.ini文件`
  43. generator_config = CONFIG.get(K_CONFIG_FILE)
  44. pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件')
  45. if isinstance(generator_config, list):
  46. # 以列表形式提供的
  47. for c in generator_config:
  48. generator_config_files.add(get_abs_path(c, check_exist=False))
  49. else:
  50. # 以逗号分隔形式提供的
  51. for c in generator_config.split(','):
  52. generator_config_files.add(get_abs_path(c, check_exist=False))
  53. elif CONFIG.__contains__(K_DIRECTORY):
  54. # 传递了含有`DataX作业配置生成器配置.ini文件`的目录
  55. pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件目录')
  56. recursive = CONFIG.get('r', False)
  57. generator_config_dirs = []
  58. if isinstance(CONFIG.get(K_DIRECTORY), str):
  59. dir_conf = CONFIG.get(K_DIRECTORY).split(',') # type:list
  60. # 以逗号分隔的多个目录
  61. else:
  62. # 目录列表
  63. dir_conf = CONFIG.get(K_DIRECTORY) # type:list
  64. for d in dir_conf:
  65. d = get_abs_path(d, check_exist=False)
  66. if not os.path.exists(d):
  67. pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 不存在')
  68. raise FileNotFoundError(d)
  69. if not os.path.isdir(d):
  70. pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 存在,但不是目录')
  71. raise NotADirectoryError(d)
  72. generator_config_dirs.append(d)
  73. for each_dir in generator_config_dirs:
  74. # 递归处理目录
  75. files = list_files(each_dir, recursive)
  76. for file in files:
  77. generator_config_files.add(file)
  78. return generator_config_files
  79. if __name__ == '__main__':
  80. pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
  81. CONFIG, _ = parse_args(sys.argv[1:])
  82. # 未提供任何参数或查看帮助
  83. if len(sys.argv) == 1 or exist(CONFIG, KL_HELP):
  84. usage(0)
  85. # DataX作业配置生成器配置文件列表
  86. generator_config_files = collect_generate_config_files()
  87. if len(generator_config_files) == 0:
  88. pretty_print(f'{NORM_RED}未找到任何有效的“DataX作业配置文件生成器”配置文件')
  89. exit(1)
  90. # 检查所有文件是否都存在
  91. for gcf in generator_config_files:
  92. short_name = gcf.replace(f'{project_root_dir}/', '')
  93. try:
  94. if not os.path.exists(gcf):
  95. pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 不存在')
  96. raise FileNotFoundError(gcf)
  97. elif not os.path.isfile(gcf):
  98. pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 存在,但不是文件')
  99. raise IsADirectoryError(gcf)
  100. except Exception as e:
  101. pretty_print(f'使用配置文件 {short_name} 生成DataX作业配置文件(.json)失败')
  102. raise e
  103. # 开始生成`DataX作业配置文件`
  104. start_date = CONFIG.get('start-date', get_yesterday())
  105. stop_date = CONFIG.get('stop-date', get_today())
  106. for gcf in generator_config_files:
  107. # gcf应形如:${project_base_dir}/conf/datax/config/${src-type}-${dst-type}/${project}-${layer}-${env}/${src-type}-${dst-type}-${src-name}.ini
  108. # ${project}-${layer}-${env}
  109. temp = os.path.dirname(gcf).replace(f'{project_root_dir}/', '').replace(f'conf/datax/config/', '').split('/')
  110. src_dst = temp[0]
  111. if len(temp) > 1:
  112. project_layer_env = temp[1]
  113. else:
  114. project_layer_env = 'default'
  115. # project_layer_env = os.path.basename(path.dirname(gcf))
  116. # src_dst = os.path.basename(path.dirname(path.dirname(gcf)))
  117. # 默认输出(绝对)路径
  118. default_output_dir = f'{project_root_dir}/conf/datax/generated'
  119. job_config_name = os.path.basename(gcf).replace('.ini', '.json')
  120. # 指定的输出路径,可以是绝对路径,也可以是相对(项目根目录)的路径
  121. output_path_arr = ([CONFIG.get("o", default_output_dir), src_dst, project_layer_env] + temp[2:]
  122. + [job_config_name])
  123. output = '/'.join(output_path_arr)
  124. os.system(f'mkdir -p {os.path.dirname(output)}')
  125. try:
  126. pretty_print(f'{NORM_MGT}开始使用 {NORM_GRN}{gcf}{NORM_MGT} 生成DataX作业配置文件')
  127. job_config_generator = JobConfigGenerator(project_root_dir, gcf, start_date, stop_date, output)
  128. job_config_generator.run()
  129. pretty_print(f'{NORM_MGT}DataX作业配置文件 {NORM_GRN}{output}{NORM_MGT} 生成成功')
  130. except Exception as e:
  131. pretty_print(f'使用配置文件 {gcf} 生成DataX作业配置文件(.json)失败')
  132. pretty_print(f'{NORM_MGT}使用配置文件 ${NORM_GRN}{gcf} {NORM_MGT}生成DataX作业配置文件失败')
  133. raise e