entry.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- coding:utf-8 -*-
  2. """
  3. DataX 入口门面(对应 bin/datax-{hive-import,hdfs-export}-starter.{sh,py} 底层)。
  4. - DataxImport: 目标=Hive/HDFS,自动预建 Hive 分区;典型 jobs/raw/
  5. - DataxExport: 源=HDFS,导出到外部;典型 jobs/ads/,无分区预建
  6. 共同流程:expand ini → for each (串/并行) → runner.run_job。
  7. Import 在 for 之前额外调 partition.execute_ddls 预建 Hive 分区。
  8. 分布式 worker 选择当前仍保留(老参数平迁);若 kb/93 ADR-02 正式采纳,
  9. worker.select_worker 会降级为"永远返回 current_host",此处无需改。
  10. """
  11. import getpass
  12. import os
  13. import socket
  14. from datetime import datetime, timedelta
  15. from typing import List, Optional
  16. from dw_base.datax import batch, partition, path_utils, runner, worker
  17. def _is_release_user(release_user: str) -> bool:
  18. return getpass.getuser() == release_user
  19. def _is_in_release_dir(base_dir: str, release_root_dir: str, project_name: str) -> bool:
  20. expected = os.path.abspath(os.path.join(release_root_dir, project_name))
  21. return os.path.abspath(base_dir).startswith(expected)
  22. def _resolve_relative_to_base(path: str, base_dir: str) -> str:
  23. """ini / inis 相对路径按 base_dir 解析(不依赖 Python 进程 cwd)。绝对路径原样返回。"""
  24. if os.path.isabs(path):
  25. return path
  26. return os.path.join(base_dir, path)
  27. class _BaseDatax:
  28. """共享基类:加载 workers pool、解析环境、构造 run_one 闭包。"""
  29. def __init__(self,
  30. base_dir: str,
  31. workers_ini_path: str,
  32. release_user: str,
  33. release_root_dir: str,
  34. python3_path: str,
  35. datax_home: str,
  36. log_root_dir: str,
  37. log_module: str):
  38. self.base_dir = os.path.abspath(base_dir)
  39. self.pool = worker.load_workers_ini(workers_ini_path)
  40. self.release_user = release_user
  41. self.release_root_dir = release_root_dir
  42. self.python3_path = python3_path
  43. self.datax_home = datax_home
  44. self.log_root_dir = log_root_dir
  45. self.log_module = log_module
  46. self.project_name = os.path.basename(self.base_dir)
  47. self.current_host = socket.gethostname().split('.')[0]
  48. def _make_run_one(self,
  49. start_date: str,
  50. stop_date: str,
  51. host: Optional[str],
  52. use_random: bool,
  53. parallel: bool,
  54. skip_datax: bool,
  55. skip_check: bool,
  56. speed_overrides: Optional[dict] = None):
  57. is_rel_user = _is_release_user(self.release_user)
  58. is_in_rel_dir = _is_in_release_dir(self.base_dir, self.release_root_dir, self.project_name)
  59. def _run_one(ini_path: str) -> int:
  60. w = worker.select_worker(
  61. self.pool,
  62. is_release_user=is_rel_user,
  63. is_in_release_dir=is_in_rel_dir,
  64. current_host=self.current_host,
  65. host=host,
  66. use_random=use_random,
  67. )
  68. job_name = path_utils.job_name_from_ini(ini_path)
  69. log_file = path_utils.log_path(self.log_root_dir, self.log_module, start_date, job_name)
  70. os.makedirs(os.path.dirname(log_file), exist_ok=True)
  71. print('[datax] ini={j} worker={w} log={lf}'.format(j=job_name, w=w, lf=log_file))
  72. # 并行:每任务独立 log 文件(输出不回父 stdout,对齐老 > LOG 2>&1 &)
  73. # 串行:tee 到独立 log 文件 + 父 stdout(对齐老 | tee LOG_FILE)
  74. if parallel:
  75. with open(log_file, 'a', encoding='utf-8') as fh:
  76. rc = runner.run_job(
  77. ini_path=ini_path, start_date=start_date, stop_date=stop_date,
  78. worker_host=w, current_host=self.current_host,
  79. base_dir=self.base_dir, python3_path=self.python3_path,
  80. datax_home=self.datax_home,
  81. skip_datax=skip_datax,
  82. skip_check=skip_check,
  83. speed_overrides=speed_overrides,
  84. stdout=fh, stderr=fh,
  85. )
  86. print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
  87. return rc
  88. with open(log_file, 'a', encoding='utf-8') as fh:
  89. rc = runner.run_job(
  90. ini_path=ini_path, start_date=start_date, stop_date=stop_date,
  91. worker_host=w, current_host=self.current_host,
  92. base_dir=self.base_dir, python3_path=self.python3_path,
  93. datax_home=self.datax_home,
  94. skip_datax=skip_datax,
  95. skip_check=skip_check,
  96. speed_overrides=speed_overrides,
  97. tee_to=fh,
  98. )
  99. print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
  100. return rc
  101. return _run_one
  102. class DataxImport(_BaseDatax):
  103. """目标=Hive 导入(自动预建分区)。"""
  104. def __init__(self, **kwargs):
  105. super().__init__(log_module='datax', **kwargs)
  106. def run(self,
  107. inis: List[str],
  108. inis_dirs: List[str],
  109. start_date: str,
  110. stop_date: str,
  111. host: Optional[str] = None,
  112. use_random: bool = False,
  113. parallel: bool = False,
  114. skip_partition: bool = False,
  115. skip_datax: bool = False,
  116. speed_overrides: Optional[dict] = None) -> int:
  117. """
  118. Returns: 失败任务数(0 = 全部成功)
  119. """
  120. resolved_inis = [_resolve_relative_to_base(p, self.base_dir) for p in inis]
  121. resolved_dirs = [_resolve_relative_to_base(p, self.base_dir) for p in inis_dirs]
  122. ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs)
  123. if not ini_list:
  124. return 0
  125. if not skip_partition:
  126. ddls = []
  127. for ini in ini_list:
  128. ddl = partition.parse_ini_partition(ini, stop_date)
  129. if ddl:
  130. ddls.append(ddl)
  131. partition.execute_ddls(ddls)
  132. # import 的 reader 永非 hdfs,runner._hdfs_src_check 早返 n/a,skip_check 透传无意义,固定 False
  133. run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax,
  134. skip_check=False, speed_overrides=speed_overrides)
  135. _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
  136. return failed
  137. def backfill(self,
  138. inis: List[str],
  139. inis_dirs: List[str],
  140. start_date: str,
  141. stop_date: str,
  142. host: Optional[str] = None,
  143. use_random: bool = False,
  144. parallel: bool = False,
  145. skip_partition: bool = False,
  146. skip_datax: bool = False,
  147. speed_overrides: Optional[dict] = None) -> int:
  148. """
  149. 存量回填:start_date/stop_date 作外层范围 [含, 不含),按日循环调 self.run() 单日语义。
  150. 失败不中断,继续下一天;返回跨天失败任务总数(0 = 全部成功)。
  151. """
  152. day = datetime.strptime(start_date, '%Y%m%d').date()
  153. stop = datetime.strptime(stop_date, '%Y%m%d').date()
  154. if stop <= day:
  155. raise ValueError('backfill: stop_date 必须大于 start_date')
  156. total_days = (stop - day).days
  157. print('[backfill] 开始回填 {n} 天:{s} → {e}'.format(
  158. n=total_days, s=day.strftime('%Y%m%d'),
  159. e=(stop - timedelta(days=1)).strftime('%Y%m%d'),
  160. ))
  161. total_failed = 0
  162. failed_days = []
  163. while day < stop:
  164. day_plus_1 = day + timedelta(days=1)
  165. sd = day.strftime('%Y%m%d')
  166. ed = day_plus_1.strftime('%Y%m%d')
  167. print('[backfill] {d} 开始'.format(d=sd))
  168. failed = self.run(
  169. inis=inis, inis_dirs=inis_dirs,
  170. start_date=sd, stop_date=ed,
  171. host=host, use_random=use_random,
  172. parallel=parallel,
  173. skip_partition=skip_partition,
  174. skip_datax=skip_datax,
  175. speed_overrides=speed_overrides,
  176. )
  177. if failed > 0:
  178. total_failed += failed
  179. failed_days.append(sd)
  180. print('[backfill] {d} 失败任务 {f} 个'.format(d=sd, f=failed))
  181. day = day_plus_1
  182. print('[backfill] 完成:成功 {s} 天 / 失败 {f} 天 / 失败任务总 {t} 个'.format(
  183. s=total_days - len(failed_days), f=len(failed_days), t=total_failed))
  184. if failed_days:
  185. print('[backfill] 失败天列表:' + ','.join(failed_days))
  186. return total_failed
  187. class DataxExport(_BaseDatax):
  188. """源=HDFS 导出(无分区预建)。HDFS 源存在性 check 默认开启,missing/empty → 失败;-skip-check 关闭后走老 silent skip。"""
  189. def __init__(self, **kwargs):
  190. super().__init__(log_module='datax', **kwargs)
  191. def run(self,
  192. inis: List[str],
  193. inis_dirs: List[str],
  194. start_date: str,
  195. stop_date: str,
  196. host: Optional[str] = None,
  197. use_random: bool = False,
  198. parallel: bool = False,
  199. skip_datax: bool = False,
  200. skip_check: bool = False,
  201. speed_overrides: Optional[dict] = None) -> int:
  202. """
  203. Returns: 失败任务数(0 = 全部成功)
  204. """
  205. resolved_inis = [_resolve_relative_to_base(p, self.base_dir) for p in inis]
  206. resolved_dirs = [_resolve_relative_to_base(p, self.base_dir) for p in inis_dirs]
  207. ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs)
  208. if not ini_list:
  209. return 0
  210. run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check,
  211. speed_overrides)
  212. _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
  213. return failed