# 重构路线 > 基于老项目 `tendata-warehouse-release` 的代码分析,为新项目 `poyee-data-warehouse` 规划的重构路线。 > 本文档说"为什么改、怎么改";配套的 `92-重构进度.md` 说"改到哪一步了"。 ## 一、模块重命名(高优先级) ### 1.1 `tendata` → `dw_base` **影响范围:** | 类型 | 涉及内容 | 处理方式 | |------|---------|---------| | Python import | `from tendata import *`、`from tendata.xxx import yyy` | 全局替换 | | SQL 中的 ADD FILE | `ADD FILE tendata/spark/udf/xxx.py` | 全局替换 | | zip 打包命令 | `zip -qr tendata.zip tendata` | 改为 `dw_base.zip dw_base` | | addPyFile | `sparkContext.addPyFile('tendata.zip')` | 改为 `dw_base.zip` | | 路径匹配正则 | `re.sub(r"tendata-warehouse.*", ...)` | 更新为新项目名 | | 目录名引用 | `PROJECT_ROOT_PATH` 相关逻辑 | 自动适配 | **建议做法:** 先用脚本批量替换,再逐文件审查。注意 `tendata` 字样可能出现在老项目的 Hive 数据库名、表名或 DataX 数据源 ini 名中,这些**属于老业务数据**、不应替换——新项目业务 SQL 从零开发,不涉及老库老表。 ### 1.2 `launch-pad/` → `jobs/`(不做业务迁移,仅建立新结构) **重要澄清**:老项目 `launch-pad/` 中的业务代码**与新项目业务完全无关**(属于上个项目的历史业务),**不存在内容迁移**。`launch-pad/` 在重构期间作为**样板 SQL 代码的参考**(看写法、看 `${dt}` 用法、看 DataX ini 结构),新项目所有业务 SQL 从零开发完成后,`launch-pad/` 整体删除。 **新 `jobs/` 目录**按数仓分层 × 业务域二维组织,结构见 `00-项目架构.md` §5 的样板。业务域代码统一使用命名规范定义的 `trd`/`usr`/`prd`/`shp`/`pub`/`dim`(见 `21-命名规范.md` §3.2)。 **需要处理的代码引用**: - 脚本中 `list_files('launch-pad/...')` 的硬编码路径 → 改为 `jobs/...` - `zip -qr launch-pad.zip launch-pad` 类命令(如有)→ 改为 `jobs` - DolphinScheduler 工作流里老路径的引用 → 新项目上线时一并替换 ## 二、消除硬编码(高优先级) ### 2.1 当前硬编码清单 | 硬编码内容 | 所在位置 | 建议方案 | |-----------|---------|---------| | `DATAX_HOME=/opt/module/datax` | `bin/common/init.sh` | 移入 `conf/env.sh` 或环境变量 | | `PYTHON3_PATH="/usr/bin/python3"` | `bin/common/init.sh` | 移入 `conf/env.sh` | | `RELEASE_USER="alvis"` | `bin/common/init.sh` | 改为 `RELEASE_USER="bigdata"` 并移入 `conf/env.sh` | | `RELEASE_ROOT_DIR="/home/alvis/release"` | `init.sh`、`__init__.py` | 改为 `/home/bigdata/release` 并移入 `conf/env.sh` | | 项目部署目录 `poyee-data-warehouse/` | `publish.sh` | 新项目发布目录为 `/home/bigdata/release/poyee-data-warehouse/` | | `DATAX_WORKERS=(m3 d1 d2 d3 d4)` + `DATAX_WORKERS_WEIGHTS` 权重 map | `init.sh:18-31`(含展开 `DATAX_WORKERS_QUEUE` 的循环) | workers 列表 + 权重 map **整体**移入 `conf/workers.conf`(ini 或 yaml 格式),`init.sh` 仅保留读取 + 展开逻辑 | | `HADOOP_CONF_DIR='/etc/hadoop/conf'` | `__init__.py` | 使用系统环境变量 | | `LOG_ROOT_DIR="/opt/data/log"` + whoami 分流 | `init.sh`、`__init__.py` | 删除 whoami 分支,单值改为 `${HOME}/log` 并迁入 `conf/env.sh`,见 §7.2.1 | | 告警 Webhook(钉钉 / 企微 Key) | `dw_base/common/alerter_constants.py`(老告警模块已于 2026-04-20 删除,含 `dingtalk_notifier.py` / `ent_interface_dingtalk*` / `bin/dingtalk-work-alert.sh`) | 新告警模块重写时 Webhook Key 外移到 `conf/alerter.ini`(**入库**——部署靠 git pull,gitignore 会拉不到;webhook key 不算高敏感,最多被拿去发垃圾消息),Python 侧改 ConfigParser 加载;`alerter_constants.py` 整个删除;新项目不再使用钉钉 | | Spark 默认参数(executor/driver/shuffle/sql.*) | `dw_base/spark/spark_sql.py` 构造函数 + `.config(...)` 链 | 移入 `conf/spark-defaults.yaml`,SQL 文件可用 `SET` 覆盖,见 §2.3 | | DataX ini 路径前缀剥离 `conf/datax/config/` | `bin/datax-single-job-starter.sh`(TEMP 处理)、`bin/datax-job-config-generator.py`(`replace('conf/datax/config/', '')`)、`bin/datax-multiple-job-starter.sh`(日志路径派生) | 原目录已整体挪到 `conf/bak/` 并 gitignore,脚本里 replace 现在是 no-op 死逻辑。去除前缀假设,改为靠 ini 文件名(= 任务唯一标识,见 `21-命名规范.md` §3.9)识别用途 | | DataX 生成 JSON 输出目录名 `conf/datax/generated` | `bin/datax-job-config-generator.py` 末尾 `default_output_dir`、`bin/datax-single-job-starter.sh` 第 89/118 行、`bin/datax-multiple-job-starter.sh` 第 187 行、`.gitignore` | 目录改名 `conf/datax-json/`;子路径扁平化为 `conf/datax-json/{env}/{ini_basename}.json`(仅按 env 分一级,去掉 src_dst / project_layer_env 等派生层级);`.gitignore` 同步改 | | JOB_NAME / JSON 文件名的 `ini→json` 转换逻辑重复实现 | Python 侧 `bin/datax-job-config-generator.py:126`(`os.path.basename(gcf).replace('.ini', '.json')`)+ Bash 侧 `bin/datax-single-job-starter.sh:88`(`basename .ini`) | 合一到 `dw_base.datax.path_utils.job_name_from_ini()`(或类似工具);Bash 侧通过 `python3 -c` 调用或在 `bin/common/init.sh` 定义等价 shell 函数,单一来源 | | DataX 脚本不支持 `-env` 参数 | `bin/datax-*.sh` / `bin/datax-job-config-generator.py` 参数解析 | 新增 `-env` 参数,由 ini 内 `dataSource = {db_type}/{instance}` 拼接成 `datasource/{db_type}/{env}/{instance}.ini` 的完整路径;详见 §2.5 | | `datasource/` 单层组织(无环境子目录) | `datasource/{db_type}/{instance}.ini` | 改为 `datasource/{db_type}/{env}/{instance}.ini`(运维侧权限隔离 + 支持一套代码跑多环境) | | ini 里 `dataSource` 字段拼接环境后缀 | 老项目写法 `dataSource = pg-hobby-prod` | 改为 `dataSource = {db_type}/{instance}`(不含环境),env 由脚本注入 | | 导出类 ini 扇出撞名风险 | `jobs/ads/{域}/` 下 ini 若都以源 Hive 表名命名,同一张 ads 表扇出到多个目标库时会重名覆盖 | 命名规则改为 `{源 Hive 表名}__{目标 db_type}_{目标 instance}.ini`(双下划线分隔源/目标),见 `21-命名规范.md` §3.9 | | `dw_base/common/template_constants.py` 大量死代码 | 定义了 20+ 个 SQL 模板路径常量,实际只有 2 个(`MYSQL_HIVE_CREATE_TABLE_TEMPLATE` / `MYSQL_HIVE_HBASE_CREATE_TABLE_TEMPLATE`)被引用,其余 18 个零 import | 整个文件删除;连带废弃下一条 | | `MySQLReader.generate_hive_ddl()` / `generate_hive_over_hbase_ddl()` 自动建表 DDL 路径 | `dw_base/datax/plugins/reader/mysql_reader.py:195/222`,被 `bin/datax-gc-generator.py:616/728` 调用;且 `conf/template/` 目录在新项目根本不存在,真调用会 FileNotFoundError | 整段路径废弃——与 CLAUDE.md 约定的 `manual/ddl/` 是 DDL 唯一来源相冲突。`datax-gc-generator.py` 仅生成 ini 配置,不再输出 CREATE TABLE DDL;DDL 由开发者按 `21-命名规范.md` 手写到 `manual/ddl/` | | 缺少集中的开发者参考模板目录 | —(新增) | 已建 `conf/templates/{datasource,datax/{raw,ads,manual},sql,ddl}/`,模板用 `*.template.{ini,sql}` 双扩展名。与上条废弃的运行时模板完全不同:这里的模板不被任何代码读取,只供开发者对照写新文件;`kb/README.md` 已加入口 | ### 2.2 建议的配置结构 ``` conf/ ├── env.sh # Shell 环境变量(路径、用户、日志目录等) ├── env.py # Python 环境变量(或直接读 env.sh) ├── workers.conf # DataX Worker 列表与权重 ├── alerter.conf # 告警 Webhook 配置(敏感项,可 .gitignore) ├── spark-defaults.yaml # Spark 默认参数 └── ds/ ├── base_config.yaml # DolphinScheduler 配置 └── process_code.yaml # 工作流编码映射 ``` ### 2.3 Spark 配置三级覆盖策略 **现状**:`dw_base/spark/spark_sql.py` 构造函数里硬编码了约 15 个 `.config(...)` 调用(executor/driver/memory/parallelism/shuffle/adaptive/arrow/codegen 等),默认值写死在构造参数里,覆盖只能通过 SparkSQL 构造函数传参或 SQL 文件内 `SET`。 **问题**: - 想批量调整 shuffle partitions 的默认值,就得改代码 + 发版 - 不同类型的作业(dwd 大宽表 / ads 小聚合)需要不同默认,现状只能每张表的 SQL 开头都重复写一遍 `SET` - 默认参数和业务代码耦合,不便于运维按集群负载动态调整 **目标态:三级覆盖** ``` conf/spark-defaults.yaml (L1) 全局默认,运维可改,发版同步到集群 ↓ 被覆盖 SQL 文件内 SET spark.xxx=yyy (L2) 单作业级别的覆盖,业务开发写 ↓ 被覆盖 命令行 -sc key=value / Python 构造函数传参 (L3) 临时/调试 override ``` **`conf/spark-defaults.yaml` 草案**: ```yaml # 全局 Spark 默认参数,dw_base/spark/spark_sql.py 启动时加载 # 单作业需要覆盖时,在对应 jobs/*.sql 文件开头写 SET;不要改本文件 executor: instances: 4 cores: 4 memory: 8g memoryOverhead: 2g driver: cores: 2 memory: 4g maxResultSize: 2g sql: shuffle.partitions: 200 adaptive.enabled: true broadcastTimeout: -1 codegen.wholeStage: false execution.arrow.enabled: true execution.arrow.fallback.enabled: true files.ignoreCorruptFiles: true statistics.fallBackToHdfs: true default: parallelism: 400 ``` **代码改动要点**: 1. `dw_base/spark/spark_sql.py` - 新增 `_load_default_config() -> dict`:读 `conf/spark-defaults.yaml`,扁平化为 `{"spark.executor.instances": 4, ...}` 形式(dot-notation 按 yaml 嵌套路径拼) - 构造函数接收的显式参数(`spark_executor_cores` 等)改为 `None` 默认,若未传则 fall back 到 yaml - `SparkSession.builder` 的 `.config(...)` 链改成 `for k, v in resolved_config.items(): builder.config(k, v)` 2. SQL 文件内的 `SET spark.xxx=yyy` 本来就由 `spark.sql(...)` 原生支持,无需改动 3. 命令行 `-sc` 参数保持现有语义,覆盖 L1 4. **Python 单测要能跑**:yaml 读取要容错(测试环境下找不到 conf 文件时回退到一套最小内置默认,不阻塞 `tests/unit/`) **兼容性**:老代码里已在写 `SparkSQL(spark_executor_cores=8, ...)` 的调用站点不破坏,因为显式传参仍是最高级(L3)。 **落地时的两个坑**: 1. **L2 覆盖只对 `spark.sql.*` 系参数生效**。Spark 的参数分两类: - `spark.sql.*`、`spark.default.parallelism` 等运行时参数 —— `spark.conf.set(...)` 或 SQL 内 `SET` 可动态改写 - `spark.executor.*`、`spark.driver.*`、`spark.executor.memoryOverhead` 等资源类参数 —— **session 启动时锁定**,SQL 里写 `SET spark.executor.memory=16g` 不会真的扩容已启动的 executor 因此开发写 SQL 内 `SET` 时只能调 `spark.sql.*` 和并行度;需要改资源的场景只能走 L3(命令行 `-sc` 或调用方在构造 `SparkSQL(...)` 时显式传参)。文档里和 `spark-defaults.yaml` 注释里都要讲清楚这条,避免开发以为 `SET spark.executor.memory` 有效。 2. **`conf/spark-defaults.yaml` 的路径解析依赖 `PROJECT_ROOT_PATH`**,这和 §三 `__init__.py` 瘦身存在先后依赖: - 现状 `PROJECT_ROOT_PATH` 在 `dw_base/__init__.py` 顶部定义,`import dw_base` 就会拿到 - 瘦身后 `__init__.py` 只保留最基本路径定义,`PROJECT_ROOT_PATH` 仍可用,但拆分过程中要保证 `spark_sql.py` 加载 yaml 的那行代码拿到的根路径与瘦身前一致 - **执行顺序建议**:先做 §三 `__init__.py` 瘦身,把 `PROJECT_ROOT_PATH` 的定义稳定下来;再做 §2.3 的 `spark-defaults.yaml` 接入。反过来做会踩到"瘦身后路径变了"的返工 **与仓库改名的联动**: 仓库改名 `tendata-warehouse-release` → `poyee-data-warehouse` 时(阶段 1 尾声),`.idea/tendata-warehouse-release.iml` 也要改名为 `.idea/poyee-data-warehouse.iml`,并同步更新 `.idea/modules.xml` 里的引用。这一步不属于 `.gitignore` 的范畴,但和它是同一天会碰到的事,在阶段 1 的仓库改名 checklist 里一起记一笔。 ### 2.5 DataX 脚本多环境支持与路径解耦 **现状(脚本残留老逻辑,新项目的业务 ini 未走这条路径):** 1. **脚本里残留路径前缀剥离**:`bin/datax-single-job-starter.sh` 和 `bin/datax-job-config-generator.py` 仍通过剥离 `conf/datax/config/` 前缀从源 ini 路径里派生 `SRC_DST` / `PROJECT_LAYER_ENV`,用于生成 JSON 输出路径和 `datax-multiple-job-starter.sh` 的日志目录。该目录已整体挪到 `conf/bak/` 并 gitignore,新项目 ini 放在 `jobs/raw/{domain}/` / `jobs/ads/{domain}/` / `manual/`,前缀不匹配时剥离变成 no-op,输出会落到形如 `conf/datax/generated/jobs/raw/trd/xxx.json` 的位置——能跑但与新约定不符。代码里这段死逻辑要清理,同时输出根目录也一并改名为 `conf/datax-json/`。 2. **没有 `-env` 参数**:所有脚本不认 `-env`。 3. **数据源配置单层组织**:老约定 `datasource/{db_type}/{instance}.ini` 把环境和实例扁平混在一起,要么靠不同的 `{instance}` 名字(如 `hobby-prod` / `hobby-dev`)区分,要么靠部署时替换文件。第一种让 ini 里写 `dataSource = pg-hobby-prod` 这种绑死环境的字符串;第二种让开发侧不知道当前跑的是哪套。 **目标态:一套代码多环境运行** **三件事联动(顺序重要):** #### 阶段 1:datasource 改按环境分子目录 - 改为 `datasource/{db_type}/{env}/{instance}.ini` - 运维在集群侧按 `prod` / `test` / `dev` 建子目录,各自放一套连接配置 - 权限隔离更方便:prod 子目录只给生产账号可读 #### 阶段 2:ini 里 `dataSource` 字段去掉环境后缀 - 老写法:`dataSource = pg-hobby-prod` - 新写法:`dataSource = pg/hobby`(只写 `{db_type}/{instance}`,不含 env) - **变更项目下所有存量 ini**:当前 `conf/bak/` 下的老配置不处理,等业务新 ini 从零写起时就按新规范 #### 阶段 3:DataX 脚本加 `-env` 参数 + env 注入逻辑 **`bin/common/init.sh` 扩展:** ```bash # 从命令行参数里挑出 -env,或 fall back 到 conf/env.sh # 不读环境变量(用户明确要求不污染 shell 环境) # 不做"按用户/部署目录自动派生"——env 必须来自 -env 或 conf/env.sh,二者都没给就退出 resolve_env() { for arg in "$@"; do case $arg in -env) NEXT_IS_ENV=1 ;; -env=*) DW_ENV="${arg#*=}" ;; *) [ -n "$NEXT_IS_ENV" ] && DW_ENV="$arg" && unset NEXT_IS_ENV ;; esac done if [ -z "$DW_ENV" ]; then # fall back 到 conf/env.sh(入仓库的默认配置) [ -f "${BASE_DIR}/conf/env.sh" ] && . "${BASE_DIR}/conf/env.sh" fi if [ -z "$DW_ENV" ]; then echo "[FATAL] DW_ENV not set: pass -env or define DW_ENV in conf/env.sh" >&2 exit 1 fi export DW_ENV } ``` **`conf/env.sh` 草案**(**入仓库**,开发者维护,服务本地调试): ```bash # 全局环境变量默认值 # env 判定优先级:命令行 -env > 本文件 DW_ENV # 默认锁定为 dev:本地调试开箱即用;DolphinScheduler / 生产脚本总是命令行显式 -env prod 覆盖 DW_ENV=dev LOG_ROOT_DIR="${HOME}/log" ``` **`dw_base/datax/job_config_generator.py` 改造**: - `JobConfigGenerator.__init__` 接受新参数 `env` - 各 reader/writer 插件在解析 `dataSource = pg/hobby` 时,自动拼成 `datasource/pg/{env}/hobby.ini` 的完整路径再去读连接信息 **`bin/datax-job-config-generator.py` 改造**: - 新增 `-env` 参数 - **去掉** `temp = os.path.dirname(gcf).replace('conf/datax/config/', '').split('/')` 这段路径前缀剥离 - JSON 输出目录从 `conf/datax/generated/` 改名为 `conf/datax-json/`,子路径简化为 `conf/datax-json/{env}/{ini_basename}.json`(扁平,按 env 分一级;`ini_basename` 由 `21-命名规范.md` §3.9 保证全局唯一——采集类 = 目标 Hive 表名、导出类 = `{源}__{db_type}_{instance}`、一次性 = 日期前缀) - `.gitignore` 同步把 `conf/datax/generated` 改成 `conf/datax-json` #### 阶段 4:启动脚本层串起来 - `datax-single-job-starter.sh` 调用 generator 时把 `$DW_ENV` 透传过去 - `datax-multiple-job-starter.sh` 日志目录改为 `${LOG_ROOT_DIR}/datax/${DW_ENV}/${START_DATE}/${JOB_NAME}.log`(把 `SRC_DST` / `PROJECT_LAYER_ENV` 彻底移除) **兼容性**:阶段 1-3 期间 `conf/bak/` 下的老 ini 不参与新流程,保留作为老项目历史资产;新业务 ini 全部从零按新规范写。 **参考**:kb/00-项目架构.md §4.3(DataX 脚本详细使用)、§6.4(多环境机制)、kb/21-命名规范.md §3.9(DataX ini 文件命名)。 ### 2.6 DataX 入口脚本收口为两条命令(中优先级) **现状**:DataX 入口分散,学习成本高: - `bin/datax-single-job-starter.sh` — 单 ini 执行 - `bin/datax-multiple-job-starter.sh` — 批量 + 按 start-date/stop-date 范围展开 - `bin/datax-multiple-hive-job-starter.sh` — Hive 表批量(语义与上者重合) - `bin/datax-single-hdfs-job-starter.sh`(如有残留) — 单次 HDFS 导出 - `bin/datax-job-config-generator.py` — ini → json 翻译器(内部工具) - `bin/datax-gc-generator.py` — ini 元生成器(详见 §2.7) **目标态**:顶层收成两个命令,每个命令内部吃 single / batch 两种输入形态;底层的 json 翻译 / worker 选择 / 日志路径由公共模块承担,调用方不感知。 | 顶层命令 | 语义 | 关键参数 | |---|---|---| | **`bin/datax-import`**(命名待确认) | 导入到 Hive(目标侧带分区管理) | `-ini ` 单 ini · `-inis ` 批量 · `-dt ` 指定分区 · `-start-date / -stop-date` 日期范围展开 · `-skip-exist` 默认开,已存在分区跳过 · `-force-overwrite` 强制覆盖 · `-skip-partitions ` 手动跳过特定分区 · `-env ` | | **`bin/datax-export`**(命名待确认) | 从 Hive/HDFS 导出到外部系统(源侧带路径探测) | `-ini / -inis` 同上 · `-src-check`(默认 fail-fast)· `-skip-missing` 源路径缺失时跳过不报错 · `-dt / -start-date / -stop-date` · `-env` | **实现建议**: 1. 把老脚本 worker 选择、日志路径、json 翻译提到 Python 模块 `dw_base/datax/entry.py`,两个 sh 只做参数解析 + 调用 2. 分区检查:`datax-import` 在执行前 `SHOW PARTITIONS` 目标表 → 命中则按 `-skip-exist` / `-force-overwrite` 决策;`datax-export` 在执行前 `hdfs dfs -test -e <源路径>` → 不存在按 `-src-check` / `-skip-missing` 决策 3. `-inis` 的批量展开规则:传目录则递归扫 `.ini` 文件,传文件列表(`jobs.list`)则读文件每行一个 ini 4. 老脚本 `datax-single-job-starter.sh` / `datax-multiple-*-starter.sh` 在两个新命令稳定后整体删除,保留一期转发封装作为兼容 **第三条命令 `datax-gc-generator`(ini 元生成器)独立保留**:用户已确认。职责是"从 PG 扫 schema 生成 ini 参考模板",和"执行 ini"不是一回事,不收口到上面两条里。详见 §2.7。 ### 2.7 `datax-gc-generator` 从零重写(中优先级) **现状**(凭查证 2026-04-18):`bin/datax-gc-generator.py` 支持 `from ∈ {mysql, hdfs}` × `to ∈ {elasticsearch, hbase, hdfs, kafka, mongo, mysql}`,覆盖面大、代码沉重,且: - **全项目没有任何其他模块 import 或 shell 调用它**(仅 3 处 `SparkSQL('datax-gc-generator')` 是自己设置 app name) - 内部走 `MySQLHandler` + `MySQLDataSource` + `convert_mysql_column_types` + `MySQLReader.generate_hive_ddl()` 一条链,与 §2.1 已登记废弃的"自动 DDL 生成"方向相冲突 - 新项目新业务只需要 `from=pg to=hdfs` 一条路径,其他组合全是老项目遗产 **定位**:**参考模板生成器**,不是"一键出可用 ini"。产物是开发者人工调整的起点 —— 常见修改包括字段剪裁(只同步用到的列)、WHERE 过滤条件、hivePartitions 配置、大表拆分策略等。开发者 diff 参考模板和自己的需求,改完再把成品 ini 提交到 `jobs/raw/{domain}/`。 **方向**:整个文件废弃 + 从零重写(凭记忆:未完全定稿,待真正开始写代码时再细化) **重写目标**: - 仅支持 `from=pg to=hdfs` - 读 PG 表结构(`information_schema.columns` + `pg_catalog.pg_description` 取字段注释) - 输出到**开发者本地的 `workspace/{yyyymmdd}/{name}.ini`**(`workspace/` 被 `.gitignore` 排除,**不入仓**),作为参考模板(原始全字段 + 默认分区 + 默认 `where=1=1`);开发者裁剪后再把最终 ini 提交到 `jobs/raw/{domain}/` - **不再生成 DDL**:DDL 统一由开发者按 `21-命名规范.md` 手写到 `manual/ddl/{layer}/{domain}/`(CLAUDE.md 单一来源约束,与 §2.1 已登记项一致) **目录约定**: - `workspace/` 在仓库根,**仅存在于开发者本地**,整个目录进 `.gitignore` - `workspace/{yyyymmdd}/` 按运行日期分子目录,便于开发者看"我今天生成了哪些候选" - 与 `manual/imports/{yyyymmdd}/` 的分工:`manual/imports/` 放一次性**执行**的 SQL / ini(会入仓做审计证据,执行完归档),`workspace/` 放自动化工具**未经人工确认**的中间产物(永不入仓) **拆除清单**(重写时连带删除): - `dw_base/database/mysql_utils.py` 的 `list_tables` / `list_columns` 方法(只服务老 generator) - `dw_base/datax/datasources/mysql_data_source.py` - `dw_base/datax/plugins/reader/mysql_reader.py` 的 `generate_hive_ddl` / `generate_hive_over_hbase_ddl` 方法 - `dw_base/datax/datax_utils.py` 的 `convert_mysql_column_types` - 所有 mongo / kafka / hbase writer 在 generator 里的分支 **注意**:以上删除范围与 `dw_base/datax/plugins/` 里仍在被真实采集任务调用的 reader/writer 不冲突 —— 真实采集任务只用到 `HDFSReader` / `HDFSWriter` / `MongoWriter`(如果还有 mongo 采集任务)。删之前要用 `grep -r "from dw_base.datax.plugins.reader.mysql_reader"` 再确认一次。 ### 2.8 HDFS 数据源 ini 支持 HA nameservice(**Path B 锁定**) **结论(凭 2026-04-18 新 CDH 环境实测)**:DataX hdfswriter 在新环境下连 HA 集群**必须**在 json 里显式提供 `hadoopConfig` 块,这是 §2.8 改造的动机。 **为什么 `HADOOP_CONF_DIR` 对 DataX 无效(实测三连)**: | 测试 | json 内容 | `HADOOP_CONF_DIR` | 结果 | |---|---|---|---| | 1 | `defaultFS` + 完整 `hadoopConfig` | 未设置 | ✅ 成功 | | 2 | 只有 `defaultFS` | 未设置 | ❌ UnknownHostException: nameservice1 | | 3 | 只有 `defaultFS` | `/etc/hadoop/conf` | ❌ 仍 UnknownHostException | `hadoop fs -ls` 命令能解析 `nameservice1`,是因为 `hadoop` shell 脚本把 `$HADOOP_CONF_DIR` **加入了 Java classpath**(`hdfs-site.xml` 才能被 `Configuration` 自动加载)。`datax.py` 启动 Java 时**不做这件事**,classpath 里不含 `/etc/hadoop/conf`,所以 DataX 的 Hadoop `Configuration` 对象是"干净的",只能靠 json `hadoopConfig` 块显式注入 HA 参数。 (老项目在上一个公司/服务器只写 `defaultFS` 也能跑通 HA,最可能的原因是运维把 `hdfs-site.xml` 塞进了 DataX 的 classpath 目录 —— 比如 `datax/conf/` / `datax/lib/` / 某 plugin 的 `libs/`。新环境 `/opt/datax` 下没有这类预置文件,不走这条路。) **老的 env 设置是死代码**:`dw_base/__init__.py:16` 的 `os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'` 对 DataX JVM 子进程无影响 —— 一是 classpath 不含 conf 目录(见上);二是 DataX 由 `datax-single-job-starter.sh` 通过 `python3 datax.py` 启动,并未 import `dw_base`。这行已在本次 §2.8 一并清理。 --- **正确的 json 形态**: ```json "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://nameservice1", "hadoopConfig": { "dfs.nameservices": "nameservice1", "dfs.ha.namenodes.nameservice1": "nn1,nn2", "dfs.namenode.rpc-address.nameservice1.nn1": "192.168.33.61:8020", "dfs.namenode.rpc-address.nameservice1.nn2": "192.168.33.62:8020", "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, ... } } ``` **ini 新格式**: **非 HA / 单 NN(dev / test 环境)**: ```ini [base] defaultFS = hdfs://192.168.33.61:8020 ``` **HA / nameservice(prod 环境)**: ```ini [base] defaultFS = hdfs://nameservice1 [hadoop_config] dfs.nameservices = nameservice1 dfs.ha.namenodes.nameservice1 = nn1,nn2 dfs.namenode.rpc-address.nameservice1.nn1 = 192.168.33.61:8020 dfs.namenode.rpc-address.nameservice1.nn2 = 192.168.33.62:8020 dfs.client.failover.proxy.provider.nameservice1 = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider ``` **设计要点**: - `[hadoop_config]` 节的 key **原样照搬 `/etc/hadoop/conf/hdfs-site.xml`**,不做缩写 / 翻译 —— 运维直接复制粘贴,出了问题可以字段级对比,零心智成本 - `[hadoop_config]` 节**可选**:不写即视为非 HA,代码不注入 `hadoopConfig`,生成的 json 只有 `defaultFS`,与单 NN 环境兼容 - 将来若要支持多 nameservice(federation),`[hadoop_config]` 节天然兼容(多写几组 key 即可),无需再改 ini schema **代码改造清单**: | 文件 | 改动 | |------|------| | `dw_base/datax/datasources/hdfs_data_source.py` | 覆写 `get_datasource_dict()`:父类逻辑外,检测 `[hadoop_config]` 节是否存在;存在则把整节作为 dict 塞进 `ds_dict['hadoopConfig']` | | `dw_base/datax/plugins/plugin.py` | **不用改** —— `load_data_source()` 的 `for key, value in ds_dict.items()` 里,value 是 dict 时走 Python 原生赋值,json 序列化自然成立 | | `dw_base/datax/plugins/reader/hdfs_reader.py` / `writer/hdfs_writer.py` | **不用改**,`defaultFS + hadoopConfig` 由 `load_data_source()` 自动注入到 `parameter` | | `dw_base/__init__.py:16` | **删除** `os.environ['HADOOP_CONF_DIR']` 死代码(实测对 DataX JVM 无影响) | | `bin/datax-gc-generator.py` | §2.7 从零重写时一并处理,这里不单独列 | | `datasource/hdfs/{env}/*.ini` | 按新 schema 新建(prod 带 `[hadoop_config]`;dev/test 只写 `[base] defaultFS`) | **回归测试**: - prod HA 集群:新 ini 跑一次真实 PG→HDFS 任务,生成 json 含完整 `hadoopConfig`,任务成功(2026-04-18 已用手写 json 预验证) - NameNode 主备切换期间跑一次,DataX 自动切到 standby(HA 的原始动机) - dev / test 单 NN 集群:不写 `[hadoop_config]`,生成 json 只含 `defaultFS`,任务成功 ### 2.9 DataX 速率控制外移到 conf(中优先级) **现状**:`dw_base/datax/job_config_generator.py:60-67` 硬编码分时调度: ```python local_time = int(datetime_utils.formatted_now('%H%M')) if 750 < local_time < 1900: speed = self.get_speed(10, byte=10485760, record=40000) # 白天 10 channel × 10 MB/s core_speed = self.get_core_speed(byte=10485760, record=40000) else: speed = self.get_speed() # 夜间 6 channel × 256 MB/s core_speed = self.get_core_speed() ``` **问题**: - 白天 / 夜间的边界(0750、1900)、channel 数、单 channel 速率全写死在代码里,想调一个值要改 py 源码重新发布 - 不同业务域 / 不同表的速率诉求可能不同,现在一刀切 - 生产偶发突发(白天要抢刷一次全量)没有临时放大速率的口子 - 硬编码的动机在代码里一字没留(为什么 0750-1900?避开业务高峰?避开实时链路?后人读代码猜不到) **目标**:把速率配置抽到 `conf/datax-speed.conf`,`JobConfigGenerator` 运行时读;默认值 = 现硬编码值,保持向后兼容。 **conf 格式**(ini,按时间段分段,自上而下匹配,未命中走 `[default]`): ```ini ; conf/datax-speed.conf ; 速率档位定义:从上到下依次匹配,第一条命中的为准;无匹配走 [default] [daytime] hours = 07:50-19:00 channel = 10 byte_per_channel = 10485760 ; 10 MB/s/channel,避开业务高峰 record_per_channel = 40000 [default] channel = 6 byte_per_channel = 268435456 ; 256 MB/s/channel,夜间放满 record_per_channel = 100000 ``` **代码改造**: - 在 `JobConfigGenerator.__init__()` 读 `conf/datax-speed.conf` - 用当前时间在各段 `hours` 里找匹配,取 `channel / byte_per_channel / record_per_channel` - `assemble()` 里的分支逻辑改为 `speed = self.resolve_speed_profile()` 一行 - 落 `conf/datax-speed.conf` 时在文件头注释里把**"白天是为了避开业务高峰"**的动机写清楚(消除默认值的来历盲区) **扩展方向**(P2,先不做): - 支持按 ini 名(业务表)在 conf 里覆盖特定任务的速率 - 支持命令行 `-speed fast` / `-speed slow` 手动切档(突发高峰 / 限流时用) ## 三、`__init__.py` 瘦身(高优先级) **现状:** `tendata/__init__.py` 约 120 行,import 即执行以下操作: - 环境变量设置 - 颜色常量定义(30+ 个) - findspark.init() - 用户/权限/路径检测 + 打印 - cow_says() 调用 shell **问题:** - 任何 `from dw_base import xxx` 都会触发全部初始化 - 不在 Spark 节点上运行的脚本也被迫执行 `findspark.init()` - 影响单元测试(测试 UDF 函数也要初始化 Spark 环境) **建议拆分为:** ```python # dw_base/__init__.py —— 仅做最基本的路径定义 PROJECT_ROOT_PATH = ... PROJECT_NAME = ... # dw_base/core/env.py —— 环境检测(延迟调用) # dw_base/core/colors.py —— 颜色常量 # dw_base/core/spark_env.py —— findspark 初始化(按需 import) ``` ## 四、代码风格修正(中优先级) ### 4.1 `__contains__` 反模式 全项目大量使用: ```python if config.__contains__(key): # 反模式 if self.REGISTERED_UDF.__contains__(name): ``` 应改为: ```python if key in config: # Pythonic if name in self.REGISTERED_UDF: ``` ### 4.2 Shell / Python 重复逻辑 `bin/common/init.sh` 和 `dw_base/__init__.py` 有大量重复的环境检测逻辑(用户判断、路径判断、日志目录、颜色常量)。 **建议:** 统一由 Python 入口处理,Shell 脚本仅做最小化的环境设置后调用 Python。或提取为一份共享的配置文件。 ### 4.3 SQL 注入风险 `mysql_utils.py` 中使用 f-string 拼接 SQL: ```python sql = "... WHERE TABLE_SCHEMA='%s' ..." % (database, table_name) ``` **建议:** 改用参数化查询。 ## 五、清理废弃代码(中优先级) | 模块/文件 | 状态 | 建议 | |----------|------|------| | `dw_base/validation/__init__.py` | 空文件 | 删除或实现数据质量校验 | | `dw_base/ml/a.py` | 空文件 | 删除 | | `dw_base/flink/__init__.py` | 空文件 | 删除(除非计划使用 Flink) | | `dw_base/elasticsearch/__init__.py` | 空文件 | 删除 | | `dw_base/database/mongodb_utils.py` | 约 80% 是注释掉的旧代码 | 清理注释 | | `conf/datax/` 下全部内容 | 已废弃的旧配置 | 保留少量样例,其余删除 | ## 六、测试体系搭建(中优先级) ### 6.1 现状 - 仅 UDF 有少量 pytest 测试 - 核心模块(SparkSQL、DataX 配置生成)无测试 - 无 CI/CD 集成 ### 6.2 建议的测试结构 ``` tests/ ├── conftest.py # pytest 公共 fixtures ├── unit/ │ ├── test_udf_common.py # 通用 UDF 单测(纯函数,不依赖 Spark) │ ├── test_udf_business.py # 业务 UDF 单测(如有,按文件组织) │ ├── test_config_utils.py # 工具函数单测 │ ├── test_datetime_utils.py │ ├── test_sql_utils.py │ └── test_datax_generator.py # DataX ini→json 生成测试 ├── integration/ │ ├── test_spark_sql.py # SparkSession local[*] 模式集成测试 │ └── test_hive_utils.py └── quality/ └── test_data_quality.py # 数据质量校验(行数、空值率、主键唯一性) ``` ### 6.3 测试策略 - **UDF 单测**:纯 Python 函数,直接 assert,不需要 Spark 环境 - **DataX 配置生成测试**:给定 ini 文件,断言生成的 JSON 结构正确 - **Spark 集成测试**:使用 `local[*]` + 内存 Hive(`enableHiveSupport()` 需要 Hive MetaStore,可用嵌入式 Derby) - **数据质量**:在 DolphinScheduler 工作流中加入校验节点 ## 七、其他建议 ### 7.1 依赖管理(已精简) **状态**:2026-04-15 已完成首轮审计与精简。老清单 48 行 → 新清单 10 个强依赖,详见根目录 `requirements.txt`;原始快照备份在 `requirements.txt.bak`,并在注释里给每一行打了 `[KEEP/DROP/LAZY/STDLIB]` 结论。 **精简策略**: | 分类 | 处理方式 | 代表包 | |---|---|---| | **强依赖(KEEP)** | 留在 `requirements.txt` | pyspark / pandas / pymongo / PyMySQL / requests / PyYAML / findspark / python-dateutil / wheel / pytest | | **无引用(DROP)** | 直接移除 | openvino / transformers / scikit-learn / scipy / numpy / Flask / matplotlib / lxml / SQLAlchemy / jieba / cpca / openpyxl / xlrd / 等 20+ 个 | | **stdlib 冗余(STDLIB)** | 移除 backport | `configparser` —— Python 3 标准库自带,backport 安装反而会覆盖 stdlib | | **弱依赖(LAZY)** | **不写进 requirements**,用到时手动 pip install | elasticsearch / pyhive / redis / cryptography / oss2 / fuzzywuzzy / pygeohash / pypinyin —— 都只被即将清理的老业务代码引用 | **后续事项**: - LAZY 类依赖关联的老代码:`get_oldmongo_*` / `mg2es/` / `ent_interface_dingtalk*` 于 2026-04-20 第一批提前清理;同日第二批清理 `dw_base/oss/` 整目录、`dw_base/scheduler/` 整目录(含 polling_scheduler / drop_partitions / drop_daily_full_snapshot_tbls)、`dw_base/hive/` 整目录、`dw_base/utils/` 7 文件(data_distinct / diff_utils / excel_to_hive_utils / hive_diff_database / hive_to_excel_utils / pdt_check_table\*);剩余 `customs/similarity.py` 等在阶段 4 / 阶段 5 一并清理 - 不需要 `requirements-base.txt` / `requirements-dev.txt` 分文件——当前依赖规模下单文件已经足够 - pyspark==2.4.0 固定(对齐 CDH 6.3.2 parcel 自带版本):`pip install` 把 pyspark 装进解释器 site-packages 解决 PyCharm 远程解释器静态索引红线 + 本地 pytest;运行时 `findspark.init()` 指向集群 `$SPARK_HOME/python/`,两条链路同版本不冲突 ### 7.2 日志改进 - `pretty_print()` 混合了控制台输出和文件写入,职责不清 - `Logging` 类定义了但很少使用 - **建议**:统一使用 Python `logging` 模块,配置 handler 实现控制台+文件双输出 ### 7.2.1 日志路径按 whoami 分流的硬编码逻辑 **现状:** `bin/common/init.sh` 和 `dw_base/__init__.py` 硬编码 `RELEASE_USER="alvis"`,并按 `whoami` 是否等于该用户分流日志目录: ```bash if [ "$(whoami)" = "${RELEASE_USER}" ]; then LOG_ROOT_DIR="/opt/data/log" # release 用户走系统日志目录 else LOG_ROOT_DIR="${HOME}/data/log" # 其他用户走自己家目录 fi ``` **方向:删除 whoami 分流,统一落 `${HOME}/log/{module}/{dt}/{file}.log`** - release 用户 `bigdata`:`$HOME` = `/home/bigdata`,日志落 `/home/bigdata/log/{module}/{dt}/{file}.log` - 个人调试用户:`$HOME` = 各自家目录,日志落 `/home/{user}/log/{module}/{dt}/{file}.log` - `$HOME` 本身就按用户隔离,无需代码再判断 `whoami` **为什么去掉 `/opt/data/log` 这条路**:原来 release 用户走系统级 `/opt/data/log` 的理由是"生产日志不应混在个人 home",但 `bigdata` 本身就是专属调度账号,它的 `$HOME` 就是生产日志的合法归宿,不需要再多开一条系统目录。路径统一后,权限 / 轮转 / 清理策略只需按一套做。 **为什么保留 `LOG_ROOT_DIR` 在 `conf/env.sh` 里**:虽然默认值只有 `${HOME}/log` 一条,但仍作为**单一默认值**外配到 `conf/env.sh`,保留后期改路径的口子(比如某天运维要求共享一块专用盘,改一处即可,无需改代码)。 **为什么改目的地形态为 `{module}/{dt}/{file}.log`**: - 当前老结构 `/opt/data/log/datax/20260418/xxx.log` 已按 `{module}/{dt}/` 分,但不是所有入口都遵守(spark、ds 等散落在各自子结构下) - 新结构强制三级 `{module}/{dt}/{file}.log`,便于按天归档 + 按模块清理 - `{module}` 取值:`datax` / `spark` / `ds` / `csv` / `export` 等顶层入口名 **代码改动:** 1. 删除 `whoami == RELEASE_USER` 分支逻辑,`LOG_ROOT_DIR` 单值从 `conf/env.sh` 读,默认 `${HOME}/log` 2. 日志文件路径拼接统一走一个工具函数 `log_path(module, dt, file)`(Python 和 Shell 各一份),避免入口脚本各自拼 3. `RELEASE_USER` 作为单一来源定义在 `conf/env.sh`,与 publish.sh 共用(日志路径已不依赖它,但 publish.sh 仍要) ### 7.3 部署改进 - `publish.sh` 使用 `re-all` 命令(自定义的 SSH 分发脚本)全量同步 - **建议**:考虑引入版本化部署(tag + 软链接切换),便于回滚 ### 7.4 DataX 限速逻辑 `job_config_generator.py` 中根据时间段(7:50-19:00 白天限速,其余时间放开)动态设置 DataX 传输速率: ```python if 750 < local_time < 1900: speed = self.get_speed(10, byte=10485760, record=40000) # 白天低速 else: speed = self.get_speed() # 夜间高速 ``` **建议**:将时间段和速率配置化,避免硬编码。 ### 7.5 Spark / HMS 侧 Ranger Hive 策略验证(低优先级) **背景**:`02-权限与账号.md §1` 链路 B 原先假设 HMS 侧也挂载 Ranger Hive Plugin,从而 PySpark 直连 HMS 的库/表/列操作仍受 Ranger Hive 管辖。但在 CDH 6.3.2 默认部署中,Ranger Hive Plugin **只挂载在 HiveServer2**,HMS 服务端未必部署;若 HMS 未挂,PySpark / spark-submit 的 SQL 层授权会绕过 Ranger Hive,**仅剩 NameNode 上的 Ranger HDFS Plugin 做数据平面兜底** —— 与同版本 Impala 的"无 Ranger 细粒度授权,只能 HDFS 兜底"规律一致。 **代码侧查证已完成**:仓库内 `dw_base/spark/spark_sql.py:167` 等 8 处 `.enableHiveSupport()`,Spark 确实走 HMS 读元数据,不走 HS2,因此是否受 Ranger Hive 管辖完全看集群侧 HMS 是否挂插件。 **集群侧验证动作(待做)**: 1. 登 HiveMetastore 节点,查 `hive-site.xml`:有无 `hive.metastore.pre.event.listeners = org.apache.ranger.authorization.hive.authorizer.RangerHiveMetastoreAuthorizer` 2. 查 HMS 进程 classpath 里是否包含 `ranger-hive-plugin-*.jar` 3. 从开发环境跑一条故意触发 column 级禁权的 PySpark SQL,看是否被拒 —— 端到端佐证 4. 结论回填到 `02-权限与账号.md §1` 链路 B 关键点段 + 关键点段里的 mermaid Note **后续处理**:若 HMS 未挂 Ranger Hive,调研补挂成本 + 评估现有 HDFS 兜底是否足够(大部分数仓读写场景下足够,因为 PySpark 任务绝大多数以受控 Unix 账号提交、权限粒度粗即可;若要满足敏感列屏蔽类需求则必须补挂)。 ## 八、重构优先级排序 | 阶段 | 任务 | 优先级 | |------|------|--------| | P0 | 模块重命名 tendata→dw_base、launch-pad→jobs | 高 | | P0 | 清理所有业务代码(launch-pad 中保留的样本) | 高 | | P1 | 硬编码提取到 conf/ | 高 | | P1 | `__init__.py` 瘦身,拆分初始化逻辑 | 高 | | P1 | 敏感信息(Webhook token 等)移出代码 | 高 | | P2 | `__contains__` → `in` 全局替换 | 中 | | P2 | 删除废弃空模块和注释代码 | 中 | | P2 | 搭建 tests/ 基础框架 + UDF 单测 | 中 | | P2 | 精简 requirements.txt | 中 | | P3 | 日志模块统一 | 低 | | P3 | SQL 注入修复 | 低 | | P3 | 部署脚本改进 | 低 | | P3 | Spark/HMS 侧 Ranger Hive 策略验证(见 §7.5) | 低 |