00-项目架构.md 45 KB

项目架构

poyee-data-warehouse 数据仓库工程的模块划分、执行时序与配置管理。

项目状态:重构中,目前采用原地渐进式重构模式。

1. 目录结构(目标态)

poyee-data-warehouse/              # 项目根目录(仓库名 = 部署名)
├── bin/                           # 启动脚本层:Shell + Python 入口脚本
│   ├── common/                    #   公共 Shell 函数与初始化
│   ├── spark-sql-starter.py       #   Spark SQL 执行入口
│   ├── datax-single-job-starter.sh#   DataX 单任务启动
│   ├── datax-multiple-job-starter.sh # DataX 批量任务启动
│   ├── datax-job-config-generator.py # ini→json 配置生成
│   └── ...
├── jobs/                          # 业务代码层(新建,替代老项目 launch-pad/),每日调度执行
│   ├── raw/                       #   原始数据采集(DataX ini 或 CSV 导入任务定义)
│   ├── ods/                       #   贴源层计算 SQL(类型转换、脏数据识别)
│   ├── dim/                       #   维度层计算 SQL(公共维度,贯穿 dwd/dws/tdm/ads)
│   ├── dwd/                       #   明细层计算 SQL
│   ├── dws/                       #   汇总层计算 SQL
│   ├── tdm/                       #   主题域模型层计算 SQL
│   ├── ads/                       #   应用层计算 SQL + 导出 ini
│   └── archive/                   #   已执行完的历史脚本归档
├── manual/                        # 一次性脚本(禁止接入定时调度)
│   ├── ddl/                       #   所有 DDL(初始 CREATE + 后续 ALTER),唯一来源
│   ├── backfill/                  #   历史数据回刷
│   ├── fix/                       #   线上脏数据订正(必须带工单号)
│   └── adhoc/                     #   临时取数 / 排查
├── dw_base/                       # 通用库层
│   ├── __init__.py                #   全局初始化
│   ├── common/                    #   常量、容器
│   ├── spark/                     #   SparkSQL 核心 + UDF
│   ├── datax/                     #   DataX 配置生成引擎
│   ├── database/                  #   MongoDB/MySQL 工具
│   ├── scheduler/                 #   调度辅助脚本
│   ├── hive/                      #   Hive DDL 生成
│   ├── ds/                        #   DolphinScheduler API
│   ├── utils/                     #   通用工具函数
│   └── ...
├── kb/                            # 知识库:项目文档
├── conf/                          # 配置层(当前为样例,后续存放非敏感配置)
├── publish.sh                     # 集群部署脚本
├── requirements.txt               # Python 依赖
└── README.md

项目同级目录(运维维护,不入仓库):

/home/bigdata/release/
├── poyee-data-warehouse/          # 本项目部署目录
└── datasource/                    # 数据源连接配置(含账密,由运维管理)
    ├── mongo/                     #   按数据库类型划分子目录
    ├── mysql/                     #   每个子目录下存放 {实例名}.ini
    ├── hdfs/
    ├── clickhouse/
    ├── elasticsearch/
    ├── kafka/
    ├── redis/
    ├── hbase/
    └── postgresql/

2. 核心模块职责

模块 路径(新) 职责
全局初始化 dw_base/__init__.py 环境检测、颜色常量、findspark 初始化、用户/权限判断
SparkSQL 引擎 dw_base/spark/spark_sql.py SparkSession 管理、UDF 注册、SQL 执行、数据导出
Spark 快捷初始化 dw_base/spark/td_spark_init.py 类 spark-submit 风格的 Session 创建
UDF 库 dw_base/spark/udf/ 按业务线分类的 Spark 自定义函数
DataX 引擎 dw_base/datax/ ini 配置解析 → json 作业文件生成
DataX 数据源 dw_base/datax/datasources/ 各类数据源的连接参数抽象
DataX 插件 dw_base/datax/plugins/ Reader/Writer 工厂 + 各数据源实现
数据库工具 dw_base/database/ MongoDB、MySQL 原生客户端封装
调度辅助 dw_base/scheduler/ 钉钉/企微通知、轮询调度、分区清理等
Hive 工具 dw_base/hive/ DDL 生成、库表命名规则
DS 工作流 dw_base/ds/ DolphinScheduler REST API 触发工作流
通用工具 dw_base/utils/ 参数解析、日期、文件、日志、SQL 解析、字符串等

3. 模块关系图

graph TB
    subgraph 外部系统
        DS[DolphinScheduler<br/>调度系统]
        YARN[Spark on YARN]
        HIVE[Hive MetaStore]
        HDFS[HDFS]
        MONGO[(MongoDB)]
        MYSQL[(MySQL)]
        CK[(ClickHouse)]
        ES[(Elasticsearch)]
        KAFKA[Kafka]
        DORIS[(Doris)]
        REDIS[(Redis)]
        DINGTALK[钉钉/企微<br/>告警]
    end

    subgraph bin [bin/ 启动脚本]
        SPARK_STARTER[spark-sql-starter.py]
        DATAX_MULTI[datax-multiple-job-starter.sh]
        DATAX_SINGLE[datax-single-job-starter.sh]
        DATAX_GEN[datax-job-config-generator.py]
        DORIS_STARTER[doris-*-starter.py]
        INIT_SH[common/init.sh]
    end

    subgraph dw_base [dw_base/ 通用库]
        INIT_PY[__init__.py<br/>全局初始化]
        SPARK_SQL[spark/spark_sql.py<br/>SparkSQL 引擎]
        UDF[spark/udf/<br/>UDF 库]
        DATAX_ENGINE[datax/<br/>配置生成引擎]
        DATASOURCES_CODE[datax/datasources/<br/>数据源抽象]
        PLUGINS[datax/plugins/<br/>Reader/Writer]
        SCHEDULER[scheduler/<br/>调度辅助]
        DB_UTILS[database/<br/>DB 工具]
        UTILS[utils/<br/>通用工具]
        DS_API[ds/<br/>DS API]
    end

    subgraph jobs [jobs/ 按数仓分层组织]
        RAW[raw/<br/>原始数据采集 ini]
        ODS_JOBS[ods/<br/>贴源层 SQL]
        DWD_JOBS[dwd/<br/>明细层 SQL]
        DWS_JOBS[dws/<br/>汇总层 SQL]
        TDM_JOBS[tdm/<br/>主题域模型 SQL]
        ADS_JOBS[ads/<br/>应用层 SQL + 导出 ini]
    end

    subgraph external_conf [项目同级 datasource/]
        DS_INI[数据源 .ini<br/>含账密]
    end

    DS -->|触发| DATAX_MULTI
    DS -->|触发| SPARK_STARTER
    DS -->|触发| DORIS_STARTER

    DATAX_MULTI --> DATAX_SINGLE
    DATAX_SINGLE --> DATAX_GEN
    DATAX_GEN --> DATAX_ENGINE
    DATAX_ENGINE --> PLUGINS
    PLUGINS --> DATASOURCES_CODE
    DATASOURCES_CODE -->|读取| DS_INI

    SPARK_STARTER --> INIT_PY
    SPARK_STARTER --> SPARK_SQL
    SPARK_SQL --> UDF
    SPARK_SQL -->|spark.sql| YARN
    YARN --> HIVE
    YARN --> HDFS

    DATAX_SINGLE -->|python datax.py| HDFS
    DATAX_SINGLE -->|python datax.py| MONGO
    DATAX_SINGLE -->|python datax.py| MYSQL
    DATAX_SINGLE -->|python datax.py| CK
    DATAX_SINGLE -->|python datax.py| ES
    DATAX_SINGLE -->|python datax.py| KAFKA

    DORIS_STARTER --> DORIS

    SCHEDULER --> DINGTALK
    SCHEDULER --> DB_UTILS
    DB_UTILS --> MONGO
    DB_UTILS --> MYSQL
    DS_API --> DS

    DATAX_GEN -->|读取| RAW
    SPARK_STARTER -->|读取| ODS_JOBS
    SPARK_STARTER -->|读取| DWD_JOBS
    SPARK_STARTER -->|读取| DWS_JOBS
    SPARK_STARTER -->|读取| TDM_JOBS
    SPARK_STARTER -->|读取| ADS_JOBS
    DATAX_GEN -->|读取| ADS_JOBS

4. 执行链详解

4.1 Spark SQL 执行链

sequenceDiagram
    participant DS as DolphinScheduler
    participant BIN as spark-sql-starter.py
    participant INIT as dw_base/__init__.py
    participant CFG as utils/config_utils.py
    participant SQL_ENGINE as spark/spark_sql.py
    participant HIVE as Hive/YARN

    DS->>BIN: 调用 -f sql文件 -dt 20250101
    BIN->>INIT: import(环境检测 + findspark)
    BIN->>CFG: parse_args() 解析参数
    BIN->>SQL_ENGINE: SparkSQL(session_name, udf_files, ...)
    SQL_ENGINE->>SQL_ENGINE: 读取 SQL 文件,解析 SET/ADD FILE
    SQL_ENGINE->>SQL_ENGINE: zip -qr dw_base.zip dw_base
    SQL_ENGINE->>HIVE: SparkSession.builder.enableHiveSupport()
    SQL_ENGINE->>SQL_ENGINE: 注册 UDF(反射扫描函数)
    loop 按日期循环
        SQL_ENGINE->>SQL_ENGINE: 替换 ${dt} 等参数
        SQL_ENGINE->>HIVE: spark.sql(sql_statement)
        HIVE-->>SQL_ENGINE: DataFrame
        SQL_ENGINE->>SQL_ENGINE: show / 记录日志
    end

4.2 DataX 数据同步执行链

sequenceDiagram
    participant DS as DolphinScheduler
    participant MULTI as datax-multiple-job-starter.sh
    participant SINGLE as datax-single-job-starter.sh
    participant GEN as datax-job-config-generator.py
    participant ENGINE as datax/job_config_generator.py
    participant PLUGIN as plugins/plugin_factory.py
    participant DS_FILE as datasource/*.ini
    participant DATAX as DataX (python datax.py)

    DS->>MULTI: -gc ini文件 -start-date -stop-date
    MULTI->>SINGLE: 逐个/并行调用
    SINGLE->>SINGLE: select_worker() 选择执行节点
    SINGLE->>GEN: 调用生成器
    GEN->>ENGINE: JobConfigGenerator(ini, start, stop, output)
    ENGINE->>ENGINE: ConfigParser 读取 .ini
    ENGINE->>PLUGIN: PluginFactory.get_plugin(reader/writer)
    PLUGIN->>DS_FILE: 读取数据源 ini(账密等)
    PLUGIN-->>ENGINE: reader/writer JSON 片段
    ENGINE->>ENGINE: 组装完整 JSON + 限速配置
    ENGINE->>ENGINE: 写入 conf/datax/generated/*.json
    SINGLE->>DATAX: python datax.py generated.json
    DATAX->>DATAX: 执行数据同步

4.3 DataX 脚本详细使用说明

DataX 脚本分为执行脚本辅助工具两类,调用链如下:

datax-multiple-hive-job-starter.sh   (MySQL→Hive 专用批量入口,含自动分区管理)
        │
        ▼
datax-multiple-job-starter.sh        (通用批量入口)
        │
        ▼
datax-single-job-starter.sh          (单任务入口)
        │
        ├─► datax-job-config-generator.py   (ini→json 配置生成)
        │           │
        │           ▼
        │   dw_base/datax/job_config_generator.py(生成引擎)
        │
        ▼
python datax.py generated.json       (DataX 框架执行数据同步)

辅助工具:datax-gc-generator.py(连接源库元数据,批量生成 ini 配置文件)

.py 同名包装器datax-single-job-starter.pydatax-multiple-job-starter.pydatax-multiple-hive-job-starter.py 是对应 .sh 的薄 Python 包装,仅供本地调试,禁止在 DolphinScheduler 调度中使用。


4.3.1 datax-single-job-starter.sh —— 单任务启动

用途:启动单个 DataX 同步任务。接受已生成的 JSON 或待生成的 ini 配置。

参数

参数 必填 说明
-c <path> 二选一 DataX 作业 JSON 配置文件(绝对路径)
-gc <path> 二选一 DataX ini 配置文件(项目内相对路径或绝对路径),-c 优先
-start-date <yyyyMMdd> 开始日期,默认昨天
-stop-date <yyyyMMdd> 结束日期,默认今天
-host <hostname> 指定执行主机,优先于 -random
-random 随机选择 Worker 节点
-skip-datax 跳过实际 DataX 执行(仅生成配置)

Worker 选择逻辑select_worker()):

  1. bigdata 用户 → 强制本机执行
  2. 非发布目录下执行 → 强制本机执行
  3. 指定了 -host → 使用指定主机
  4. 指定了 -random → 从 DATAX_WORKERS_QUEUE 随机选一台
  5. 都未指定 → 本机执行

HDFS 数据检查check_data_exists()):当 JSON 配置路径包含 hdfs- 时,会自动检查 HDFS reader 路径是否存在且有数据,无数据则跳过执行。

示例(目标态,用 -env 切环境;命名见 21-命名规范.md §3.9):

# 采集任务(raw 层 ini)
bin/datax-single-job-starter.sh -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini -start-date 20260415 -env prod

# 导出任务(ads 层 ini)
bin/datax-single-job-starter.sh -gc jobs/ads/trd/ads_trd_gmv_d_export.ini -start-date 20260415 -env prod

# 使用已生成的 JSON(跳过生成,env 已嵌入 JSON)
bin/datax-single-job-starter.sh -c /abs/path/to/generated.json

待重构项(见 90-重构路线.md §2.1 DataX 条目):

  • -env 参数目前尚未实现,现阶段切环境靠改 datasource/ 下的实际文件或 conf/env.sh(待新建)
  • bin/ 下几个 DataX 启动脚本 / 生成器里还残留 conf/datax/config/ 前缀剥离逻辑(老项目遗留;该目录已迁至 conf/bak/ 并忽略入库),新项目 ini 放在 jobs/raw/ / jobs/ads/ / manual/,这段逻辑要清理掉

4.3.2 datax-multiple-job-starter.sh —— 通用批量启动

用途:批量启动多个 DataX 任务,支持串行/并行执行。DolphinScheduler 调度的主要入口。

参数

参数 优先级 说明
-c <path> 1(最高) JSON 配置文件,可多次传入
-cd <dir> 2 JSON 配置文件目录
-gc <path> 3 ini 配置文件,可多次传入
-gcd <dir> 4(最低) ini 配置文件目录
-start-date 开始日期,默认昨天
-stop-date 结束日期,默认今天
-host 指定 Worker 节点
-random 随机选择 Worker
-parallel 并行执行(默认串行)
-skip-datax 跳过 DataX 执行

执行模式

  • 串行(默认):逐个调用 datax-single-job-starter.sh,日志实时输出(tee),结束后汇报成功/失败计数
  • 并行-parallel):后台启动所有任务,日志写入文件,仅限 bigdata 用户 + 发布主机

日志路径${LOG_ROOT_DIR}/datax/${src-dst}/${project_layer_env}/${START_DATE}/${START_DATE}-${JOB_NAME}.log

示例(目标态):

# 批量执行整个业务域下的 raw 采集 ini
bin/datax-multiple-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env prod -parallel

# 指定多个 ini 文件串行执行
bin/datax-multiple-job-starter.sh \
  -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
  -gc jobs/raw/usr/raw_usr_user_info_inc_d.ini \
  -start-date 20260415 -env prod

4.3.3 datax-multiple-hive-job-starter.sh —— 带 Hive 分区自动管理的批量启动

用途:在 datax-multiple-job-starter.sh 之上封装了 Hive 分区自动管理。任何写入 Hive 分区表的 DataX 同步作业(不限于 MySQL→Hive)都可以用它,脚本头注释里"MySQL-Hive 作业"只是历史命名。日常采集作业的主力入口

与 multiple-job-starter 的区别

  1. 自动从 ini 配置中解析 Hive 表名和分区路径(parse_ddl() 函数,grep "path =" <ini>
  2. 在执行 DataX 前自动执行 ALTER TABLE ... ADD IF NOT EXISTS PARTITION(dt=...)
  3. 支持在脚本内硬编码配置列表(partitioned_tablesgenerator_config_array 等数组),适合固定调度场景
  4. 支持 --override 参数临时覆盖脚本内硬编码配置

自动建分区只对 ini 输入生效parse_ddl() 读的是 ini 里的 path = ... 行。如果走 -jc / -jcd 传已生成的 JSON,脚本没有 ini 可解析,自动建分区不触发,此时要么改用 -t db.table 显式声明分区、要么把分区记录在脚本内 partitioned_tables 数组。

额外参数

参数 说明
--override 忽略脚本内硬编码的配置列表,只执行命令行传入的配置
-t <db.table> 显式指定需要建分区的 Hive 表,可多次传入
-skip-add-partition 跳过 Hive 分区创建
-jc / -jcd / -gc / -gcd 同 multiple-job-starter
-start-date / -stop-date 同上
-random / -parallel / -skip-datax 同上

分区解析逻辑parse_ddl()):

  1. 读取 ini 文件中 path =
  2. 检查路径是否包含 /dt=${dt}(分区标识)
  3. 从 HDFS 路径中提取 {db}.db/{table_name} → 拼接 ALTER TABLE {db}.{table} ADD IF NOT EXISTS PARTITION(dt={START_DATE})

示例(目标态):

# 执行某业务域下所有 raw 采集 ini + 自动建 Hive 分区
bin/datax-multiple-hive-job-starter.sh \
  -gcd jobs/raw/trd \
  -start-date 20260415 -env prod -parallel

# 覆盖脚本内硬编码配置,只跑指定的失败任务
bin/datax-multiple-hive-job-starter.sh --override \
  -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
  -start-date 20260415 -env prod

4.3.4 datax-job-config-generator.py —— ini→JSON 配置生成器

用途:将人类可读的 .ini 任务配置转换为 DataX 框架所需的 .json 作业配置文件。通常由 datax-single-job-starter.sh 自动调用,也可独立执行。

参数

参数 说明
-c <path> ini 配置文件路径,可多次传入或逗号分隔
-d <dir> 扫描指定目录下的 ini 文件
-r 配合 -d 使用,递归扫描子目录
-start-date 开始日期,默认昨天
-stop-date 结束日期,默认今天
-o <dir> 输出目录(默认 conf/datax/generated/

生成路径规则当前脚本残留老逻辑,待清理):脚本里仍保留 temp = os.path.dirname(gcf).replace(project_root_dir, '').replace('conf/datax/config/', '').split('/') 这段——老项目的 ini 放在 conf/datax/config/{src-dst}/{env}/ 下,前缀剥离后能派生出 src_dst / project_layer_env 拼接输出路径。新项目 ini 已经不走这条路径(conf/datax/config/ 整体挪到 conf/bak/ 并 gitignore),但脚本里的 replace 语句仍在执行一次无效剥离,输出会落到 conf/datax/generated/jobs/raw/trd/xxx.json——能跑但路径形态不符合新约定。

重构目标:去掉路径前缀剥离逻辑,输出统一扁平为 conf/datax/generated/{env}/{目标表名}.json。登记为硬编码待重构项,见 90-重构路线.md §2.1。

示例(目标态):

# 生成单个 ini 对应的 JSON
python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini -env prod

# 批量生成某业务域下所有 ini(递归)
python3 bin/datax-job-config-generator.py -d jobs/raw/trd -r -env prod

# 指定日期和输出路径
python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
  -start-date 20260415 -stop-date 20260416 -env prod -o /tmp/datax-out

4.3.5 datax-gc-generator.py —— ini 配置元生成器

用途:连接源数据库读取表结构元数据,自动生成 DataX ini 配置文件。是开发阶段的辅助工具,用于批量初始化 ini 配置,生成后通常需要人工检查和调整。

支持的同步方向

--from --to 说明
mysql hdfs MySQL → HDFS(最常用,MySQL 同步到 Hive raw 层)
mysql hbase MySQL → HBase
hdfs hbase HDFS(Hive) → HBase
hdfs kafka HDFS(Hive) → Kafka
hdfs mongo HDFS(Hive) → MongoDB
hdfs elasticsearch HDFS(Hive) → Elasticsearch
hdfs mysql HDFS(Hive) → MySQL

通用参数

参数 说明
--from <type> 源系统类型(mysql / hdfs,默认 mysql
--to <type> 目标系统类型(hdfs / hbase / kafka / mongo / elasticsearch / mysql,默认 hdfs
--output <dir> 生成的 ini 文件存储目录

MySQL 作为源(--from mysql)额外参数-h(主机)、-P(端口)、-u(用户)、-p(密码)、-D(数据库)、-t(指定表)、-tr(表名正则)、-e(排除表)、-er(排除表正则)、--inc-col(增量字段,默认 update_time

HDFS 作为源(--from hdfs)额外参数-d(Hive 数据库)、-t(Hive 表)、-e(排除表)、--partitioned(是否分区表)

示例

# 为 MySQL 库中所有表生成 mysql→hdfs 的 ini 配置,输出到 raw/trd 业务域
python3 bin/datax-gc-generator.py --from mysql --to hdfs \
  -h 10.0.0.1 -u reader -p xxx -D hobby_prod \
  --output jobs/raw/trd

# 只为指定表生成,排除临时表
python3 bin/datax-gc-generator.py --from mysql --to hdfs \
  -h 10.0.0.1 -u reader -p xxx -D hobby_prod \
  -tr "^order" -er "^tmp_" \
  --output jobs/raw/trd

# 为 Hive 表生成 hdfs→elasticsearch 的 ini 配置
python3 bin/datax-gc-generator.py --from hdfs --to elasticsearch \
  -d ads --partitioned \
  --output conf/datax/config/hdfs-elasticsearch/prod

安全提示:该脚本接受数据库账密作为命令行参数。生产环境中建议通过环境变量或临时文件传递敏感信息,避免密码出现在 shell history 和进程列表中。

5. 数据分层架构

数仓分层与 jobs/ 目录一一对应:

                                                ┌────────────┐
┌───────────────────────────────────────────┐   │            │
│  ADS  应用层:业务指标、服务端导出宽表      │   │            │
├───────────────────────────────────────────┤   │            │
│  TDM  标签层:长表明细 + 宽表 + 人群包      │◄──┤    DIM     │
├───────────────────────────────────────────┤   │            │
│  DWS  汇总层:主题聚合、提供公共指标        │◄──┤  公共维度   │
├───────────────────────────────────────────┤   │            │
│  DWD  明细层:清洗加工 + 维度退化           │◄──┤            │
├───────────────────────────────────────────┤   │            │
│  ODS  贴源层:类型转换、脏数据识别          │   │            │
├───────────────────────────────────────────┤   └────────────┘
│  RAW  采集层:全字段 STRING,原样落盘       │
└───────────────────────────────────────────┘

数据流向:
  PG / ES ──DataX(raw)──▶ RAW ──SparkSQL(ods)──▶ ODS ──SparkSQL(dwd)──▶ DWD
  DWD ──SparkSQL(dws)──▶ DWS ──SparkSQL(tdm)──▶ TDM ──SparkSQL(ads)──▶ ADS
  ADS ──DataX / BrokerLoad──▶ 服务层(Doris / ClickHouse / ES / MongoDB)

jobs/ 目录内部组织:

每个分层目录内部按业务域代码(见 21-命名规范.md §3.2,trd/usr/prd/shp/pub/dim)建子目录,每个业务域下放置具体的 ini 或 SQL 文件。样板见 00-项目架构.md §9。

jobs/
├── raw/                          # DataX ini / CSV 导入 SQL:从源系统采集到 Hive raw 区
│   ├── trd/                      #   交易域:订单、支付、退款等源表采集
│   ├── usr/                      #   用户域:用户注册、登录、行为等源表采集
│   ├── shp/                      #   商家域
│   ├── prd/                      #   商品域
│   └── pub/                      #   公共域:平台、日历等
├── ods/                          # SQL:贴源层(类型转换、脏数据识别)
│   ├── trd/
│   └── ...
├── dim/                          # SQL:维度层(公共维度,贯穿 dwd/dws/tdm/ads)
│   ├── pub/                      #   公共维度:日历、地区、币种、汇率
│   ├── usr/                      #   用户维度
│   ├── prd/                      #   商品维度
│   └── shp/                      #   店铺维度
├── dwd/                          # SQL:明细层(清洗加工 + 维度退化)
├── dws/                          # SQL:汇总层(轻度聚合)
├── tdm/                          # SQL:主题域模型层(跨域整合、画像)
└── ads/                          # SQL + ini:应用层 + 导出到服务层

原则:同一业务(如订单)的数据在不同分层之间纵向流转,trd/ 这一业务域在 raw/ods/dwd/dws/tdm/ads 各层都会出现。

6. 配置管理体系

6.1 配置分类

配置类型 存放位置 是否入仓库 维护角色
数据源连接(含账密) ../datasource/{db_type}/{env}/{instance}.ini 运维
DataX 同步任务定义 jobs/raw/ (采集) 和 jobs/ads/ (导出) 开发
Spark 默认参数 conf/spark-defaults.yaml(目标态) / spark_sql.py 构造函数(现状) 开发
Spark 单作业覆盖 对应 jobs/*.sql 文件内 SET spark.x.y=z 开发
环境变量 / 路径 dw_base/__init__.pybin/common/init.sh 是(待改为conf) 开发
告警 Webhook dw_base/common/alerter_constants.py 是(待改为conf) 开发
DS 工作流配置 dw_base/ds/config/*.yaml 开发

6.2 Spark 参数优先级(三级覆盖)

命令行 -sc key=value / SparkSQL(...) 显式传参   (L3,最高优先级,临时 override)
        ↓ 覆盖
SQL 文件内 SET spark.x.y=z                      (L2,单作业级别,开发写)
        ↓ 覆盖
conf/spark-defaults.yaml                        (L1,全局默认,大数据负责人维护)

目标态由 dw_base/spark/spark_sql.py 启动时加载 L1,再让 Spark 本身处理 L2/L3 的覆盖。详细改造计划见 90-重构路线.md §2.3。

6.3 DataX ini 配置格式

数据源定义(datasource/*.ini):

[base]
address = mongodb://user:pass@host:port/db    # MongoDB
# 或
jdbcUrl = jdbc:mysql://host:port              # MySQL
userName = xxx
userPassword = xxx
# 或
defaultFS = hdfs://nameservice1               # HDFS

同步任务定义(jobs/raw/{domain}/{目标表名}.ini):

[reader]
dataSource = pg/hobby                         # 引用 datasource/pg/${env}/hobby.ini,${env} 运行时注入
dbName = xxx
schemaName = public
tableName = orders
column = col1,col2,...
columnType = col1:bigint,col2:date
where = update_time >= '${start_date}' AND update_time < '${stop_date}'

[writer]
dataSource = hdfs/default                     # 引用 datasource/hdfs/${env}/default.ini
column = col1,col2,...
columnType = col1:bigint,col2:date
path = /user/hive/warehouse/raw.db/raw_trd_order_pay_inc_d/dt=${dt}
fileType = orc
writeMode = truncate

关键约定

  • dataSource 字段只写 {db_type}/{instance}不含环境。环境由启动脚本的 -env 参数注入
  • 新项目推荐规范见 §6.4;老项目里 dataSource = pg-hobby-prod 这种把环境拼进字符串的写法是历史遗留,重构中统一改为上述新形式

增量/全量区分:

  • dt=19700101query={} → 全量
  • query 中含 ${start_date}/${stop_date} → 增量

6.4 多环境机制与 env 注入

原则:业务代码一套,环境差异收敛在 datasource/conf/env.sh,运行时注入。

环境集合dev / test / prod(由运维在 datasource/ 下分别维护一套实例配置)。

注入链路

启动脚本(-env prod)
        │
        ▼
ini 里 dataSource = pg/hobby
        │
        │  脚本拼接
        ▼
实际路径 datasource/pg/prod/hobby.ini
        │
        ▼
DataX Reader/Writer 建立连接

env 判定优先级(两级,不引入环境变量,避免污染 shell 历史和 CI 环境):

级别 来源 用途
L1(最高) 命令行 -env <name> 调试 / 跨环境临时切换
L2 conf/env.sh 里的 DW_ENV 默认值 入仓库的一份配置,由开发者维护。默认值通常锁死为 dev(服务本地调试方便)。DolphinScheduler / 生产脚本总是命令行显式挂 -env prod 覆盖。不做任何"按用户/目录"的自动派生

目录示例

datasource/                           # 运维维护,不入仓库
├── pg/
│   ├── prod/
│   │   ├── hobby.ini                 # 生产 PG 实例
│   │   └── crm.ini
│   ├── test/
│   │   └── hobby.ini
│   └── dev/
│       └── hobby.ini
├── mysql/
│   ├── prod/
│   └── dev/
├── mongo/
│   └── ...
└── hdfs/
    └── ...

执行示例

# 跑生产环境
bin/datax-multiple-hive-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env prod -parallel

# 本地调试(通常省略 -env,走 conf/env.sh 默认值 dev)
bin/datax-single-job-starter.sh -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini -start-date 20260415

# 跑测试环境(测试 Hive 集群 + 测试后端库 + 测试服务库都在 datasource/*/test/ 下)
bin/datax-multiple-hive-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env test -parallel

当前状态-env 参数、conf/env.shdatasource/ 的环境子目录都尚未落地,是目标态。改造清单见 90-重构路线.md §2.1。

7. 部署架构

集群节点:m3(master), d1, d2, d3, d4(data nodes)
部署目录:/home/bigdata/release/poyee-data-warehouse/
部署用户:bigdata
部署方式:git pull + rsync (publish.sh → re-all 分发)

日志目录:
  统一输出到 /opt/data/log/{module}/{dt}/{file}.log
  (老项目按 whoami 分流到 /opt/data/log 或 ~/data/log,参见 90-重构路线.md)

8. manual/ 目录执行规范

定位:一次性、非幂等的 SQL 脚本;与 jobs/ 语义完全独立,严禁接入 DolphinScheduler 定时调度

子目录职责

目录 用途
manual/ddl/ 一次性 DDL:加列、改分区、补建表
manual/backfill/ 历史数据回刷(跨日期重算)
manual/fix/ 线上脏数据订正,必须附工单号
manual/adhoc/ 临时取数、问题排查
manual/archive/ 执行完毕的历史脚本归档,保留审计痕迹

命名规则{yyyymmdd}_{层}_{域}_{简述}.sql,例如 20260414_dwd_trd_add_refund_col.sql

文件头强制注释

-- 作者:xxx
-- 日期:2026-04-14
-- 工单:TPAD-1234
-- 目的:补录 2026-Q1 的退款维度
-- 状态:[待执行 | 已执行 2026-04-14]

执行与回收

  • 执行入口复用 bin/spark-sql-starter.py,不新增脚本
  • 仅通过 DS 一次性工作流或命令行手动触发
  • fix/backfill/ 类脚本上线前必须经过 1 人以上 Review
  • 执行完成后保留 3-6 个月作为审计证据,过期移入 archive/ 或删除

9. 样板 job 结构

核心原则:DDL 与计算 SQL 物理分离,DDL 全部在 manual/ddl/ 下单一来源。

  • manual/ddl/ 存放所有 DDL(首次建表 + 后续 ALTER),采用 migration 模式:每次 DDL 操作是一个不可变文件,禁止回头改老文件
  • jobs/ 存放每日调度执行的采集 / 计算任务,只做 INSERT OVERWRITE 或数据同步,不写 CREATE TABLE

一张表的完整生命周期涉及:

  • manual/ddl/{表名}_create.sql —— 首次建表,永久保留
  • 若干 manual/ddl/{表名}_{yyyymmdd}_{描述}_change.sql —— 之后每次 ALTER,独立文件
  • jobs/{layer}/{domain}/{表名}.sql —— 每日调度的计算 SQL(不含建表)

9.1 manual/ddl/ —— DDL 唯一来源

目录组织:扁平存放,不再按 {layer}/{domain}/ 分子目录 —— 因为每张表只在 manual/ddl/ 出现一次(外加若干 ALTER 文件),扁平就够用。grep dwd_trd_ manual/ddl/ 一行命令就能列出某表的所有 DDL 历史。

manual/ddl/
├── raw_trd_order_pay_inc_d.sql                          # 首次建表(永久保留)
├── ods_trd_order_pay_inc_d.sql
├── dwd_trd_order_pay_inc_d.sql
├── ads_trd_gmv_d.sql
├── 20260520_dwd_trd_order_pay_add_refund.sql            # ALTER(执行后归档)
├── 20260612_raw_trd_legacy_order_change_partition.sql
└── archive/
    └── 20260301_old_alter.sql                           # 已归档

首次建表 DDL 文件模板manual/ddl/dwd_trd_order_pay_inc_d.sql):

-- ============================================================
-- 表名:dwd.dwd_trd_order_pay_inc_d
-- 业务含义:交易域-订单支付明细-增量-日
-- 分层:DWD
-- 更新周期:日 / 增量
-- 负责人:tianyu.chu
-- 创建日期:2026-xx-xx
-- 工单:TPAD-1001
-- 状态:已执行 2026-xx-xx
-- ============================================================

CREATE TABLE IF NOT EXISTS dwd.dwd_trd_order_pay_inc_d (
    order_id      BIGINT        COMMENT '订单ID',
    user_id       BIGINT        COMMENT '用户ID',
    shop_id       BIGINT        COMMENT '店铺ID',
    pay_amt_cny   DECIMAL(18,2) COMMENT '支付金额(人民币)',
    pay_time      TIMESTAMP     COMMENT '支付时间',
    etl_time      TIMESTAMP     COMMENT 'ETL写入时间'
) COMMENT '交易域-订单支付明细-增量-日'
PARTITIONED BY (dt STRING COMMENT '日期分区 yyyyMMdd')
STORED AS ORC;

ALTER 文件模板manual/ddl/20260520_dwd_trd_order_pay_add_refund.sql):

-- ============================================================
-- 表名:dwd.dwd_trd_order_pay_inc_d
-- 作者:tianyu.chu
-- 日期:2026-05-20
-- 工单:TPAD-1234
-- 目的:加 refund_amt_cny 字段(业务上线退款流程)
-- 回滚:ALTER TABLE dwd.dwd_trd_order_pay_inc_d REPLACE COLUMNS (...)
--       (Hive 2.x 不支持 DROP COLUMN)
-- 状态:[待执行]
-- ============================================================

ALTER TABLE dwd.dwd_trd_order_pay_inc_d
ADD COLUMNS (
    refund_amt_cny DECIMAL(18,2) COMMENT '退款金额(人民币)'
);

存储格式约定:所有分层一律 STORED AS ORC (不压缩,放弃一部分磁盘换 CPU、查询速度与 debug 友好度 ,后期做冷热数据再考虑压缩)。

9.2 jobs/ 层 —— 每日调度的计算 SQL

文件粒度:一个 .sql 对应一张目标表。只写 INSERT OVERWRITE不写 CREATE TABLE(表由 manual/ddl/ 保证已存在)。

jobs/dwd/trd/
├── dwd_trd_order_pay_inc_d.sql
├── dwd_trd_order_refund_inc_d.sql
└── dwd_trd_shop_gmv_agg_ful_d.sql

计算 SQL 文件模板

-- ============================================================
-- 目标表:dwd.dwd_trd_order_pay_inc_d
-- DDL:manual/ddl/dwd_trd_order_pay_inc_d.sql(首次建表)
--      manual/ddl/2026*_dwd_trd_order_pay_*.sql(历次 ALTER)
-- 业务含义:交易域-订单支付明细-增量-日
-- 上游依赖:
--   - ods.ods_trd_order_pay_inc_d
--   - dim.dim_prd_product_ful_d
-- 下游使用:dws_trd_order_agg_inc_d、ads_trd_gmv_d
-- 负责人:tianyu.chu
-- 创建日期:2026-xx-xx
-- ============================================================

-- Spark 参数
SET spark.sql.shuffle.partitions=200;
SET hive.exec.dynamic.partition.mode=nonstrict;

-- 主逻辑(纯 INSERT,不含建表)
INSERT OVERWRITE TABLE dwd.dwd_trd_order_pay_inc_d PARTITION (dt='${dt}')
SELECT
    o.order_id,
    o.user_id,
    o.shop_id,
    CAST(o.pay_amt AS DECIMAL(18,2)) AS pay_amt_cny,
    CAST(o.pay_time AS TIMESTAMP)    AS pay_time,
    CURRENT_TIMESTAMP()              AS etl_time
FROM ods.ods_trd_order_pay_inc_d o
WHERE o.dt = '${dt}'
;

对需要多步 CTE 或中间结果的复杂加工,用 WITH + 最终 INSERT OVERWRITE 写在同一文件内,不拆分多文件。

9.3 raw 层(采集任务)

raw 层的 jobs/ 有两类主要任务,根据源数据形态选择:

场景 文件类型 执行器
从 MongoDB / PG / MySQL 等结构化源库同步 .ini(DataX 配置) bin/datax-single-job-starter.sh
从本地 / 外部 CSV 文件导入 .sql(含 USING csv 临时视图 + INSERT OVERWRITE bin/csv-to-hdfs-starter.py(阶段 1 实现)

raw 层数据类型约定:所有字段一律 STRING,类型转换、空值处理、脏数据识别全部下推到 ods 层。这样 raw 的同步永远不会因为类型不符而失败,保证"链路出入口"简单稳定。

CSV 导入流程

  1. 本地 CSV 文件如果较大,先 gzip 压缩
  2. bin/csv-to-hdfs-starter.py 把(压缩后的)CSV hdfs dfs -put 到 HDFS 暂存区
  3. 调用 SparkSQL 执行 jobs/raw/{域}/{表}.sql,文件内通过 USING csv OPTIONS(...) 临时视图解析 CSV,再 INSERT OVERWRITE 写入对应 raw 表
  4. 清理 HDFS 暂存文件

CSV 导入任务定义模板jobs/raw/trd/raw_trd_legacy_order_his_o.sqlCTAS 一次性导入):

-- ============================================================
-- 目标表:raw.raw_trd_legacy_order_his_o
-- 业务含义:交易域 - 历史订单 CSV 导入(一次性历史快照 his)
-- 源数据:本地路径 /data/uploads/legacy_orders/historical.csv
--        编码 UTF-8 / 带表头 / 逗号分隔 / 双引号 quote / 字段内允许换行
-- 执行器:bin/csv-to-hdfs-starter.py
-- 暂存路径:/tmp/csv_staging/raw_trd_legacy_order_his_o/
-- 负责人:tianyu.chu
-- 创建日期:2026-xx-xx
-- 状态:[待执行]
-- 备注:CTAS 语法一次性建表 + 写入,无需在 manual/ddl/ 单独写 CREATE TABLE
-- ============================================================

-- csv-to-hdfs-starter 通过文件头注释读取这两个变量并完成 gzip+put 操作
-- @local_csv     = /data/uploads/legacy_orders/historical.csv
-- @hdfs_staging  = /tmp/csv_staging/raw_trd_legacy_order_his_o/

SET spark.sql.shuffle.partitions=50;

-- 用 USING csv 临时视图解析 HDFS 暂存区的 CSV,所有字段读为 STRING
CREATE OR REPLACE TEMPORARY VIEW v_raw_trd_legacy_order_stage
USING csv
OPTIONS (
    path        '/tmp/csv_staging/raw_trd_legacy_order_his_o/',
    header      'true',
    quote       '"',
    escape      '"',
    multiLine   'true',
    inferSchema 'false'
);

-- CTAS:一次性建表 + 写入,全字段 STRING(Spark 默认从 v_stage 推断为 STRING)
-- 不需要预先在 manual/ddl/ 写 CREATE TABLE
DROP TABLE IF EXISTS raw.raw_trd_legacy_order_his_o;

CREATE TABLE raw.raw_trd_legacy_order_his_o
USING ORC
OPTIONS ('orc.compress'='NONE')
COMMENT '交易域-历史订单CSV导入(一次性,全STRING)'
AS
SELECT
    order_id,
    user_id,
    amount,
    order_time
FROM v_raw_trd_legacy_order_stage
;

CTAS vs INSERT OVERWRITE 何时用哪个

场景 推荐写法 是否需要 manual/ddl/ 文件
一次性 CSV 导入(历史回刷、单批 vendor 数据),表名 raw_xxx_his_o DROP TABLE + CREATE TABLE ... AS SELECT不分区 否,CTAS 一步搞定
每日重复的 CSV 导入(daily file drop) manual/ddl/ 先建分区表,每日 SQL 用 INSERT OVERWRITE TABLE ... PARTITION (dt='${dt}') 是,需要 manual/ddl/{表名}.sql
结构化源库同步(PG/MySQL 等) DataX ini,writeMode=truncate 或分区覆盖 是,需要 manual/ddl/{表名}.sql

his 表为什么不分区:一次性导入永不追加,分区裁剪没有意义;CTAS 也不适合直接建分区表(每次都会 DROP,无法保留历史分区)。如果业务要求"按日期看一眼",下游 ods 建分区表,用 INSERT OVERWRITE PARTITION (dt) ... DISTRIBUTE BY dt 一次性把 raw → ods 切片即可(见 §9.3.1)。

为什么 CSV 一次性导入推荐 CTAS

  • 省掉单独写 manual/ddl/{表名}.sql 的步骤,减少维护点
  • 字段类型由 SELECT 列自动确定(CSV 全 STRING 场景下 Spark 默认推断为 STRING,符合 raw 层契约)
  • DROP + CREATE 配合 IF EXISTS 是幂等的,重跑安全

为什么用 SQL 而不是 YAML 描述 CSV 任务

  • 复用 SparkSQL 现有执行链,bin/csv-to-hdfs-starter.py 只需在 bin/spark-sql-starter.py 之外加一层 gzip+put+清理的薄壳,不需要单独的 YAML 渲染器
  • USING csv OPTIONS(...) 本身就是 Spark 的声明式 CSV 读取语法,YAML 再封装一层是多余的
  • 与其他分层文件类型一致(除 raw DataX ini 外,其他都是 .sql),AI 与人都不需要切换上下文

9.3.1 raw → ods 历史回刷(his → 分区表)

典型链路raw_xxx_his_o(不分区,全 STRING)→ ods_xxx_inc_d(按 dt 分区,类型化)。一次性 SQL 跑完后,下游 dwd/dws 看到的就是和"每日增量 ods"一样形态的分区表,无需感知数据来自历史导入。

ods 表先在 manual/ddl/ods_trd_order_pay_inc_d.sql 建好(同 §9.1 模板)。然后用一次性 SQL 把 raw 的全量切片到 ods 各分区:

-- ============================================================
-- 目标表:ods.ods_trd_order_pay_inc_d
-- 上游:  raw.raw_trd_legacy_order_his_o(一次性历史 his)
-- 性质:  一次性回刷,跑完后该 SQL 移入 manual/backfill/ 归档
-- 状态:  [待执行]
-- ============================================================

SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions=5000;
SET hive.exec.max.dynamic.partitions.pernode=2000;
SET spark.sql.shuffle.partitions=400;

INSERT OVERWRITE TABLE ods.ods_trd_order_pay_inc_d PARTITION (dt)
SELECT
    CAST(order_id AS BIGINT)         AS order_id,
    CAST(user_id  AS BIGINT)         AS user_id,
    CAST(amount   AS DECIMAL(18,2))  AS pay_amt_cny,
    CAST(order_time AS TIMESTAMP)    AS pay_time,
    CURRENT_TIMESTAMP()              AS etl_time,
    SUBSTR(order_time, 1, 10)        AS dt          -- 分区列必须放最后
FROM raw.raw_trd_legacy_order_his_o
DISTRIBUTE BY dt;

为什么必须 DISTRIBUTE BY dt

  • 不加时,每个 reducer 都可能写所有分区 → 每个分区产出 shuffle.partitions 个小文件,几百日的历史会瞬间产生数万小文件,NameNode 直接告急
  • 加了之后同一 dt 的数据汇聚到一个 reducer → 每个分区只有 1 个文件,对历史回刷来说写入性能也最高
  • 如果某些 dt 数据量特别大(双十一这种),单 reducer 写不动,可改成 DISTRIBUTE BY dt, FLOOR(RAND() * 8) 把每个大 dt 切 8 份并行

回刷脚本归档:这类一次性 SQL 不放 jobs/(避免被 DS 误调度),而是放在 manual/backfill/{yyyymmdd}_{表名}_history.sql,跑完后保留 3-6 个月入档。

9.4 ads 层(SQL + 导出 ini 并存)

manual/ddl/
└── ads_trd_gmv_d.sql                  # 建表 DDL(首次建表,永久保留)

jobs/ads/trd/
├── ads_trd_gmv_d.sql                  # 每日计算产出 ads 表
└── ads_trd_gmv_d_export.ini           # 导出到 Doris/ClickHouse/MySQL 的 DataX ini

命名规则:导出 ini 文件名 = {ads 表名}_export.ini,便于一眼对应。

9.5 文件命名速查

目录 文件后缀 文件名规则 说明
manual/ddl/ .sql {目标表名}.sql(首次) 或 {yyyymmdd}_{表名}_{change}.sql(ALTER) DDL 唯一来源;首次建表用 CREATE TABLE IF NOT EXISTS,后续 ALTER 带日期前缀,执行后归档
jobs/raw/{domain}/ .ini(DataX)或 .sql(CSV 导入) {目标表名}.ini{目标表名}.sql DataX 采集或 CSV 导入任务定义
jobs/{ods\|dwd\|dws\|tdm}/{domain}/ .sql {目标表名}.sql 每日 INSERT OVERWRITE 计算
jobs/ads/{domain}/ .sql + .ini {ads 表名}.sql + {ads 表名}__{db_type}_{instance}.ini 产出 + 导出;同一张 ads 表扇出多下游时各一份 ini(见 21-命名规范.md §3.9)
manual/backfill/ .sql {yyyymmdd}_{表名}_history.sql 一次性历史回刷脚本

9.6 表结构变更流程(migration 模式)

当要给某张表加列 / 改字段时,只写新文件,不改老文件

manual/ddl/{yyyymmdd}_{表名}_{change}.sql 写 ALTER 语句(带工单号、目的、回滚方案)

  • ALTER 文件按时间前缀线性堆叠,grep dwd_trd_order_pay manual/ddl/ 即可看到该表的全部 DDL 历史,按文件名时间序回放就是表结构的完整演化
  • 真要在新环境重建这张表,按时间顺序把 manual/ddl/{表名}.sql + 所有相关 ALTER 文件依次执行即可,结果和生产一致。注意:目前没有自动化重放工具,需要人手按文件名时间序执行;未来视需要可以写一个 bin/replay-ddl.sh(当前未实现)
  • 这是数据库 migration 工具(Flyway / Alembic / Liquibase)的标准做法,已被工业界验证