Procházet zdrojové kódy

refactor(udf): 提升 UDF 为顶层模块 dw_base/udf/

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu před 2 týdny
rodič
revize
9e2d611a61

+ 1 - 1
bin/excel_to_hive.py

@@ -6,7 +6,7 @@
   -- 设置 Spark 配置
   SET spark.xxx.yyy.zzz=xyz;
   -- 引用 UDF
-  ADD FILE dw_base/spark/udf/spark_xxx_udf.py;
+  ADD FILE dw_base/udf/business/spark_xxx_udf.py;
   -- 声明变量
   SET TOPIC=xxx;
   -- 查看数据行数

+ 1 - 1
bin/spark-sql-starter.py

@@ -6,7 +6,7 @@
   -- 设置 Spark 配置
   SET spark.xxx.yyy.zzz=xyz;
   -- 引用 UDF
-  ADD FILE dw_base/spark/udf/spark_xxx_udf.py;
+  ADD FILE dw_base/udf/business/spark_xxx_udf.py;
   -- 声明变量
   SET DT_START=20210101;
   SET TOPIC=xxx;

+ 1 - 1
dw_base/__init__.py

@@ -24,7 +24,7 @@ PROJECT_ROOT_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
 PROJECT_NAME = os.path.basename(PROJECT_ROOT_PATH)
 sys.path.append(PROJECT_ROOT_PATH)
 # 公用的Spark UDF文件
-COMMON_SPARK_UDF_FILE = 'dw_base/spark/udf/common/spark_common_udf.py'
+COMMON_SPARK_UDF_FILE = 'dw_base/udf/common/spark_common_udf.py'
 BANNED_USER = 'root'
 RELEASE_USER = 'alvis'
 USER = os.environ['USER']

+ 0 - 0
dw_base/spark/udf/__init__.py → dw_base/udf/__init__.py


+ 0 - 0
dw_base/spark/udf/business/__init__.py → dw_base/udf/business/__init__.py


+ 0 - 0
dw_base/spark/udf/common/__init__.py → dw_base/udf/common/__init__.py


+ 1 - 1
dw_base/spark/udf/common/spark_common_udf.py → dw_base/udf/common/spark_common_udf.py

@@ -2,7 +2,7 @@
 # -*- coding:utf-8 -*-
 """
 通用 UDF —— 与业务无关的数据类型 / 格式操作(JSON / Array / String / Numeric / Date / Hash)
-SparkSQL 入口自动 ADD FILE 注册;业务专用 UDF 请放到 dw_base/spark/udf/business/ 下按需加载
+SparkSQL 入口自动 ADD FILE 注册;业务专用 UDF 请放到 dw_base/udf/business/ 下按需加载
 """
 
 import difflib

+ 4 - 3
kb/00-项目架构.md

@@ -33,7 +33,8 @@ poyee-data-warehouse/              # 项目根目录(仓库名 = 部署名)
 ├── dw_base/                       # 通用库层
 │   ├── __init__.py                #   全局初始化
 │   ├── common/                    #   常量、容器
-│   ├── spark/                     #   SparkSQL 核心 + UDF
+│   ├── spark/                     #   SparkSQL 核心
+│   ├── udf/                       #   Spark UDF 库(common + business)
 │   ├── datax/                     #   DataX 配置生成引擎
 │   ├── database/                  #   MongoDB/MySQL 工具
 │   ├── scheduler/                 #   调度辅助脚本
@@ -71,7 +72,7 @@ poyee-data-warehouse/              # 项目根目录(仓库名 = 部署名)
 |------|-----------|------|
 | 全局初始化 | `dw_base/__init__.py` | 环境检测、颜色常量、findspark 初始化、用户/权限判断 |
 | SparkSQL 引擎 | `dw_base/spark/spark_sql.py` | SparkSession 管理、UDF 注册、SQL 执行、数据导出 |
-| UDF 库 | `dw_base/spark/udf/` | `common/` 通用 UDF(入口自动注册)+ `business/` 业务专用 UDF(按需 `ADD FILE`) |
+| UDF 库 | `dw_base/udf/` | `common/` 通用 UDF(入口自动注册)+ `business/` 业务专用 UDF(按需 `ADD FILE`) |
 | DataX 引擎 | `dw_base/datax/` | ini 配置解析 → json 作业文件生成 |
 | DataX 数据源 | `dw_base/datax/datasources/` | 各类数据源的连接参数抽象 |
 | DataX 插件 | `dw_base/datax/plugins/` | Reader/Writer 工厂 + 各数据源实现 |
@@ -111,7 +112,7 @@ graph TB
     subgraph dw_base [dw_base/ 通用库]
         INIT_PY[__init__.py<br/>全局初始化]
         SPARK_SQL[spark/spark_sql.py<br/>SparkSQL 引擎]
-        UDF[spark/udf/<br/>UDF 库]
+        UDF[udf/<br/>UDF 库]
         DATAX_ENGINE[datax/<br/>配置生成引擎]
         DATASOURCES_CODE[datax/datasources/<br/>数据源抽象]
         PLUGINS[datax/plugins/<br/>Reader/Writer]

+ 1 - 1
kb/23-标签体系.md

@@ -215,7 +215,7 @@ graph TB
 
 **改造内容**:
 - 人群包表 `member_ids` 字段从 `ARRAY<BIGINT>` 改为 `BINARY`(RoaringBitmap 序列化)
-- 自研 Spark UDF(Java/Scala,注册到 `dw_base/spark/udf/`):
+- 自研 Spark UDF(Java/Scala,注册到 `dw_base/udf/`):
 
 | UDF | 功能 | 对应 SQL |
 |-----|------|---------|

+ 1 - 0
kb/92-重构进度.md

@@ -158,5 +158,6 @@
 | 2026-04-20 | **§7.2.1 再次反转**:删除 `whoami == RELEASE_USER` 分流,`LOG_ROOT_DIR` 改为单值默认 `${HOME}/log` 并保留在 `conf/env.sh`(外配后期可改)。理由:`$HOME` 天然按用户隔离(bigdata/个人用户家目录不同),代码判断是多余一层;`bigdata` 本身就是专属调度账号,其 `$HOME` 即是生产日志合法归宿,不需要系统级 `/opt/data/log` 那条路。同步更新 `90-重构路线.md §7.2.1`(核心段)+ `§2.1 硬编码表行` + `§2.4 env.sh 草稿` + `00-项目架构.md §6 部署段` + `92 阶段 2 checklist` | — |
 | 2026-04-20 | **老业务耦合代码第二批清理(重构计划外)**:在 UDF/模块独立化讨论中顺带盘点 `dw_base/` 子模块,决定 16 文件批量删除:**整目录删 3 个**——`oss/`(oss2_util.py + __init__,新业务不需要对象存储)、`scheduler/`(polling_scheduler / drop_partitions / drop_daily_full_snapshot_tbls 三业务文件,前者绑死老 Mongo 轮询、后两者按 N 天清分区的能力已在阶段 4 记录重写任务)、`hive/`(hive_utils + hive_constants;hive_utils 中 `get_hive_create_table_ddl*` 零引用 + 依赖 `COLUMN_NAME_COMMENT_DICT` 老业务字段字典、DDL 生成器整体不重建;`get_hive_database_name` / `get_hive_table_prefix` 两个命名约定函数语义已在 `kb/21-命名规范.md` 有规则,不重建代码,后续 `bin/datax-gc-generator.py` 从零重写时按新约定实现);**utils/ 删 7 文件**——data_distinct / diff_utils / excel_to_hive_utils / hive_diff_database / hive_to_excel_utils / pdt_check_table / pdt_check_table_multis,全部零外部引用 + 强业务耦合(硬编码 tendata 路径 / 老集群 IP `192.168.30.3` / 中文表名拼音转换 / 海关 `cts_*` 表名模式)。**连带效应**:`bin/datax-gc-generator.py:26` import hive_utils 成破损 import,由 90-路线 §2.7 "从零重写" 任务覆盖,不单独修复。**阶段 4 新增任务**:重新实现分区保留工具(元表驱动 + 参数化天数,目录可能不叫 scheduler)。**CLAUDE.md 规则追加**:"空模块直接删"原则首次执行延后(elasticsearch/flink/ml/validation/common/ 暂留,后续更细粒度规整) | — |
 | 2026-04-20 | **老业务耦合代码批量清理(重构计划外)**:排查 `tendata` 残留时发现一批与 `tendata_corp` / `ent_tendata_interface` / DolphinScheduler / 钉钉告警强耦合的存量文件,逐项核对后批量删除 40 个文件 + 精简 1 个:**老业务模块 34**(`dw_base/scheduler/` 下 `get_oldmongo_*` ×5、`dingtalk_*` / `ent_interface_dingtalk*` / `country_count_dingtalk` / `mg_company_alias_init` ×8、`mg2es/` 整目录 13 文件;`dw_base/ds/` 整目录 4 文件;`dw_base/spark/udf/spark_read_hive_columns_cnt.py`;`dw_base/utils/tid_utils.py`;`dw_base/spark/td_spark_init.py`(老同事 xunxu 所写未被调用);`bin/hive-exec.sh`),**级联清理 6**(`dw_base/spark/udf/spark_id_generate_udf.py` + `dw_base/spark/udf/enterprise/unique/spark_tid_match_udf.py` 依赖已删 `tid_utils`;`dw_base/utils/hive_file_merge.py` + `dw_base/utils/spark_parse_json_to_hive.py` 依赖已删 `mg2es`/钉钉告警;`bin/hive-exec-job-starter.py` 调用已删 `hive-exec.sh`;`bin/dingtalk-work-alert.sh`),**精简 1**:`dw_base/spark/udf/spark_mmq_udf.py` 从 530 行裁到 4 个数据类型转换函数(phone/domain/website/statname 等场景相关 UDF 与 Mongo 相关逻辑全删)。同步更新:`00-项目架构.md`(移除 `td_spark_init` / DS 相关条目)、`90-重构路线.md`(钉钉 + 企微 Webhook 合并表述、删除 DS API 行、§5.2 依赖清理清单标记提前完成)、`92-进度.md` 阶段 1 第 6 行 `re.sub` checklist 更新残留范围(~15 处)。**阶段 4 新增两项任务**:(1) 重新实现 Hive HDFS 小文件合并工具(通用化连接 / 剥离 `cts_*_ex/_im` 表名假设);(2) 重写告警模块(弃钉钉走 `conf/alerter.ini` Webhook) | — |
+| 2026-04-20 | **UDF 提升为顶层模块(重构计划外)**:`dw_base/spark/udf/` → `dw_base/udf/`。动机:UDF 是独立能力域(后续会高频扩展、需本地单测),不应锁死在 `spark/` 子树里。联动:`dw_base/__init__.py:27` 常量、`bin/spark-sql-starter.py` + `bin/excel_to_hive.py` 文件头 SQL 样例注释、`dw_base/udf/common/spark_common_udf.py` 模块 docstring、`kb/00-项目架构.md`(目录树新增 `udf/` 行 + 模块职责表 + Mermaid 节点)、`kb/23-标签体系.md §5` bitmap UDF 注册路径。`bin/spark-sql-starter.py:172-173` 用的是常量自动生效 | — |
 | 2026-04-20 | **修正 §7.1 pyspark 误记**:前期文档把 pyspark 列进强依赖 KEEP 行 + "pyspark 2.4.0 固定" 一句,均与真实的 `requirements.txt` 不符。真实机制:`findspark==2.0.1` 运行时定位 CDH 集群已装 PySpark,版本随集群走,客户端不固定也不入 `requirements.txt`。kb/90 §7.1 表格 KEEP 列去 pyspark + "后续事项"末行改为 findspark 机制说明 | — |
 | 2026-04-20 | **UDF 模块重组(重构计划外)**:独立 `dw_base/spark/udf/` 目录结构为 `common/`(通用 UDF,SparkSQL 入口自动 `ADD FILE` 注册)+ `business/`(业务专用 UDF,SQL 中按需 `ADD FILE` 加载)两类。(a) 6 份源文件(根 `spark_common_udf.py` 24 函数 + `spark_json_array_udf.py` 23 函数 + `spark_mmq_udf.py` 3 函数 + `customs/cts_common.py` + `product/escape_udf.py` + `enterprise/spark_eng_ent_json_array_append_udf.py`)通读 + 去重 + 业务耦合剥离后,合并为单文件 `common/spark_common_udf.py`(500 行 40 函数,分 JSON / Array / String / Numeric-Date-Hash / Cross-type-converters 5 段)。单文件方案而非按类型拆分,理由:跨类型转换函数(`json2str` / `arr2json` / `str2map` 等约 9 个,占 20%+)没有明确归属,强行分只会制造边界争议。(b) 清理 `dw_base/spark/udf/` 下所有老业务 UDF 子目录与根级业务文件共 60 个:整目录删 `contacts/` / `customs/` / `enterprise/` / `product/` / `productApplication/` / `test/`;根目录删 `spark_eng_ent_name_clean.py` / `spark_india_format_phone_udf.py` / `solr_similar_match_udf.py` / `main_test.py` 以及 3 份源 UDF 文件。(c) `dw_base/__init__.py:27` `COMMON_SPARK_UDF_FILE` 常量路径由 `dw_base/spark/udf/spark_common_udf.py` 改为 `dw_base/spark/udf/common/spark_common_udf.py`(`bin/spark-sql-starter.py:172-173` 两处 usage 靠常量传递自动生效)。(d) 删除老 `dingtalk_*` / `mg2es` 级联清理中没赶上的 UDF 业务耦合文件在此批统一清零。`business/` 目录暂为骨架,后续真正出现新业务 UDF 时按需补 | — |