Bladeren bron

feat(datax): HDFS 源 check 默认反转 fail loud,加 -skip-check 显式关闭

- runner.run_job 加 skip_check 参数(默认 False)
- 默认模式 missing/empty → 打错误日志 + return 1(以前 return 0 silent skip)
- skip_check=True 跳过整段 check,行为=老 silent skip,手动跑批已知空分区时用
- entry._make_run_one / DataxImport.run / DataxExport.run / 两个 bin 入口透传 -skip-check
- 单测反转 missing → rc=1,新增 empty fail + skip_check 旁路两条测试
- kb/94 §1.2 更新默认语义反转说明

默认反转理由:T+1 ads 导出"源没数据=上游异常",显式失败由 DS 告警触发通知;
老 silent skip 适用手动跑批已知空分区场景,留 -skip-check 保留这条路径。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 week geleden
bovenliggende
commit
985b09e87f

+ 6 - 3
bin/datax-hdfs-export-starter.py

@@ -5,10 +5,10 @@ DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(
 对应 jobs/ads/ 场景。
 
 老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t):
-  -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax
+  -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax / -skip-check
 
-注:源 HDFS 路径存在性 check(老 check_data_exists 行为)本版本暂未搬迁
-后续按需补;当前源路径不存在时 datax.py 直接报错
+HDFS 源存在性 check 默认开启,missing/empty → 任务失败(与老 silent skip 语义反转
+配合 DS 告警"没数据=异常"场景);手动跑批知道分区空可加 -skip-check 跳过
 """
 import argparse
 import os
@@ -42,6 +42,8 @@ def main():
                         help='并行执行(默认串行)')
     parser.add_argument('-skip-datax', action='store_true',
                         help='只生成 json 不执行 datax.py')
+    parser.add_argument('-skip-check', action='store_true',
+                        help='跳过 HDFS 源路径 check(默认开启 check,missing/empty 算失败;显式关闭后不检查直接交 datax)')
     args = parser.parse_args()
 
     # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
@@ -73,6 +75,7 @@ def main():
         use_random=args.use_random,
         parallel=args.parallel,
         skip_datax=args.skip_datax,
+        skip_check=args.skip_check,
     )
     sys.exit(failed)
 

+ 3 - 0
bin/datax-hive-import-starter.py

@@ -46,6 +46,8 @@ def main():
                         help='并行执行(默认串行)')
     parser.add_argument('-skip-datax', action='store_true',
                         help='只生成 json 不执行 datax.py')
+    parser.add_argument('-skip-check', action='store_true',
+                        help='跳过 HDFS 源路径 check(默认开启 check,missing/empty 算失败;显式关闭后不检查直接交 datax)')
     parser.add_argument('-skip-partition', action='store_true',
                         help='跳过 Hive 分区预建')
     parser.add_argument('-t', action='append', default=[], dest='extra_partition_tables',
@@ -83,6 +85,7 @@ def main():
         parallel=args.parallel,
         skip_partition=args.skip_partition,
         skip_datax=args.skip_datax,
+        skip_check=args.skip_check,
         extra_partition_tables=args.extra_partition_tables,
     )
     sys.exit(failed)

+ 10 - 5
dw_base/datax/entry.py

@@ -64,7 +64,8 @@ class _BaseDatax:
                       host: Optional[str],
                       use_random: bool,
                       parallel: bool,
-                      skip_datax: bool):
+                      skip_datax: bool,
+                      skip_check: bool):
         is_rel_user = _is_release_user(self.release_user)
         is_in_rel_dir = _is_in_release_dir(self.base_dir, self.release_root_dir, self.project_name)
 
@@ -91,6 +92,7 @@ class _BaseDatax:
                         base_dir=self.base_dir, python3_path=self.python3_path,
                         datax_home=self.datax_home,
                         skip_datax=skip_datax,
+                        skip_check=skip_check,
                         stdout=fh, stderr=fh,
                     )
                 print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
@@ -102,6 +104,7 @@ class _BaseDatax:
                     base_dir=self.base_dir, python3_path=self.python3_path,
                     datax_home=self.datax_home,
                     skip_datax=skip_datax,
+                    skip_check=skip_check,
                     tee_to=fh,
                 )
             print('[datax] ini={j} done rc={rc}'.format(j=job_name, rc=rc))
@@ -125,6 +128,7 @@ class DataxImport(_BaseDatax):
             parallel: bool = False,
             skip_partition: bool = False,
             skip_datax: bool = False,
+            skip_check: bool = False,
             extra_partition_tables: Optional[List[str]] = None) -> int:
         """
         Returns: 失败任务数(0 = 全部成功)
@@ -148,13 +152,13 @@ class DataxImport(_BaseDatax):
                         tbl=tbl, dt=dt))
             partition.execute_ddls(ddls)
 
-        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
+        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check)
         _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
         return failed
 
 
 class DataxExport(_BaseDatax):
-    """源=HDFS 导出(无分区预建;源路径存在性 check 沿用老脚本 check_data_exists 行为,暂未搬迁)。"""
+    """源=HDFS 导出(无分区预建)。HDFS 源存在性 check 默认开启,missing/empty → 失败;-skip-check 关闭后走老 silent skip。"""
 
     def __init__(self, **kwargs):
         super().__init__(log_module='datax', **kwargs)
@@ -167,7 +171,8 @@ class DataxExport(_BaseDatax):
             host: Optional[str] = None,
             use_random: bool = False,
             parallel: bool = False,
-            skip_datax: bool = False) -> int:
+            skip_datax: bool = False,
+            skip_check: bool = False) -> int:
         """
         Returns: 失败任务数(0 = 全部成功)
         """
@@ -176,6 +181,6 @@ class DataxExport(_BaseDatax):
         ini_list = batch.expand_ini_inputs(resolved_inis, resolved_dirs)
         if not ini_list:
             return 0
-        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax)
+        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check)
         _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
         return failed

+ 13 - 8
dw_base/datax/runner.py

@@ -74,20 +74,25 @@ def run_job(ini_path: str,
             python3_path: str,
             datax_home: str,
             skip_datax: bool = False,
+            skip_check: bool = False,
             stdout=None,
             stderr=None,
             tee_to=None) -> int:
     """
     单任务执行。
+
+    skip_check=False(默认):hdfs reader 跑源路径存在性 check,missing/empty → return 1 失败
+    skip_check=True:跳过整段 check,直接生成 json + 跑 datax(对齐老 silent skip 行为)
     """
-    # 1. HDFS 源路径 check(对齐老 check_data_exists;仅 hdfs reader 触发)
-    src_state = _hdfs_src_check(ini_path, start_date)
-    if src_state == 'missing':
-        print('[datax] {ini} HDFS 源路径不存在,跳过 datax'.format(ini=ini_path))
-        return 0
-    if src_state == 'empty':
-        print('[datax] {ini} HDFS 源路径空,跳过 datax'.format(ini=ini_path))
-        return 0
+    # 1. HDFS 源路径 check(默认开启;--skip-check 显式关闭后走老 silent skip 行为)
+    if not skip_check:
+        src_state = _hdfs_src_check(ini_path, start_date)
+        if src_state == 'missing':
+            print('[datax] {ini} HDFS 源路径不存在,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path))
+            return 1
+        if src_state == 'empty':
+            print('[datax] {ini} HDFS 源路径空,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path))
+            return 1
 
     json_path = path_utils.json_output_path(base_dir, ini_path)
     os.makedirs(os.path.dirname(json_path), exist_ok=True)

+ 1 - 1
kb/94-重构对比.md

@@ -38,7 +38,7 @@
 
 - ~~**#14 日志颜色退化**~~ / ~~**关键阶段 print 缺失**~~ —— 2026-04-24 **阶段 print 已补齐**(worker 选中 / gen 起止 / exec 起止 / 任务起止);颜色仍退化(仅单色文本),如要恢复用 `colorama` 或自写 ANSI escape helper,当前不做
 - ~~**#15 串行模式日志文件缺失**~~ —— 2026-04-24 **已补齐**,`runner.run_job(tee_to=fh)` Popen 行循环 tee 到文件 + 父 stdout,行为对齐老 `| tee LOG_FILE`
-- **HDFS 源路径存在性 check 未搬迁** —— 2026-04-24 **已补齐**,`runner._hdfs_src_check` 对 `reader.dataSource` 是 `hdfs/...` 的 ini 在 gen json 前跑 `hadoop fs -test -e` + `hadoop fs -du -s`,路径不存在 / 空则跳过 datax(对齐老 `datax-single-job-starter.sh:128-146` check_data_exists)
+- **HDFS 源路径存在性 check 未搬迁** —— 2026-04-24 **已补齐,且默认语义反转**:`runner._hdfs_src_check` 对 `reader.dataSource` 是 `hdfs/...` 的 ini 在 gen json 前跑 `hadoop fs -test -e` + `hadoop fs -du -s`;**missing / empty → `return 1` 任务失败**(老 sh 是 silent skip `return 0`)。反转理由:T+1 ads 导出"源没数据=上游异常",应显式失败由 DS 告警触发通知。手动跑批已知有空分区场景可加 `-skip-check` 恢复老 silent skip 语义
 - **默认日期未做** —— 2026-04-24 **已补齐**,入口层 `-start-date` / `-stop-date` 不传时默认昨天/今天(对齐老 sh 的 YESTERDAY/TODAY 默认)
 
 **仍未做**:hdfs-kafka 特殊 writer 说明(老 sh `datax-single-job-starter.sh:232-245` 10 行 columnType/columnMapping 业务说明),本项目用 kafka 概率低,用到再补

+ 38 - 3
tests/unit/datax/test_runner.py

@@ -151,7 +151,8 @@ def test_hdfs_src_check_empty_when_du_size_zero(mock_run, tmp_path):
 
 @patch('dw_base.datax.runner._hdfs_src_check', return_value='missing')
 @patch('dw_base.datax.runner.subprocess.run')
-def test_run_job_skips_when_hdfs_src_missing(mock_run, _mock_check, tmp_path):
+def test_run_job_fails_when_hdfs_src_missing(mock_run, _mock_check, tmp_path):
+    """默认 HDFS missing → rc=1(失败,不 silent skip)。"""
     rc = run_job(
         ini_path=str(tmp_path / 'x.ini'),
         start_date='20260422', stop_date='20260423',
@@ -159,6 +160,40 @@ def test_run_job_skips_when_hdfs_src_missing(mock_run, _mock_check, tmp_path):
         base_dir=str(tmp_path), python3_path='/usr/bin/python3',
         datax_home='/opt/datax',
     )
-    assert rc == 0
-    # HDFS missing → 不调 gen / exec
+    assert rc == 1
+    # 直接失败 → 不调 gen / exec
+    assert mock_run.call_count == 0
+
+
+@patch('dw_base.datax.runner._hdfs_src_check', return_value='empty')
+@patch('dw_base.datax.runner.subprocess.run')
+def test_run_job_fails_when_hdfs_src_empty(mock_run, _mock_check, tmp_path):
+    """默认 HDFS empty → rc=1(失败)。"""
+    mock_run.return_value = _RC(0)
+    rc = run_job(
+        ini_path=str(tmp_path / 'x.ini'),
+        start_date='20260422', stop_date='20260423',
+        worker_host='cdhmaster02', current_host='cdhmaster02',
+        base_dir=str(tmp_path), python3_path='/usr/bin/python3',
+        datax_home='/opt/datax',
+    )
+    assert rc == 1
     assert mock_run.call_count == 0
+
+
+@patch('dw_base.datax.runner._hdfs_src_check')
+@patch('dw_base.datax.runner.subprocess.run')
+def test_run_job_skip_check_bypasses_hdfs_check(mock_run, mock_check, tmp_path):
+    """skip_check=True → 不调 _hdfs_src_check,直接 gen-json + 跑 datax。"""
+    mock_run.return_value = _RC(0)
+    rc = run_job(
+        ini_path=str(tmp_path / 'x.ini'),
+        start_date='20260422', stop_date='20260423',
+        worker_host='cdhmaster02', current_host='cdhmaster02',
+        base_dir=str(tmp_path), python3_path='/usr/bin/python3',
+        datax_home='/opt/datax',
+        skip_check=True,
+    )
+    assert rc == 0
+    mock_check.assert_not_called()
+    assert mock_run.call_count == 2  # gen + exec 都照跑