datax-hive-import-starter.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. DataX hive-import 入口:目标=Hive(自动预建分区),对应 jobs/raw/ 场景。
  5. 老参数平迁集(见 kb/90 §2.6):
  6. -ini <file> 单 ini,可多次
  7. -inis <dir> ini 目录(非递归扫 *.ini),可多次
  8. -start-date / -stop-date yyyyMMdd
  9. -host <hostname> 显式指定 worker
  10. -random 加权随机选 worker
  11. -parallel 并行(默认串行)
  12. -skip-datax 只生成 json 不执行
  13. -skip-partition 跳过 Hive 分区预建
  14. -t <db.table> 显式追加需建分区的表,可多次
  15. """
  16. import argparse
  17. import os
  18. import sys
  19. project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  20. sys.path.append(project_root_dir)
  21. from dw_base.datax.entry import DataxImport
  22. def main():
  23. parser = argparse.ArgumentParser(
  24. prog='datax-hive-import-starter',
  25. description='DataX hive-import 入口:PG/其他源 → Hive (raw 层)',
  26. )
  27. parser.add_argument('-ini', action='append', default=[], metavar='FILE',
  28. help='DataX ini 单文件(可多次)')
  29. parser.add_argument('-inis', action='append', default=[], metavar='DIR',
  30. help='DataX ini 目录,非递归扫 *.ini(可多次)')
  31. parser.add_argument('-start-date', required=True, metavar='YYYYMMDD')
  32. parser.add_argument('-stop-date', required=True, metavar='YYYYMMDD')
  33. parser.add_argument('-host', default=None, metavar='HOSTNAME',
  34. help='显式指定 worker(优先于 -random)')
  35. parser.add_argument('-random', action='store_true', dest='use_random',
  36. help='从 conf/workers.ini 加权随机选 worker')
  37. parser.add_argument('-parallel', action='store_true',
  38. help='并行执行(默认串行)')
  39. parser.add_argument('-skip-datax', action='store_true',
  40. help='只生成 json 不执行 datax.py')
  41. parser.add_argument('-skip-partition', action='store_true',
  42. help='跳过 Hive 分区预建')
  43. parser.add_argument('-t', action='append', default=[], dest='extra_partition_tables',
  44. metavar='DB.TABLE',
  45. help='显式追加需建分区的 Hive 表(可多次)')
  46. args = parser.parse_args()
  47. print('{script} 收到参数: {argv}'.format(
  48. script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
  49. ))
  50. importer = DataxImport(
  51. base_dir=project_root_dir,
  52. workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
  53. release_user=os.environ['RELEASE_USER'],
  54. release_root_dir=os.environ['RELEASE_ROOT_DIR'],
  55. python3_path=os.environ['PYTHON3_PATH'],
  56. datax_home=os.environ['DATAX_HOME'],
  57. log_root_dir=os.environ['LOG_ROOT_DIR'],
  58. )
  59. failed = importer.run(
  60. inis=args.ini,
  61. inis_dirs=args.inis,
  62. start_date=args.start_date,
  63. stop_date=args.stop_date,
  64. host=args.host,
  65. use_random=args.use_random,
  66. parallel=args.parallel,
  67. skip_partition=args.skip_partition,
  68. skip_datax=args.skip_datax,
  69. extra_partition_tables=args.extra_partition_tables,
  70. )
  71. sys.exit(failed)
  72. if __name__ == '__main__':
  73. main()