| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- 埋点 NDJSON.gz → raw.raw_usr_traces_apd_d 单日入仓。
- CLI:
- python3 jobs/raw/usr/raw_usr_traces_apd_d.py -dt YYYYMMDD
- 行为:
- - 在 SOURCE_DIR 下找 traces-{YYYY-MM-DD}.json.gz(dt 转中划线格式拼文件名)
- - hdfs dfs -mkdir -p HDFS_TBL_DIR/dt={YYYYMMDD}
- - hdfs dfs -put -f 源 gz 到该分区目录(-f 覆盖,幂等可重跑)
- - hive -e ALTER TABLE ... ADD IF NOT EXISTS PARTITION ... LOCATION ...
- 当前 SOURCE_DIR 写死成 m2 临时目录;上 DS 调度时把 SOURCE_DIR 改成正式产线路径,
- 其余逻辑无需改动(CLI 已是单 dt 单文件语义,与调度天然契合)。
- """
- import argparse
- import os
- import re
- import subprocess
- import sys
- SOURCE_DIR = '/data/upload/tracking/temp'
- HDFS_TBL_DIR = '/user/hive/warehouse/raw.db/raw_usr_traces_apd_d'
- HIVE_DB = 'raw'
- HIVE_TBL = 'raw_usr_traces_apd_d'
- DT_PATTERN = re.compile(r'^\d{8}$')
- def run(cmd):
- print('[exec] ' + cmd)
- # Python 3.6.8 无 capture_output,用 stdout=PIPE, stderr=PIPE
- result = subprocess.run(
- cmd, shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- )
- out = result.stdout.decode('utf-8', errors='replace').rstrip()
- err = result.stderr.decode('utf-8', errors='replace').rstrip()
- if out:
- print(out)
- if result.returncode != 0:
- if err:
- print(err, file=sys.stderr)
- raise RuntimeError('cmd failed (exit {}): {}'.format(result.returncode, cmd))
- def main():
- parser = argparse.ArgumentParser(
- prog='raw_usr_traces_apd_d',
- description='埋点 NDJSON.gz → raw.raw_usr_traces_apd_d 单日入仓',
- )
- parser.add_argument('-dt', required=True, metavar='YYYYMMDD',
- help='分区日期,yyyymmdd 格式(如 20260409)')
- args = parser.parse_args()
- dt = args.dt
- if not DT_PATTERN.match(dt):
- print('-dt 格式必须是 yyyymmdd(8 位数字),收到: {}'.format(dt), file=sys.stderr)
- sys.exit(2)
- file_dt = '{}-{}-{}'.format(dt[0:4], dt[4:6], dt[6:8])
- file_name = 'traces-{}.json.gz'.format(file_dt)
- local_path = os.path.join(SOURCE_DIR, file_name)
- print('{script} dt={dt} src={src}'.format(
- script=os.path.basename(__file__), dt=dt, src=local_path))
- if not os.path.isfile(local_path):
- print('源文件不存在: {}'.format(local_path), file=sys.stderr)
- sys.exit(3)
- part_dir = '{}/dt={}'.format(HDFS_TBL_DIR, dt)
- run('hdfs dfs -mkdir -p {}'.format(part_dir))
- run('hdfs dfs -put -f {} {}/'.format(local_path, part_dir))
- alter = (
- "ALTER TABLE {db}.{tbl} "
- "ADD IF NOT EXISTS PARTITION (dt='{dt}') "
- "LOCATION '{loc}';"
- ).format(db=HIVE_DB, tbl=HIVE_TBL, dt=dt, loc=part_dir)
- run('hive -e "{}"'.format(alter))
- print('done. dt={}'.format(dt))
- if __name__ == '__main__':
- main()
|