90-重构路线.md 53 KB

重构路线

基于老项目 tendata-warehouse-release 的代码分析,为新项目 poyee-data-warehouse 规划的重构路线。 本文档说"为什么改、怎么改";配套的 92-重构进度.md 说"改到哪一步了"。

〇、全景与依赖 DAG

线性 P0-P3 排序不适合实际推进 —— 配置外移、模块重组、基础设施可以并行,而业务 SQL / 老代码删除强依赖它们。本节把全部任务按聚簇组织,按依赖边而非阶段号推进。

聚簇划分

聚簇 名称 范围 对应旧阶段
A 配置外移 / 硬编码清理 conf/env.sh / workers.ini / alerter.ini / spark-defaults.conf / datax-speed.ini / datasource 多环境 / DataX 路径解耦 阶段 2 主体
B dw_base/ 重组 B1 __init__.py 瘦身 · B2 common/utils/io/ops 四模块边界定稿 · B3 代码风格修正(__contains__ / SQL 注入 / Shell-Python 重复) · B4 新占位模块 registry 阶段 2 / 4 混合
C bin/ 入口收口 datax-import / datax-export 两命令收口 · datax-gc-generator 从零重写 · csv-to-hdfs-starter 实现 阶段 1 尾 + 阶段 2
D 基础设施 tests/ 测试体系 · 告警模块重写 · 日志模块统一 · dq/ 数据质量 · wiki/ Docmost · pm/ 项目管理集成 阶段 2 / 4
F 老代码删除 launch-pad/ 整删 · 其他已确认废弃 阶段 5

新业务 SQL 从零开发(jobs/{raw,ods,dwd,dws,tdm,ads}/ + DS 工作流)不属于重构 scope,不纳入聚簇。新业务 SQL 生产稳定一个完整周期后触发 F(launch-pad/ 整删)。

依赖 DAG

┌──────────────┐
│ B2 四模块    │
│    边界定稿  │
└──────┬───────┘
       ↓
 ┌─────┴──────┐
 ↓            ↓
┌──────────┐ ┌──────────┐
│ B4 新占位 │ │  C bin   │
│  (骨架)   │ │  收口    │
└──────────┘ └──────────┘

A 全部子项 ─────┐
B1 __init__ 瘦身┤  ✅ 2026-04-21
A2 spark conf ─┤  ✅ 2026-04-21
B3 风格修正 ────┤ 可独立推进,与上图并行
D 基础设施 ─────┘

       (A + B + C 基本就绪 → 新业务 SQL 从零开发,不属于重构 scope)
                 ↓
              (新业务 SQL 生产稳定一个完整周期后)
                 ↓
              ┌────┐
              │ F  │  老代码删除
              └────┘

关键依赖边(强依赖,不能翻转):

  • B2 → B4:四模块(common/utils/io/ops)边界不定,新占位模块放哪里都是赌博
  • B2 → Cbin/ 两命令的底层实现要放 dw_base/datax/entry.py,属于四模块定稿后的延伸
  • 新业务 SQL 生产稳定 → Flaunch-pad/ 删除前置条件是新 jobs/ 在生产跑稳一个完整周期(新业务 SQL 开发本身不纳入重构 scope)

可并行的事(无强依赖):

  • A 大部分子项(env.sh / workers.ini / alerter.ini / datax-speed.ini / datasource 多环境 / DataX 脚本路径解耦)彼此独立
  • B3 代码风格修正(__contains__ / SQL 注入)与任何聚簇都不冲突
  • D 基础设施(tests 骨架 / dq / sync / pm)骨架已建,实现可滚动推进

本文各节与聚簇的对应

  • §一 模块重命名 → 跨 B/F(历史项,大部分已完成)
  • §二 消除硬编码 → 聚簇 A + 部分 C
  • §三 __init__.py 瘦身 → 聚簇 B(B1)
  • §四 代码风格修正 → 聚簇 B(B3)
  • §五 清理废弃代码 → 聚簇 F(目前已清零,留档案)
  • §六 测试体系 → 聚簇 D
  • §七 其他建议 → 聚簇 D + A 杂项

一、模块重命名(高优先级) [聚簇 B / F]

1.1 tendatadw_base

影响范围:

类型 涉及内容 处理方式
Python import from tendata import *from tendata.xxx import yyy 全局替换
SQL 中的 ADD FILE ADD FILE tendata/spark/udf/xxx.py 全局替换
zip 打包命令 zip -qr tendata.zip tendata 改为 dw_base.zip dw_base
addPyFile sparkContext.addPyFile('tendata.zip') 改为 dw_base.zip
路径匹配正则 re.sub(r"tendata-warehouse.*", ...) ✅ 2026-04-22 已改为 poyee-data-warehouse(两处字面量:dw_base/utils/file_utils.py:9 + dw_base/utils/hdfs_merge_small_file.py:7
目录名引用 PROJECT_ROOT_PATH 相关逻辑 自动适配

建议做法: 先用脚本批量替换,再逐文件审查。注意 tendata 字样可能出现在老项目的 Hive 数据库名、表名或 DataX 数据源 ini 名中,这些属于老业务数据、不应替换——新项目业务 SQL 从零开发,不涉及老库老表。

1.2 launch-pad/jobs/(不做业务迁移,仅建立新结构)

重要澄清:老项目 launch-pad/ 中的业务代码与新项目业务完全无关(属于上个项目的历史业务),不存在内容迁移launch-pad/ 在重构期间作为样板 SQL 代码的参考(看写法、看 ${dt} 用法、看 DataX ini 结构),新项目所有业务 SQL 从零开发完成后,launch-pad/ 整体删除。

jobs/ 目录按数仓分层 × 业务域二维组织,结构见 00-项目导览.md §1 目录树与 30-开发规范.md §4.2。业务域代码统一使用命名规范定义的 trd/usr/prd/shp/pub/dim(见 21-命名规范.md §3.2)。

需要处理的代码引用

  • 脚本中 list_files('launch-pad/...') 的硬编码路径 → 改为 jobs/...
  • zip -qr launch-pad.zip launch-pad 类命令(如有)→ 改为 jobs
  • DolphinScheduler 工作流里老路径的引用 → 新项目上线时一并替换

二、消除硬编码(高优先级) [聚簇 A + 部分 C]

2.1 当前硬编码清单

硬编码内容 所在位置 建议方案
项目部署目录 poyee-data-warehouse/ bin/publish.sh(2026-04-20 从根目录挪入 bin/ 新项目发布目录为 /home/bigdata/release/poyee-data-warehouse/
DATAX_WORKERS + DATAX_WORKERS_WEIGHTS + RELEASE_HOST 硬编码 bin/common/init.sh:13-28 ✅ 2026-04-23 整体移入 conf/workers.ini[release] host + [weights] 两 section,ini 格式入库),init.sh 改为纯 bash 解析;新集群 hostname cdhmaster02 / cdhnode01-03
HADOOP_CONF_DIR='/etc/hadoop/conf' / SPARK_CONF_DIR='/etc/spark/conf' / PYSPARK_DRIVER_PYTHON='/usr/bin/python3' / PYSPARK_PYTHON='/usr/bin/python3' __init__.py ✅ 2026-04-29 收口:HADOOP_CONF_DIR / SPARK_CONF_DIR 加 ${VAR:-default}conf/env.sh,由 bootstrap_env setdefault 注入;PYSPARK_DRIVER_PYTHON / PYSPARK_PYTHON 改用 os.environ['PYTHON3_PATH'] 复用,消除双份硬编码
告警 Webhook(钉钉 / 企微 Key) dw_base/common/alerter_constants.py(老告警模块已于 2026-04-20 删除,含 dingtalk_notifier.py / ent_interface_dingtalk* / bin/dingtalk-work-alert.sh 新告警模块重写时 Webhook Key 外移到 conf/alerter.ini入库——部署靠 git pull,gitignore 会拉不到;webhook key 不算高敏感,最多被拿去发垃圾消息),Python 侧改 ConfigParser 加载;alerter_constants.py 整个删除;新项目不再使用钉钉
Spark 默认参数(executor/driver/shuffle/sql.*) dw_base/spark/spark_sql.py 构造函数 + .config(...) 移入 conf/spark-defaults.conf,SQL 文件可用 SET 覆盖,见 §2.3
DataX ini 路径前缀剥离 conf/datax/config/ bin/datax-single-job-starter.sh(TEMP 处理)、bin/datax-job-config-generator.pyreplace('conf/datax/config/', ''))、bin/datax-multiple-job-starter.sh(日志路径派生) 原目录已整体挪到 conf/bak/ 并 gitignore,脚本里 replace 现在是 no-op 死逻辑。去除前缀假设,改为靠 ini 文件名(= 任务唯一标识)识别用途
DataX 生成 JSON 输出目录名 conf/datax/generated bin/datax-job-config-generator.py 末尾 default_output_dirbin/datax-single-job-starter.sh 第 89/118 行、bin/datax-multiple-job-starter.sh 第 187 行、.gitignore 目录改名 conf/datax-json/;子路径扁平化为 conf/datax-json/{ini_basename}.json(去掉 src_dst / project_layer_env 等派生层级,依赖 ini 文件名全局唯一);.gitignore 同步改
JOB_NAME / JSON 文件名的 ini→json 转换逻辑重复实现 Python 侧 bin/datax-job-config-generator.py:126os.path.basename(gcf).replace('.ini', '.json'))+ Bash 侧 bin/datax-single-job-starter.sh:88basename .ini 合一到 dw_base.datax.path_utils.job_name_from_ini()(或类似工具);Bash 侧通过 python3 -c 调用或在 bin/common/init.sh 定义等价 shell 函数,单一来源
ini 里 dataSource 字段拼接环境后缀 老项目写法 dataSource = pg-hobby-prod 改为 dataSource = {db_type}/{env}-{实例简称},代码按首段斜杠判 db_type(即父目录名);source ini 落位 datasource/{db_type}/{env}-{实例简称}.ini,无 env 子目录。已于 2026-04-22 落地(dw_base/datax/plugins/plugin.py:37plugin_factory.py:34) ✅
导出类 ini 扇出撞名风险 jobs/ads/{域}/ 下 ini 若都以源 Hive 表名命名,同一张 ads 表扇出到多个目标库时会重名覆盖 命名规则改为 {源 Hive 表名}__{目标 db_type}_{目标 instance}.ini(双下划线分隔源/目标)
dw_base/common/template_constants.py 大量死代码 定义了 20+ 个 SQL 模板路径常量,实际只有 2 个(MYSQL_HIVE_CREATE_TABLE_TEMPLATE / MYSQL_HIVE_HBASE_CREATE_TABLE_TEMPLATE)被引用,其余 18 个零 import 整个文件删除;连带废弃下一条 ✅ 2026-04-29 整文件删
MySQLReader.generate_hive_ddl() / generate_hive_over_hbase_ddl() 自动建表 DDL 路径 dw_base/datax/plugins/reader/mysql_reader.py:195/222,被 bin/datax-gc-generator.py:616/728 调用;且 conf/template/ 目录在新项目根本不存在,真调用会 FileNotFoundError 整段路径废弃——与 CLAUDE.md 约定的 manual/ddl/ 是 DDL 唯一来源相冲突。datax-gc-generator.py 仅生成 ini 配置,不再输出 CREATE TABLE DDL;DDL 由开发者按 21-命名规范.md 手写到 manual/ddl/ ✅ 2026-04-29 两方法删
缺少集中的开发者参考模板目录 —(新增) 已建 conf/templates/{datax/{datasource,sync},spark/{sql,ddl}}/,模板用 *.template.{ini,sql} 双扩展名。与上条废弃的运行时模板完全不同:这里的模板不被任何代码读取,只供开发者对照写新文件;入口见 kb/30-开发规范.md §6

2.2 建议的配置结构

conf/
├── env.sh                    # Shell + Python 环境变量单源(Python 侧由 dw_base/utils/env_loader.py 通过 bash 子进程解析注入 os.environ)
├── workers.ini               # DataX Worker 列表与权重
├── alerter.ini               # 告警 Webhook 配置(入库;见 §2.1)
├── spark-defaults.conf       # Spark 底层行为/开关类(11 条,初始化后少改;Spark 原生格式)
└── spark-tuning.conf         # Spark 资源/并行度/队列类(10 条,业务早期常改;同 tuning 相同 key 覆盖 defaults)

2.3 Spark 配置三级覆盖策略(已完成)

2026-04-21 落地形态:按业务调整频率拆两文件入库 + spark_sql.py 三级覆盖。

两文件拆分

  • conf/spark-defaults.conf(11 条)—— 底层行为/开关类,初始化后少改(spark.sql.adaptive/broadcastTimeout/codegen/arrow*/files/statistics.* + spark.dynamicAllocation.enabled + spark.files.ignoreCorruptFiles + spark.debug.maxToStringFields + spark.port.maxRetries
  • conf/spark-tuning.conf(10 条)—— 资源/并行度/队列类,业务早期常改(spark.{driver,executor}.{memory,cores} + spark.executor.instances + spark.executor.memoryOverhead + spark.driver.maxResultSize + spark.default.parallelism + spark.sql.shuffle.partitions + spark.yarn.queue

两文件都用 Spark 原生 key value 格式(空白分隔、# 注释、无 section),与 spark-submit --properties-file 同语法。

三级覆盖

L1   conf/spark-defaults.conf  +  conf/spark-tuning.conf     (相同 key tuning 覆盖 defaults)
     ↓
L2   SQL 文件内 SET spark.xxx=yyy     (query() 预扫描塞入 _final_spark_config,session 启动前写入 builder)
     ↓
L3   SparkSQL(...) 显式传参  +  extra_spark_config  +  命令行 -sc

与仓库改名的联动 ✅ 2026-04-22:仓库改名 tendata-warehouse-releasepoyee-data-warehouse 已完成(项目根由用户手动改名 + 路径正则两处字面量同步)。.idea/*.iml + modules.xml.idea + *.iml.gitignore,老 iml / modules.xml 引用 / workspace.xml module name 残留是本地 IDE 状态,不入库亦不影响运行,不处理。

2.5 DataX 脚本路径解耦

现状bin/datax-single-job-starter.shbin/datax-job-config-generator.py 仍通过剥离 conf/datax/config/ 前缀从源 ini 路径里派生 SRC_DST / PROJECT_LAYER_ENV,用于生成 JSON 输出路径和 datax-multiple-job-starter.sh 的日志目录。该目录已整体挪到 conf/bak/ 并 gitignore,新项目 ini 放在 jobs/raw/{domain}/ / jobs/ads/{domain}/ / manual/,前缀不匹配时剥离变成 no-op,输出会落到形如 conf/datax/generated/jobs/raw/trd/xxx.json 的位置——能跑但与新约定不符。

目标态:清理死路径派生逻辑,JSON 输出扁平化为 conf/datax-json/{ini_basename}.json(依赖 ini 文件名全局唯一),日志目录简化为 ${LOG_ROOT_DIR}/datax/${START_DATE}/${JOB_NAME}.log

已作废的子项(2026-04-22 方向反转):-env 参数 + datasource 按 env 分子目录 + 一套代码跑多环境。前期跨环境同步是常态(test 业务库 → prod HDFS),不存在"全局 env"概念,sync ini 里 dataSource = {db_type}/{env}-{实例简称} 显式指向具体 source ini;source ini 落位 datasource/{db_type}/{env}-{实例简称}.ini,扁平组织。代码侧 db_type 判定改按首段斜杠(= 父目录)在 plugin.py:37 + plugin_factory.py:34 落地(✅ 2026-04-22)。

2.6 DataX 入口脚本收口为两条命令 ✅ 2026-04-23

✅ 2026-04-23 落地:新入口 bin/datax-{hive-import,hdfs-export}-starter.{sh,py} + dw_base/datax/ 7 模块(path_utils / worker / partition / runner / batch / entry / cli);老 7 个脚本(datax-{single,multiple,multiple-hive}-job-starter.{sh,py} + datax-job-config-generator.py)整体删除。详见 kb/92 对应日 changelog。本节保留为上下文(kb/90 retention 规则)。

现状:DataX 入口分散,学习成本高:

  • bin/datax-single-job-starter.sh — 单 ini 执行
  • bin/datax-multiple-job-starter.sh — 批量 + 按 start-date/stop-date 范围展开
  • bin/datax-multiple-hive-job-starter.sh — Hive 表批量(语义与上者重合)
  • bin/datax-single-hdfs-job-starter.sh(如有残留) — 单次 HDFS 导出
  • bin/datax-job-config-generator.py — ini → json 翻译器(内部工具)
  • bin/datax-gc-generator.py — ini 元生成器(详见 §2.7)

目标态:顶层收成两个命令,命名锁定 bin/datax-hive-import-starter.sh / bin/datax-hdfs-export-starter.sh(对齐现有 spark-sql-starter / datax-*-starter 命名风格);参数按老入口一比一平迁、不引入新能力;底层 worker 选择 / 日志路径 / json 翻译下沉到 dw_base/datax/ 包内,调用方不感知。

顶层命令 语义 参数集(老入口平迁)
bin/datax-hive-import-starter.sh 目标=Hive(reader=外部 / writer=hdfs + 自动分区),对应 jobs/raw/ -ini <file> 单 ini · -inis <dir> 目录批量 · -start-date / -stop-date · -host / -random · -parallel · -skip-datax · -skip-partition(默认开启分区管理)· -t <db.table> 显式追加需建分区的表
bin/datax-hdfs-export-starter.sh 源=HDFS(reader=hdfs / writer=外部如 ES/Mongo/MySQL/Kafka),对应 jobs/ads/ 同上减 -skip-partition / -t;源路径存在性 check 沿用老 check_data_exists 默认 fail-fast 行为,不暴露开关

老参数不平迁(新入口移除):

  • -c / -cd / -jc / -jcd:json 输入形态不再对外暴露,入口自动调 ini→json 翻译
  • --override + 5 个脚本内 xxx_array=()partitioned_tables 等):数组在老 datax-multiple-hive-job-starter.sh 中本就是未激活的空壳(2026-04-23 查证),一并删除

实现建议

  1. 新增 dw_base/datax/{entry,runner,batch,worker,partition,path_utils}.py 6 个模块,把老 shell 脚本里的 worker 选择 / 批量展开 / 日志路径派生 / ini→json 翻译 / Hive 分区管理逻辑全部搬迁,两个 sh 只做 bash 环境初始化 + 参数解析 + 调 python
  2. 模块放 dw_base/datax/ 包内,不横跨 io/ / utils/ —— 放宽聚簇 B2 前置(四模块边界定稿前先落地)。等 B2 定稿后,path_utils.log_path 等纯函数、worker.ssh_run 中跨包部分再挪到 dw_base/utils/ / dw_base/io/,只改 import 路径
  3. -inis <dir> 目录扫描只递归 .ini 文件
  4. 老脚本 datax-{single,multiple,multiple-hive}-job-starter.{sh,py} + datax-job-config-generator.py 在冒烟 2(新入口端到端通过)后整体删,不保留兼容转发封装
  5. partition.py 预建分区的 dt 用 start_date(业务日),和 HDFS writer(见 hdfs_writer.py:23-26)对齐;分区内允许含次日漂移数据(按 ADR-03 raw 不纠正分区漂移,配套 raw 48h 宽窗机制)。早先方案曾用 stop_date - 1 day——单日窗口下与 start_date 数值相等没问题,但与 raw 宽窗(stop = 业务日+2)配合时分区会跑到业务日+1 与业务日错位,反向校正回 start_date

本轮不做、后延 ADR:下列能力属新增需求、非老入口平迁,暂不实现;若将来出现明确场景,单独开 ADR 落 kb/93。按优先级分:

高优先级(正式 raw 库上线前需要):

  • ✅ 2026-04-23 DataX 字段级脱敏(合规硬约束:敏感值必须不出业务库):规范做法 = ini 里 [mask] 声明式段(cert_birthday = month_trunc 等),代码侧 dw_base/datax/mask.py 翻译为 PG querySql 内 SQL 表达式下发源库执行,敏感原值从未离开业务库。PG 方言当前支持 5 种脱敏(静态:month_trunc / md5 / mask_middle;动态:keep_first_{n} / keep_last_{n})。优先级 手写 querySql > [mask] > table,手写用于复杂场景 override。不走 DataX 层 transformer / reader transform = 方案——这些路径是原值先进 DataX JVM 再变换,等同"已出业务库再脱",合规退步,本项目不采纳。MySQL / ClickHouse 等其他 reader 按需扩展 mask.py 顶层字典

低优先级(有明确批量回刷场景再做):

  • 日期范围自动按日展开 + N 个 json 分发多 worker(-start-date 20260401 -stop-date 20260410 → 自动切 10 份 json,每份独立选 worker)——可能不做,按天补数职责归 DolphinScheduler 更合理(见 kb/93 ADR-01 草案)
  • DataX 仅本机执行(砍 ssh 远端分发 + workers.ini + select_worker):分布式分发由 DolphinScheduler worker group 承担,DataX 不重复随机(见 kb/93 ADR-02 草案)
  • plugin_factory 解耦:当前工厂模块顶层 eager import 所有 reader/writer(mysql/pg/mongo/hdfs/es/kafka/hbase/ch/...),任一 plugin 的 top-level import 破损就连累整个 DataX 链路(2026-04-23 已踩:dw_base/database/ 删除后 mysql_readerMySQLColumn import 挂,挂一扯一串)。改造方向二选一:(a) lazy import:importlib.import_module(f"dw_base.datax.plugins.reader.{db_type}_reader") 按需加载;(b) registry pattern:每个 reader 用 @register("postgresql") 装饰器自注册,工厂解耦于具体 plugin 列表
  • ✅ 2026-04-23 ini 配置化脱敏 已落地 dw_base/datax/mask.py(PG 5 种脱敏),详见高优先级"DataX 字段级脱敏"条

第三条命令 datax-gc-generator(ini 元生成器)独立保留:用户已确认。职责是"从 PG 扫 schema 生成 ini 参考模板",和"执行 ini"不是一回事,不收口到上面两条里。详见 §2.7。

2.7 datax-gc-generator 从零重写 ✅ 2026-04-29

现状(凭查证 2026-04-18):bin/datax-gc-generator.py 支持 from ∈ {mysql, hdfs} × to ∈ {elasticsearch, hbase, hdfs, kafka, mongo, mysql},覆盖面大、代码沉重,且:

  • 全项目没有任何其他模块 import 或 shell 调用它(仅 3 处 SparkSQL('datax-gc-generator') 是自己设置 app name)
  • 内部走 MySQLHandler + MySQLDataSource + convert_mysql_column_types + MySQLReader.generate_hive_ddl() 一条链,与 §2.1 已登记废弃的"自动 DDL 生成"方向相冲突
  • 新项目新业务只需要 from=pg to=hdfs 一条路径,其他组合全是老项目遗产

定位参考模板生成器,不是"一键出可用 ini"。产物是开发者人工调整的起点 —— 常见修改包括字段剪裁(只同步用到的列)、WHERE 过滤条件、hivePartitions 配置、大表拆分策略等。开发者 diff 参考模板和自己的需求,改完再把成品 ini 提交到 jobs/raw/{domain}/

方向:整个文件废弃 + 从零重写 ✅ 2026-04-29 重写为 bin/datax-sync-template-gen.py(PG → HDFS sync 模板生成器,全字段参考模板,pg8000 直连查 schema,-o 三态:stdout / workspace 默认 / 自定义目录)

重写目标

  • 仅支持 from=pg to=hdfs
  • 读 PG 表结构(information_schema.columns + pg_catalog.pg_description 取字段注释)
  • 输出到开发者本地的 workspace/{yyyymmdd}/{name}.iniworkspace/.gitignore 排除,不入仓),作为参考模板(原始全字段 + 默认分区 + 默认 where=1=1);开发者裁剪后再把最终 ini 提交到 jobs/raw/{domain}/
  • 不再生成 DDL:DDL 统一由开发者按 21-命名规范.md 手写到 manual/ddl/{layer}/{domain}/(CLAUDE.md 单一来源约束,与 §2.1 已登记项一致)

目录约定

  • workspace/ 在仓库根,仅存在于开发者本地,整个目录进 .gitignore
  • workspace/{yyyymmdd}/ 按运行日期分子目录,便于开发者看"我今天生成了哪些候选"
  • manual/imports/{yyyymmdd}/ 的分工:manual/imports/ 放一次性执行的 SQL / ini(会入仓做审计证据,执行完归档),workspace/ 放自动化工具未经人工确认的中间产物(永不入仓)

拆除清单(实际执行时 scope 收紧——mysql 整体作为运行时同步代码保留,仅删纯 generator 死代码):

  • dw_base/database/mysql_utils.pylist_tables / list_columns 方法 ✅ 整 database/ 目录已于 2026-04-20 老业务批清理删除
  • dw_base/datax/datasources/mysql_data_source.py 保留(mysql 同步运行时 DataSource,与 PostgreSQLDataSource 同等地位;未来 mysql 同步可能用)
  • dw_base/datax/plugins/reader/mysql_reader.pygenerate_hive_ddl / generate_hive_over_hbase_ddl 方法 ✅ 2026-04-29 删
  • dw_base/datax/datax_utils.pyconvert_mysql_column_types ✅ 2026-04-29 整文件删(零外部引用)
  • 所有 mongo / kafka / hbase writer 在 generator 里的分支 ✅ 2026-04-29 随 generator 整文件删

注意:以上删除范围与 dw_base/datax/plugins/ 里仍在被真实采集任务调用的 reader/writer 不冲突 —— 真实采集任务只用到 HDFSReader / HDFSWriter / MongoWriter(如果还有 mongo 采集任务)。删之前要用 grep -r "from dw_base.datax.plugins.reader.mysql_reader" 再确认一次。

2.8 HDFS 数据源 ini 支持 HA nameservice(Path B 锁定

结论(凭 2026-04-18 新 CDH 环境实测):DataX hdfswriter 在新环境下连 HA 集群必须在 json 里显式提供 hadoopConfig 块,这是 §2.8 改造的动机。

为什么 HADOOP_CONF_DIR 对 DataX 无效(实测三连)

测试 json 内容 HADOOP_CONF_DIR 结果
1 defaultFS + 完整 hadoopConfig 未设置 ✅ 成功
2 只有 defaultFS 未设置 ❌ UnknownHostException: nameservice1
3 只有 defaultFS /etc/hadoop/conf ❌ 仍 UnknownHostException

hadoop fs -ls 命令能解析 nameservice1,是因为 hadoop shell 脚本把 $HADOOP_CONF_DIR 加入了 Java classpathhdfs-site.xml 才能被 Configuration 自动加载)。datax.py 启动 Java 时不做这件事,classpath 里不含 /etc/hadoop/conf,所以 DataX 的 Hadoop Configuration 对象是"干净的",只能靠 json hadoopConfig 块显式注入 HA 参数。

(老项目在上一个公司/服务器只写 defaultFS 也能跑通 HA,最可能的原因是运维把 hdfs-site.xml 塞进了 DataX 的 classpath 目录 —— 比如 datax/conf/ / datax/lib/ / 某 plugin 的 libs/。新环境 /opt/datax 下没有这类预置文件,不走这条路。)

dw_base/__init__.pyHADOOP_CONF_DIR + HIVE_CONF_DIR export 仅对 Spark 入口生效:(a) HADOOP_CONF_DIR:Spark on YARN 启动时 SparkSubmitArguments.validateSubmitArguments 会强校验该 env(不设直接抛 When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set,2026-04-20 实测)。(b) HIVE_CONF_DIR:Spark .enableHiveSupport() 找不到 hive-site.xml 时会静默回落 in-memory metastore——session 能起但看不到 HMS 真实库表(症状:describe test.xxx 在 Hive CLI 有,在 Spark 里 Table or view not found,2026-04-20 实测)。两者对 DataX JVM 子进程都无影响——一是 classpath 不含 conf 目录(见上),二是 DataX 由 datax-single-job-starter.sh 通过 python3 datax.py 启动,并未 import dw_base;DataX HA 靠 ini [hadoop_config] 节显式注入。早先 2026-04-18 曾把这两行当"死代码"删除(kb/92:158),仅从 DataX 视角判断忽略了 Spark 路径,2026-04-20 分两批恢复(HADOOP_CONF_DIR 在先,HIVE_CONF_DIR 补齐)。


正确的 json 形态

"writer": {
  "name": "hdfswriter",
  "parameter": {
    "defaultFS": "hdfs://nameservice1",
    "hadoopConfig": {
      "dfs.nameservices": "nameservice1",
      "dfs.ha.namenodes.nameservice1": "nn1,nn2",
      "dfs.namenode.rpc-address.nameservice1.nn1": "192.168.33.61:8020",
      "dfs.namenode.rpc-address.nameservice1.nn2": "192.168.33.62:8020",
      "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    },
    ...
  }
}

ini 新格式

非 HA / 单 NN(dev / test 环境)

[base]
defaultFS = hdfs://192.168.33.61:8020

HA / nameservice(prod 环境)

[base]
defaultFS = hdfs://nameservice1

[hadoop_config]
dfs.nameservices = nameservice1
dfs.ha.namenodes.nameservice1 = nn1,nn2
dfs.namenode.rpc-address.nameservice1.nn1 = 192.168.33.61:8020
dfs.namenode.rpc-address.nameservice1.nn2 = 192.168.33.62:8020
dfs.client.failover.proxy.provider.nameservice1 = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

设计要点

  • [hadoop_config] 节的 key 原样照搬 /etc/hadoop/conf/hdfs-site.xml,不做缩写 / 翻译 —— 运维直接复制粘贴,出了问题可以字段级对比,零心智成本
  • [hadoop_config]可选:不写即视为非 HA,代码不注入 hadoopConfig,生成的 json 只有 defaultFS,与单 NN 环境兼容
  • 将来若要支持多 nameservice(federation),[hadoop_config] 节天然兼容(多写几组 key 即可),无需再改 ini schema

代码改造清单

文件 改动
dw_base/datax/datasources/hdfs_data_source.py 覆写 get_datasource_dict():父类逻辑外,检测 [hadoop_config] 节是否存在;存在则把整节作为 dict 塞进 ds_dict['hadoopConfig']
dw_base/datax/plugins/plugin.py 不用改 —— load_data_source()for key, value in ds_dict.items() 里,value 是 dict 时走 Python 原生赋值,json 序列化自然成立
dw_base/datax/plugins/reader/hdfs_reader.py / writer/hdfs_writer.py 不用改defaultFS + hadoopConfigload_data_source() 自动注入到 parameter
dw_base/__init__.py 保留 os.environ['HADOOP_CONF_DIR'](Spark on YARN 启动强校验)+ os.environ.setdefault('SPARK_CONF_DIR', '/etc/spark/conf')(pip pyspark 默认指向自身空 conf/,需显式指到集群配置才能加载 hive-site.xml);DataX JVM 不读这些,HA 靠 ini [hadoop_config]。详见 01-运行环境.md §4
bin/datax-gc-generator.py §2.7 从零重写时一并处理,这里不单独列
datasource/hdfs/{env}-{实例简称}.ini 按新 schema 新建(HA 集群带 [hadoop_config];单 NN 只写 [base] defaultFS

回归测试

  • prod HA 集群:新 ini 跑一次真实 PG→HDFS 任务,生成 json 含完整 hadoopConfig,任务成功(2026-04-18 已用手写 json 预验证)
  • NameNode 主备切换期间跑一次,DataX 自动切到 standby(HA 的原始动机)
  • dev / test 单 NN 集群:不写 [hadoop_config],生成 json 只含 defaultFS,任务成功

2.9 DataX 速率控制外移到 conf(中优先级)

现状dw_base/datax/job_config_generator.py:60-67 硬编码分时调度:

local_time = int(datetime_utils.formatted_now('%H%M'))
if 750 < local_time < 1900:
    speed = self.get_speed(10, byte=10485760, record=40000)          # 白天 10 channel × 10 MB/s
    core_speed = self.get_core_speed(byte=10485760, record=40000)
else:
    speed = self.get_speed()                                          # 夜间 6 channel × 256 MB/s
    core_speed = self.get_core_speed()

问题

  • 白天 / 夜间的边界(0750、1900)、channel 数、单 channel 速率全写死在代码里,想调一个值要改 py 源码重新发布
  • 不同业务域 / 不同表的速率诉求可能不同,现在一刀切
  • 生产偶发突发(白天要抢刷一次全量)没有临时放大速率的口子
  • 硬编码的动机在代码里一字没留(为什么 0750-1900?避开业务高峰?避开实时链路?后人读代码猜不到)

目标:把速率配置抽到 conf/datax-speed.iniJobConfigGenerator 运行时读;默认值 = 现硬编码值,保持向后兼容。

conf 格式(ini,按时间段分段,自上而下匹配,未命中走 [default]):

; conf/datax-speed.ini
; 速率档位定义:从上到下依次匹配,第一条命中的为准;无匹配走 [default]

[daytime]
hours = 07:50-19:00
channel = 10
byte_per_channel = 10485760       ; 10 MB/s/channel,避开业务高峰
record_per_channel = 40000

[default]
channel = 6
byte_per_channel = 268435456      ; 256 MB/s/channel,夜间放满
record_per_channel = 100000

代码改造

  • JobConfigGenerator.__init__()conf/datax-speed.ini
  • 用当前时间在各段 hours 里找匹配,取 channel / byte_per_channel / record_per_channel
  • assemble() 里的分支逻辑改为 speed = self.resolve_speed_profile() 一行
  • conf/datax-speed.ini 时在文件头注释里把"白天是为了避开业务高峰"的动机写清楚(消除默认值的来历盲区)

扩展方向(P2,先不做):

  • 支持按 ini 名(业务表)在 conf 里覆盖特定任务的速率
  • 支持命令行 -speed fast / -speed slow 手动切档(突发高峰 / 限流时用)

2.10 dw_base/ 四模块边界(common / utils / io / ops) [聚簇 B(B2)]

背景:老项目 dw_base/ 下子目录职责混乱 —— utils/ 里塞过 Excel/Hive 读写、小文件合并;database/ 又和 datax/datasources/ 重叠;缺少 I/O 边界和"数据湖运维"这类长期职责的收纳点。重组把全部非业务代码(非 spark/ / udf/ / datax/ 等明确功能域)按边界属性归入四个顶层模块,规则如下:

模块 边界属性 典型内容 反例(不该放在此处)
common/ 常量 / 全局上下文 / 无行为的元数据 颜色常量、枚举、模板路径常量、项目级单例 context 任何"动词"型函数 —— 那是 utils/
utils/ 纯函数,无外部副作用,可纯 Python 单测 日期格式、字符串切分、配置解析、数据结构转换 读/写 DB / 文件 / HDFS —— 那是 io/
io/ 与外部系统通信的边界 DB connector 薄封装、CSV/JSON/Excel 读写、HDFS 文件读写 数据湖维护动作(合并小文件、清理分区)—— 那是 ops/
ops/ 数据湖维护动作,会改变湖里的物理状态 小文件合并、分区清理、统计信息刷新、存储压缩 一次性的业务查询脚本 —— 那走 jobs/manual/

落地规则

  1. 写新代码前先对号入座:如果一个函数既读文件又做纯计算,按最强副作用归类(读文件 → io/,即便函数里 90% 是计算)
  2. io/ 只做薄封装:不把业务 schema 嵌入 io/(老 mongodb_utils.py 把公司名查询嵌进去就是反例),业务 schema 留在调用方
  3. ops/ 里的函数预期被多处复用:只被一张表调用一次的"清分区"走 manual/adhoc/,不进 ops/
  4. 跨模块依赖方向ops/ 可依赖 io/ + utils/ + common/io/ 可依赖 utils/ + common/utils/ 只依赖 common/common/ 不依赖任何项目内模块。禁止反向依赖

B2 先于 B4 / C:新占位模块(io/ops/pm/dq/sync)和 bin/ 收口都假设四模块边界已定;边界不定就先别新建骨架,否则返工成本高。

2.11 新占位模块 registry(B4) [聚簇 B(B4)]

2026-04-20 本批建立的占位模块(空骨架,实现待后续阶段),登记在此便于跨会话追踪:

模块 路径 状态 何时实现 对应阶段
io/ dw_base/io/{db,file,hdfs}/ 骨架 B2 四模块边界定稿后,搬入 db/file/hdfs 读写封装 阶段 2
ops/ dw_base/ops/ 骨架 阶段 4 随"重新实现小文件合并 / 分区保留工具"落地 阶段 4
pm/ dw_base/pm/ 骨架 待确认 TAPD vs Jira 后实现(commit → 任务 ID 联动) 阶段 4 之后
dq/ dw_base/dq/ 骨架 待确认告警落点 + 首批规则后实现 阶段 4
wiki/ dw_base/wiki/ 骨架 待确认 Docmost API 鉴权 / webhook 支持后实现 阶段 4 之后
tests/ tests/{unit,integration}/ 骨架 阶段 4 首批 UDF 单测启动时同步补 conftest.py + fixtures 阶段 4

2.12 通用 UDF 自查表(B 延伸) [聚簇 B]

现状dw_base/udf/common/spark_common_udf.py 462 行,其中 @udf 装饰的注册函数 13 个(启动日志 注册 Python UDF xxx 条数即真值),另有若干普通 def 辅助函数不进 SparkSQL 注册表。函数注释已由开发者手动补完(编号 UDF-XX 顺序注入、分类分节 JSON / ARRAY / STRING / NUMERIC-DATE-HASH / CROSS-TYPE),本节不再规划"注释补齐"动作。

问题:未来新增通用 UDF 如果没有登记表,规模大了之后"这个函数有没有 / 叫什么 / 谁加的"全靠 grep。

目标态

  • UDF 自查表kb/31-UDF手册.md(与 30-开发规范.md 同级独立文档)。表头 函数编号 | 函数名 | 分类 | 入参 | 返回 | 描述 | 示例,函数编号沿用 spark_common_udf.py 中的 UDF-XX 顺序注释。common 与 business 两类 UDF 都在本表登记,分两段(§1 通用 / §2 业务)。初版 13 个通用 UDF 已登记(2026-04-21)。

回归检验

  • 任意 SQL 文件直接 SELECT my_udf(col) 能跑通(common auto-load 链路未变)
  • 自查表 §1 行数 = spark_common_udf.py@udf 注册函数数(启动日志中 注册 Python UDF 的条数即真值)

与其他条目的关系

  • 2026-04-20 UDF 模块重组(kb/92)已完成重组动作,本节是其延伸(登记表)
  • 不动 auto-load 机制(bin/spark-sql-starter.py + dw_base/__init__.py:29 COMMON_SPARK_UDF_FILE 常量),只补文档

三、__init__.py 瘦身(已完成 · 修剪式) [聚簇 B(B1)]

2026-04-21 完成形态:原地修剪,不拆 core/dw_base/__init__.py 从 127 行 → 83 行。

落地动作

  • import findspark + findspark.init():pip pyspark 已在 venv site-packages 直接覆盖 import pyspark,findspark 在 CDH 节点上即便间接反推出 $SPARK_HOME 也只是把同版本 parcel pyspark 前插进 sys.path,业务表现零差异(冒烟已确认)
  • 删 21 个外部零引用的颜色常量:CHG_BOLD / NORM_BLU / NORM_WHT / 7×BOLD_* / 7×BGRD_*(if/else 两分支同步删)
  • IS_RUN_BY_NORMAL_USER 状态变量(两处赋值外部均无引用)

未拆分为 core/env.py / core/colors.py / core/spark_env.py 的理由:findspark 删除后"findspark 按需 import"的懒加载诉求大半消失;剩余颜色/env 检测拆三文件需改 11 处调用点 import,收益比修剪方案差,暂不做。若后续单元测试对 "import dw_base 零副作用" 有刚性需求再拆。

最终 __init__.py 保留PROJECT_ROOT_PATH / PROJECT_NAME / bootstrap_env() / sys.path.append / HADOOP_CONF_DIR / SPARK_CONF_DIR / PYSPARK_* / COMMON_SPARK_UDF_FILE / BANNED_USER 退出守卫 / IS_RUN_BY_RELEASE_USER / IS_RUN_IN_RELEASE_DIR / 启动 print / 6 个实际被引用的颜色常量(DO_RESET / NORM_RED / NORM_GRN / NORM_YEL / NORM_MGT / NORM_CYN)。

2026-04-24 追加清理cow_says() 函数(os.system source bin/common/functions.sh)已删——bin 层 bash 壳整体清理时确认其为 no-op(子 shell source 后立即退出,无任何可见效果)。

四、代码风格修正(中优先级) [聚簇 B(B3)]

4.1 __contains__ 反模式

全项目大量使用:

if config.__contains__(key):       # 反模式
if self.REGISTERED_UDF.__contains__(name):

应改为:

if key in config:                  # Pythonic
if name in self.REGISTERED_UDF:

4.2 Shell / Python 重复逻辑

bin/common/init.shdw_base/__init__.py 有大量重复的环境检测逻辑(用户判断、路径判断、日志目录、颜色常量)。

建议: 统一由 Python 入口处理,Shell 脚本仅做最小化的环境设置后调用 Python。或提取为一份共享的配置文件。

五、清理废弃代码(中优先级) [聚簇 F]

截至 2026-04-20 本节已无待清理项。后续若发现新的废弃代码,在下方表格追加登记;已完成项保留在"历史档案"表中留档。

5.1 待清理(当前为空)

模块/文件 状态 建议
(无)

5.2 历史档案(已完成)

模块/文件 完成动作 完成时间 / 来源
dw_base/{validation,ml,flink,elasticsearch}/ 四空壳模块 整目录删(仅 56 字节空 __init__.py,零引用) 2026-04-20,见 92-进度.md
dw_base/database/mongodb_utils.py 整文件删(184 行 = 19 行 MongoClient 薄包装 + 165 行注释,零外部引用,需要时 MongoClient(uri) 一行重写) 2026-04-20,见 92-进度.md
dw_base/database/ 整目录 整目录删(__init__.py + 仅剩的 mysql_utils.py,zero 外部引用;mysql 读写后续在 dw_base/io/db/ 下重建) 2026-04-21
5 个占位模块 README(dq/ io/ ops/ pm/ sync/ 批量删,占位模块 docstring 即文档,不再要求单独 README 2026-04-21
kb/README.md 整文件删,内容合入项目根 README.md(项目现状速读 / 文档索引 / 阅读建议三节),CLAUDE.md 引用同步更新 2026-04-21
conf/datax/ 下老项目遗留 ini / datasource 样例 整批挪入 conf/bak/datax/{config,datasource}/,由 .gitignore:6 conf/bak 拦截不入库 项目初始化 8d2ade5(2026-04-17)
bin/flume-control.sh 整文件删(194 行)。职责:管理单个 Flume-Kafka-HDFS 作业生命周期(log/monitor/start/status/stop/restart/start-all/stop-all/restart-all 一组子命令)。实际状态:顶部 shebang 损坏(ho#!/bin/bash)、依赖已删除的 bin/wechat-work-alert.sh、L64 与 L162 conf/flume/*.properties vs conf/flume/config/*.properties 路径自相矛盾,整体已不可跑。按需重建时参考:conf/flume/config/*.properties 为作业配置约定、flume-ng agent --conf-file 为启动命令、启动前需 mkdir -p ${LOG_ROOT_DIR}/flume-agent/${TODAY}、监控循环 `head -n 1000 log grep -E "gz failed|java.io.IOException|org.apache.flume.ChannelException|java.lang.IllegalStateException"命中则发企微告警。Kafka→HDFS 接入通道重建时按新conf/alerter.ini` 告警外配重写,不沿用老 SKB_LITTLE_CUTE / 手机号硬编码

代码里残留的 conf/datax/config/ replace 死逻辑 + conf/datax/generated 默认值,属于 §2.x 路径硬编码清理范畴(改名为 conf/datax-json/ + 删 replace),不在本节。

六、测试体系搭建(中优先级) [聚簇 D]

6.1 现状

  • 仅 UDF 有少量 pytest 测试
  • 核心模块(SparkSQL、DataX 配置生成)无测试
  • 无 CI/CD 集成
  • 2026-04-20 已建 tests/{unit,integration}/ 骨架 + README(.gitkeep 占位,无实测用例),见 §2.11 占位模块 registry

6.2 建议的测试结构

tests/
├── conftest.py                    # pytest 公共 fixtures
├── unit/
│   ├── test_udf_common.py         # 通用 UDF 单测(纯函数,不依赖 Spark)
│   ├── test_udf_business.py       # 业务 UDF 单测(如有,按文件组织)
│   ├── test_config_utils.py       # 工具函数单测
│   ├── test_datetime_utils.py
│   ├── test_sql_utils.py
│   └── test_datax_generator.py    # DataX ini→json 生成测试
├── integration/
│   ├── test_spark_sql.py          # SparkSession local[*] 模式集成测试
│   └── test_hive_utils.py
└── quality/
    └── test_data_quality.py       # 数据质量校验(行数、空值率、主键唯一性)

6.3 测试策略

  • UDF 单测:纯 Python 函数,直接 assert,不需要 Spark 环境
  • DataX 配置生成测试:给定 ini 文件,断言生成的 JSON 结构正确
  • Spark 集成测试:使用 local[*] + 内存 Hive(enableHiveSupport() 需要 Hive MetaStore,可用嵌入式 Derby)
  • 数据质量:在 DolphinScheduler 工作流中加入校验节点

七、其他建议 [聚簇 D + A 杂项]

7.1 依赖管理(已精简)

状态:2026-04-15 已完成首轮审计与精简。老清单 48 行 → 新清单 10 个强依赖,详见根目录 requirements.txt;原始快照备份在 requirements.txt.bak,并在注释里给每一行打了 [KEEP/DROP/LAZY/STDLIB] 结论。

精简策略

分类 处理方式 代表包
强依赖(KEEP) 留在 requirements.txt pyspark / pandas / pymongo / PyMySQL / requests / PyYAML / python-dateutil / wheel / pytest
无引用(DROP) 直接移除 openvino / transformers / scikit-learn / scipy / numpy / Flask / matplotlib / lxml / SQLAlchemy / jieba / cpca / openpyxl / xlrd / 等 20+ 个
stdlib 冗余(STDLIB) 移除 backport configparser —— Python 3 标准库自带,backport 安装反而会覆盖 stdlib
弱依赖(LAZY) 不写进 requirements,用到时手动 pip install elasticsearch / pyhive / redis / cryptography / oss2 / fuzzywuzzy / pypinyin —— 都只被即将清理的老业务代码引用

后续事项

  • LAZY 类依赖关联的老代码:get_oldmongo_* / mg2es/ / ent_interface_dingtalk* 于 2026-04-20 第一批提前清理;同日第二批清理 dw_base/oss/ 整目录、dw_base/scheduler/ 整目录(含 polling_scheduler / drop_partitions / drop_daily_full_snapshot_tbls)、dw_base/hive/ 整目录、dw_base/utils/ 7 文件(data_distinct / diff_utils / excel_to_hive_utils / hive_diff_database / hive_to_excel_utils / pdt_check_table*);剩余 customs/similarity.py 等在阶段 4 / 阶段 5 一并清理
  • 不需要 requirements-base.txt / requirements-dev.txt 分文件——当前依赖规模下单文件已经足够
  • pyspark==2.4.0 固定(对齐 CDH 6.3.2 parcel 自带版本):pip install 把 pyspark 装进解释器 site-packages 解决 PyCharm 远程解释器静态索引红线 + 本地 pytest;集群运行时同一解释器的 site-packages pyspark 直接命中,HMS 走 SPARK_CONF_DIR=/etc/spark/conf/hive-site.xml(见 01-运行环境.md §4
  • findspark 已于 2026-04-21 随 §三 瘦身删除(冒烟确认其在 CDH + pip pyspark 情境下只做同版本同路径覆盖,行为无差异,详见 92-进度.md changelog)

7.2 日志改进

  • pretty_print() 混合了控制台输出和文件写入,职责不清
  • Logging 类定义了但很少使用
  • 建议:统一使用 Python logging 模块,配置 handler 实现控制台+文件双输出

7.2.1 日志路径约定

日志文件统一落 ${LOG_ROOT_DIR}/{module}/{dt}/{file}.log

  • LOG_ROOT_DIR 默认 ${HOME}/log,外配在 conf/env.sh
  • {module} 取值 datax / spark / ds / csv / export 等顶层入口名
  • {dt} 格式 yyyymmdd

待建log_path(module, dt, file) 工具函数 Python / Shell 各一份,避免入口脚本各自拼。

7.3 部署改进

  • publish.sh 使用 re-all 命令(自定义的 SSH 分发脚本)全量同步
  • 建议:考虑引入版本化部署(tag + 软链接切换),便于回滚

7.4 DataX 限速逻辑

job_config_generator.py 中根据时间段(7:50-19:00 白天限速,其余时间放开)动态设置 DataX 传输速率:

if 750 < local_time < 1900:
    speed = self.get_speed(10, byte=10485760, record=40000)   # 白天低速
else:
    speed = self.get_speed()                                   # 夜间高速

建议:将时间段和速率配置化,避免硬编码。

7.5 Spark / HMS 侧 Ranger Hive 策略验证(低优先级)

背景02-权限与账号.md §1 链路 B 原先假设 HMS 侧也挂载 Ranger Hive Plugin,从而 PySpark 直连 HMS 的库/表/列操作仍受 Ranger Hive 管辖。但在 CDH 6.3.2 默认部署中,Ranger Hive Plugin 只挂载在 HiveServer2,HMS 服务端未必部署;若 HMS 未挂,PySpark / spark-submit 的 SQL 层授权会绕过 Ranger Hive,仅剩 NameNode 上的 Ranger HDFS Plugin 做数据平面兜底 —— 与同版本 Impala 的"无 Ranger 细粒度授权,只能 HDFS 兜底"规律一致。

代码侧查证已完成:仓库内 dw_base/spark/spark_sql.py:167 等 8 处 .enableHiveSupport(),Spark 确实走 HMS 读元数据,不走 HS2,因此是否受 Ranger Hive 管辖完全看集群侧 HMS 是否挂插件。

集群侧验证动作(待做)

  1. 登 HiveMetastore 节点,查 hive-site.xml:有无 hive.metastore.pre.event.listeners = org.apache.ranger.authorization.hive.authorizer.RangerHiveMetastoreAuthorizer
  2. 查 HMS 进程 classpath 里是否包含 ranger-hive-plugin-*.jar
  3. 从开发环境跑一条故意触发 column 级禁权的 PySpark SQL,看是否被拒 —— 端到端佐证
  4. 结论回填到 02-权限与账号.md §1 链路 B 关键点段 + 关键点段里的 mermaid Note

后续处理:若 HMS 未挂 Ranger Hive,调研补挂成本 + 评估现有 HDFS 兜底是否足够(大部分数仓读写场景下足够,因为 PySpark 任务绝大多数以受控 Unix 账号提交、权限粒度粗即可;若要满足敏感列屏蔽类需求则必须补挂)。

八、聚簇推进视图

替换原 P0-P3 线性优先级表。按聚簇 A-F 组织(定义见 §〇),同一聚簇内部不强排序,跨聚簇只标强依赖。

A 配置外移 / 硬编码清理

子项 状态 依赖 参见
conf/env.sh(LOG_ROOT_DIR / RELEASE_USER / RELEASE_ROOT_DIR / PYTHON3_PATH / DATAX_HOME) 待启动 §2.1 / §7.2.1
conf/workers.ini(DataX Workers + 权重 map 外移) ✅ 2026-04-23 §2.1
conf/alerter.ini(告警 Webhook,入库) 待启动 旧告警代码删除(已 2026-04-20 完成) §2.1
conf/spark-defaults.conf(底层 11 条)+ conf/spark-tuning.conf(调优 10 条)+ spark_sql.py 三级覆盖 ✅ 2026-04-21 §2.3
conf/datax-speed.ini(DataX 分时速率) 待启动 §2.9
datasource 扁平化 {db_type}/{env}-{实例简称}.ini + db_type 按父目录段判定 ✅ 2026-04-22 §2.1 / §2.5
DataX 脚本去前缀剥离 + JSON 输出目录改名扁平化 conf/datax-json/{ini_basename}.json 待启动 §2.1 / §2.5

B dw_base/ 重组

子项 状态 依赖 参见
B1 __init__.py 瘦身(修剪式) ✅ 2026-04-21 §三
B2 common/utils/io/ops 四模块边界定稿 待启动 §2.10
B3 __contains__in 全局替换 待启动 §4.1
B3 Shell/Python 环境检测去重(bin/common/init.shdw_base/__init__.py 待启动 B1 §4.2
B3 mysql_utils.py SQL 注入修复 ✅ 2026-04-21 随 dw_base/database/ 整目录删自动完成;mysql 读写后续在 dw_base/io/db/ 下重造 见 §5.2
B4 新占位模块 io/ / ops/ / pm/ / dq/ / wiki/ 骨架 ✅ 2026-04-20 已建(2026-04-21 去掉各模块 README,docstring 即文档) B2 边界规则先立 §2.10 §2.11

C bin/ 入口收口

子项 状态 依赖 参见
bin/publish.sh(从项目根挪入) ✅ 2026-04-20 完成 §2.1 行
bin/excel_to_hive.py 删除(有需求重做) ✅ 2026-04-20 完成 changelog
bin/csv-to-hdfs-starter.py 实现 待启动 A 大部分就绪 kb/30 §4.3
bin/datax-import + bin/datax-export 两命令收口 待启动 B2 四模块边界 / A datax 路径解耦 §2.6
bin/datax-gc-generator.py 从零重写 待启动 datasource 多环境就绪 §2.7

D 基础设施

子项 状态 依赖 参见
tests/{unit,integration}/ 骨架 + README ✅ 2026-04-20 完成 §2.11 / §六
tests/unit/udf/test_spark_common_udf.py 首批单测 待启动 tests 骨架 + conftest.py §六
DataX 配置生成单测 待启动 §六
告警模块重写(弃钉钉 → conf/alerter.ini 待启动 conf/alerter.ini §2.1 / 阶段 4
日志模块统一(Python logging 双输出 / log_path 工具) 待启动 conf/env.sh LOG_ROOT_DIR §7.2 / §7.2.1
Hive HDFS 小文件合并工具重新实现 待启动 B2ops/ 阶段 4
分区保留工具重新实现(元表驱动 + 参数化) 待启动 B2ops/ 阶段 4
dq/ 数据质量规则首批 + runner 待启动 conf/alerter.ini 就绪 §2.11
pm/ TAPD / Jira 集成(commit → 任务 ID) 待启动 确认平台选型 §2.11
wiki/ Docmost → kb/inbox/ 待启动 Docmost API 鉴权确认 §2.11
Ranger Hive 策略验证(集群侧) 待启动 §7.5

F 老代码删除(阶段 5)

kb/92-重构进度.md 阶段 5 checklist。前置:新业务 SQL 在生产稳定运行一个完整周期(新业务 SQL 开发不属于重构 scope,详见 kb/92 变更记录)。

当前推进建议

本阶段可并行开工(无前置阻塞):

  1. A 大部分子项(env.sh / workers.ini / alerter.ini / datax-speed.ini)
  2. B2 四模块边界定稿(只需写决策,不改代码)
  3. B3 代码风格修正(__contains__ 全替换)
  4. D 首批 UDF 单测(tests 骨架已建)

等待前置

  • C bin/ 两命令 / datax-gc-generator 重写 ← B2 + A datax
  • D ops/ 下两个工具 ← B2
  • F launch-pad/ 整删 ← 新业务 SQL 生产稳定一个完整周期(新业务开发本身不属于重构 scope)

九、待讨论议题

以下议题已登记为未来专题讨论对象,暂不展开、不纳入聚簇推进:

  • 分布式任务分发:DolphinScheduler worker 分发机制 × 新入口 dw_base/datax/ 内部实现(worker 选择 / 批量展开 / 日志路径 / ssh 并发)的交互关系。