entry.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX 入口门面(对应 bin/datax-{hive-import,hdfs-export}-starter.{sh,py} 底层)。
  4. - DataxImport: 目标=Hive/HDFS,自动预建 Hive 分区;典型 jobs/raw/
  5. - DataxExport: 源=HDFS,导出到外部;典型 jobs/ads/,无分区预建
  6. 共同流程:expand ini → for each (串/并行) → runner.run_job。
  7. Import 在 for 之前额外调 partition.execute_ddls 预建 Hive 分区。
  8. 分布式 worker 选择当前仍保留(老参数平迁);若 kb/93 ADR-02 正式采纳,
  9. worker.select_worker 会降级为"永远返回 current_host",此处无需改。
  10. """
  11. import getpass
  12. import os
  13. import socket
  14. from typing import List, Optional
  15. from dw_base.datax import batch, partition, path_utils, runner, worker
  16. def _is_release_user(release_user: str) -> bool:
  17. return getpass.getuser() == release_user
  18. def _is_in_release_dir(base_dir: str, release_root_dir: str, project_name: str) -> bool:
  19. expected = os.path.abspath(os.path.join(release_root_dir, project_name))
  20. return os.path.abspath(base_dir).startswith(expected)
  21. class _BaseDatax:
  22. """共享基类:加载 workers pool、解析环境、构造 run_one 闭包。"""
  23. def __init__(self,
  24. base_dir: str,
  25. workers_ini_path: str,
  26. release_user: str,
  27. release_root_dir: str,
  28. python3_path: str,
  29. datax_home: str,
  30. log_root_dir: str,
  31. log_module: str):
  32. self.base_dir = os.path.abspath(base_dir)
  33. self.pool = worker.load_workers_ini(workers_ini_path)
  34. self.release_user = release_user
  35. self.release_root_dir = release_root_dir
  36. self.python3_path = python3_path
  37. self.datax_home = datax_home
  38. self.log_root_dir = log_root_dir
  39. self.log_module = log_module
  40. self.project_name = os.path.basename(self.base_dir)
  41. self.current_host = socket.gethostname().split('.')[0]
  42. def _make_run_one(self,
  43. start_date: str,
  44. stop_date: str,
  45. host: Optional[str],
  46. use_random: bool,
  47. parallel: bool,
  48. skip_datax: bool):
  49. is_rel_user = _is_release_user(self.release_user)
  50. is_in_rel_dir = _is_in_release_dir(self.base_dir, self.release_root_dir, self.project_name)
  51. def _run_one(ini_path: str) -> int:
  52. w = worker.select_worker(
  53. self.pool,
  54. is_release_user=is_rel_user,
  55. is_in_release_dir=is_in_rel_dir,
  56. current_host=self.current_host,
  57. host=host,
  58. use_random=use_random,
  59. )
  60. job_name = path_utils.job_name_from_ini(ini_path)
  61. log_file = path_utils.log_path(self.log_root_dir, self.log_module, start_date, job_name)
  62. os.makedirs(os.path.dirname(log_file), exist_ok=True)
  63. # 并行:每任务独立 log 文件(输出不回父 stdout,对齐老 > LOG 2>&1 &)
  64. # 串行:继承父 stdout(用户可 tail 文件或靠外层 bash tee)
  65. if parallel:
  66. with open(log_file, 'a', encoding='utf-8') as fh:
  67. return runner.run_job(
  68. ini_path=ini_path, start_date=start_date, stop_date=stop_date,
  69. worker_host=w, current_host=self.current_host,
  70. base_dir=self.base_dir, python3_path=self.python3_path,
  71. datax_home=self.datax_home,
  72. skip_datax=skip_datax,
  73. stdout=fh, stderr=fh,
  74. )
  75. return runner.run_job(
  76. ini_path=ini_path, start_date=start_date, stop_date=stop_date,
  77. worker_host=w, current_host=self.current_host,
  78. base_dir=self.base_dir, python3_path=self.python3_path,
  79. datax_home=self.datax_home,
  80. skip_datax=skip_datax,
  81. )
  82. return _run_one
  83. class DataxImport(_BaseDatax):
  84. """目标=Hive 导入(自动预建分区)。"""
  85. def __init__(self, **kwargs):
  86. super().__init__(log_module='datax', **kwargs)
  87. def run(self,
  88. inis: List[str],
  89. inis_dirs: List[str],
  90. start_date: str,
  91. stop_date: str,
  92. host: Optional[str] = None,
  93. use_random: bool = False,
  94. parallel: bool = False,
  95. skip_partition: bool = False,
  96. skip_datax: bool = False,
  97. extra_partition_tables: Optional[List[str]] = None) -> int:
  98. """
  99. Returns: 失败任务数(0 = 全部成功)
  100. """
  101. ini_list = batch.expand_ini_inputs(inis, inis_dirs)
  102. if not ini_list:
  103. return 0
  104. if not skip_partition:
  105. ddls = []
  106. for ini in ini_list:
  107. ddl = partition.parse_ini_partition(ini, stop_date)
  108. if ddl:
  109. ddls.append(ddl)
  110. if extra_partition_tables:
  111. dt = partition.compute_partition_dt(stop_date)
  112. for tbl in extra_partition_tables:
  113. ddls.append('ALTER TABLE {tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
  114. tbl=tbl, dt=dt))
  115. partition.execute_ddls(ddls)
  116. run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
  117. _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
  118. return failed
  119. class DataxExport(_BaseDatax):
  120. """源=HDFS 导出(无分区预建;源路径存在性 check 沿用老脚本 check_data_exists 行为,暂未搬迁)。"""
  121. def __init__(self, **kwargs):
  122. super().__init__(log_module='datax', **kwargs)
  123. def run(self,
  124. inis: List[str],
  125. inis_dirs: List[str],
  126. start_date: str,
  127. stop_date: str,
  128. host: Optional[str] = None,
  129. use_random: bool = False,
  130. parallel: bool = False,
  131. skip_datax: bool = False) -> int:
  132. """
  133. Returns: 失败任务数(0 = 全部成功)
  134. """
  135. ini_list = batch.expand_ini_inputs(inis, inis_dirs)
  136. if not ini_list:
  137. return 0
  138. run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
  139. _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
  140. return failed