00-项目架构.md 23 KB

项目架构

poyee-data-warehouse 数据仓库工程的模块划分、执行时序与配置管理。

项目状态:重构中,目前采用原地渐进式重构模式。

1. 目录结构(更新中)

poyee-data-warehouse/              # 项目根目录(仓库名 = 部署名)
├── bin/                           # 启动脚本层:Shell + Python 入口脚本
│   ├── common/                    #   公共 Shell 函数与初始化
│   ├── spark-sql-starter.py       #   Spark SQL 执行入口
│   ├── datax-single-job-starter.sh#   DataX 单任务启动
│   ├── datax-multiple-job-starter.sh # DataX 批量任务启动
│   ├── datax-job-config-generator.py # ini→json 配置生成
│   └── ...
├── jobs/                          # 业务代码层,定时调度执行
│   ├── raw/                       #   原始数据采集(DataX ini 或 CSV 导入任务定义)
│   ├── ods/                       #   贴源层计算 SQL(类型转换、脏数据识别)
│   ├── dim/                       #   维度层计算 SQL(公共维度,贯穿 dwd/dws/tdm/ads)
│   ├── dwd/                       #   明细层计算 SQL
│   ├── dws/                       #   汇总层计算 SQL
│   ├── tdm/                       #   主题域模型层计算 SQL
│   ├── ads/                       #   应用层计算 SQL + 导出 ini
│   └── archive/                   #   已弃用的过期脚本归档
├── manual/                        # 一次性脚本(禁止接入定时调度)
│   ├── ddl/                       #   所有 DDL(初始 CREATE + 后续 ALTER),唯一来源;按 {layer}/{domain}/ 分子目录
│   ├── backfill/                  #   历史数据回刷
│   ├── fix/                       #   线上脏数据订正(必须带工单号)
│   ├── adhoc/                     #   临时取数 / 排查
│   ├── imports/{yyyymmdd}/        #   一次性入仓(硬盘、历史 dump、外部 CSV),按执行日期归档
│   └── exports/{yyyymmdd}/        #   一次性出仓任务,按执行日期归档
├── dw_base/                       # 通用库层
│   ├── __init__.py                #   全局初始化(环境检测、用户/权限判断、颜色常量)
│   ├── common/                    #   常量、容器(alerter / config / template 常量)
│   ├── spark/                     #   SparkSQL 引擎(Session 管理、UDF 注册、SQL 执行、数据导出)
│   ├── udf/                       #   UDF 库(common 通用 + business 业务专用)
│   ├── datax/                     #   DataX 配置生成引擎(ini→json),含 datasources/ + plugins/
│   ├── utils/                     #   通用工具(参数解析、日期、文件、日志、SQL 解析、字符串等)
│   ├── io/                        #   (占位)I/O 边界:db / file / hdfs 跨进程读写
│   ├── ops/                       #   (占位)仓内数据运维(小文件合并、分区清理)
│   ├── dq/                        #   (占位)数据质量检查(schema drift、值域、关联、规模)
│   ├── pm/                        #   (占位)项目管理工具集成(TAPD / Jira API)
│   └── wiki/                      #   (占位)外部文档同步(Docmost → kb/inbox)
├── kb/                            # 知识库:项目文档
├── conf/                          # 配置层(非敏感配置入库:alerter.ini / workers.ini / datax-speed.ini / spark-defaults.conf 等)
├── tests/                         # 测试:unit/ 纯函数单测 + integration/ Spark datax 集成测试
├── publish.sh                     # 集群部署脚本
├── requirements.txt               # Python 依赖
└── README.md

项目同级目录(运维维护,不入仓库):

/home/bigdata/release/
├── poyee-data-warehouse/          # 本项目部署目录
└── datasource/                    # 数据源连接配置(含账密,由运维管理,不入仓库)
    ├── postgresql/                #   文件名约定 {env}-{实例简称}.ini;env ∈ {dev,test,prod}
    │   ├── prod-hobby.ini         #     生产 PG hobby 业务库
    │   ├── prod-hobby-ro.ini      #     同环境多实例加二次后缀(ro = 只读从库)
    │   ├── test-hobby.ini
    │   └── dev-hobby.ini
    ├── mysql/
    │   └── prod-order.ini
    ├── mongo/
    ├── hdfs/
    │   ├── prod-ha.ini            #     HA 集群
    │   └── test-single.ini        #     单 NN 集群
    ├── clickhouse/
    ├── elasticsearch/
    ├── kafka/
    ├── redis/
    └── hbase/

sync ini 里 [reader] / [writer]dataSource 字段必须带 {db_type}/ 前缀,例如 dataSource = postgresql/prod-hobbydataSource = hdfs/prod-ha。代码按首段斜杠判 db_type(= 父目录),裸名(hobby)会找不到文件。前期跨环境同步常态(test 业务库 → prod HDFS),不设全局 env 概念,每个 sync ini 显式指向各自 env 的 source ini。

3. 模块关系图

待补充

4. 执行链详解

待补充

4.3 DataX 脚本详细使用说明 (即将重构)

DataX 脚本分为执行脚本辅助工具两类,调用链如下:

datax-multiple-hive-job-starter.sh   (MySQL→Hive 专用批量入口,含自动分区管理)
        │
        ▼
datax-multiple-job-starter.sh        (通用批量入口)
        │
        ▼
datax-single-job-starter.sh          (单任务入口)
        │
        ├─► datax-job-config-generator.py   (ini→json 配置生成)
        │           │
        │           ▼
        │   dw_base/datax/job_config_generator.py(生成引擎)
        │
        ▼
python datax.py generated.json       (DataX 框架执行数据同步)

辅助工具:datax-gc-generator.py(连接源库元数据,批量生成 ini 配置文件)

.py 同名包装器datax-single-job-starter.pydatax-multiple-job-starter.pydatax-multiple-hive-job-starter.py 是对应 .sh 的薄 Python 包装,仅供本地调试,禁止在 DolphinScheduler 调度中使用。


4.3.1 datax-single-job-starter.sh —— 单任务启动

用途:启动单个 DataX 同步任务。接受已生成的 JSON 或待生成的 ini 配置。

参数

参数 必填 说明
-c <path> 二选一 DataX 作业 JSON 配置文件(绝对路径)
-gc <path> 二选一 DataX ini 配置文件(项目内相对路径或绝对路径),-c 优先
-start-date <yyyyMMdd> 开始日期,默认昨天
-stop-date <yyyyMMdd> 结束日期,默认今天
-host <hostname> 指定执行主机,优先于 -random
-random 随机选择 Worker 节点
-skip-datax 跳过实际 DataX 执行(仅生成配置)

Worker 选择逻辑select_worker()):

  1. bigdata 用户 → 强制本机执行
  2. 非发布目录下执行 → 强制本机执行
  3. 指定了 -host → 使用指定主机
  4. 指定了 -random → 从 DATAX_WORKERS_QUEUE 随机选一台
  5. 都未指定 → 本机执行

HDFS 数据检查check_data_exists()):当 JSON 配置路径包含 hdfs- 时,会自动检查 HDFS reader 路径是否存在且有数据,无数据则跳过执行。

示例

# 采集任务(raw 层 ini)
bin/datax-single-job-starter.sh -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini -start-date 20260415 -env prod

# 导出任务(ads 层 ini)
bin/datax-single-job-starter.sh -gc jobs/ads/trd/ads_trd_gmv_d_export.ini -start-date 20260415 -env prod

# 使用已生成的 JSON(跳过生成,env 已嵌入 JSON)
bin/datax-single-job-starter.sh -c /abs/path/to/generated.json

待重构项(见 90-重构路线.md §2.1 DataX 条目):

  • -env 参数目前尚未实现,现阶段切环境靠改 datasource/ 下的实际文件或 conf/env.sh(待新建)
  • bin/ 下几个 DataX 启动脚本 / 生成器里还残留 conf/datax/config/ 前缀剥离逻辑(老项目遗留;该目录已迁至 conf/bak/ 并忽略入库),新项目 ini 放在 jobs/raw/ / jobs/ads/ / manual/,这段逻辑要清理掉

4.3.2 datax-multiple-job-starter.sh —— 通用批量启动

用途:批量启动多个 DataX 任务,支持串行/并行执行。DolphinScheduler 调度的主要入口。

参数

参数 优先级 说明
-c <path> 1(最高) JSON 配置文件,可多次传入
-cd <dir> 2 JSON 配置文件目录
-gc <path> 3 ini 配置文件,可多次传入
-gcd <dir> 4(最低) ini 配置文件目录
-start-date 开始日期,默认昨天
-stop-date 结束日期,默认今天
-host 指定 Worker 节点
-random 随机选择 Worker
-parallel 并行执行(默认串行)
-skip-datax 跳过 DataX 执行

执行模式

  • 串行(默认):逐个调用 datax-single-job-starter.sh,日志实时输出(tee),结束后汇报成功/失败计数
  • 并行-parallel):后台启动所有任务,日志写入文件,仅限 bigdata 用户 + 发布主机

日志路径${LOG_ROOT_DIR}/datax/${src-dst}/${project_layer_env}/${START_DATE}/${START_DATE}-${JOB_NAME}.log

示例(目标态):

# 批量执行整个业务域下的 raw 采集 ini
bin/datax-multiple-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env prod -parallel

# 指定多个 ini 文件串行执行
bin/datax-multiple-job-starter.sh \
  -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
  -gc jobs/raw/usr/raw_usr_user_info_inc_d.ini \
  -start-date 20260415 -env prod

4.3.3 datax-multiple-hive-job-starter.sh —— 带 Hive 分区自动管理的批量启动

用途:在 datax-multiple-job-starter.sh 之上封装了 Hive 分区自动管理。任何写入 Hive 分区表的 DataX 同步作业(不限于 MySQL→Hive)都可以用它,脚本头注释里"MySQL-Hive 作业"只是历史命名。日常采集作业的主力入口

与 multiple-job-starter 的区别

  1. 自动从 ini 配置中解析 Hive 表名和分区路径(parse_ddl() 函数,grep "path =" <ini>
  2. 在执行 DataX 前自动执行 ALTER TABLE ... ADD IF NOT EXISTS PARTITION(dt=...)
  3. 支持在脚本内硬编码配置列表(partitioned_tablesgenerator_config_array 等数组),适合固定调度场景
  4. 支持 --override 参数临时覆盖脚本内硬编码配置

自动建分区只对 ini 输入生效parse_ddl() 读的是 ini 里的 path = ... 行。如果走 -jc / -jcd 传已生成的 JSON,脚本没有 ini 可解析,自动建分区不触发,此时要么改用 -t db.table 显式声明分区、要么把分区记录在脚本内 partitioned_tables 数组。

额外参数

参数 说明
--override 忽略脚本内硬编码的配置列表,只执行命令行传入的配置
-t <db.table> 显式指定需要建分区的 Hive 表,可多次传入
-skip-add-partition 跳过 Hive 分区创建
-jc / -jcd / -gc / -gcd 同 multiple-job-starter
-start-date / -stop-date 同上
-random / -parallel / -skip-datax 同上

分区解析逻辑parse_ddl()):

  1. 读取 ini 文件中 path =
  2. 检查路径是否包含 /dt=${dt}(分区标识)
  3. 从 HDFS 路径中提取 {db}.db/{table_name} → 拼接 ALTER TABLE {db}.{table} ADD IF NOT EXISTS PARTITION(dt={START_DATE})

示例(目标态):

# 执行某业务域下所有 raw 采集 ini + 自动建 Hive 分区
bin/datax-multiple-hive-job-starter.sh \
  -gcd jobs/raw/trd \
  -start-date 20260415 -env prod -parallel

# 覆盖脚本内硬编码配置,只跑指定的失败任务
bin/datax-multiple-hive-job-starter.sh --override \
  -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
  -start-date 20260415 -env prod

4.3.4 datax-job-config-generator.py —— ini→JSON 配置生成器

用途:将人类可读的 .ini 任务配置转换为 DataX 框架所需的 .json 作业配置文件。通常由 datax-single-job-starter.sh 自动调用,也可独立执行。

参数

参数 说明
-c <path> ini 配置文件路径,可多次传入或逗号分隔
-d <dir> 扫描指定目录下的 ini 文件
-r 配合 -d 使用,递归扫描子目录
-start-date 开始日期,默认昨天
-stop-date 结束日期,默认今天
-o <dir> 输出目录(默认 conf/datax/generated/

生成路径规则当前脚本残留老逻辑,待清理):脚本里仍保留 temp = os.path.dirname(gcf).replace(project_root_dir, '').replace('conf/datax/config/', '').split('/') 这段——老项目的 ini 放在 conf/datax/config/{src-dst}/{env}/ 下,前缀剥离后能派生出 src_dst / project_layer_env 拼接输出路径。新项目 ini 已经不走这条路径(conf/datax/config/ 整体挪到 conf/bak/ 并 gitignore),但脚本里的 replace 语句仍在执行一次无效剥离,输出会落到 conf/datax/generated/jobs/raw/trd/xxx.json——能跑但路径形态不符合新约定。

重构目标:去掉路径前缀剥离逻辑,输出统一扁平为 conf/datax/generated/{env}/{目标表名}.json。登记为硬编码待重构项,见 90-重构路线.md §2.1。

示例(目标态):

# 生成单个 ini 对应的 JSON
python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini -env prod

# 批量生成某业务域下所有 ini(递归)
python3 bin/datax-job-config-generator.py -d jobs/raw/trd -r -env prod

# 指定日期和输出路径
python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
  -start-date 20260415 -stop-date 20260416 -env prod -o /tmp/datax-out

4.3.5 datax-gc-generator.py —— ini 配置元生成器

此部分需要完全重构,此记录仅为重构提供思路。

用途:连接源数据库读取表结构元数据,自动生成 DataX ini 配置文件。是开发阶段的辅助工具,用于批量初始化 ini 配置,生成后通常需要人工检查和调整。

通用参数

参数 说明
--from <type> 源系统类型(mysql / hdfs,默认 mysql
--to <type> 目标系统类型(hdfs / hbase / kafka / mongo / elasticsearch / mysql,默认 hdfs
--output <dir> 生成的 ini 文件存储目录

MySQL 作为源(--from mysql)额外参数-h(主机)、-P(端口)、-u(用户)、-p(密码)、-D(数据库)、-t(指定表)、-tr(表名正则)、-e(排除表)、-er(排除表正则)、--inc-col(增量字段,默认 update_time

HDFS 作为源(--from hdfs)额外参数-d(Hive 数据库)、-t(Hive 表)、-e(排除表)、--partitioned(是否分区表)

示例

# 为 MySQL 库中所有表生成 mysql→hdfs 的 ini 配置,输出到 raw/trd 业务域
python3 bin/datax-gc-generator.py --from mysql --to hdfs \
  -h 10.0.0.1 -u reader -p xxx -D hobby_prod \
  --output jobs/raw/trd

# 只为指定表生成,排除临时表
python3 bin/datax-gc-generator.py --from mysql --to hdfs \
  -h 10.0.0.1 -u reader -p xxx -D hobby_prod \
  -tr "^order" -er "^tmp_" \
  --output jobs/raw/trd

# 为 Hive 表生成 hdfs→elasticsearch 的 ini 配置
python3 bin/datax-gc-generator.py --from hdfs --to elasticsearch \
  -d ads --partitioned \
  --output conf/datax/config/hdfs-elasticsearch/prod

安全提示:该脚本接受数据库账密作为命令行参数。生产环境中建议通过环境变量或临时文件传递敏感信息,避免密码出现在 shell history 和进程列表中。

6. 配置管理体系

6.1 配置分类

配置类型 存放位置 是否入仓库 维护角色
数据源连接(含账密) ../datasource/{db_type}/{env}-{实例简称}.ini 运维
DataX 同步任务定义 jobs/raw/ (采集) 和 jobs/ads/ (导出) 开发
Spark 默认参数 conf/spark-defaults.conf(行为/开关)+ conf/spark-tuning.conf(资源/调优) 开发
Spark 单作业覆盖 对应 jobs/*.sql 文件内 SET spark.x.y=z 开发
环境变量 / 路径 conf/env.shbin/common/init.sh + dw_base/utils/env_loader.py 消费) 开发
告警 Webhook dw_base/common/alerter_constants.py 是(待改 conf/alerter.ini,入库) 开发

6.2 Spark 参数优先级(三级覆盖)

命令行 -sc key=value / SparkSQL(...) 显式传参   (L3,最高优先级,临时 override)
        ↓ 覆盖
SQL 文件内 SET spark.x.y=z                      (L2,单作业级别,开发写)
        ↓ 覆盖
conf/spark-defaults.conf + conf/spark-tuning.conf   (L1,全局默认,大数据负责人维护)

目标态由 dw_base/spark/spark_sql.py 启动时按 Spark 原生 key value 格式加载 L1,再让 Spark 本身处理 L2/L3 的覆盖。详细改造计划见 90-重构路线.md §2.3。

6.3 DataX ini 配置格式

关键约定

  • dataSource 字段只写 {db_type}/{instance}不含环境。环境由启动脚本的 -env 参数注入
  • 新项目推荐规范见 §6.4;老项目里 dataSource = pg-hobby-prod 这种把环境拼进字符串的写法是历史遗留,重构中统一改为上述新形式

⚠️ 当前代码实现现状(2026-04-20 实测,与目标态有差距,datax-gc-generator 重写时一并对齐)

  1. dataSource 前缀必须用数据源全名dw_base/datax/plugins/plugin_factory.py:33- 拆取第 0 段作为 ds_type,与 plugin_factory.py 的类型白名单比对。白名单仅 8 个全名:postgresql / mysql / clickhouse / hdfs / hbase / kafka / mongo / elasticsearchpg-xxx / ch-xxx / mq-xxx 这种简写当前会报 DataSource type pg ... is not supported yet;上面样例里写 dataSource = pg/hobby 是目标态,当前代码未支持(split('-') 逻辑 + / 路径推导都要跟着 §6.4 改)。
  2. RDBMS reader 的 columnType 当前被完全忽略PostgreSQLReader.load_columnpostgresql_reader.py:74-76)、MySQLReaderClickHouseReader 都覆盖了基类 Plugin.load_column,只读 column(字段名列表),columnType 不解析;类型靠 JDBC 驱动的 ResultSetMetaData 返回。对应的 writer 同样只读 column只有 HDFS/HBase/Kafka 这类读写文件/非关系型存储的插件走基类 Plugin.load_columnplugin.py:63-118),此时 columnType 才生效,且字符串字段可省略(基类默认类型是 string,见 plugin.py:77)。这一条与 kb/20 §8.1 raw 层"DataX ini 不写类型映射"的约定方向一致,但底层机制是上游代码覆盖掉了,不是约定的结果。

增量/全量区分:

  • dt=19700101query={} → 全量
  • query 中含 ${start_date}/${stop_date} → 增量

6.4 多环境机制与 env 注入

原则:业务代码一套,环境差异收敛在 datasource/conf/env.sh,运行时注入。

环境集合dev / test / prod(由运维在 datasource/ 下分别维护一套实例配置)。

注入链路

启动脚本(-env prod)
        │
        ▼
ini 里 dataSource = pg/hobby
        │
        │  脚本拼接
        ▼
实际路径 datasource/pg/prod/hobby.ini
        │
        ▼
DataX Reader/Writer 建立连接

env 判定优先级(两级,不引入环境变量,避免污染 shell 历史和 CI 环境):

级别 来源 用途
L1(最高) 命令行 -env <name> 调试 / 跨环境临时切换
L2 conf/env.sh 里的 DW_ENV 默认值 入仓库的一份配置,由开发者维护。默认值通常锁死为 dev(服务本地调试方便)。DolphinScheduler / 生产脚本总是命令行显式挂 -env prod 覆盖。不做任何"按用户/目录"的自动派生

执行示例

# 跑生产环境
bin/datax-multiple-hive-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env prod -parallel

# 本地调试(通常省略 -env,走 conf/env.sh 默认值 dev)
bin/datax-single-job-starter.sh -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini -start-date 20260415

# 跑测试环境(测试 Hive 集群 + 测试后端库 + 测试服务库都在 datasource/*/test/ 下)
bin/datax-multiple-hive-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env test -parallel

7. 部署架构

集群节点:m3(master), d1, d2, d3, d4(data nodes)
部署目录:/home/bigdata/release/poyee-data-warehouse/
部署用户:bigdata
部署方式:git pull + rsync (publish.sh → re-all 分发)

日志目录:
  统一输出到 ${LOG_ROOT_DIR}/{module}/{dt}/{file}.log
  (LOG_ROOT_DIR 默认 ${HOME}/log,外配在 conf/env.sh;
   release 用户 bigdata 落 /home/bigdata/log/...,个人落各自家目录。
   老项目 whoami 分流 /opt/data/log 与 ~/data/log 已废弃,见 90-重构路线.md §7.2.1)

8. manual/ 目录执行规范

定位:一次性、非幂等的 SQL 脚本;与 jobs/ 语义完全独立,严禁接入 DolphinScheduler 定时调度

子目录职责

目录 用途
manual/ddl/ 所有 DDL(初始 CREATE + 后续 ALTER),唯一来源;内部按 {layer}/{domain}/ 分子目录
manual/backfill/ 历史数据回刷(跨日期重算)
manual/fix/ 线上脏数据订正,必须附工单号
manual/adhoc/ 临时取数、问题排查
manual/imports/{yyyymmdd}/ 一次性入仓任务(离线硬盘、历史 dump、外部 CSV 等),按执行日期归档
manual/exports/{yyyymmdd}/ 一次性出仓任务,按执行日期归档
manual/archive/ 执行完毕的历史脚本归档,保留审计痕迹

命名规则{yyyymmdd}_{层}_{域}_{简述}.sql,例如 20260414_dwd_trd_add_refund_col.sql

文件头强制注释

-- 作者:xxx
-- 日期:2026-04-14
-- 工单:TPAD-1234
-- 目的:补录 2026-Q1 的退款维度
-- 状态:[待执行 | 已执行 2026-04-14]

执行与回收

  • 执行入口复用 bin/spark-sql-starter.py,不新增脚本
  • 仅通过 DS 一次性工作流或命令行手动触发
  • fix/backfill/ 类脚本上线前必须经过 1 人以上 Review
  • 执行完成后保留 3-6 个月作为审计证据,过期移入 archive/ 或删除