Ver Fonte

feat(tracking): -dt 支持 4 种形式(复用 get_date_range)

复用 dw_base.utils.datetime_utils.get_date_range 解析 -dt:
单日 / 起始- / 区间 / 离散逗号;CLI 与 spark-sql-starter 对齐
失败立即中断后续 dt(沿用默认 fail-stop 语义)
tianyu.chu há 1 semana atrás
pai
commit
829af904e4
1 ficheiros alterados com 35 adições e 24 exclusões
  1. 35 24
      tests/integration/tracking/raw_usr_traces_apd_d.py

+ 35 - 24
tests/integration/tracking/raw_usr_traces_apd_d.py

@@ -1,12 +1,18 @@
 #!/usr/bin/env /usr/bin/python3
 # -*- coding:utf-8 -*-
 """
-埋点 NDJSON.gz → test.raw_usr_traces_apd_d 单日入仓(冒烟测试期)。
+埋点 NDJSON.gz → test.raw_usr_traces_apd_d 入仓(冒烟测试期)。
 
 CLI:
-  python3 tests/integration/tracking/raw_usr_traces_apd_d.py -dt YYYYMMDD
+  python3 tests/integration/tracking/raw_usr_traces_apd_d.py -dt DT
 
-行为:
+-dt 4 种形式(与 bin/spark-sql-starter.py 一致,由 dw_base get_date_range 解析):
+  20260407                          单日
+  20260407-                          20260407 至昨天
+  20260407-20260409                  区间(含两端)
+  20260407,20260408,20260409         离散
+
+行为(按解析得到的 dt 列表串行处理,单 dt 失败立即中断后续):
   - 在 SOURCE_DIR 下找 traces-{YYYY-MM-DD}.json.gz(dt 转中划线格式拼文件名)
   - hdfs dfs -mkdir -p HDFS_TBL_DIR/dt={YYYYMMDD}
   - hdfs dfs -put -f 源 gz 到该分区目录(-f 覆盖,幂等可重跑)
@@ -18,15 +24,17 @@ HIVE_DB / HDFS_TBL_DIR 改回 raw / raw.db。SOURCE_DIR 仍是 m2 临时目录
 """
 import argparse
 import os
-import re
 import subprocess
 import sys
 
+project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
+sys.path.append(project_root)
+from dw_base.utils.datetime_utils import get_date_range
+
 SOURCE_DIR = '/data/upload/tracking/temp'
 HDFS_TBL_DIR = '/user/hive/warehouse/test.db/raw_usr_traces_apd_d'
 HIVE_DB = 'test'
 HIVE_TBL = 'raw_usr_traces_apd_d'
-DT_PATTERN = re.compile(r'^\d{8}$')
 
 
 def run(cmd):
@@ -46,30 +54,15 @@ def run(cmd):
         raise RuntimeError('cmd failed (exit {}): {}'.format(result.returncode, cmd))
 
 
-def main():
-    parser = argparse.ArgumentParser(
-        prog='raw_usr_traces_apd_d',
-        description='埋点 NDJSON.gz → test.raw_usr_traces_apd_d 单日入仓',
-    )
-    parser.add_argument('-dt', required=True, metavar='YYYYMMDD',
-                        help='分区日期,yyyymmdd 格式(如 20260409)')
-    args = parser.parse_args()
-
-    dt = args.dt
-    if not DT_PATTERN.match(dt):
-        print('-dt 格式必须是 yyyymmdd(8 位数字),收到: {}'.format(dt), file=sys.stderr)
-        sys.exit(2)
-
+def process_one(dt):
     file_dt = '{}-{}-{}'.format(dt[0:4], dt[4:6], dt[6:8])
     file_name = 'traces-{}.json.gz'.format(file_dt)
     local_path = os.path.join(SOURCE_DIR, file_name)
 
-    print('{script} dt={dt} src={src}'.format(
-        script=os.path.basename(__file__), dt=dt, src=local_path))
+    print('--- dt={dt} src={src} ---'.format(dt=dt, src=local_path))
 
     if not os.path.isfile(local_path):
-        print('源文件不存在: {}'.format(local_path), file=sys.stderr)
-        sys.exit(3)
+        raise RuntimeError('源文件不存在: {}'.format(local_path))
 
     part_dir = '{}/dt={}'.format(HDFS_TBL_DIR, dt)
     run('hdfs dfs -mkdir -p {}'.format(part_dir))
@@ -81,7 +74,25 @@ def main():
     ).format(db=HIVE_DB, tbl=HIVE_TBL, dt=dt, loc=part_dir)
     run('hive -e "{}"'.format(alter))
 
-    print('done. dt={}'.format(dt))
+
+def main():
+    parser = argparse.ArgumentParser(
+        prog='raw_usr_traces_apd_d',
+        description='埋点 NDJSON.gz → test.raw_usr_traces_apd_d 入仓(支持批量 dt)',
+    )
+    parser.add_argument('-dt', required=True, metavar='DT',
+                        help='分区日期,4 种形式:20260407 / 20260407- / '
+                             '20260407-20260409 / 20260407,20260408,20260409')
+    args = parser.parse_args()
+
+    dts = get_date_range(args.dt)
+    print('{script} dts={dts} ({n} 个)'.format(
+        script=os.path.basename(__file__), dts=dts, n=len(dts)))
+
+    for dt in dts:
+        process_one(dt)
+
+    print('all done. processed {} dts.'.format(len(dts)))
 
 
 if __name__ == '__main__':