#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ 埋点 NDJSON.gz → test.raw_usr_traces_apd_d 单日入仓(冒烟测试期)。 CLI: python3 tests/integration/tracking/raw_usr_traces_apd_d.py -dt YYYYMMDD 行为: - 在 SOURCE_DIR 下找 traces-{YYYY-MM-DD}.json.gz(dt 转中划线格式拼文件名) - hdfs dfs -mkdir -p HDFS_TBL_DIR/dt={YYYYMMDD} - hdfs dfs -put -f 源 gz 到该分区目录(-f 覆盖,幂等可重跑) - hive -e ALTER TABLE ... ADD IF NOT EXISTS PARTITION ... LOCATION ... 当前 HIVE_DB='test'、HDFS_TBL_DIR 指 test.db;冒烟跑通后迁到 jobs/raw/usr/ 并把 HIVE_DB / HDFS_TBL_DIR 改回 raw / raw.db。SOURCE_DIR 仍是 m2 临时目录, 正式上调度时改成产线路径,CLI 不变。 """ import argparse import os import re import subprocess import sys SOURCE_DIR = '/data/upload/tracking/temp' HDFS_TBL_DIR = '/user/hive/warehouse/test.db/raw_usr_traces_apd_d' HIVE_DB = 'test' HIVE_TBL = 'raw_usr_traces_apd_d' DT_PATTERN = re.compile(r'^\d{8}$') def run(cmd): print('[exec] ' + cmd) # Python 3.6.8 无 capture_output,用 stdout=PIPE, stderr=PIPE result = subprocess.run( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out = result.stdout.decode('utf-8', errors='replace').rstrip() err = result.stderr.decode('utf-8', errors='replace').rstrip() if out: print(out) if result.returncode != 0: if err: print(err, file=sys.stderr) raise RuntimeError('cmd failed (exit {}): {}'.format(result.returncode, cmd)) def main(): parser = argparse.ArgumentParser( prog='raw_usr_traces_apd_d', description='埋点 NDJSON.gz → test.raw_usr_traces_apd_d 单日入仓', ) parser.add_argument('-dt', required=True, metavar='YYYYMMDD', help='分区日期,yyyymmdd 格式(如 20260409)') args = parser.parse_args() dt = args.dt if not DT_PATTERN.match(dt): print('-dt 格式必须是 yyyymmdd(8 位数字),收到: {}'.format(dt), file=sys.stderr) sys.exit(2) file_dt = '{}-{}-{}'.format(dt[0:4], dt[4:6], dt[6:8]) file_name = 'traces-{}.json.gz'.format(file_dt) local_path = os.path.join(SOURCE_DIR, file_name) print('{script} dt={dt} src={src}'.format( script=os.path.basename(__file__), dt=dt, src=local_path)) if not os.path.isfile(local_path): print('源文件不存在: {}'.format(local_path), file=sys.stderr) sys.exit(3) part_dir = '{}/dt={}'.format(HDFS_TBL_DIR, dt) run('hdfs dfs -mkdir -p {}'.format(part_dir)) run('hdfs dfs -put -f {} {}/'.format(local_path, part_dir)) alter = ( "ALTER TABLE {db}.{tbl} " "ADD IF NOT EXISTS PARTITION (dt='{dt}') " "LOCATION '{loc}';" ).format(db=HIVE_DB, tbl=HIVE_TBL, dt=dt, loc=part_dir) run('hive -e "{}"'.format(alter)) print('done. dt={}'.format(dt)) if __name__ == '__main__': main()