90-重构路线.md 40 KB

重构路线

基于老项目 tendata-warehouse-release 的代码分析,为新项目 poyee-data-warehouse 规划的重构路线。 本文档说"为什么改、怎么改";配套的 92-重构进度.md 说"改到哪一步了"。

一、模块重命名(高优先级)

1.1 tendatadw_base

影响范围:

类型 涉及内容 处理方式
Python import from tendata import *from tendata.xxx import yyy 全局替换
SQL 中的 ADD FILE ADD FILE tendata/spark/udf/xxx.py 全局替换
zip 打包命令 zip -qr tendata.zip tendata 改为 dw_base.zip dw_base
addPyFile sparkContext.addPyFile('tendata.zip') 改为 dw_base.zip
路径匹配正则 re.sub(r"tendata-warehouse.*", ...) 更新为新项目名
目录名引用 PROJECT_ROOT_PATH 相关逻辑 自动适配

建议做法: 先用脚本批量替换,再逐文件审查。注意 tendata 字样可能出现在老项目的 Hive 数据库名、表名或 DataX 数据源 ini 名中,这些属于老业务数据、不应替换——新项目业务 SQL 从零开发,不涉及老库老表。

1.2 launch-pad/jobs/(不做业务迁移,仅建立新结构)

重要澄清:老项目 launch-pad/ 中的业务代码与新项目业务完全无关(属于上个项目的历史业务),不存在内容迁移launch-pad/ 在重构期间作为样板 SQL 代码的参考(看写法、看 ${dt} 用法、看 DataX ini 结构),新项目所有业务 SQL 从零开发完成后,launch-pad/ 整体删除。

jobs/ 目录按数仓分层 × 业务域二维组织,结构见 00-项目架构.md §5 的样板。业务域代码统一使用命名规范定义的 trd/usr/prd/shp/pub/dim(见 21-命名规范.md §3.2)。

需要处理的代码引用

  • 脚本中 list_files('launch-pad/...') 的硬编码路径 → 改为 jobs/...
  • zip -qr launch-pad.zip launch-pad 类命令(如有)→ 改为 jobs
  • DolphinScheduler 工作流里老路径的引用 → 新项目上线时一并替换

二、消除硬编码(高优先级)

2.1 当前硬编码清单

硬编码内容 所在位置 建议方案
DATAX_HOME=/opt/module/datax bin/common/init.sh 移入 conf/env.sh 或环境变量
PYTHON3_PATH="/usr/bin/python3" bin/common/init.sh 移入 conf/env.sh
RELEASE_USER="alvis" bin/common/init.sh 改为 RELEASE_USER="bigdata" 并移入 conf/env.sh
RELEASE_ROOT_DIR="/home/alvis/release" init.sh__init__.py 改为 /home/bigdata/release 并移入 conf/env.sh
项目部署目录 poyee-data-warehouse/ publish.sh 新项目发布目录为 /home/bigdata/release/poyee-data-warehouse/
DATAX_WORKERS=(m3 d1 d2 d3 d4) + DATAX_WORKERS_WEIGHTS 权重 map init.sh:18-31(含展开 DATAX_WORKERS_QUEUE 的循环) workers 列表 + 权重 map 整体移入 conf/workers.conf(ini 或 yaml 格式),init.sh 仅保留读取 + 展开逻辑
HADOOP_CONF_DIR='/etc/hadoop/conf' __init__.py 使用系统环境变量
LOG_ROOT_DIR="/opt/data/log" + whoami 分流 init.sh__init__.py 删除 whoami 分支,单值改为 ${HOME}/log 并迁入 conf/env.sh,见 §7.2.1
告警 Webhook(钉钉 / 企微 Key) dw_base/common/alerter_constants.py(老告警模块已于 2026-04-20 删除,含 dingtalk_notifier.py / ent_interface_dingtalk* / bin/dingtalk-work-alert.sh 新告警模块重写时 Webhook Key 外移到 conf/alerter.ini入库——部署靠 git pull,gitignore 会拉不到;webhook key 不算高敏感,最多被拿去发垃圾消息),Python 侧改 ConfigParser 加载;alerter_constants.py 整个删除;新项目不再使用钉钉
Spark 默认参数(executor/driver/shuffle/sql.*) dw_base/spark/spark_sql.py 构造函数 + .config(...) 移入 conf/spark-defaults.yaml,SQL 文件可用 SET 覆盖,见 §2.3
DataX ini 路径前缀剥离 conf/datax/config/ bin/datax-single-job-starter.sh(TEMP 处理)、bin/datax-job-config-generator.pyreplace('conf/datax/config/', ''))、bin/datax-multiple-job-starter.sh(日志路径派生) 原目录已整体挪到 conf/bak/ 并 gitignore,脚本里 replace 现在是 no-op 死逻辑。去除前缀假设,改为靠 ini 文件名(= 任务唯一标识,见 21-命名规范.md §3.9)识别用途
DataX 生成 JSON 输出目录名 conf/datax/generated bin/datax-job-config-generator.py 末尾 default_output_dirbin/datax-single-job-starter.sh 第 89/118 行、bin/datax-multiple-job-starter.sh 第 187 行、.gitignore 目录改名 conf/datax-json/;子路径扁平化为 conf/datax-json/{env}/{ini_basename}.json(仅按 env 分一级,去掉 src_dst / project_layer_env 等派生层级);.gitignore 同步改
JOB_NAME / JSON 文件名的 ini→json 转换逻辑重复实现 Python 侧 bin/datax-job-config-generator.py:126os.path.basename(gcf).replace('.ini', '.json'))+ Bash 侧 bin/datax-single-job-starter.sh:88basename .ini 合一到 dw_base.datax.path_utils.job_name_from_ini()(或类似工具);Bash 侧通过 python3 -c 调用或在 bin/common/init.sh 定义等价 shell 函数,单一来源
DataX 脚本不支持 -env 参数 bin/datax-*.sh / bin/datax-job-config-generator.py 参数解析 新增 -env 参数,由 ini 内 dataSource = {db_type}/{instance} 拼接成 datasource/{db_type}/{env}/{instance}.ini 的完整路径;详见 §2.5
datasource/ 单层组织(无环境子目录) datasource/{db_type}/{instance}.ini 改为 datasource/{db_type}/{env}/{instance}.ini(运维侧权限隔离 + 支持一套代码跑多环境)
ini 里 dataSource 字段拼接环境后缀 老项目写法 dataSource = pg-hobby-prod 改为 dataSource = {db_type}/{instance}(不含环境),env 由脚本注入
导出类 ini 扇出撞名风险 jobs/ads/{域}/ 下 ini 若都以源 Hive 表名命名,同一张 ads 表扇出到多个目标库时会重名覆盖 命名规则改为 {源 Hive 表名}__{目标 db_type}_{目标 instance}.ini(双下划线分隔源/目标),见 21-命名规范.md §3.9
dw_base/common/template_constants.py 大量死代码 定义了 20+ 个 SQL 模板路径常量,实际只有 2 个(MYSQL_HIVE_CREATE_TABLE_TEMPLATE / MYSQL_HIVE_HBASE_CREATE_TABLE_TEMPLATE)被引用,其余 18 个零 import 整个文件删除;连带废弃下一条
MySQLReader.generate_hive_ddl() / generate_hive_over_hbase_ddl() 自动建表 DDL 路径 dw_base/datax/plugins/reader/mysql_reader.py:195/222,被 bin/datax-gc-generator.py:616/728 调用;且 conf/template/ 目录在新项目根本不存在,真调用会 FileNotFoundError 整段路径废弃——与 CLAUDE.md 约定的 manual/ddl/ 是 DDL 唯一来源相冲突。datax-gc-generator.py 仅生成 ini 配置,不再输出 CREATE TABLE DDL;DDL 由开发者按 21-命名规范.md 手写到 manual/ddl/
缺少集中的开发者参考模板目录 —(新增) 已建 conf/templates/{datasource,datax/{raw,ads,manual},sql,ddl}/,模板用 *.template.{ini,sql} 双扩展名。与上条废弃的运行时模板完全不同:这里的模板不被任何代码读取,只供开发者对照写新文件;kb/README.md 已加入口

2.2 建议的配置结构

conf/
├── env.sh                    # Shell 环境变量(路径、用户、日志目录等)
├── env.py                    # Python 环境变量(或直接读 env.sh)
├── workers.conf              # DataX Worker 列表与权重
├── alerter.conf              # 告警 Webhook 配置(敏感项,可 .gitignore)
├── spark-defaults.yaml       # Spark 默认参数
└── ds/
    ├── base_config.yaml      # DolphinScheduler 配置
    └── process_code.yaml     # 工作流编码映射

2.3 Spark 配置三级覆盖策略

现状dw_base/spark/spark_sql.py 构造函数里硬编码了约 15 个 .config(...) 调用(executor/driver/memory/parallelism/shuffle/adaptive/arrow/codegen 等),默认值写死在构造参数里,覆盖只能通过 SparkSQL 构造函数传参或 SQL 文件内 SET

问题

  • 想批量调整 shuffle partitions 的默认值,就得改代码 + 发版
  • 不同类型的作业(dwd 大宽表 / ads 小聚合)需要不同默认,现状只能每张表的 SQL 开头都重复写一遍 SET
  • 默认参数和业务代码耦合,不便于运维按集群负载动态调整

目标态:三级覆盖

conf/spark-defaults.yaml         (L1) 全局默认,运维可改,发版同步到集群
        ↓ 被覆盖
SQL 文件内 SET spark.xxx=yyy     (L2) 单作业级别的覆盖,业务开发写
        ↓ 被覆盖
命令行 -sc key=value / Python 构造函数传参  (L3) 临时/调试 override

conf/spark-defaults.yaml 草案

# 全局 Spark 默认参数,dw_base/spark/spark_sql.py 启动时加载
# 单作业需要覆盖时,在对应 jobs/*.sql 文件开头写 SET;不要改本文件

executor:
  instances: 4
  cores: 4
  memory: 8g
  memoryOverhead: 2g

driver:
  cores: 2
  memory: 4g
  maxResultSize: 2g

sql:
  shuffle.partitions: 200
  adaptive.enabled: true
  broadcastTimeout: -1
  codegen.wholeStage: false
  execution.arrow.enabled: true
  execution.arrow.fallback.enabled: true
  files.ignoreCorruptFiles: true
  statistics.fallBackToHdfs: true

default:
  parallelism: 400

代码改动要点

  1. dw_base/spark/spark_sql.py
    • 新增 _load_default_config() -> dict:读 conf/spark-defaults.yaml,扁平化为 {"spark.executor.instances": 4, ...} 形式(dot-notation 按 yaml 嵌套路径拼)
    • 构造函数接收的显式参数(spark_executor_cores 等)改为 None 默认,若未传则 fall back 到 yaml
    • SparkSession.builder.config(...) 链改成 for k, v in resolved_config.items(): builder.config(k, v)
  2. SQL 文件内的 SET spark.xxx=yyy 本来就由 spark.sql(...) 原生支持,无需改动
  3. 命令行 -sc 参数保持现有语义,覆盖 L1
  4. Python 单测要能跑:yaml 读取要容错(测试环境下找不到 conf 文件时回退到一套最小内置默认,不阻塞 tests/unit/

兼容性:老代码里已在写 SparkSQL(spark_executor_cores=8, ...) 的调用站点不破坏,因为显式传参仍是最高级(L3)。

落地时的两个坑

  1. L2 覆盖只对 spark.sql.* 系参数生效。Spark 的参数分两类:
    • spark.sql.*spark.default.parallelism 等运行时参数 —— spark.conf.set(...) 或 SQL 内 SET 可动态改写
    • spark.executor.*spark.driver.*spark.executor.memoryOverhead 等资源类参数 —— session 启动时锁定,SQL 里写 SET spark.executor.memory=16g 不会真的扩容已启动的 executor

因此开发写 SQL 内 SET 时只能调 spark.sql.* 和并行度;需要改资源的场景只能走 L3(命令行 -sc 或调用方在构造 SparkSQL(...) 时显式传参)。文档里和 spark-defaults.yaml 注释里都要讲清楚这条,避免开发以为 SET spark.executor.memory 有效。

  1. conf/spark-defaults.yaml 的路径解析依赖 PROJECT_ROOT_PATH,这和 §三 __init__.py 瘦身存在先后依赖:
    • 现状 PROJECT_ROOT_PATHdw_base/__init__.py 顶部定义,import dw_base 就会拿到
    • 瘦身后 __init__.py 只保留最基本路径定义,PROJECT_ROOT_PATH 仍可用,但拆分过程中要保证 spark_sql.py 加载 yaml 的那行代码拿到的根路径与瘦身前一致
    • 执行顺序建议:先做 §三 __init__.py 瘦身,把 PROJECT_ROOT_PATH 的定义稳定下来;再做 §2.3 的 spark-defaults.yaml 接入。反过来做会踩到"瘦身后路径变了"的返工

与仓库改名的联动

仓库改名 tendata-warehouse-releasepoyee-data-warehouse 时(阶段 1 尾声),.idea/tendata-warehouse-release.iml 也要改名为 .idea/poyee-data-warehouse.iml,并同步更新 .idea/modules.xml 里的引用。这一步不属于 .gitignore 的范畴,但和它是同一天会碰到的事,在阶段 1 的仓库改名 checklist 里一起记一笔。

2.5 DataX 脚本多环境支持与路径解耦

现状(脚本残留老逻辑,新项目的业务 ini 未走这条路径):

  1. 脚本里残留路径前缀剥离bin/datax-single-job-starter.shbin/datax-job-config-generator.py 仍通过剥离 conf/datax/config/ 前缀从源 ini 路径里派生 SRC_DST / PROJECT_LAYER_ENV,用于生成 JSON 输出路径和 datax-multiple-job-starter.sh 的日志目录。该目录已整体挪到 conf/bak/ 并 gitignore,新项目 ini 放在 jobs/raw/{domain}/ / jobs/ads/{domain}/ / manual/,前缀不匹配时剥离变成 no-op,输出会落到形如 conf/datax/generated/jobs/raw/trd/xxx.json 的位置——能跑但与新约定不符。代码里这段死逻辑要清理,同时输出根目录也一并改名为 conf/datax-json/
  2. 没有 -env 参数:所有脚本不认 -env
  3. 数据源配置单层组织:老约定 datasource/{db_type}/{instance}.ini 把环境和实例扁平混在一起,要么靠不同的 {instance} 名字(如 hobby-prod / hobby-dev)区分,要么靠部署时替换文件。第一种让 ini 里写 dataSource = pg-hobby-prod 这种绑死环境的字符串;第二种让开发侧不知道当前跑的是哪套。

目标态:一套代码多环境运行

三件事联动(顺序重要):

阶段 1:datasource 改按环境分子目录

  • 改为 datasource/{db_type}/{env}/{instance}.ini
  • 运维在集群侧按 prod / test / dev 建子目录,各自放一套连接配置
  • 权限隔离更方便:prod 子目录只给生产账号可读

阶段 2:ini 里 dataSource 字段去掉环境后缀

  • 老写法:dataSource = pg-hobby-prod
  • 新写法:dataSource = pg/hobby(只写 {db_type}/{instance},不含 env)
  • 变更项目下所有存量 ini:当前 conf/bak/ 下的老配置不处理,等业务新 ini 从零写起时就按新规范

阶段 3:DataX 脚本加 -env 参数 + env 注入逻辑

bin/common/init.sh 扩展:

# 从命令行参数里挑出 -env,或 fall back 到 conf/env.sh
# 不读环境变量(用户明确要求不污染 shell 环境)
# 不做"按用户/部署目录自动派生"——env 必须来自 -env 或 conf/env.sh,二者都没给就退出
resolve_env() {
  for arg in "$@"; do
    case $arg in
      -env) NEXT_IS_ENV=1 ;;
      -env=*) DW_ENV="${arg#*=}" ;;
      *) [ -n "$NEXT_IS_ENV" ] && DW_ENV="$arg" && unset NEXT_IS_ENV ;;
    esac
  done
  if [ -z "$DW_ENV" ]; then
    # fall back 到 conf/env.sh(入仓库的默认配置)
    [ -f "${BASE_DIR}/conf/env.sh" ] && . "${BASE_DIR}/conf/env.sh"
  fi
  if [ -z "$DW_ENV" ]; then
    echo "[FATAL] DW_ENV not set: pass -env <dev|test|prod> or define DW_ENV in conf/env.sh" >&2
    exit 1
  fi
  export DW_ENV
}

conf/env.sh 草案入仓库,开发者维护,服务本地调试):

# 全局环境变量默认值
# env 判定优先级:命令行 -env > 本文件 DW_ENV
# 默认锁定为 dev:本地调试开箱即用;DolphinScheduler / 生产脚本总是命令行显式 -env prod 覆盖
DW_ENV=dev
LOG_ROOT_DIR="${HOME}/log"

dw_base/datax/job_config_generator.py 改造

  • JobConfigGenerator.__init__ 接受新参数 env
  • 各 reader/writer 插件在解析 dataSource = pg/hobby 时,自动拼成 datasource/pg/{env}/hobby.ini 的完整路径再去读连接信息

bin/datax-job-config-generator.py 改造

  • 新增 -env 参数
  • 去掉 temp = os.path.dirname(gcf).replace('conf/datax/config/', '').split('/') 这段路径前缀剥离
  • JSON 输出目录从 conf/datax/generated/ 改名为 conf/datax-json/,子路径简化为 conf/datax-json/{env}/{ini_basename}.json(扁平,按 env 分一级;ini_basename21-命名规范.md §3.9 保证全局唯一——采集类 = 目标 Hive 表名、导出类 = {源}__{db_type}_{instance}、一次性 = 日期前缀)
  • .gitignore 同步把 conf/datax/generated 改成 conf/datax-json

阶段 4:启动脚本层串起来

  • datax-single-job-starter.sh 调用 generator 时把 $DW_ENV 透传过去
  • datax-multiple-job-starter.sh 日志目录改为 ${LOG_ROOT_DIR}/datax/${DW_ENV}/${START_DATE}/${JOB_NAME}.log(把 SRC_DST / PROJECT_LAYER_ENV 彻底移除)

兼容性:阶段 1-3 期间 conf/bak/ 下的老 ini 不参与新流程,保留作为老项目历史资产;新业务 ini 全部从零按新规范写。

参考:kb/00-项目架构.md §4.3(DataX 脚本详细使用)、§6.4(多环境机制)、kb/21-命名规范.md §3.9(DataX ini 文件命名)。

2.6 DataX 入口脚本收口为两条命令(中优先级)

现状:DataX 入口分散,学习成本高:

  • bin/datax-single-job-starter.sh — 单 ini 执行
  • bin/datax-multiple-job-starter.sh — 批量 + 按 start-date/stop-date 范围展开
  • bin/datax-multiple-hive-job-starter.sh — Hive 表批量(语义与上者重合)
  • bin/datax-single-hdfs-job-starter.sh(如有残留) — 单次 HDFS 导出
  • bin/datax-job-config-generator.py — ini → json 翻译器(内部工具)
  • bin/datax-gc-generator.py — ini 元生成器(详见 §2.7)

目标态:顶层收成两个命令,每个命令内部吃 single / batch 两种输入形态;底层的 json 翻译 / worker 选择 / 日志路径由公共模块承担,调用方不感知。

顶层命令 语义 关键参数
bin/datax-import(命名待确认) 导入到 Hive(目标侧带分区管理) -ini <file> 单 ini · -inis <dir> 批量 · -dt <yyyymmdd> 指定分区 · -start-date / -stop-date 日期范围展开 · -skip-exist 默认开,已存在分区跳过 · -force-overwrite 强制覆盖 · -skip-partitions <csv> 手动跳过特定分区 · -env <dev\|test\|prod>
bin/datax-export(命名待确认) 从 Hive/HDFS 导出到外部系统(源侧带路径探测) -ini / -inis 同上 · -src-check(默认 fail-fast)· -skip-missing 源路径缺失时跳过不报错 · -dt / -start-date / -stop-date · -env

实现建议

  1. 把老脚本 worker 选择、日志路径、json 翻译提到 Python 模块 dw_base/datax/entry.py,两个 sh 只做参数解析 + 调用
  2. 分区检查:datax-import 在执行前 SHOW PARTITIONS 目标表 → 命中则按 -skip-exist / -force-overwrite 决策;datax-export 在执行前 hdfs dfs -test -e <源路径> → 不存在按 -src-check / -skip-missing 决策
  3. -inis 的批量展开规则:传目录则递归扫 .ini 文件,传文件列表(jobs.list)则读文件每行一个 ini
  4. 老脚本 datax-single-job-starter.sh / datax-multiple-*-starter.sh 在两个新命令稳定后整体删除,保留一期转发封装作为兼容

第三条命令 datax-gc-generator(ini 元生成器)独立保留:用户已确认。职责是"从 PG 扫 schema 生成 ini 参考模板",和"执行 ini"不是一回事,不收口到上面两条里。详见 §2.7。

2.7 datax-gc-generator 从零重写(中优先级)

现状(凭查证 2026-04-18):bin/datax-gc-generator.py 支持 from ∈ {mysql, hdfs} × to ∈ {elasticsearch, hbase, hdfs, kafka, mongo, mysql},覆盖面大、代码沉重,且:

  • 全项目没有任何其他模块 import 或 shell 调用它(仅 3 处 SparkSQL('datax-gc-generator') 是自己设置 app name)
  • 内部走 MySQLHandler + MySQLDataSource + convert_mysql_column_types + MySQLReader.generate_hive_ddl() 一条链,与 §2.1 已登记废弃的"自动 DDL 生成"方向相冲突
  • 新项目新业务只需要 from=pg to=hdfs 一条路径,其他组合全是老项目遗产

定位参考模板生成器,不是"一键出可用 ini"。产物是开发者人工调整的起点 —— 常见修改包括字段剪裁(只同步用到的列)、WHERE 过滤条件、hivePartitions 配置、大表拆分策略等。开发者 diff 参考模板和自己的需求,改完再把成品 ini 提交到 jobs/raw/{domain}/

方向:整个文件废弃 + 从零重写(凭记忆:未完全定稿,待真正开始写代码时再细化)

重写目标

  • 仅支持 from=pg to=hdfs
  • 读 PG 表结构(information_schema.columns + pg_catalog.pg_description 取字段注释)
  • 输出到开发者本地的 workspace/{yyyymmdd}/{name}.iniworkspace/.gitignore 排除,不入仓),作为参考模板(原始全字段 + 默认分区 + 默认 where=1=1);开发者裁剪后再把最终 ini 提交到 jobs/raw/{domain}/
  • 不再生成 DDL:DDL 统一由开发者按 21-命名规范.md 手写到 manual/ddl/{layer}/{domain}/(CLAUDE.md 单一来源约束,与 §2.1 已登记项一致)

目录约定

  • workspace/ 在仓库根,仅存在于开发者本地,整个目录进 .gitignore
  • workspace/{yyyymmdd}/ 按运行日期分子目录,便于开发者看"我今天生成了哪些候选"
  • manual/imports/{yyyymmdd}/ 的分工:manual/imports/ 放一次性执行的 SQL / ini(会入仓做审计证据,执行完归档),workspace/ 放自动化工具未经人工确认的中间产物(永不入仓)

拆除清单(重写时连带删除):

  • dw_base/database/mysql_utils.pylist_tables / list_columns 方法(只服务老 generator)
  • dw_base/datax/datasources/mysql_data_source.py
  • dw_base/datax/plugins/reader/mysql_reader.pygenerate_hive_ddl / generate_hive_over_hbase_ddl 方法
  • dw_base/datax/datax_utils.pyconvert_mysql_column_types
  • 所有 mongo / kafka / hbase writer 在 generator 里的分支

注意:以上删除范围与 dw_base/datax/plugins/ 里仍在被真实采集任务调用的 reader/writer 不冲突 —— 真实采集任务只用到 HDFSReader / HDFSWriter / MongoWriter(如果还有 mongo 采集任务)。删之前要用 grep -r "from dw_base.datax.plugins.reader.mysql_reader" 再确认一次。

2.8 HDFS 数据源 ini 支持 HA nameservice(Path B 锁定

结论(凭 2026-04-18 新 CDH 环境实测):DataX hdfswriter 在新环境下连 HA 集群必须在 json 里显式提供 hadoopConfig 块,这是 §2.8 改造的动机。

为什么 HADOOP_CONF_DIR 对 DataX 无效(实测三连)

测试 json 内容 HADOOP_CONF_DIR 结果
1 defaultFS + 完整 hadoopConfig 未设置 ✅ 成功
2 只有 defaultFS 未设置 ❌ UnknownHostException: nameservice1
3 只有 defaultFS /etc/hadoop/conf ❌ 仍 UnknownHostException

hadoop fs -ls 命令能解析 nameservice1,是因为 hadoop shell 脚本把 $HADOOP_CONF_DIR 加入了 Java classpathhdfs-site.xml 才能被 Configuration 自动加载)。datax.py 启动 Java 时不做这件事,classpath 里不含 /etc/hadoop/conf,所以 DataX 的 Hadoop Configuration 对象是"干净的",只能靠 json hadoopConfig 块显式注入 HA 参数。

(老项目在上一个公司/服务器只写 defaultFS 也能跑通 HA,最可能的原因是运维把 hdfs-site.xml 塞进了 DataX 的 classpath 目录 —— 比如 datax/conf/ / datax/lib/ / 某 plugin 的 libs/。新环境 /opt/datax 下没有这类预置文件,不走这条路。)

老的 env 设置是死代码dw_base/__init__.py:16os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf' 对 DataX JVM 子进程无影响 —— 一是 classpath 不含 conf 目录(见上);二是 DataX 由 datax-single-job-starter.sh 通过 python3 datax.py 启动,并未 import dw_base。这行已在本次 §2.8 一并清理。


正确的 json 形态

"writer": {
  "name": "hdfswriter",
  "parameter": {
    "defaultFS": "hdfs://nameservice1",
    "hadoopConfig": {
      "dfs.nameservices": "nameservice1",
      "dfs.ha.namenodes.nameservice1": "nn1,nn2",
      "dfs.namenode.rpc-address.nameservice1.nn1": "192.168.33.61:8020",
      "dfs.namenode.rpc-address.nameservice1.nn2": "192.168.33.62:8020",
      "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    },
    ...
  }
}

ini 新格式

非 HA / 单 NN(dev / test 环境)

[base]
defaultFS = hdfs://192.168.33.61:8020

HA / nameservice(prod 环境)

[base]
defaultFS = hdfs://nameservice1

[hadoop_config]
dfs.nameservices = nameservice1
dfs.ha.namenodes.nameservice1 = nn1,nn2
dfs.namenode.rpc-address.nameservice1.nn1 = 192.168.33.61:8020
dfs.namenode.rpc-address.nameservice1.nn2 = 192.168.33.62:8020
dfs.client.failover.proxy.provider.nameservice1 = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

设计要点

  • [hadoop_config] 节的 key 原样照搬 /etc/hadoop/conf/hdfs-site.xml,不做缩写 / 翻译 —— 运维直接复制粘贴,出了问题可以字段级对比,零心智成本
  • [hadoop_config]可选:不写即视为非 HA,代码不注入 hadoopConfig,生成的 json 只有 defaultFS,与单 NN 环境兼容
  • 将来若要支持多 nameservice(federation),[hadoop_config] 节天然兼容(多写几组 key 即可),无需再改 ini schema

代码改造清单

文件 改动
dw_base/datax/datasources/hdfs_data_source.py 覆写 get_datasource_dict():父类逻辑外,检测 [hadoop_config] 节是否存在;存在则把整节作为 dict 塞进 ds_dict['hadoopConfig']
dw_base/datax/plugins/plugin.py 不用改 —— load_data_source()for key, value in ds_dict.items() 里,value 是 dict 时走 Python 原生赋值,json 序列化自然成立
dw_base/datax/plugins/reader/hdfs_reader.py / writer/hdfs_writer.py 不用改defaultFS + hadoopConfigload_data_source() 自动注入到 parameter
dw_base/__init__.py:16 删除 os.environ['HADOOP_CONF_DIR'] 死代码(实测对 DataX JVM 无影响)
bin/datax-gc-generator.py §2.7 从零重写时一并处理,这里不单独列
datasource/hdfs/{env}/*.ini 按新 schema 新建(prod 带 [hadoop_config];dev/test 只写 [base] defaultFS

回归测试

  • prod HA 集群:新 ini 跑一次真实 PG→HDFS 任务,生成 json 含完整 hadoopConfig,任务成功(2026-04-18 已用手写 json 预验证)
  • NameNode 主备切换期间跑一次,DataX 自动切到 standby(HA 的原始动机)
  • dev / test 单 NN 集群:不写 [hadoop_config],生成 json 只含 defaultFS,任务成功

2.9 DataX 速率控制外移到 conf(中优先级)

现状dw_base/datax/job_config_generator.py:60-67 硬编码分时调度:

local_time = int(datetime_utils.formatted_now('%H%M'))
if 750 < local_time < 1900:
    speed = self.get_speed(10, byte=10485760, record=40000)          # 白天 10 channel × 10 MB/s
    core_speed = self.get_core_speed(byte=10485760, record=40000)
else:
    speed = self.get_speed()                                          # 夜间 6 channel × 256 MB/s
    core_speed = self.get_core_speed()

问题

  • 白天 / 夜间的边界(0750、1900)、channel 数、单 channel 速率全写死在代码里,想调一个值要改 py 源码重新发布
  • 不同业务域 / 不同表的速率诉求可能不同,现在一刀切
  • 生产偶发突发(白天要抢刷一次全量)没有临时放大速率的口子
  • 硬编码的动机在代码里一字没留(为什么 0750-1900?避开业务高峰?避开实时链路?后人读代码猜不到)

目标:把速率配置抽到 conf/datax-speed.confJobConfigGenerator 运行时读;默认值 = 现硬编码值,保持向后兼容。

conf 格式(ini,按时间段分段,自上而下匹配,未命中走 [default]):

; conf/datax-speed.conf
; 速率档位定义:从上到下依次匹配,第一条命中的为准;无匹配走 [default]

[daytime]
hours = 07:50-19:00
channel = 10
byte_per_channel = 10485760       ; 10 MB/s/channel,避开业务高峰
record_per_channel = 40000

[default]
channel = 6
byte_per_channel = 268435456      ; 256 MB/s/channel,夜间放满
record_per_channel = 100000

代码改造

  • JobConfigGenerator.__init__()conf/datax-speed.conf
  • 用当前时间在各段 hours 里找匹配,取 channel / byte_per_channel / record_per_channel
  • assemble() 里的分支逻辑改为 speed = self.resolve_speed_profile() 一行
  • conf/datax-speed.conf 时在文件头注释里把"白天是为了避开业务高峰"的动机写清楚(消除默认值的来历盲区)

扩展方向(P2,先不做):

  • 支持按 ini 名(业务表)在 conf 里覆盖特定任务的速率
  • 支持命令行 -speed fast / -speed slow 手动切档(突发高峰 / 限流时用)

三、__init__.py 瘦身(高优先级)

现状: tendata/__init__.py 约 120 行,import 即执行以下操作:

  • 环境变量设置
  • 颜色常量定义(30+ 个)
  • findspark.init()
  • 用户/权限/路径检测 + 打印
  • cow_says() 调用 shell

问题:

  • 任何 from dw_base import xxx 都会触发全部初始化
  • 不在 Spark 节点上运行的脚本也被迫执行 findspark.init()
  • 影响单元测试(测试 UDF 函数也要初始化 Spark 环境)

建议拆分为:

# dw_base/__init__.py —— 仅做最基本的路径定义
PROJECT_ROOT_PATH = ...
PROJECT_NAME = ...

# dw_base/core/env.py —— 环境检测(延迟调用)
# dw_base/core/colors.py —— 颜色常量
# dw_base/core/spark_env.py —— findspark 初始化(按需 import)

四、代码风格修正(中优先级)

4.1 __contains__ 反模式

全项目大量使用:

if config.__contains__(key):       # 反模式
if self.REGISTERED_UDF.__contains__(name):

应改为:

if key in config:                  # Pythonic
if name in self.REGISTERED_UDF:

4.2 Shell / Python 重复逻辑

bin/common/init.shdw_base/__init__.py 有大量重复的环境检测逻辑(用户判断、路径判断、日志目录、颜色常量)。

建议: 统一由 Python 入口处理,Shell 脚本仅做最小化的环境设置后调用 Python。或提取为一份共享的配置文件。

4.3 SQL 注入风险

mysql_utils.py 中使用 f-string 拼接 SQL:

sql = "... WHERE TABLE_SCHEMA='%s' ..." % (database, table_name)

建议: 改用参数化查询。

五、清理废弃代码(中优先级)

模块/文件 状态 建议
dw_base/validation/__init__.py 空文件 删除或实现数据质量校验
dw_base/ml/a.py 空文件 删除
dw_base/flink/__init__.py 空文件 删除(除非计划使用 Flink)
dw_base/elasticsearch/__init__.py 空文件 删除
dw_base/database/mongodb_utils.py 约 80% 是注释掉的旧代码 清理注释
conf/datax/ 下全部内容 已废弃的旧配置 保留少量样例,其余删除

六、测试体系搭建(中优先级)

6.1 现状

  • 仅 UDF 有少量 pytest 测试
  • 核心模块(SparkSQL、DataX 配置生成)无测试
  • 无 CI/CD 集成

6.2 建议的测试结构

tests/
├── conftest.py                    # pytest 公共 fixtures
├── unit/
│   ├── test_udf_common.py         # 通用 UDF 单测(纯函数,不依赖 Spark)
│   ├── test_udf_business.py       # 业务 UDF 单测(如有,按文件组织)
│   ├── test_config_utils.py       # 工具函数单测
│   ├── test_datetime_utils.py
│   ├── test_sql_utils.py
│   └── test_datax_generator.py    # DataX ini→json 生成测试
├── integration/
│   ├── test_spark_sql.py          # SparkSession local[*] 模式集成测试
│   └── test_hive_utils.py
└── quality/
    └── test_data_quality.py       # 数据质量校验(行数、空值率、主键唯一性)

6.3 测试策略

  • UDF 单测:纯 Python 函数,直接 assert,不需要 Spark 环境
  • DataX 配置生成测试:给定 ini 文件,断言生成的 JSON 结构正确
  • Spark 集成测试:使用 local[*] + 内存 Hive(enableHiveSupport() 需要 Hive MetaStore,可用嵌入式 Derby)
  • 数据质量:在 DolphinScheduler 工作流中加入校验节点

七、其他建议

7.1 依赖管理(已精简)

状态:2026-04-15 已完成首轮审计与精简。老清单 48 行 → 新清单 10 个强依赖,详见根目录 requirements.txt;原始快照备份在 requirements.txt.bak,并在注释里给每一行打了 [KEEP/DROP/LAZY/STDLIB] 结论。

精简策略

分类 处理方式 代表包
强依赖(KEEP) 留在 requirements.txt pyspark / pandas / pymongo / PyMySQL / requests / PyYAML / findspark / python-dateutil / wheel / pytest
无引用(DROP) 直接移除 openvino / transformers / scikit-learn / scipy / numpy / Flask / matplotlib / lxml / SQLAlchemy / jieba / cpca / openpyxl / xlrd / 等 20+ 个
stdlib 冗余(STDLIB) 移除 backport configparser —— Python 3 标准库自带,backport 安装反而会覆盖 stdlib
弱依赖(LAZY) 不写进 requirements,用到时手动 pip install elasticsearch / pyhive / redis / cryptography / oss2 / fuzzywuzzy / pygeohash / pypinyin —— 都只被即将清理的老业务代码引用

后续事项

  • LAZY 类依赖关联的老代码:get_oldmongo_* / mg2es/ / ent_interface_dingtalk* 于 2026-04-20 第一批提前清理;同日第二批清理 dw_base/oss/ 整目录、dw_base/scheduler/ 整目录(含 polling_scheduler / drop_partitions / drop_daily_full_snapshot_tbls)、dw_base/hive/ 整目录、dw_base/utils/ 7 文件(data_distinct / diff_utils / excel_to_hive_utils / hive_diff_database / hive_to_excel_utils / pdt_check_table*);剩余 customs/similarity.py 等在阶段 4 / 阶段 5 一并清理
  • 不需要 requirements-base.txt / requirements-dev.txt 分文件——当前依赖规模下单文件已经足够
  • pyspark==2.4.0 固定(对齐 CDH 6.3.2 parcel 自带版本):pip install 把 pyspark 装进解释器 site-packages 解决 PyCharm 远程解释器静态索引红线 + 本地 pytest;运行时 findspark.init() 指向集群 $SPARK_HOME/python/,两条链路同版本不冲突

7.2 日志改进

  • pretty_print() 混合了控制台输出和文件写入,职责不清
  • Logging 类定义了但很少使用
  • 建议:统一使用 Python logging 模块,配置 handler 实现控制台+文件双输出

7.2.1 日志路径按 whoami 分流的硬编码逻辑

现状: bin/common/init.shdw_base/__init__.py 硬编码 RELEASE_USER="alvis",并按 whoami 是否等于该用户分流日志目录:

if [ "$(whoami)" = "${RELEASE_USER}" ]; then
    LOG_ROOT_DIR="/opt/data/log"     # release 用户走系统日志目录
else
    LOG_ROOT_DIR="${HOME}/data/log"  # 其他用户走自己家目录
fi

方向:删除 whoami 分流,统一落 ${HOME}/log/{module}/{dt}/{file}.log

  • release 用户 bigdata$HOME = /home/bigdata,日志落 /home/bigdata/log/{module}/{dt}/{file}.log
  • 个人调试用户:$HOME = 各自家目录,日志落 /home/{user}/log/{module}/{dt}/{file}.log
  • $HOME 本身就按用户隔离,无需代码再判断 whoami

为什么去掉 /opt/data/log 这条路:原来 release 用户走系统级 /opt/data/log 的理由是"生产日志不应混在个人 home",但 bigdata 本身就是专属调度账号,它的 $HOME 就是生产日志的合法归宿,不需要再多开一条系统目录。路径统一后,权限 / 轮转 / 清理策略只需按一套做。

为什么保留 LOG_ROOT_DIRconf/env.sh:虽然默认值只有 ${HOME}/log 一条,但仍作为单一默认值外配到 conf/env.sh,保留后期改路径的口子(比如某天运维要求共享一块专用盘,改一处即可,无需改代码)。

为什么改目的地形态为 {module}/{dt}/{file}.log

  • 当前老结构 /opt/data/log/datax/20260418/xxx.log 已按 {module}/{dt}/ 分,但不是所有入口都遵守(spark、ds 等散落在各自子结构下)
  • 新结构强制三级 {module}/{dt}/{file}.log,便于按天归档 + 按模块清理
  • {module} 取值:datax / spark / ds / csv / export 等顶层入口名

代码改动:

  1. 删除 whoami == RELEASE_USER 分支逻辑,LOG_ROOT_DIR 单值从 conf/env.sh 读,默认 ${HOME}/log
  2. 日志文件路径拼接统一走一个工具函数 log_path(module, dt, file)(Python 和 Shell 各一份),避免入口脚本各自拼
  3. RELEASE_USER 作为单一来源定义在 conf/env.sh,与 publish.sh 共用(日志路径已不依赖它,但 publish.sh 仍要)

7.3 部署改进

  • publish.sh 使用 re-all 命令(自定义的 SSH 分发脚本)全量同步
  • 建议:考虑引入版本化部署(tag + 软链接切换),便于回滚

7.4 DataX 限速逻辑

job_config_generator.py 中根据时间段(7:50-19:00 白天限速,其余时间放开)动态设置 DataX 传输速率:

if 750 < local_time < 1900:
    speed = self.get_speed(10, byte=10485760, record=40000)   # 白天低速
else:
    speed = self.get_speed()                                   # 夜间高速

建议:将时间段和速率配置化,避免硬编码。

7.5 Spark / HMS 侧 Ranger Hive 策略验证(低优先级)

背景02-权限与账号.md §1 链路 B 原先假设 HMS 侧也挂载 Ranger Hive Plugin,从而 PySpark 直连 HMS 的库/表/列操作仍受 Ranger Hive 管辖。但在 CDH 6.3.2 默认部署中,Ranger Hive Plugin 只挂载在 HiveServer2,HMS 服务端未必部署;若 HMS 未挂,PySpark / spark-submit 的 SQL 层授权会绕过 Ranger Hive,仅剩 NameNode 上的 Ranger HDFS Plugin 做数据平面兜底 —— 与同版本 Impala 的"无 Ranger 细粒度授权,只能 HDFS 兜底"规律一致。

代码侧查证已完成:仓库内 dw_base/spark/spark_sql.py:167 等 8 处 .enableHiveSupport(),Spark 确实走 HMS 读元数据,不走 HS2,因此是否受 Ranger Hive 管辖完全看集群侧 HMS 是否挂插件。

集群侧验证动作(待做)

  1. 登 HiveMetastore 节点,查 hive-site.xml:有无 hive.metastore.pre.event.listeners = org.apache.ranger.authorization.hive.authorizer.RangerHiveMetastoreAuthorizer
  2. 查 HMS 进程 classpath 里是否包含 ranger-hive-plugin-*.jar
  3. 从开发环境跑一条故意触发 column 级禁权的 PySpark SQL,看是否被拒 —— 端到端佐证
  4. 结论回填到 02-权限与账号.md §1 链路 B 关键点段 + 关键点段里的 mermaid Note

后续处理:若 HMS 未挂 Ranger Hive,调研补挂成本 + 评估现有 HDFS 兜底是否足够(大部分数仓读写场景下足够,因为 PySpark 任务绝大多数以受控 Unix 账号提交、权限粒度粗即可;若要满足敏感列屏蔽类需求则必须补挂)。

八、重构优先级排序

阶段 任务 优先级
P0 模块重命名 tendata→dw_base、launch-pad→jobs
P0 清理所有业务代码(launch-pad 中保留的样本)
P1 硬编码提取到 conf/
P1 __init__.py 瘦身,拆分初始化逻辑
P1 敏感信息(Webhook token 等)移出代码
P2 __contains__in 全局替换
P2 删除废弃空模块和注释代码
P2 搭建 tests/ 基础框架 + UDF 单测
P2 精简 requirements.txt
P3 日志模块统一
P3 SQL 注入修复
P3 部署脚本改进
P3 Spark/HMS 侧 Ranger Hive 策略验证(见 §7.5)