Browse Source

fix(datax): json_path 加 start_date_stop_date 后缀防并发覆盖(修 raw_his_o 数据错位根因)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 3 days ago
parent
commit
a597c15a7a
4 changed files with 40 additions and 6 deletions
  1. 1 1
      dw_base/datax/cli.py
  2. 15 4
      dw_base/datax/path_utils.py
  3. 1 1
      dw_base/datax/runner.py
  4. 23 0
      tests/unit/datax/test_path_utils.py

+ 1 - 1
dw_base/datax/cli.py

@@ -21,7 +21,7 @@ from dw_base.datax.path_utils import json_output_path
 
 def _cmd_gen_json(args):
     base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
-    out = json_output_path(base_dir, args.ini)
+    out = json_output_path(base_dir, args.ini, args.start_date, args.stop_date)
     os.makedirs(os.path.dirname(out), exist_ok=True)
     cli_speed_overrides = {
         'channel': args.channel,

+ 15 - 4
dw_base/datax/path_utils.py

@@ -27,12 +27,23 @@ def job_name_from_ini(ini_path: str) -> str:
     return basename
 
 
-def json_output_path(base_dir: str, ini_path: str) -> str:
+def json_output_path(base_dir: str, ini_path: str,
+                     start_date: str = None, stop_date: str = None) -> str:
     """
-    按 ini 推导 DataX json 作业配置的输出绝对路径。
-    扁平化:{base_dir}/conf/datax-json/{job_name}.json。
+    按 ini + 日期区间推导 DataX json 作业配置的输出绝对路径。
+    扁平化:{base_dir}/conf/datax-json/{job_name}_{start_date}_{stop_date}.json。
+
+    日期后缀防同 ini 不同 [start, stop) 段并发跑时 json 互相覆盖(DS 同时触发
+    多个 backfill / 手动 + 调度并发等场景)。同 ini 同 [start, stop) 并发仍冲突,
+    属真正重复任务,按业务约定应让其中一个失败而非容忍。
+    缺省 start_date/stop_date 时退回老格式(仅用于 cli gen-json 单文件查看 / 测试)。
     """
-    return os.path.join(base_dir, 'conf', 'datax-json', job_name_from_ini(ini_path) + '.json')
+    job = job_name_from_ini(ini_path)
+    if start_date and stop_date:
+        suffix = '_{}_{}'.format(start_date, stop_date)
+    else:
+        suffix = ''
+    return os.path.join(base_dir, 'conf', 'datax-json', job + suffix + '.json')
 
 
 def log_path(log_root_dir: str, module: str, dt: str, job_name: str) -> str:

+ 1 - 1
dw_base/datax/runner.py

@@ -96,7 +96,7 @@ def run_job(ini_path: str,
             print('[datax] {ini} HDFS 源路径空,任务失败(如确需跳过请加 -skip-check)'.format(ini=ini_path))
             return 1
 
-    json_path = path_utils.json_output_path(base_dir, ini_path)
+    json_path = path_utils.json_output_path(base_dir, ini_path, start_date, stop_date)
     os.makedirs(os.path.dirname(json_path), exist_ok=True)
 
     gen_argv = [

+ 23 - 0
tests/unit/datax/test_path_utils.py

@@ -28,6 +28,29 @@ def test_json_output_path_flat():
     )
 
 
+def test_json_output_path_with_dates():
+    out = json_output_path(
+        '/opt/release/poyee-data-warehouse',
+        'tests/integration/datax/hive_import/app_user_cert_info.ini',
+        '20260311', '20260312',
+    )
+    assert os.path.normpath(out) == os.path.normpath(
+        '/opt/release/poyee-data-warehouse/conf/datax-json/app_user_cert_info_20260311_20260312.json'
+    )
+
+
+def test_json_output_path_only_start_falls_back():
+    # 只传 start 不传 stop(半填),按文档约定退回无后缀格式(任一为空都视作缺省)
+    out = json_output_path(
+        '/opt/release/poyee-data-warehouse',
+        'tests/integration/datax/hive_import/app_user_cert_info.ini',
+        '20260311', None,
+    )
+    assert os.path.normpath(out) == os.path.normpath(
+        '/opt/release/poyee-data-warehouse/conf/datax-json/app_user_cert_info.json'
+    )
+
+
 def test_log_path_template():
     out = log_path('/home/bigdata/log', 'datax', '20260422', 'app_user_cert_info')
     assert os.path.normpath(out) == os.path.normpath(