# 重构路线 > 基于老项目 `tendata-warehouse-release` 的代码分析,为新项目 `poyee-data-warehouse` 规划的重构路线。 > 本文档说"为什么改、怎么改";配套的 `92-重构进度.md` 说"改到哪一步了"。 ## 一、模块重命名(高优先级) ### 1.1 `tendata` → `dw_base` **影响范围:** | 类型 | 涉及内容 | 处理方式 | |------|---------|---------| | Python import | `from tendata import *`、`from tendata.xxx import yyy` | 全局替换 | | SQL 中的 ADD FILE | `ADD FILE tendata/spark/udf/xxx.py` | 全局替换 | | zip 打包命令 | `zip -qr tendata.zip tendata` | 改为 `dw_base.zip dw_base` | | addPyFile | `sparkContext.addPyFile('tendata.zip')` | 改为 `dw_base.zip` | | 路径匹配正则 | `re.sub(r"tendata-warehouse.*", ...)` | 更新为新项目名 | | 目录名引用 | `PROJECT_ROOT_PATH` 相关逻辑 | 自动适配 | **建议做法:** 先用脚本批量替换,再逐文件审查。注意 `tendata` 字样可能出现在老项目的 Hive 数据库名、表名或 DataX 数据源 ini 名中,这些**属于老业务数据**、不应替换——新项目业务 SQL 从零开发,不涉及老库老表。 ### 1.2 `launch-pad/` → `jobs/`(不做业务迁移,仅建立新结构) **重要澄清**:老项目 `launch-pad/` 中的业务代码**与新项目业务完全无关**(属于上个项目的历史业务),**不存在内容迁移**。`launch-pad/` 在重构期间作为**样板 SQL 代码的参考**(看写法、看 `${dt}` 用法、看 DataX ini 结构),新项目所有业务 SQL 从零开发完成后,`launch-pad/` 整体删除。 **新 `jobs/` 目录**按数仓分层 × 业务域二维组织,结构见 `00-项目架构.md` §5 的样板。业务域代码统一使用命名规范定义的 `trd`/`usr`/`prd`/`shp`/`pub`/`dim`(见 `21-命名规范.md` §3.2)。 **需要处理的代码引用**: - 脚本中 `list_files('launch-pad/...')` 的硬编码路径 → 改为 `jobs/...` - `zip -qr launch-pad.zip launch-pad` 类命令(如有)→ 改为 `jobs` - DolphinScheduler 工作流里老路径的引用 → 新项目上线时一并替换 ## 二、消除硬编码(高优先级) ### 2.1 当前硬编码清单 | 硬编码内容 | 所在位置 | 建议方案 | |-----------|---------|---------| | `DATAX_HOME=/opt/module/datax` | `bin/common/init.sh` | 移入 `conf/env.sh` 或环境变量 | | `PYTHON3_PATH="/usr/bin/python3"` | `bin/common/init.sh` | 移入 `conf/env.sh` | | `RELEASE_USER="alvis"` | `bin/common/init.sh` | 改为 `RELEASE_USER="bigdata"` 并移入 `conf/env.sh` | | `RELEASE_ROOT_DIR="/home/alvis/release"` | `init.sh`、`__init__.py` | 改为 `/home/bigdata/release` 并移入 `conf/env.sh` | | 项目部署目录 `poyee-data-warehouse/` | `publish.sh` | 新项目发布目录为 `/home/bigdata/release/poyee-data-warehouse/` | | `DATAX_WORKERS=(m3 d1 d2 d3 d4)` + 权重 | `init.sh` | 移入 `conf/workers.conf` | | `HADOOP_CONF_DIR='/etc/hadoop/conf'` | `__init__.py` | 使用系统环境变量 | | `LOG_ROOT_DIR="/opt/data/log"` | `init.sh`、`__init__.py` | 移入 `conf/env.sh` | | 钉钉 access_token | `dingtalk_notifier.py` | 移入 `conf/alerter.conf`(敏感项) | | 企微 Webhook Key | `alerter_constants.py` | 移入 `conf/alerter.conf`(敏感项) | | DS API 地址 | `ds/config/base_config.yaml` | 已在 yaml,保持即可 | | Spark 默认参数(executor/driver/shuffle/sql.*) | `dw_base/spark/spark_sql.py` 构造函数 + `.config(...)` 链 | 移入 `conf/spark-defaults.yaml`,SQL 文件可用 `SET` 覆盖,见 §2.3 | ### 2.2 建议的配置结构 ``` conf/ ├── env.sh # Shell 环境变量(路径、用户、日志目录等) ├── env.py # Python 环境变量(或直接读 env.sh) ├── workers.conf # DataX Worker 列表与权重 ├── alerter.conf # 告警 Webhook 配置(敏感项,可 .gitignore) ├── spark-defaults.yaml # Spark 默认参数 └── ds/ ├── base_config.yaml # DolphinScheduler 配置 └── process_code.yaml # 工作流编码映射 ``` ### 2.3 Spark 配置三级覆盖策略 **现状**:`dw_base/spark/spark_sql.py` 构造函数里硬编码了约 15 个 `.config(...)` 调用(executor/driver/memory/parallelism/shuffle/adaptive/arrow/codegen 等),默认值写死在构造参数里,覆盖只能通过 SparkSQL 构造函数传参或 SQL 文件内 `SET`。 **问题**: - 想批量调整 shuffle partitions 的默认值,就得改代码 + 发版 - 不同类型的作业(dwd 大宽表 / ads 小聚合)需要不同默认,现状只能每张表的 SQL 开头都重复写一遍 `SET` - 默认参数和业务代码耦合,不便于运维按集群负载动态调整 **目标态:三级覆盖** ``` conf/spark-defaults.yaml (L1) 全局默认,运维可改,发版同步到集群 ↓ 被覆盖 SQL 文件内 SET spark.xxx=yyy (L2) 单作业级别的覆盖,业务开发写 ↓ 被覆盖 命令行 -sc key=value / Python 构造函数传参 (L3) 临时/调试 override ``` **`conf/spark-defaults.yaml` 草案**: ```yaml # 全局 Spark 默认参数,dw_base/spark/spark_sql.py 启动时加载 # 单作业需要覆盖时,在对应 jobs/*.sql 文件开头写 SET;不要改本文件 executor: instances: 4 cores: 4 memory: 8g memoryOverhead: 2g driver: cores: 2 memory: 4g maxResultSize: 2g sql: shuffle.partitions: 200 adaptive.enabled: true broadcastTimeout: -1 codegen.wholeStage: false execution.arrow.enabled: true execution.arrow.fallback.enabled: true files.ignoreCorruptFiles: true statistics.fallBackToHdfs: true default: parallelism: 400 ``` **代码改动要点**: 1. `dw_base/spark/spark_sql.py` - 新增 `_load_default_config() -> dict`:读 `conf/spark-defaults.yaml`,扁平化为 `{"spark.executor.instances": 4, ...}` 形式(dot-notation 按 yaml 嵌套路径拼) - 构造函数接收的显式参数(`spark_executor_cores` 等)改为 `None` 默认,若未传则 fall back 到 yaml - `SparkSession.builder` 的 `.config(...)` 链改成 `for k, v in resolved_config.items(): builder.config(k, v)` 2. SQL 文件内的 `SET spark.xxx=yyy` 本来就由 `spark.sql(...)` 原生支持,无需改动 3. 命令行 `-sc` 参数保持现有语义,覆盖 L1 4. **Python 单测要能跑**:yaml 读取要容错(测试环境下找不到 conf 文件时回退到一套最小内置默认,不阻塞 `tests/unit/`) **兼容性**:老代码里已在写 `SparkSQL(spark_executor_cores=8, ...)` 的调用站点不破坏,因为显式传参仍是最高级(L3)。 **落地时的两个坑**: 1. **L2 覆盖只对 `spark.sql.*` 系参数生效**。Spark 的参数分两类: - `spark.sql.*`、`spark.default.parallelism` 等运行时参数 —— `spark.conf.set(...)` 或 SQL 内 `SET` 可动态改写 - `spark.executor.*`、`spark.driver.*`、`spark.executor.memoryOverhead` 等资源类参数 —— **session 启动时锁定**,SQL 里写 `SET spark.executor.memory=16g` 不会真的扩容已启动的 executor 因此开发写 SQL 内 `SET` 时只能调 `spark.sql.*` 和并行度;需要改资源的场景只能走 L3(命令行 `-sc` 或调用方在构造 `SparkSQL(...)` 时显式传参)。文档里和 `spark-defaults.yaml` 注释里都要讲清楚这条,避免开发以为 `SET spark.executor.memory` 有效。 2. **`conf/spark-defaults.yaml` 的路径解析依赖 `PROJECT_ROOT_PATH`**,这和 §三 `__init__.py` 瘦身存在先后依赖: - 现状 `PROJECT_ROOT_PATH` 在 `dw_base/__init__.py` 顶部定义,`import dw_base` 就会拿到 - 瘦身后 `__init__.py` 只保留最基本路径定义,`PROJECT_ROOT_PATH` 仍可用,但拆分过程中要保证 `spark_sql.py` 加载 yaml 的那行代码拿到的根路径与瘦身前一致 - **执行顺序建议**:先做 §三 `__init__.py` 瘦身,把 `PROJECT_ROOT_PATH` 的定义稳定下来;再做 §2.3 的 `spark-defaults.yaml` 接入。反过来做会踩到"瘦身后路径变了"的返工 ### 2.4 项目根 `.gitignore` **现状**:老项目根目录**没有** `.gitignore`,`.idea/workspace.xml`、`.claude/settings.local.json` 等个人状态文件随时可能被误提交,`conf/alerter.conf`(规划中的告警 Webhook,见 §2.2)也需要挡在版本控制外。 **目标**:在项目根新建 `.gitignore`,在阶段 2 建立 `conf/` 目录的同一节奏下一起落地(顺序上先有 `.gitignore` 再把 `alerter.conf` 放进 `conf/`,避免敏感文件误入第一次提交)。 **内容清单**: ```gitignore # ---- Claude Code 本地设置 ---- .claude/settings.local.json # ---- JetBrains 个人工作区 ---- # 注意:.idea/ 不整体 ignore —— modules.xml / *.iml / inspectionProfiles/ 是 # 团队可共享的项目结构配置,保留入库对新成员友好(开箱即用),这也是 # JetBrains 官方推荐做法 .idea/workspace.xml .idea/tasks.xml .idea/shelf/ .idea/usage.statistics.xml .idea/dictionaries/ .idea/httpRequests/ # ---- Python / 构建产物 ---- __pycache__/ *.py[cod] *.egg-info/ .pytest_cache/ .venv/ venv/ # ---- 运行期产物 ---- *.log dw_base.zip # ---- 敏感配置(运行时自动从 datasource/ 注入或在 conf/ 本地覆盖) ---- conf/alerter.conf ``` **注意事项**: 1. **`.idea/` 不整体 ignore**: - 入库:`modules.xml`、`*.iml`、`inspectionProfiles/`(项目结构 + 代码检查规则,团队共享) - 忽略:`workspace.xml`、`tasks.xml`、`shelf/`、`usage.statistics.xml` 等个人/统计文件 2. **`.claude/` 也不整体 ignore**:`settings.json`、`commands/`、`agents/` 是团队共享配置;只忽略 `settings.local.json` 3. **`dw_base.zip`** 是 `spark_sql.py` 运行时生成的 PySpark 打包产物,属于构建产物不入库 4. **`conf/alerter.conf`** 一开始就放进 `.gitignore`:阶段 2 迁移钉钉/企微 Webhook 时,新建文件前 `.gitignore` 必须先就位 **与仓库改名的联动**: 仓库改名 `tendata-warehouse-release` → `poyee-data-warehouse` 时(阶段 1 尾声),`.idea/tendata-warehouse-release.iml` 也要改名为 `.idea/poyee-data-warehouse.iml`,并同步更新 `.idea/modules.xml` 里的引用。这一步不属于 `.gitignore` 的范畴,但和它是同一天会碰到的事,在阶段 1 的仓库改名 checklist 里一起记一笔。 ## 三、`__init__.py` 瘦身(高优先级) **现状:** `tendata/__init__.py` 约 120 行,import 即执行以下操作: - 环境变量设置 - 颜色常量定义(30+ 个) - findspark.init() - 用户/权限/路径检测 + 打印 - cow_says() 调用 shell **问题:** - 任何 `from dw_base import xxx` 都会触发全部初始化 - 不在 Spark 节点上运行的脚本也被迫执行 `findspark.init()` - 影响单元测试(测试 UDF 函数也要初始化 Spark 环境) **建议拆分为:** ```python # dw_base/__init__.py —— 仅做最基本的路径定义 PROJECT_ROOT_PATH = ... PROJECT_NAME = ... # dw_base/core/env.py —— 环境检测(延迟调用) # dw_base/core/colors.py —— 颜色常量 # dw_base/core/spark_env.py —— findspark 初始化(按需 import) ``` ## 四、代码风格修正(中优先级) ### 4.1 `__contains__` 反模式 全项目大量使用: ```python if config.__contains__(key): # 反模式 if self.REGISTERED_UDF.__contains__(name): ``` 应改为: ```python if key in config: # Pythonic if name in self.REGISTERED_UDF: ``` ### 4.2 Shell / Python 重复逻辑 `bin/common/init.sh` 和 `dw_base/__init__.py` 有大量重复的环境检测逻辑(用户判断、路径判断、日志目录、颜色常量)。 **建议:** 统一由 Python 入口处理,Shell 脚本仅做最小化的环境设置后调用 Python。或提取为一份共享的配置文件。 ### 4.3 SQL 注入风险 `mysql_utils.py` 中使用 f-string 拼接 SQL: ```python sql = "... WHERE TABLE_SCHEMA='%s' ..." % (database, table_name) ``` **建议:** 改用参数化查询。 ## 五、清理废弃代码(中优先级) | 模块/文件 | 状态 | 建议 | |----------|------|------| | `dw_base/validation/__init__.py` | 空文件 | 删除或实现数据质量校验 | | `dw_base/ml/a.py` | 空文件 | 删除 | | `dw_base/flink/__init__.py` | 空文件 | 删除(除非计划使用 Flink) | | `dw_base/elasticsearch/__init__.py` | 空文件 | 删除 | | `dw_base/oss/oss2_util.py` | 使用场景不明 | 确认后决定保留或删除 | | `dw_base/database/mongodb_utils.py` | 约 80% 是注释掉的旧代码 | 清理注释 | | `conf/datax/` 下全部内容 | 已废弃的旧配置 | 保留少量样例,其余删除 | | `sql_style.xml` | IntelliJ SQL 格式化规则 | 移入 `.idea/` 或删除 | ## 六、测试体系搭建(中优先级) ### 6.1 现状 - 仅 UDF 有少量 pytest 测试 - 核心模块(SparkSQL、DataX 配置生成)无测试 - 无 CI/CD 集成 ### 6.2 建议的测试结构 ``` tests/ ├── conftest.py # pytest 公共 fixtures ├── unit/ │ ├── test_udf_trd.py # UDF 单测(按业务域组织,纯函数,不依赖 Spark) │ ├── test_udf_usr.py │ ├── test_udf_pub.py │ ├── test_config_utils.py # 工具函数单测 │ ├── test_datetime_utils.py │ ├── test_sql_utils.py │ └── test_datax_generator.py # DataX ini→json 生成测试 ├── integration/ │ ├── test_spark_sql.py # SparkSession local[*] 模式集成测试 │ └── test_hive_utils.py └── quality/ └── test_data_quality.py # 数据质量校验(行数、空值率、主键唯一性) ``` ### 6.3 测试策略 - **UDF 单测**:纯 Python 函数,直接 assert,不需要 Spark 环境 - **DataX 配置生成测试**:给定 ini 文件,断言生成的 JSON 结构正确 - **Spark 集成测试**:使用 `local[*]` + 内存 Hive(`enableHiveSupport()` 需要 Hive MetaStore,可用嵌入式 Derby) - **数据质量**:在 DolphinScheduler 工作流中加入校验节点 ## 七、其他建议 ### 7.1 依赖管理(已精简) **状态**:2026-04-15 已完成首轮审计与精简。老清单 48 行 → 新清单 10 个强依赖,详见根目录 `requirements.txt`;原始快照备份在 `requirements.txt.bak`,并在注释里给每一行打了 `[KEEP/DROP/LAZY/STDLIB]` 结论。 **精简策略**: | 分类 | 处理方式 | 代表包 | |---|---|---| | **强依赖(KEEP)** | 留在 `requirements.txt` | pyspark / pandas / pymongo / PyMySQL / requests / PyYAML / findspark / python-dateutil / wheel / pytest | | **无引用(DROP)** | 直接移除 | openvino / transformers / scikit-learn / scipy / numpy / Flask / matplotlib / lxml / SQLAlchemy / jieba / cpca / openpyxl / xlrd / 等 20+ 个 | | **stdlib 冗余(STDLIB)** | 移除 backport | `configparser` —— Python 3 标准库自带,backport 安装反而会覆盖 stdlib | | **弱依赖(LAZY)** | **不写进 requirements**,用到时手动 pip install | elasticsearch / pyhive / redis / cryptography / oss2 / fuzzywuzzy / pygeohash / pypinyin —— 都只被即将清理的老业务代码引用 | **后续事项**: - LAZY 类依赖关联的老代码(`tendata/scheduler/get_oldmongo_*`、`mg2es/`、`ent_interface_dingtalk*`、`customs/similarity.py`、`tendata/oss/oss2_util.py`、`tendata/utils/excel_to_hive_utils.py`)在阶段 4 / 阶段 5 清理废弃代码时一并删除,删完后即可彻底告别这些弱依赖 - 不需要 `requirements-base.txt` / `requirements-dev.txt` 分文件——当前依赖规模下单文件已经足够 - pyspark 2.4.0 暂保留(CDH 集群一致),等集群升级再一并上调 ### 7.2 日志改进 - `pretty_print()` 混合了控制台输出和文件写入,职责不清 - `Logging` 类定义了但很少使用 - **建议**:统一使用 Python `logging` 模块,配置 handler 实现控制台+文件双输出 ### 7.2.1 日志路径按 whoami 分流的硬编码逻辑 **现状:** `bin/common/init.sh` 和 `dw_base/__init__.py` 硬编码 `RELEASE_USER="alvis"`,并按 `whoami` 是否等于该用户分流日志目录: ```bash if [ "$(whoami)" = "${RELEASE_USER}" ]; then LOG_ROOT_DIR="/opt/data/log" # release 用户走系统日志目录 else LOG_ROOT_DIR="${HOME}/data/log" # 其他用户走自己家目录 fi ``` **问题:** - `alvis` 是老环境硬编码,新环境部署用户是 `bigdata`,迁移时必须一起改 - "按执行者身份决定日志路径"把运行身份与路径策略耦合在一起,代码里到处都要判断当前用户 - 调度执行(`bigdata`)和个人调试的日志散落到不同目录,排查问题时需要来回切换 - 本质是把环境差异写进代码,而不是写进配置 **建议:** 1. 删除 `whoami == RELEASE_USER` 分支逻辑 2. 日志根路径统一由 `conf/env.sh` 的 `LOG_ROOT_DIR` 决定(默认 `/opt/data/log`),个人调试可在自己的 shell 里 `export LOG_ROOT_DIR=~/data/log` 覆盖 3. `RELEASE_USER` 若仍需保留(如 publish.sh 发布身份校验),只作为白名单,不参与日志路径决策 ### 7.3 部署改进 - `publish.sh` 使用 `re-all` 命令(自定义的 SSH 分发脚本)全量同步 - **建议**:考虑引入版本化部署(tag + 软链接切换),便于回滚 ### 7.4 DataX 限速逻辑 `job_config_generator.py` 中根据时间段(7:50-19:00 白天限速,其余时间放开)动态设置 DataX 传输速率: ```python if 750 < local_time < 1900: speed = self.get_speed(10, byte=10485760, record=40000) # 白天低速 else: speed = self.get_speed() # 夜间高速 ``` **建议**:将时间段和速率配置化,避免硬编码。 ## 八、重构优先级排序 | 阶段 | 任务 | 优先级 | |------|------|--------| | P0 | 模块重命名 tendata→dw_base、launch-pad→jobs | 高 | | P0 | 清理所有业务代码(launch-pad 中保留的样本) | 高 | | P1 | 硬编码提取到 conf/ | 高 | | P1 | `__init__.py` 瘦身,拆分初始化逻辑 | 高 | | P1 | 敏感信息(Webhook token 等)移出代码 | 高 | | P2 | `__contains__` → `in` 全局替换 | 中 | | P2 | 删除废弃空模块和注释代码 | 中 | | P2 | 搭建 tests/ 基础框架 + UDF 单测 | 中 | | P2 | 精简 requirements.txt | 中 | | P3 | 日志模块统一 | 低 | | P3 | SQL 注入修复 | 低 | | P3 | 部署脚本改进 | 低 |