浏览代码

feat(bin): hive-import 原地加 -backfill 存量回填模式

- bin/datax-hive-import-starter.py 加 -backfill flag,-help 示"高级用法"+ 多行注释举例
  (7 天范围 / 2 ini / -parallel 组合)
- dw_base/datax/entry.py 加 DataxImport.backfill() 方法承载日循环逻辑:
  * -start-date/-stop-date 作外层范围 [含, 不含),逐日调 self.run() 单日语义
  * 失败不中断,继续下一天;返回跨天失败任务总数
  * stop_date <= start_date 抛 ValueError
- bin 入口 -backfill 分支纯分发到 DataxImport.backfill()
- tests/unit/datax/test_entry.py 加 3 条 backfill 测试(日循环范围/失败累加/非法范围)
- kb/93 ADR-01 更新:默认单日 + 显式 -backfill opt-in;否决独立工具文件方案
- 57 条单测全绿

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 周之前
父节点
当前提交
b0c23d2975
共有 4 个文件被更改,包括 165 次插入11 次删除
  1. 42 4
      bin/datax-hive-import-starter.py
  2. 55 0
      dw_base/datax/entry.py
  3. 9 7
      kb/93-架构决策.md
  4. 59 0
      tests/unit/datax/test_entry.py

+ 42 - 4
bin/datax-hive-import-starter.py

@@ -13,6 +13,7 @@ DataX hive-import 入口:目标=Hive(自动预建分区),对应 jobs/raw
   -skip-datax          只生成 json 不执行
   -skip-partition      跳过 Hive 分区预建
   -t <db.table>        显式追加需建分区的表,可多次
+  -backfill            【高级用法】存量回填(见下方注释)
 """
 import argparse
 import os
@@ -35,15 +36,15 @@ def main():
     parser.add_argument('-inis', action='append', default=[], metavar='DIR',
                         help='DataX ini 目录,非递归扫 *.ini(可多次)')
     parser.add_argument('-start-date', default=None, metavar='YYYYMMDD',
-                        help='默认昨天(对齐老 sh 行为)')
+                        help='默认昨天(对齐老 sh 行为);-backfill 模式下作为外层回填范围起(含)')
     parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD',
-                        help='默认今天(对齐老 sh 行为)')
+                        help='默认今天(对齐老 sh 行为);-backfill 模式下作为外层回填范围止(不含)')
     parser.add_argument('-host', default=None, metavar='HOSTNAME',
                         help='显式指定 worker(优先于 -random)')
     parser.add_argument('-random', action='store_true', dest='use_random',
                         help='从 conf/workers.ini 加权随机选 worker')
     parser.add_argument('-parallel', action='store_true',
-                        help='并行执行(默认串行)')
+                        help='并行执行(默认串行);-backfill 模式下表示单日内多 ini 并行(日间仍串行)')
     parser.add_argument('-skip-datax', action='store_true',
                         help='只生成 json 不执行 datax.py')
     parser.add_argument('-skip-check', action='store_true',
@@ -53,6 +54,25 @@ def main():
     parser.add_argument('-t', action='append', default=[], dest='extra_partition_tables',
                         metavar='DB.TABLE',
                         help='显式追加需建分区的 Hive 表(可多次)')
+    # -----------------------------------------------------------------------------
+    # 【高级用法】-backfill 存量回填模式
+    #
+    # 不传:默认单日语义,-start-date/-stop-date 对应一个 dt 分区(T+1 增量场景,DS 调度)
+    # 传  :-start-date/-stop-date 作外层回填范围(左闭右开),按日循环调用单日逻辑;
+    #       每天独立建分区 + 跑 datax,失败不中断、继续下一天;exit = 失败任务数
+    #
+    # 典型场景:接入一张新源表,一次性回填历史存量数据(1 个月、半年、N 年)
+    #
+    # 例:7 天范围,2 个 ini(a/b),单日内并行跑两表
+    #   python3 bin/datax-hive-import-starter.py \
+    #     -ini jobs/raw/a.ini -ini jobs/raw/b.ini \
+    #     -start-date 20260420 -stop-date 20260427 \
+    #     -parallel -backfill
+    #   时序:外层 7 天串行(dt=20260420 → dt=20260426),每天内 a/b 并发跑 →
+    #         产物 14 个 Hive 分区(7 天 × 2 表)
+    # -----------------------------------------------------------------------------
+    parser.add_argument('-backfill', action='store_true',
+                        help='【高级用法】存量回填:-start-date/-stop-date 作外层范围按日循环(DS 任务不加此 flag)')
     args = parser.parse_args()
 
     # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
@@ -64,7 +84,8 @@ def main():
     print('{script} 收到参数: {argv}'.format(
         script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
     ))
-    print('  start_date={s} stop_date={e}'.format(s=args.start_date, e=args.stop_date))
+    print('  start_date={s} stop_date={e} backfill={b}'.format(
+        s=args.start_date, e=args.stop_date, b=args.backfill))
 
     importer = DataxImport(
         base_dir=project_root_dir,
@@ -75,6 +96,23 @@ def main():
         datax_home=os.environ['DATAX_HOME'],
         log_root_dir=os.environ['LOG_ROOT_DIR'],
     )
+
+    if args.backfill:
+        total_failed = importer.backfill(
+            inis=args.ini,
+            inis_dirs=args.inis,
+            start_date=args.start_date,
+            stop_date=args.stop_date,
+            host=args.host,
+            use_random=args.use_random,
+            parallel=args.parallel,
+            skip_partition=args.skip_partition,
+            skip_datax=args.skip_datax,
+            skip_check=args.skip_check,
+            extra_partition_tables=args.extra_partition_tables,
+        )
+        sys.exit(total_failed)
+
     failed = importer.run(
         inis=args.ini,
         inis_dirs=args.inis,

+ 55 - 0
dw_base/datax/entry.py

@@ -14,6 +14,7 @@ worker.select_worker 会降级为"永远返回 current_host",此处无需改
 import getpass
 import os
 import socket
+from datetime import datetime, timedelta
 from typing import List, Optional
 
 from dw_base.datax import batch, partition, path_utils, runner, worker
@@ -156,6 +157,60 @@ class DataxImport(_BaseDatax):
         _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
         return failed
 
+    def backfill(self,
+                 inis: List[str],
+                 inis_dirs: List[str],
+                 start_date: str,
+                 stop_date: str,
+                 host: Optional[str] = None,
+                 use_random: bool = False,
+                 parallel: bool = False,
+                 skip_partition: bool = False,
+                 skip_datax: bool = False,
+                 skip_check: bool = False,
+                 extra_partition_tables: Optional[List[str]] = None) -> int:
+        """
+        存量回填:start_date/stop_date 作外层范围 [含, 不含),按日循环调 self.run() 单日语义。
+
+        失败不中断,继续下一天;返回跨天失败任务总数(0 = 全部成功)。
+        """
+        day = datetime.strptime(start_date, '%Y%m%d').date()
+        stop = datetime.strptime(stop_date, '%Y%m%d').date()
+        if stop <= day:
+            raise ValueError('backfill: stop_date 必须大于 start_date')
+        total_days = (stop - day).days
+        print('[backfill] 开始回填 {n} 天:{s} → {e}'.format(
+            n=total_days, s=day.strftime('%Y%m%d'),
+            e=(stop - timedelta(days=1)).strftime('%Y%m%d'),
+        ))
+        total_failed = 0
+        failed_days = []
+        while day < stop:
+            day_plus_1 = day + timedelta(days=1)
+            sd = day.strftime('%Y%m%d')
+            ed = day_plus_1.strftime('%Y%m%d')
+            print('[backfill] {d} 开始'.format(d=sd))
+            failed = self.run(
+                inis=inis, inis_dirs=inis_dirs,
+                start_date=sd, stop_date=ed,
+                host=host, use_random=use_random,
+                parallel=parallel,
+                skip_partition=skip_partition,
+                skip_datax=skip_datax,
+                skip_check=skip_check,
+                extra_partition_tables=extra_partition_tables,
+            )
+            if failed > 0:
+                total_failed += failed
+                failed_days.append(sd)
+                print('[backfill] {d} 失败任务 {f} 个'.format(d=sd, f=failed))
+            day = day_plus_1
+        print('[backfill] 完成:成功 {s} 天 / 失败 {f} 天 / 失败任务总 {t} 个'.format(
+            s=total_days - len(failed_days), f=len(failed_days), t=total_failed))
+        if failed_days:
+            print('[backfill] 失败天列表:' + ','.join(failed_days))
+        return total_failed
+
 
 class DataxExport(_BaseDatax):
     """源=HDFS 导出(无分区预建)。HDFS 源存在性 check 默认开启,missing/empty → 失败;-skip-check 关闭后走老 silent skip。"""

+ 9 - 7
kb/93-架构决策.md

@@ -25,17 +25,19 @@
 
 ## 决策清单
 
-### ADR-01 DataX 入口不做日期展开,按天补数归 DolphinScheduler
+### ADR-01 DataX 入口默认单日,显式 `-backfill` 开日循环用于存量回填
 
 - **状态**:草案
-  老 `spark-sql-starter` 的 `get_date_range` 支持 `20260401-20260410` 范围格式自动展开;DataX 入口从未用过。本项目调度用 DolphinScheduler,DS 原生支持**业务日期补数**(时间区间选定后,按调度周期逐日实例化 task)。用户老 DS 配置即 `-start-date=${dt} -stop-date=${cdt}` 单日传参。
+  老 `spark-sql-starter` 的 `get_date_range` 支持 `20260401-20260410` 范围格式自动展开;DataX 入口从未用过。本项目调度用 DolphinScheduler,DS 原生支持**业务日期补数**(时间区间选定后,按调度周期逐日实例化 task)。用户老 DS 配置即 `-start-date=${dt} -stop-date=${cdt}` 单日传参。2026-04-24 出现场景:接入新源表需一次性回填 N 年历史存量数据——这类一次性手工批量 DS 的日历补数组件实例化太慢,走外部工具更合适。
 
-- **决策**:DataX 入口只接受单日语义(`start_date` / `stop_date` 对应一个 dt 分区);按天展开 / 批量补数 / 回溯全部交由 DS 工作流承担
+- **决策**:DataX 入口**默认单日语义**(`start_date` / `stop_date` 对应一个 dt 分区),T+1 增量补数仍归 DS;**hive-import 入口 2026-04-24 补 `-backfill` flag**,显式开启后 `-start-date` / `-stop-date` 作外层回填范围,按日循环调用单日逻辑,用于存量回填
 - **后果**:
-  - 正面:DataX 层职责单一;补数、回溯、失败重跑在 DS 层统一可视化 / 可审计;DataX 不维护日期展开状态(哪天已做、哪天失败、重试)
-  - 负面:不走 DS 的一次性手工补 N 天需要外部 bash 循环或 `workspace/` 下临时 dispatcher
-- **候选方案**:DataX 入口层实现"日期范围自动展开 + 多 json 分发多 worker"——否决,理由是**重复 DS 职责** + 引入状态管理复杂度
-- **反悔条件**:项目从 DS 迁走到无补数功能的调度系统;或出现"必须在 DataX 层展开"的硬场景
+  - 正面:DataX 层默认职责单一(单日),T+1 增量 / 补数 / 回溯在 DS 层统一可视化;一次性存量回填通过 `-backfill` 内置支持,不再需要外部 bash 循环 / 临时 dispatcher
+  - 负面:参数语义轻度重载(`-start-date` / `-stop-date` 在 `-backfill` 模式下含义不同);DS 任务模板规范上不得加 `-backfill`(团队约定)
+- **候选方案**:
+  - DataX 入口层实现"日期范围自动展开 + 多 json 分发多 worker"作为默认行为——否决(重复 DS 职责 + 增入口常态复杂度)
+  - 独立 `bin/datax-backfill.py` 工具文件——否决(和 hive-import 共享 100% 参数,独立文件冗余;单 flag 切换更简洁)
+- **反悔条件**:出现必须在 DataX 层做日期展开的默认场景(如 DS 彻底换掉);或 `-backfill` 被误用频繁到需物理隔离
 
 ### ADR-02 分布式分发归 DolphinScheduler worker group,DataX 不重复随机
 

+ 59 - 0
tests/unit/datax/test_entry.py

@@ -145,3 +145,62 @@ def test_import_expands_ini_dir(mock_exec_ddls, mock_run, fake_env):
           start_date='20260422', stop_date='20260423')
     # 目录扫到 sample.ini,runner.run_job 被调
     assert mock_run.called
+
+
+def _make_importer(fake_env):
+    return DataxImport(
+        base_dir=fake_env['base_dir'],
+        workers_ini_path=fake_env['workers_ini_path'],
+        release_user=fake_env['release_user'],
+        release_root_dir=fake_env['release_root_dir'],
+        python3_path=fake_env['python3_path'],
+        datax_home=fake_env['datax_home'],
+        log_root_dir=fake_env['log_root_dir'],
+    )
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=0)
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_backfill_loops_days_inclusive_start_exclusive_stop(mock_exec_ddls, mock_run, fake_env):
+    """backfill 按 [start, stop) 循环,3 天 → run_job 调 3 次,每次单日语义。"""
+    e = _make_importer(fake_env)
+    rc = e.backfill(
+        inis=[fake_env['sample_ini']], inis_dirs=[],
+        start_date='20260420', stop_date='20260423',
+    )
+    assert rc == 0
+    # 3 天 × 1 ini = run_job 调 3 次
+    assert mock_run.call_count == 3
+    # 取每次调用的 start_date / stop_date 参数验证单日切片
+    calls = [c.kwargs for c in mock_run.call_args_list]
+    dates = [(c['start_date'], c['stop_date']) for c in calls]
+    assert dates == [
+        ('20260420', '20260421'),
+        ('20260421', '20260422'),
+        ('20260422', '20260423'),
+    ]
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=1)  # 每次 run_job 都失败
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_backfill_accumulates_failure_count(mock_exec_ddls, mock_run, fake_env):
+    """3 天全失败 → 返回 3。"""
+    e = _make_importer(fake_env)
+    rc = e.backfill(
+        inis=[fake_env['sample_ini']], inis_dirs=[],
+        start_date='20260420', stop_date='20260423',
+    )
+    assert rc == 3
+
+
+@patch('dw_base.datax.entry.runner.run_job', return_value=0)
+@patch('dw_base.datax.entry.partition.execute_ddls')
+def test_backfill_invalid_range_raises(mock_exec_ddls, mock_run, fake_env):
+    """stop_date <= start_date 抛 ValueError,不跑任何一天。"""
+    e = _make_importer(fake_env)
+    with pytest.raises(ValueError, match='stop_date 必须大于 start_date'):
+        e.backfill(
+            inis=[fake_env['sample_ini']], inis_dirs=[],
+            start_date='20260423', stop_date='20260423',
+        )
+    assert mock_run.call_count == 0