|
|
@@ -0,0 +1,87 @@
|
|
|
+#!/usr/bin/env /usr/bin/python3
|
|
|
+# -*- coding:utf-8 -*-
|
|
|
+"""
|
|
|
+埋点 NDJSON.gz → raw.raw_usr_traces_apd_d 单日入仓。
|
|
|
+
|
|
|
+CLI:
|
|
|
+ python3 jobs/raw/usr/raw_usr_traces_apd_d.py -dt YYYYMMDD
|
|
|
+
|
|
|
+行为:
|
|
|
+ - 在 SOURCE_DIR 下找 traces-{YYYY-MM-DD}.json.gz(dt 转中划线格式拼文件名)
|
|
|
+ - hdfs dfs -mkdir -p HDFS_TBL_DIR/dt={YYYYMMDD}
|
|
|
+ - hdfs dfs -put -f 源 gz 到该分区目录(-f 覆盖,幂等可重跑)
|
|
|
+ - hive -e ALTER TABLE ... ADD IF NOT EXISTS PARTITION ... LOCATION ...
|
|
|
+
|
|
|
+当前 SOURCE_DIR 写死成 m2 临时目录;上 DS 调度时把 SOURCE_DIR 改成正式产线路径,
|
|
|
+其余逻辑无需改动(CLI 已是单 dt 单文件语义,与调度天然契合)。
|
|
|
+"""
|
|
|
+import argparse
|
|
|
+import os
|
|
|
+import re
|
|
|
+import subprocess
|
|
|
+import sys
|
|
|
+
|
|
|
+SOURCE_DIR = '/data/upload/tracking/temp'
|
|
|
+HDFS_TBL_DIR = '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d'
|
|
|
+HIVE_DB = 'raw'
|
|
|
+HIVE_TBL = 'raw_usr_traces_apd_d'
|
|
|
+DT_PATTERN = re.compile(r'^\d{8}$')
|
|
|
+
|
|
|
+
|
|
|
+def run(cmd):
|
|
|
+ print('[exec] ' + cmd)
|
|
|
+ # Python 3.6.8 无 capture_output,用 stdout=PIPE, stderr=PIPE
|
|
|
+ result = subprocess.run(
|
|
|
+ cmd, shell=True,
|
|
|
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
|
+ )
|
|
|
+ out = result.stdout.decode('utf-8', errors='replace').rstrip()
|
|
|
+ err = result.stderr.decode('utf-8', errors='replace').rstrip()
|
|
|
+ if out:
|
|
|
+ print(out)
|
|
|
+ if result.returncode != 0:
|
|
|
+ if err:
|
|
|
+ print(err, file=sys.stderr)
|
|
|
+ raise RuntimeError('cmd failed (exit {}): {}'.format(result.returncode, cmd))
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
+ prog='raw_usr_traces_apd_d',
|
|
|
+ description='埋点 NDJSON.gz → raw.raw_usr_traces_apd_d 单日入仓',
|
|
|
+ )
|
|
|
+ parser.add_argument('-dt', required=True, metavar='YYYYMMDD',
|
|
|
+ help='分区日期,yyyymmdd 格式(如 20260409)')
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ dt = args.dt
|
|
|
+ if not DT_PATTERN.match(dt):
|
|
|
+ print('-dt 格式必须是 yyyymmdd(8 位数字),收到: {}'.format(dt), file=sys.stderr)
|
|
|
+ sys.exit(2)
|
|
|
+
|
|
|
+ file_dt = '{}-{}-{}'.format(dt[0:4], dt[4:6], dt[6:8])
|
|
|
+ file_name = 'traces-{}.json.gz'.format(file_dt)
|
|
|
+ local_path = os.path.join(SOURCE_DIR, file_name)
|
|
|
+
|
|
|
+ print('{script} dt={dt} src={src}'.format(
|
|
|
+ script=os.path.basename(__file__), dt=dt, src=local_path))
|
|
|
+
|
|
|
+ if not os.path.isfile(local_path):
|
|
|
+ print('源文件不存在: {}'.format(local_path), file=sys.stderr)
|
|
|
+ sys.exit(3)
|
|
|
+
|
|
|
+ part_dir = '{}/dt={}'.format(HDFS_TBL_DIR, dt)
|
|
|
+ run('hdfs dfs -mkdir -p {}'.format(part_dir))
|
|
|
+ run('hdfs dfs -put -f {} {}/'.format(local_path, part_dir))
|
|
|
+ alter = (
|
|
|
+ "ALTER TABLE {db}.{tbl} "
|
|
|
+ "ADD IF NOT EXISTS PARTITION (dt='{dt}') "
|
|
|
+ "LOCATION '{loc}';"
|
|
|
+ ).format(db=HIVE_DB, tbl=HIVE_TBL, dt=dt, loc=part_dir)
|
|
|
+ run('hive -e "{}"'.format(alter))
|
|
|
+
|
|
|
+ print('done. dt={}'.format(dt))
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|