Преглед изворни кода

chore(datax): 入口参数瘦身——砍 import 的 -skip-check 和 -t

- bin/datax-hive-import-starter.py 删 argparse -skip-check 和 -t
  (import reader 永非 hdfs,skip-check 透传到 runner._hdfs_src_check 永远早返 n/a 无意义;
   -t 额外建分区表能力实际零使用)
- dw_base/datax/entry.py:
  * DataxImport.run / DataxImport.backfill 签名删 skip_check / extra_partition_tables
  * 删 DataxImport.run 内 extra_partition_tables 的分区 DDL 追加逻辑
  * run 内部调 _make_run_one 固定传 skip_check=False(固定值说明 import 永非 hdfs)
- runner.run_job / _make_run_one / DataxExport.run 保留 skip_check(export 那条仍需)
- 69 条单测全绿

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu пре 1 недеља
родитељ
комит
13ceb2e321
2 измењених фајлова са 7 додато и 26 уклоњено
  1. 2 11
      bin/datax-hive-import-starter.py
  2. 5 15
      dw_base/datax/entry.py

+ 2 - 11
bin/datax-hive-import-starter.py

@@ -3,7 +3,7 @@
 """
 DataX hive-import 入口:目标=Hive(自动预建分区),对应 jobs/raw/ 场景。
 
-参数平迁集(见 kb/90 §2.6):
+参数集(见 kb/90 §2.6):
   -ini <file>          单 ini,可多次
   -inis <dir>          ini 目录(非递归扫 *.ini),可多次
   -start-date / -stop-date  yyyyMMdd
@@ -12,8 +12,8 @@ DataX hive-import 入口:目标=Hive(自动预建分区),对应 jobs/raw
   -parallel            并行(默认串行)
   -skip-datax          只生成 json 不执行
   -skip-partition      跳过 Hive 分区预建
-  -t <db.table>        显式追加需建分区的表,可多次
   -backfill            【高级用法】存量回填(见下方注释)
+  -channel / -byte / -record  L3 speed 覆盖
 """
 import argparse
 import os
@@ -47,13 +47,8 @@ def main():
                         help='并行执行(默认串行);-backfill 模式下表示单日内多 ini 并行(日间仍串行)')
     parser.add_argument('-skip-datax', action='store_true',
                         help='只生成 json 不执行 datax.py')
-    parser.add_argument('-skip-check', action='store_true',
-                        help='跳过 HDFS 源路径 check(默认开启 check,missing/empty 算失败;显式关闭后不检查直接交 datax)')
     parser.add_argument('-skip-partition', action='store_true',
                         help='跳过 Hive 分区预建')
-    parser.add_argument('-t', action='append', default=[], dest='extra_partition_tables',
-                        metavar='DB.TABLE',
-                        help='显式追加需建分区的 Hive 表(可多次)')
     # -----------------------------------------------------------------------------
     # 【高级用法】-backfill 存量回填模式
     #
@@ -117,9 +112,7 @@ def main():
             parallel=args.parallel,
             skip_partition=args.skip_partition,
             skip_datax=args.skip_datax,
-            skip_check=args.skip_check,
             speed_overrides=speed_overrides,
-            extra_partition_tables=args.extra_partition_tables,
         )
         sys.exit(total_failed)
 
@@ -133,9 +126,7 @@ def main():
         parallel=args.parallel,
         skip_partition=args.skip_partition,
         skip_datax=args.skip_datax,
-        skip_check=args.skip_check,
         speed_overrides=speed_overrides,
-        extra_partition_tables=args.extra_partition_tables,
     )
     sys.exit(failed)
 

+ 5 - 15
dw_base/datax/entry.py

@@ -132,9 +132,7 @@ class DataxImport(_BaseDatax):
             parallel: bool = False,
             skip_partition: bool = False,
             skip_datax: bool = False,
-            skip_check: bool = False,
-            speed_overrides: Optional[dict] = None,
-            extra_partition_tables: Optional[List[str]] = None) -> int:
+            speed_overrides: Optional[dict] = None) -> int:
         """
         Returns: 失败任务数(0 = 全部成功)
         """
@@ -150,15 +148,11 @@ class DataxImport(_BaseDatax):
                 ddl = partition.parse_ini_partition(ini, stop_date)
                 if ddl:
                     ddls.append(ddl)
-            if extra_partition_tables:
-                dt = partition.compute_partition_dt(stop_date)
-                for tbl in extra_partition_tables:
-                    ddls.append('ALTER TABLE {tbl} ADD IF NOT EXISTS PARTITION(dt={dt});'.format(
-                        tbl=tbl, dt=dt))
             partition.execute_ddls(ddls)
 
-        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax, skip_check,
-                                     speed_overrides)
+        # import 的 reader 永非 hdfs,runner._hdfs_src_check 早返 n/a,skip_check 透传无意义,固定 False
+        run_one = self._make_run_one(start_date, stop_date, host, use_random, parallel, skip_datax,
+                                     skip_check=False, speed_overrides=speed_overrides)
         _success, failed = batch.run_batch(ini_list, run_one, parallel=parallel)
         return failed
 
@@ -172,9 +166,7 @@ class DataxImport(_BaseDatax):
                  parallel: bool = False,
                  skip_partition: bool = False,
                  skip_datax: bool = False,
-                 skip_check: bool = False,
-                 speed_overrides: Optional[dict] = None,
-                 extra_partition_tables: Optional[List[str]] = None) -> int:
+                 speed_overrides: Optional[dict] = None) -> int:
         """
         存量回填:start_date/stop_date 作外层范围 [含, 不含),按日循环调 self.run() 单日语义。
 
@@ -203,9 +195,7 @@ class DataxImport(_BaseDatax):
                 parallel=parallel,
                 skip_partition=skip_partition,
                 skip_datax=skip_datax,
-                skip_check=skip_check,
                 speed_overrides=speed_overrides,
-                extra_partition_tables=extra_partition_tables,
             )
             if failed > 0:
                 total_failed += failed