raw_usr_traces_apd_d.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. 埋点 NDJSON.gz → test.raw_usr_traces_apd_d 单日入仓(冒烟测试期)。
  5. CLI:
  6. python3 tests/integration/tracking/raw_usr_traces_apd_d.py -dt YYYYMMDD
  7. 行为:
  8. - 在 SOURCE_DIR 下找 traces-{YYYY-MM-DD}.json.gz(dt 转中划线格式拼文件名)
  9. - hdfs dfs -mkdir -p HDFS_TBL_DIR/dt={YYYYMMDD}
  10. - hdfs dfs -put -f 源 gz 到该分区目录(-f 覆盖,幂等可重跑)
  11. - hive -e ALTER TABLE ... ADD IF NOT EXISTS PARTITION ... LOCATION ...
  12. 当前 HIVE_DB='test'、HDFS_TBL_DIR 指 test.db;冒烟跑通后迁到 jobs/raw/usr/ 并把
  13. HIVE_DB / HDFS_TBL_DIR 改回 raw / raw.db。SOURCE_DIR 仍是 m2 临时目录,
  14. 正式上调度时改成产线路径,CLI 不变。
  15. """
  16. import argparse
  17. import os
  18. import re
  19. import subprocess
  20. import sys
  21. SOURCE_DIR = '/data/upload/tracking/temp'
  22. HDFS_TBL_DIR = '/user/hive/warehouse/test.db/raw_usr_traces_apd_d'
  23. HIVE_DB = 'test'
  24. HIVE_TBL = 'raw_usr_traces_apd_d'
  25. DT_PATTERN = re.compile(r'^\d{8}$')
  26. def run(cmd):
  27. print('[exec] ' + cmd)
  28. # Python 3.6.8 无 capture_output,用 stdout=PIPE, stderr=PIPE
  29. result = subprocess.run(
  30. cmd, shell=True,
  31. stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  32. )
  33. out = result.stdout.decode('utf-8', errors='replace').rstrip()
  34. err = result.stderr.decode('utf-8', errors='replace').rstrip()
  35. if out:
  36. print(out)
  37. if result.returncode != 0:
  38. if err:
  39. print(err, file=sys.stderr)
  40. raise RuntimeError('cmd failed (exit {}): {}'.format(result.returncode, cmd))
  41. def main():
  42. parser = argparse.ArgumentParser(
  43. prog='raw_usr_traces_apd_d',
  44. description='埋点 NDJSON.gz → test.raw_usr_traces_apd_d 单日入仓',
  45. )
  46. parser.add_argument('-dt', required=True, metavar='YYYYMMDD',
  47. help='分区日期,yyyymmdd 格式(如 20260409)')
  48. args = parser.parse_args()
  49. dt = args.dt
  50. if not DT_PATTERN.match(dt):
  51. print('-dt 格式必须是 yyyymmdd(8 位数字),收到: {}'.format(dt), file=sys.stderr)
  52. sys.exit(2)
  53. file_dt = '{}-{}-{}'.format(dt[0:4], dt[4:6], dt[6:8])
  54. file_name = 'traces-{}.json.gz'.format(file_dt)
  55. local_path = os.path.join(SOURCE_DIR, file_name)
  56. print('{script} dt={dt} src={src}'.format(
  57. script=os.path.basename(__file__), dt=dt, src=local_path))
  58. if not os.path.isfile(local_path):
  59. print('源文件不存在: {}'.format(local_path), file=sys.stderr)
  60. sys.exit(3)
  61. part_dir = '{}/dt={}'.format(HDFS_TBL_DIR, dt)
  62. run('hdfs dfs -mkdir -p {}'.format(part_dir))
  63. run('hdfs dfs -put -f {} {}/'.format(local_path, part_dir))
  64. alter = (
  65. "ALTER TABLE {db}.{tbl} "
  66. "ADD IF NOT EXISTS PARTITION (dt='{dt}') "
  67. "LOCATION '{loc}';"
  68. ).format(db=HIVE_DB, tbl=HIVE_TBL, dt=dt, loc=part_dir)
  69. run('hive -e "{}"'.format(alter))
  70. print('done. dt={}'.format(dt))
  71. if __name__ == '__main__':
  72. main()