raw_usr_traces_apd_d.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. 埋点 NDJSON.gz → test.raw_usr_traces_apd_d 入仓(冒烟测试期)。
  5. CLI:
  6. python3 tests/integration/tracking/raw_usr_traces_apd_d.py -dt DT
  7. -dt 4 种形式(与 bin/spark-sql-starter.py 一致,由 dw_base get_date_range 解析):
  8. 20260407 单日
  9. 20260407- 20260407 至昨天
  10. 20260407-20260409 区间(含两端)
  11. 20260407,20260408,20260409 离散
  12. 行为(按解析得到的 dt 列表串行处理,单 dt 失败立即中断后续):
  13. - 在 SOURCE_DIR 下找 traces-{YYYY-MM-DD}.json.gz(dt 转中划线格式拼文件名)
  14. - hdfs dfs -mkdir -p HDFS_TBL_DIR/dt={YYYYMMDD}
  15. - hdfs dfs -put -f 源 gz 到该分区目录(-f 覆盖,幂等可重跑)
  16. - hive -e ALTER TABLE ... ADD IF NOT EXISTS PARTITION ... LOCATION ...
  17. 当前 HIVE_DB='test'、HDFS_TBL_DIR 指 test.db;冒烟跑通后迁到 jobs/raw/usr/ 并把
  18. HIVE_DB / HDFS_TBL_DIR 改回 raw / raw.db。SOURCE_DIR 仍是 m2 临时目录,
  19. 正式上调度时改成产线路径,CLI 不变。
  20. """
  21. import argparse
  22. import os
  23. import subprocess
  24. import sys
  25. project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
  26. sys.path.append(project_root)
  27. from dw_base.utils.datetime_utils import get_date_range
  28. SOURCE_DIR = '/data/upload/tracking/temp'
  29. HDFS_TBL_DIR = '/user/hive/warehouse/test.db/raw_usr_traces_apd_d'
  30. HIVE_DB = 'test'
  31. HIVE_TBL = 'raw_usr_traces_apd_d'
  32. def run(cmd):
  33. print('[exec] ' + cmd)
  34. # Python 3.6.8 无 capture_output,用 stdout=PIPE, stderr=PIPE
  35. result = subprocess.run(
  36. cmd, shell=True,
  37. stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  38. )
  39. out = result.stdout.decode('utf-8', errors='replace').rstrip()
  40. err = result.stderr.decode('utf-8', errors='replace').rstrip()
  41. if out:
  42. print(out)
  43. if result.returncode != 0:
  44. if err:
  45. print(err, file=sys.stderr)
  46. raise RuntimeError('cmd failed (exit {}): {}'.format(result.returncode, cmd))
  47. def process_one(dt):
  48. file_dt = '{}-{}-{}'.format(dt[0:4], dt[4:6], dt[6:8])
  49. file_name = 'traces-{}.json.gz'.format(file_dt)
  50. local_path = os.path.join(SOURCE_DIR, file_name)
  51. print('--- dt={dt} src={src} ---'.format(dt=dt, src=local_path))
  52. if not os.path.isfile(local_path):
  53. raise RuntimeError('源文件不存在: {}'.format(local_path))
  54. part_dir = '{}/dt={}'.format(HDFS_TBL_DIR, dt)
  55. run('hdfs dfs -mkdir -p {}'.format(part_dir))
  56. run('hdfs dfs -put -f {} {}/'.format(local_path, part_dir))
  57. alter = (
  58. "ALTER TABLE {db}.{tbl} "
  59. "ADD IF NOT EXISTS PARTITION (dt='{dt}') "
  60. "LOCATION '{loc}';"
  61. ).format(db=HIVE_DB, tbl=HIVE_TBL, dt=dt, loc=part_dir)
  62. run('hive -e "{}"'.format(alter))
  63. def main():
  64. parser = argparse.ArgumentParser(
  65. prog='raw_usr_traces_apd_d',
  66. description='埋点 NDJSON.gz → test.raw_usr_traces_apd_d 入仓(支持批量 dt)',
  67. )
  68. parser.add_argument('-dt', required=True, metavar='DT',
  69. help='分区日期,4 种形式:20260407 / 20260407- / '
  70. '20260407-20260409 / 20260407,20260408,20260409')
  71. args = parser.parse_args()
  72. dts = get_date_range(args.dt)
  73. print('{script} dts={dts} ({n} 个)'.format(
  74. script=os.path.basename(__file__), dts=dts, n=len(dts)))
  75. for dt in dts:
  76. process_one(dt)
  77. print('all done. processed {} dts.'.format(len(dts)))
  78. if __name__ == '__main__':
  79. main()