| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- 埋点 gz 入仓 raw 包装脚本。
- 逐日:本地 gz(固定服务器 /data/upload/traces/traces-YYYY-MM-DD.json.gz)
- → hdfs put 到临时目录 /tmp/raw_usr_traces/{dt}/
- → 调 bin/spark-sql-starter.py 跑解析脱敏 SQL(写 raw 当日分区)
- → 清临时目录
- CLI:-dt 支持单日 / `20260601-` / 区间 / 离散(复用 get_date_range)。
- 缺文件跳过(缺数据容忍)。返回非 0 = 失败的日期数。
- """
- import os
- import subprocess
- import sys
- PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..', '..'))
- sys.path.insert(0, PROJECT_ROOT)
- from dw_base.common.config_constants import K_DT
- from dw_base.utils.config_utils import parse_args
- from dw_base.utils.datetime_utils import get_date_range, get_yesterday
- LOCAL_DIR = '/data/upload/traces'
- HDFS_TMP_BASE = '/tmp/raw_usr_traces'
- SQL = 'jobs/raw/usr/raw_usr_traces_apd_d.sql'
- UDF = 'dw_base/udf/business/spark_traces_udf.py'
- STARTER = 'bin/spark-sql-starter.py'
- def run(cmd, cwd=None):
- print('+ ' + ' '.join(cmd))
- return subprocess.call(cmd, cwd=cwd)
- def main():
- config, _ = parse_args(sys.argv[1:])
- date_range = get_date_range(config.get(K_DT, get_yesterday()))
- failed = []
- for dt in date_range: # dt = yyyymmdd
- local_gz = '%s/traces-%s-%s-%s.json.gz' % (LOCAL_DIR, dt[0:4], dt[4:6], dt[6:8])
- if not os.path.isfile(local_gz):
- print('跳过 %s:本地 gz 不存在 %s' % (dt, local_gz))
- continue
- hdfs_tmp = '%s/%s' % (HDFS_TMP_BASE, dt)
- run(['hdfs', 'dfs', '-rm', '-r', '-f', hdfs_tmp])
- run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_tmp])
- if run(['hdfs', 'dfs', '-put', '-f', local_gz, hdfs_tmp]) != 0:
- print('!! %s hdfs put 失败' % dt)
- failed.append(dt)
- continue
- rc = run(['python3', STARTER, '-f', SQL, '-u', UDF, '-dt', dt], cwd=PROJECT_ROOT)
- run(['hdfs', 'dfs', '-rm', '-r', '-f', hdfs_tmp])
- if rc != 0:
- print('!! %s spark-sql 失败 (rc=%d)' % (dt, rc))
- failed.append(dt)
- if failed:
- print('失败日期:%s' % ','.join(failed))
- sys.exit(len(failed))
- if __name__ == '__main__':
- main()
|