raw_usr_traces_apd_d.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. 埋点 gz 入仓 raw 包装脚本。
  5. 逐日:本地 gz(固定服务器 /data/upload/traces/traces-YYYY-MM-DD.json.gz)
  6. → hdfs put 到临时目录 /tmp/raw_usr_traces/{dt}/
  7. → 调 bin/spark-sql-starter.py 跑解析脱敏 SQL(写 raw 当日分区)
  8. → 清临时目录
  9. CLI:-dt 支持单日 / `20260601-` / 区间 / 离散(复用 get_date_range)。
  10. 缺文件跳过(缺数据容忍)。返回非 0 = 失败的日期数。
  11. """
  12. import os
  13. import subprocess
  14. import sys
  15. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..', '..'))
  16. sys.path.insert(0, PROJECT_ROOT)
  17. from dw_base.common.config_constants import K_DT
  18. from dw_base.utils.config_utils import parse_args
  19. from dw_base.utils.datetime_utils import get_date_range, get_yesterday
  20. LOCAL_DIR = '/data/upload/traces'
  21. HDFS_TMP_BASE = '/tmp/raw_usr_traces'
  22. SQL = 'jobs/raw/usr/raw_usr_traces_apd_d.sql'
  23. UDF = 'dw_base/udf/business/spark_traces_udf.py'
  24. STARTER = 'bin/spark-sql-starter.py'
  25. def run(cmd, cwd=None):
  26. print('+ ' + ' '.join(cmd))
  27. return subprocess.call(cmd, cwd=cwd)
  28. def main():
  29. config, _ = parse_args(sys.argv[1:])
  30. date_range = get_date_range(config.get(K_DT, get_yesterday()))
  31. failed = []
  32. for dt in date_range: # dt = yyyymmdd
  33. local_gz = '%s/traces-%s-%s-%s.json.gz' % (LOCAL_DIR, dt[0:4], dt[4:6], dt[6:8])
  34. if not os.path.isfile(local_gz):
  35. print('跳过 %s:本地 gz 不存在 %s' % (dt, local_gz))
  36. continue
  37. hdfs_tmp = '%s/%s' % (HDFS_TMP_BASE, dt)
  38. run(['hdfs', 'dfs', '-rm', '-r', '-f', hdfs_tmp])
  39. run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_tmp])
  40. if run(['hdfs', 'dfs', '-put', '-f', local_gz, hdfs_tmp]) != 0:
  41. print('!! %s hdfs put 失败' % dt)
  42. failed.append(dt)
  43. continue
  44. rc = run(['python3', STARTER, '-f', SQL, '-u', UDF, '-dt', dt], cwd=PROJECT_ROOT)
  45. run(['hdfs', 'dfs', '-rm', '-r', '-f', hdfs_tmp])
  46. if rc != 0:
  47. print('!! %s spark-sql 失败 (rc=%d)' % (dt, rc))
  48. failed.append(dt)
  49. if failed:
  50. print('失败日期:%s' % ','.join(failed))
  51. sys.exit(len(failed))
  52. if __name__ == '__main__':
  53. main()