#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ 埋点 NDJSON.gz → test.raw_usr_traces_apd_d 入仓(冒烟测试期)。 CLI: python3 tests/integration/tracking/raw_usr_traces_apd_d.py -dt DT -dt 4 种形式(与 bin/spark-sql-starter.py 一致,由 dw_base get_date_range 解析): 20260407 单日 20260407- 20260407 至昨天 20260407-20260409 区间(含两端) 20260407,20260408,20260409 离散 行为(按解析得到的 dt 列表串行处理,单 dt 失败立即中断后续): - 在 SOURCE_DIR 下找 traces-{YYYY-MM-DD}.json.gz(dt 转中划线格式拼文件名) - hdfs dfs -mkdir -p HDFS_TBL_DIR/dt={YYYYMMDD} - hdfs dfs -put -f 源 gz 到该分区目录(-f 覆盖,幂等可重跑) - hive -e ALTER TABLE ... ADD IF NOT EXISTS PARTITION ... LOCATION ... 当前 HIVE_DB='test'、HDFS_TBL_DIR 指 test.db;冒烟跑通后迁到 jobs/raw/usr/ 并把 HIVE_DB / HDFS_TBL_DIR 改回 raw / raw.db。SOURCE_DIR 仍是 m2 临时目录, 正式上调度时改成产线路径,CLI 不变。 """ import argparse import os import subprocess import sys project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) sys.path.append(project_root) from dw_base.utils.datetime_utils import get_date_range SOURCE_DIR = '/data/upload/tracking/temp' HDFS_TBL_DIR = '/user/hive/warehouse/test.db/raw_usr_traces_apd_d' HIVE_DB = 'test' HIVE_TBL = 'raw_usr_traces_apd_d' def run(cmd): print('[exec] ' + cmd) # Python 3.6.8 无 capture_output,用 stdout=PIPE, stderr=PIPE result = subprocess.run( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out = result.stdout.decode('utf-8', errors='replace').rstrip() err = result.stderr.decode('utf-8', errors='replace').rstrip() if out: print(out) if result.returncode != 0: if err: print(err, file=sys.stderr) raise RuntimeError('cmd failed (exit {}): {}'.format(result.returncode, cmd)) def process_one(dt): file_dt = '{}-{}-{}'.format(dt[0:4], dt[4:6], dt[6:8]) file_name = 'traces-{}.json.gz'.format(file_dt) local_path = os.path.join(SOURCE_DIR, file_name) print('--- dt={dt} src={src} ---'.format(dt=dt, src=local_path)) if not os.path.isfile(local_path): raise RuntimeError('源文件不存在: {}'.format(local_path)) part_dir = '{}/dt={}'.format(HDFS_TBL_DIR, dt) run('hdfs dfs -mkdir -p {}'.format(part_dir)) run('hdfs dfs -put -f {} {}/'.format(local_path, part_dir)) alter = ( "ALTER TABLE {db}.{tbl} " "ADD IF NOT EXISTS PARTITION (dt='{dt}') " "LOCATION '{loc}';" ).format(db=HIVE_DB, tbl=HIVE_TBL, dt=dt, loc=part_dir) run('hive -e "{}"'.format(alter)) def main(): parser = argparse.ArgumentParser( prog='raw_usr_traces_apd_d', description='埋点 NDJSON.gz → test.raw_usr_traces_apd_d 入仓(支持批量 dt)', ) parser.add_argument('-dt', required=True, metavar='DT', help='分区日期,4 种形式:20260407 / 20260407- / ' '20260407-20260409 / 20260407,20260408,20260409') args = parser.parse_args() dts = get_date_range(args.dt) print('{script} dts={dts} ({n} 个)'.format( script=os.path.basename(__file__), dts=dts, n=len(dts))) for dt in dts: process_one(dt) print('all done. processed {} dts.'.format(len(dts))) if __name__ == '__main__': main()