基于老项目
tendata-warehouse-release的代码分析,为新项目poyee-data-warehouse规划的重构路线。 本文档说"为什么改、怎么改";配套的92-重构进度.md说"改到哪一步了"。
tendata → dw_base影响范围:
| 类型 | 涉及内容 | 处理方式 |
|---|---|---|
| Python import | from tendata import *、from tendata.xxx import yyy |
全局替换 |
| SQL 中的 ADD FILE | ADD FILE tendata/spark/udf/xxx.py |
全局替换 |
| zip 打包命令 | zip -qr tendata.zip tendata |
改为 dw_base.zip dw_base |
| addPyFile | sparkContext.addPyFile('tendata.zip') |
改为 dw_base.zip |
| 路径匹配正则 | re.sub(r"tendata-warehouse.*", ...) |
更新为新项目名 |
| 目录名引用 | PROJECT_ROOT_PATH 相关逻辑 |
自动适配 |
建议做法: 先用脚本批量替换,再逐文件审查。注意 tendata 字样可能出现在老项目的 Hive 数据库名、表名或 DataX 数据源 ini 名中,这些属于老业务数据、不应替换——新项目业务 SQL 从零开发,不涉及老库老表。
launch-pad/ → jobs/(不做业务迁移,仅建立新结构)重要澄清:老项目 launch-pad/ 中的业务代码与新项目业务完全无关(属于上个项目的历史业务),不存在内容迁移。launch-pad/ 在重构期间作为样板 SQL 代码的参考(看写法、看 ${dt} 用法、看 DataX ini 结构),新项目所有业务 SQL 从零开发完成后,launch-pad/ 整体删除。
新 jobs/ 目录按数仓分层 × 业务域二维组织,结构见 00-项目架构.md §5 的样板。业务域代码统一使用命名规范定义的 trd/usr/prd/shp/pub/dim(见 21-命名规范.md §3.2)。
需要处理的代码引用:
list_files('launch-pad/...') 的硬编码路径 → 改为 jobs/...zip -qr launch-pad.zip launch-pad 类命令(如有)→ 改为 jobs| 硬编码内容 | 所在位置 | 建议方案 |
|---|---|---|
DATAX_HOME=/opt/module/datax |
bin/common/init.sh |
移入 conf/env.sh 或环境变量 |
PYTHON3_PATH="/usr/bin/python3" |
bin/common/init.sh |
移入 conf/env.sh |
RELEASE_USER="alvis" |
bin/common/init.sh |
改为 RELEASE_USER="bigdata" 并移入 conf/env.sh |
RELEASE_ROOT_DIR="/home/alvis/release" |
init.sh、__init__.py |
改为 /home/bigdata/release 并移入 conf/env.sh |
项目部署目录 poyee-data-warehouse/ |
publish.sh |
新项目发布目录为 /home/bigdata/release/poyee-data-warehouse/ |
DATAX_WORKERS=(m3 d1 d2 d3 d4) + 权重 |
init.sh |
移入 conf/workers.conf |
HADOOP_CONF_DIR='/etc/hadoop/conf' |
__init__.py |
使用系统环境变量 |
LOG_ROOT_DIR="/opt/data/log" |
init.sh、__init__.py |
移入 conf/env.sh |
| 钉钉 access_token | dingtalk_notifier.py |
移入 conf/alerter.conf(敏感项) |
| 企微 Webhook Key | alerter_constants.py |
移入 conf/alerter.conf(敏感项) |
| DS API 地址 | ds/config/base_config.yaml |
已在 yaml,保持即可 |
| Spark 默认参数(executor/driver/shuffle/sql.*) | dw_base/spark/spark_sql.py 构造函数 + .config(...) 链 |
移入 conf/spark-defaults.yaml,SQL 文件可用 SET 覆盖,见 §2.3 |
conf/
├── env.sh # Shell 环境变量(路径、用户、日志目录等)
├── env.py # Python 环境变量(或直接读 env.sh)
├── workers.conf # DataX Worker 列表与权重
├── alerter.conf # 告警 Webhook 配置(敏感项,可 .gitignore)
├── spark-defaults.yaml # Spark 默认参数
└── ds/
├── base_config.yaml # DolphinScheduler 配置
└── process_code.yaml # 工作流编码映射
现状:dw_base/spark/spark_sql.py 构造函数里硬编码了约 15 个 .config(...) 调用(executor/driver/memory/parallelism/shuffle/adaptive/arrow/codegen 等),默认值写死在构造参数里,覆盖只能通过 SparkSQL 构造函数传参或 SQL 文件内 SET。
问题:
SET目标态:三级覆盖
conf/spark-defaults.yaml (L1) 全局默认,运维可改,发版同步到集群
↓ 被覆盖
SQL 文件内 SET spark.xxx=yyy (L2) 单作业级别的覆盖,业务开发写
↓ 被覆盖
命令行 -sc key=value / Python 构造函数传参 (L3) 临时/调试 override
conf/spark-defaults.yaml 草案:
# 全局 Spark 默认参数,dw_base/spark/spark_sql.py 启动时加载
# 单作业需要覆盖时,在对应 jobs/*.sql 文件开头写 SET;不要改本文件
executor:
instances: 4
cores: 4
memory: 8g
memoryOverhead: 2g
driver:
cores: 2
memory: 4g
maxResultSize: 2g
sql:
shuffle.partitions: 200
adaptive.enabled: true
broadcastTimeout: -1
codegen.wholeStage: false
execution.arrow.enabled: true
execution.arrow.fallback.enabled: true
files.ignoreCorruptFiles: true
statistics.fallBackToHdfs: true
default:
parallelism: 400
代码改动要点:
dw_base/spark/spark_sql.py
_load_default_config() -> dict:读 conf/spark-defaults.yaml,扁平化为 {"spark.executor.instances": 4, ...} 形式(dot-notation 按 yaml 嵌套路径拼)spark_executor_cores 等)改为 None 默认,若未传则 fall back 到 yamlSparkSession.builder 的 .config(...) 链改成 for k, v in resolved_config.items(): builder.config(k, v)SET spark.xxx=yyy 本来就由 spark.sql(...) 原生支持,无需改动-sc 参数保持现有语义,覆盖 L1tests/unit/)兼容性:老代码里已在写 SparkSQL(spark_executor_cores=8, ...) 的调用站点不破坏,因为显式传参仍是最高级(L3)。
落地时的两个坑:
spark.sql.* 系参数生效。Spark 的参数分两类:
spark.sql.*、spark.default.parallelism 等运行时参数 —— spark.conf.set(...) 或 SQL 内 SET 可动态改写spark.executor.*、spark.driver.*、spark.executor.memoryOverhead 等资源类参数 —— session 启动时锁定,SQL 里写 SET spark.executor.memory=16g 不会真的扩容已启动的 executor因此开发写 SQL 内 SET 时只能调 spark.sql.* 和并行度;需要改资源的场景只能走 L3(命令行 -sc 或调用方在构造 SparkSQL(...) 时显式传参)。文档里和 spark-defaults.yaml 注释里都要讲清楚这条,避免开发以为 SET spark.executor.memory 有效。
conf/spark-defaults.yaml 的路径解析依赖 PROJECT_ROOT_PATH,这和 §三 __init__.py 瘦身存在先后依赖:
PROJECT_ROOT_PATH 在 dw_base/__init__.py 顶部定义,import dw_base 就会拿到__init__.py 只保留最基本路径定义,PROJECT_ROOT_PATH 仍可用,但拆分过程中要保证 spark_sql.py 加载 yaml 的那行代码拿到的根路径与瘦身前一致__init__.py 瘦身,把 PROJECT_ROOT_PATH 的定义稳定下来;再做 §2.3 的 spark-defaults.yaml 接入。反过来做会踩到"瘦身后路径变了"的返工.gitignore现状:老项目根目录没有 .gitignore,.idea/workspace.xml、.claude/settings.local.json 等个人状态文件随时可能被误提交,conf/alerter.conf(规划中的告警 Webhook,见 §2.2)也需要挡在版本控制外。
目标:在项目根新建 .gitignore,在阶段 2 建立 conf/ 目录的同一节奏下一起落地(顺序上先有 .gitignore 再把 alerter.conf 放进 conf/,避免敏感文件误入第一次提交)。
内容清单:
# ---- Claude Code 本地设置 ----
.claude/settings.local.json
# ---- JetBrains 个人工作区 ----
# 注意:.idea/ 不整体 ignore —— modules.xml / *.iml / inspectionProfiles/ 是
# 团队可共享的项目结构配置,保留入库对新成员友好(开箱即用),这也是
# JetBrains 官方推荐做法
.idea/workspace.xml
.idea/tasks.xml
.idea/shelf/
.idea/usage.statistics.xml
.idea/dictionaries/
.idea/httpRequests/
# ---- Python / 构建产物 ----
__pycache__/
*.py[cod]
*.egg-info/
.pytest_cache/
.venv/
venv/
# ---- 运行期产物 ----
*.log
dw_base.zip
# ---- 敏感配置(运行时自动从 datasource/ 注入或在 conf/ 本地覆盖) ----
conf/alerter.conf
注意事项:
.idea/ 不整体 ignore:
modules.xml、*.iml、inspectionProfiles/(项目结构 + 代码检查规则,团队共享)workspace.xml、tasks.xml、shelf/、usage.statistics.xml 等个人/统计文件.claude/ 也不整体 ignore:settings.json、commands/、agents/ 是团队共享配置;只忽略 settings.local.jsondw_base.zip 是 spark_sql.py 运行时生成的 PySpark 打包产物,属于构建产物不入库conf/alerter.conf 一开始就放进 .gitignore:阶段 2 迁移钉钉/企微 Webhook 时,新建文件前 .gitignore 必须先就位与仓库改名的联动:
仓库改名 tendata-warehouse-release → poyee-data-warehouse 时(阶段 1 尾声),.idea/tendata-warehouse-release.iml 也要改名为 .idea/poyee-data-warehouse.iml,并同步更新 .idea/modules.xml 里的引用。这一步不属于 .gitignore 的范畴,但和它是同一天会碰到的事,在阶段 1 的仓库改名 checklist 里一起记一笔。
__init__.py 瘦身(高优先级)现状: tendata/__init__.py 约 120 行,import 即执行以下操作:
问题:
from dw_base import xxx 都会触发全部初始化findspark.init()建议拆分为:
# dw_base/__init__.py —— 仅做最基本的路径定义
PROJECT_ROOT_PATH = ...
PROJECT_NAME = ...
# dw_base/core/env.py —— 环境检测(延迟调用)
# dw_base/core/colors.py —— 颜色常量
# dw_base/core/spark_env.py —— findspark 初始化(按需 import)
__contains__ 反模式全项目大量使用:
if config.__contains__(key): # 反模式
if self.REGISTERED_UDF.__contains__(name):
应改为:
if key in config: # Pythonic
if name in self.REGISTERED_UDF:
bin/common/init.sh 和 dw_base/__init__.py 有大量重复的环境检测逻辑(用户判断、路径判断、日志目录、颜色常量)。
建议: 统一由 Python 入口处理,Shell 脚本仅做最小化的环境设置后调用 Python。或提取为一份共享的配置文件。
mysql_utils.py 中使用 f-string 拼接 SQL:
sql = "... WHERE TABLE_SCHEMA='%s' ..." % (database, table_name)
建议: 改用参数化查询。
| 模块/文件 | 状态 | 建议 |
|---|---|---|
dw_base/validation/__init__.py |
空文件 | 删除或实现数据质量校验 |
dw_base/ml/a.py |
空文件 | 删除 |
dw_base/flink/__init__.py |
空文件 | 删除(除非计划使用 Flink) |
dw_base/elasticsearch/__init__.py |
空文件 | 删除 |
dw_base/oss/oss2_util.py |
使用场景不明 | 确认后决定保留或删除 |
dw_base/database/mongodb_utils.py |
约 80% 是注释掉的旧代码 | 清理注释 |
conf/datax/ 下全部内容 |
已废弃的旧配置 | 保留少量样例,其余删除 |
sql_style.xml |
IntelliJ SQL 格式化规则 | 移入 .idea/ 或删除 |
tests/
├── conftest.py # pytest 公共 fixtures
├── unit/
│ ├── test_udf_trd.py # UDF 单测(按业务域组织,纯函数,不依赖 Spark)
│ ├── test_udf_usr.py
│ ├── test_udf_pub.py
│ ├── test_config_utils.py # 工具函数单测
│ ├── test_datetime_utils.py
│ ├── test_sql_utils.py
│ └── test_datax_generator.py # DataX ini→json 生成测试
├── integration/
│ ├── test_spark_sql.py # SparkSession local[*] 模式集成测试
│ └── test_hive_utils.py
└── quality/
└── test_data_quality.py # 数据质量校验(行数、空值率、主键唯一性)
local[*] + 内存 Hive(enableHiveSupport() 需要 Hive MetaStore,可用嵌入式 Derby)状态:2026-04-15 已完成首轮审计与精简。老清单 48 行 → 新清单 10 个强依赖,详见根目录 requirements.txt;原始快照备份在 requirements.txt.bak,并在注释里给每一行打了 [KEEP/DROP/LAZY/STDLIB] 结论。
精简策略:
| 分类 | 处理方式 | 代表包 |
|---|---|---|
| 强依赖(KEEP) | 留在 requirements.txt |
pyspark / pandas / pymongo / PyMySQL / requests / PyYAML / findspark / python-dateutil / wheel / pytest |
| 无引用(DROP) | 直接移除 | openvino / transformers / scikit-learn / scipy / numpy / Flask / matplotlib / lxml / SQLAlchemy / jieba / cpca / openpyxl / xlrd / 等 20+ 个 |
| stdlib 冗余(STDLIB) | 移除 backport | configparser —— Python 3 标准库自带,backport 安装反而会覆盖 stdlib |
| 弱依赖(LAZY) | 不写进 requirements,用到时手动 pip install | elasticsearch / pyhive / redis / cryptography / oss2 / fuzzywuzzy / pygeohash / pypinyin —— 都只被即将清理的老业务代码引用 |
后续事项:
tendata/scheduler/get_oldmongo_*、mg2es/、ent_interface_dingtalk*、customs/similarity.py、tendata/oss/oss2_util.py、tendata/utils/excel_to_hive_utils.py)在阶段 4 / 阶段 5 清理废弃代码时一并删除,删完后即可彻底告别这些弱依赖requirements-base.txt / requirements-dev.txt 分文件——当前依赖规模下单文件已经足够pretty_print() 混合了控制台输出和文件写入,职责不清Logging 类定义了但很少使用logging 模块,配置 handler 实现控制台+文件双输出现状: bin/common/init.sh 和 dw_base/__init__.py 硬编码 RELEASE_USER="alvis",并按 whoami 是否等于该用户分流日志目录:
if [ "$(whoami)" = "${RELEASE_USER}" ]; then
LOG_ROOT_DIR="/opt/data/log" # release 用户走系统日志目录
else
LOG_ROOT_DIR="${HOME}/data/log" # 其他用户走自己家目录
fi
问题:
alvis 是老环境硬编码,新环境部署用户是 bigdata,迁移时必须一起改bigdata)和个人调试的日志散落到不同目录,排查问题时需要来回切换建议:
whoami == RELEASE_USER 分支逻辑conf/env.sh 的 LOG_ROOT_DIR 决定(默认 /opt/data/log),个人调试可在自己的 shell 里 export LOG_ROOT_DIR=~/data/log 覆盖RELEASE_USER 若仍需保留(如 publish.sh 发布身份校验),只作为白名单,不参与日志路径决策publish.sh 使用 re-all 命令(自定义的 SSH 分发脚本)全量同步job_config_generator.py 中根据时间段(7:50-19:00 白天限速,其余时间放开)动态设置 DataX 传输速率:
if 750 < local_time < 1900:
speed = self.get_speed(10, byte=10485760, record=40000) # 白天低速
else:
speed = self.get_speed() # 夜间高速
建议:将时间段和速率配置化,避免硬编码。
| 阶段 | 任务 | 优先级 |
|---|---|---|
| P0 | 模块重命名 tendata→dw_base、launch-pad→jobs | 高 |
| P0 | 清理所有业务代码(launch-pad 中保留的样本) | 高 |
| P1 | 硬编码提取到 conf/ | 高 |
| P1 | __init__.py 瘦身,拆分初始化逻辑 |
高 |
| P1 | 敏感信息(Webhook token 等)移出代码 | 高 |
| P2 | __contains__ → in 全局替换 |
中 |
| P2 | 删除废弃空模块和注释代码 | 中 |
| P2 | 搭建 tests/ 基础框架 + UDF 单测 | 中 |
| P2 | 精简 requirements.txt | 中 |
| P3 | 日志模块统一 | 低 |
| P3 | SQL 注入修复 | 低 |
| P3 | 部署脚本改进 | 低 |