소스 검색

feat(raw): 埋点 gz 入仓 raw — mask UDF + 解析脱敏 SQL + 包装脚本

tianyu.chu 4 일 전
부모
커밋
970188fb49
3개의 변경된 파일130개의 추가작업 그리고 0개의 파일을 삭제
  1. 45 0
      dw_base/udf/business/spark_traces_udf.py
  2. 64 0
      jobs/raw/usr/raw_usr_traces_apd_d.py
  3. 21 0
      jobs/raw/usr/raw_usr_traces_apd_d.sql

+ 45 - 0
dw_base/udf/business/spark_traces_udf.py

@@ -0,0 +1,45 @@
+# -*- coding:utf-8 -*-
+"""
+埋点入仓脱敏 Spark UDF。
+
+注册为 SQL UDF `mask_source(line) -> STRING`:输入单条 ES hit(NDJSON 行),
+解析 `_source`、按事件脱敏 `properties`(含嵌套 `params`),回吐脱敏后的 `_source` JSON 字符串。
+es_id / event_name 不敏感,在 SQL 侧用 get_json_object 原生取,不经本 UDF。
+
+注册机制:spark-sql-starter `-u` 加载本文件,普通函数自动注册为 StringType 返回的 SQL UDF
+(@udf 装饰的 struct UDF 不会被注册,故返回 String 而非 struct)。
+脱敏配置 conf/tracking-mask.ini 经 SQL `ADD FILE` 分发到 executor,首次调用经 SparkFiles 懒加载。
+"""
+import json
+
+from pyspark import SparkFiles
+
+from dw_base.tracking import mask
+
+_CONF = None
+_CONF_NAME = 'tracking-mask.ini'
+
+
+def _conf():
+    """executor 端懒加载脱敏配置(driver 注册时配置尚未分发,不能 module-level 加载)。"""
+    global _CONF
+    if _CONF is None:
+        _CONF = mask.load_mask_conf(SparkFiles.get(_CONF_NAME))
+    return _CONF
+
+
+def mask_source(line):
+    """ES hit NDJSON 行 → 脱敏后的 _source JSON 字符串;解析失败返回 None。"""
+    if not line:
+        return None
+    try:
+        doc = json.loads(line)
+    except (TypeError, ValueError):
+        return None
+    source = doc.get('_source')
+    if not isinstance(source, dict):
+        return None
+    props = source.get('properties')
+    if isinstance(props, dict):
+        source['properties'] = mask.apply_mask(source.get('event'), props, _conf())
+    return json.dumps(source, ensure_ascii=False)

+ 64 - 0
jobs/raw/usr/raw_usr_traces_apd_d.py

@@ -0,0 +1,64 @@
+#!/usr/bin/env /usr/bin/python3
+# -*- coding:utf-8 -*-
+"""
+埋点 gz 入仓 raw 包装脚本。
+
+逐日:本地 gz(固定服务器 /data/upload/traces/traces-YYYY-MM-DD.json.gz)
+  → hdfs put 到临时目录 /tmp/raw_usr_traces/{dt}/
+  → 调 bin/spark-sql-starter.py 跑解析脱敏 SQL(写 raw 当日分区)
+  → 清临时目录
+
+CLI:-dt 支持单日 / `20260601-` / 区间 / 离散(复用 get_date_range)。
+缺文件跳过(缺数据容忍)。返回非 0 = 失败的日期数。
+"""
+import os
+import subprocess
+import sys
+
+PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..', '..'))
+sys.path.insert(0, PROJECT_ROOT)
+
+from dw_base.common.config_constants import K_DT
+from dw_base.utils.config_utils import parse_args
+from dw_base.utils.datetime_utils import get_date_range, get_yesterday
+
+LOCAL_DIR = '/data/upload/traces'
+HDFS_TMP_BASE = '/tmp/raw_usr_traces'
+SQL = 'jobs/raw/usr/raw_usr_traces_apd_d.sql'
+UDF = 'dw_base/udf/business/spark_traces_udf.py'
+STARTER = 'bin/spark-sql-starter.py'
+
+
+def run(cmd, cwd=None):
+    print('+ ' + ' '.join(cmd))
+    return subprocess.call(cmd, cwd=cwd)
+
+
+def main():
+    config, _ = parse_args(sys.argv[1:])
+    date_range = get_date_range(config.get(K_DT, get_yesterday()))
+    failed = []
+    for dt in date_range:  # dt = yyyymmdd
+        local_gz = '%s/traces-%s-%s-%s.json.gz' % (LOCAL_DIR, dt[0:4], dt[4:6], dt[6:8])
+        if not os.path.isfile(local_gz):
+            print('跳过 %s:本地 gz 不存在 %s' % (dt, local_gz))
+            continue
+        hdfs_tmp = '%s/%s' % (HDFS_TMP_BASE, dt)
+        run(['hdfs', 'dfs', '-rm', '-r', '-f', hdfs_tmp])
+        run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_tmp])
+        if run(['hdfs', 'dfs', '-put', '-f', local_gz, hdfs_tmp]) != 0:
+            print('!! %s hdfs put 失败' % dt)
+            failed.append(dt)
+            continue
+        rc = run(['python3', STARTER, '-f', SQL, '-u', UDF, '-dt', dt], cwd=PROJECT_ROOT)
+        run(['hdfs', 'dfs', '-rm', '-r', '-f', hdfs_tmp])
+        if rc != 0:
+            print('!! %s spark-sql 失败 (rc=%d)' % (dt, rc))
+            failed.append(dt)
+    if failed:
+        print('失败日期:%s' % ','.join(failed))
+    sys.exit(len(failed))
+
+
+if __name__ == '__main__':
+    main()

+ 21 - 0
jobs/raw/usr/raw_usr_traces_apd_d.sql

@@ -0,0 +1,21 @@
+-- 作者:tianyu.chu
+-- 日期:2026-06-10
+-- 工单:(无)
+-- 目的:埋点 gz(已 hdfs put 到临时目录)→ 解析 _source、脱敏 properties → 写 raw 薄表(es_id / event_name / 脱敏后 _source JSON)
+-- 状态:[待执行]
+-- 备注:gz 由包装脚本 jobs/raw/usr/raw_usr_traces_apd_d.py 逐日 put 到 /tmp/raw_usr_traces/${dt}/;
+--       mask_source UDF = dw_base/udf/business/spark_traces_udf.py(-u 加载);脱敏配置经 ADD FILE 分发;
+--       es_id/event_name 不敏感、原生 get_json_object 取;dt 静态 = 文件日
+
+ADD FILE conf/tracking-mask.ini;
+
+CREATE OR REPLACE TEMPORARY VIEW traces_gz_text
+USING text
+OPTIONS (path '/tmp/raw_usr_traces/${dt}/');
+
+INSERT OVERWRITE TABLE raw.raw_usr_traces_apd_d PARTITION (dt = '${dt}')
+SELECT
+    get_json_object(value, '$._id')           AS es_id,
+    get_json_object(value, '$._source.event') AS event_name,
+    mask_source(value)                        AS raw_json
+FROM traces_gz_text;