poyee-data-warehouse数据仓库工程的模块划分、执行时序与配置管理。
项目状态:重构中,目前采用原地渐进式重构模式。
poyee-data-warehouse/ # 项目根目录(仓库名 = 部署名)
├── bin/ # 启动脚本层:Shell + Python 入口脚本
│ ├── common/ # 公共 Shell 函数与初始化
│ ├── spark-sql-starter.py # Spark SQL 执行入口
│ ├── datax-single-job-starter.sh# DataX 单任务启动
│ ├── datax-multiple-job-starter.sh # DataX 批量任务启动
│ ├── datax-job-config-generator.py # ini→json 配置生成
│ └── ...
├── jobs/ # 业务代码层,定时调度执行
│ ├── raw/ # 原始数据采集(DataX ini 或 CSV 导入任务定义)
│ ├── ods/ # 贴源层计算 SQL(类型转换、脏数据识别)
│ ├── dim/ # 维度层计算 SQL(公共维度,贯穿 dwd/dws/tdm/ads)
│ ├── dwd/ # 明细层计算 SQL
│ ├── dws/ # 汇总层计算 SQL
│ ├── tdm/ # 主题域模型层计算 SQL
│ ├── ads/ # 应用层计算 SQL + 导出 ini
│ └── archive/ # 已弃用的过期脚本归档
├── manual/ # 一次性脚本(禁止接入定时调度)
│ ├── ddl/ # 所有 DDL(初始 CREATE + 后续 ALTER),唯一来源;按 {layer}/{domain}/ 分子目录
│ ├── backfill/ # 历史数据回刷
│ ├── fix/ # 线上脏数据订正(必须带工单号)
│ ├── adhoc/ # 临时取数 / 排查
│ ├── imports/{yyyymmdd}/ # 一次性入仓(硬盘、历史 dump、外部 CSV),按执行日期归档
│ └── exports/{yyyymmdd}/ # 一次性出仓任务,按执行日期归档
├── dw_base/ # 通用库层
│ ├── __init__.py # 全局初始化(环境检测、用户/权限判断、颜色常量)
│ ├── common/ # 常量、容器(alerter / config / template 常量)
│ ├── spark/ # SparkSQL 引擎(Session 管理、UDF 注册、SQL 执行、数据导出)
│ ├── udf/ # UDF 库(common 通用 + business 业务专用)
│ ├── datax/ # DataX 配置生成引擎(ini→json),含 datasources/ + plugins/
│ ├── utils/ # 通用工具(参数解析、日期、文件、日志、SQL 解析、字符串等)
│ ├── io/ # (占位)I/O 边界:db / file / hdfs 跨进程读写
│ ├── ops/ # (占位)仓内数据运维(小文件合并、分区清理)
│ ├── dq/ # (占位)数据质量检查(schema drift、值域、关联、规模)
│ ├── pm/ # (占位)项目管理工具集成(TAPD / Jira API)
│ └── wiki/ # (占位)外部文档同步(Docmost → kb/inbox)
├── kb/ # 知识库:项目文档
├── conf/ # 配置层(非敏感配置入库:alerter.ini / workers.ini / datax-speed.ini / spark-defaults.conf 等)
├── tests/ # 测试:unit/ 纯函数单测 + integration/ Spark datax 集成测试
├── publish.sh # 集群部署脚本
├── requirements.txt # Python 依赖
└── README.md
项目同级目录(运维维护,不入仓库):
/home/bigdata/release/
├── poyee-data-warehouse/ # 本项目部署目录
└── datasource/ # 数据源连接配置(含账密,由运维管理,不入仓库)
├── postgresql/ # 文件名约定 {env}-{实例简称}.ini;env ∈ {dev,test,prod}
│ ├── prod-hobby.ini # 生产 PG hobby 业务库
│ ├── prod-hobby-ro.ini # 同环境多实例加二次后缀(ro = 只读从库)
│ ├── test-hobby.ini
│ └── dev-hobby.ini
├── mysql/
│ └── prod-order.ini
├── mongo/
├── hdfs/
│ ├── prod-ha.ini # HA 集群
│ └── test-single.ini # 单 NN 集群
├── clickhouse/
├── elasticsearch/
├── kafka/
├── redis/
└── hbase/
sync ini 里 [reader] / [writer] 的 dataSource 字段必须带 {db_type}/ 前缀,例如 dataSource = postgresql/prod-hobby、dataSource = hdfs/prod-ha。代码按首段斜杠判 db_type(= 父目录),裸名(hobby)会找不到文件。前期跨环境同步常态(test 业务库 → prod HDFS),不设全局 env 概念,每个 sync ini 显式指向各自 env 的 source ini。
待补充
待补充
DataX 脚本分为执行脚本和辅助工具两类,调用链如下:
datax-multiple-hive-job-starter.sh (MySQL→Hive 专用批量入口,含自动分区管理)
│
▼
datax-multiple-job-starter.sh (通用批量入口)
│
▼
datax-single-job-starter.sh (单任务入口)
│
├─► datax-job-config-generator.py (ini→json 配置生成)
│ │
│ ▼
│ dw_base/datax/job_config_generator.py(生成引擎)
│
▼
python datax.py generated.json (DataX 框架执行数据同步)
辅助工具:datax-gc-generator.py(连接源库元数据,批量生成 ini 配置文件)
.py同名包装器:datax-single-job-starter.py、datax-multiple-job-starter.py、datax-multiple-hive-job-starter.py是对应.sh的薄 Python 包装,仅供本地调试,禁止在 DolphinScheduler 调度中使用。
datax-single-job-starter.sh —— 单任务启动用途:启动单个 DataX 同步任务。接受已生成的 JSON 或待生成的 ini 配置。
参数:
| 参数 | 必填 | 说明 |
|---|---|---|
-c <path> |
二选一 | DataX 作业 JSON 配置文件(绝对路径) |
-gc <path> |
二选一 | DataX ini 配置文件(项目内相对路径或绝对路径),-c 优先 |
-start-date <yyyyMMdd> |
否 | 开始日期,默认昨天 |
-stop-date <yyyyMMdd> |
否 | 结束日期,默认今天 |
-host <hostname> |
否 | 指定执行主机,优先于 -random |
-random |
否 | 随机选择 Worker 节点 |
-skip-datax |
否 | 跳过实际 DataX 执行(仅生成配置) |
Worker 选择逻辑(select_worker()):
bigdata 用户 → 强制本机执行-host → 使用指定主机-random → 从 DATAX_WORKERS_QUEUE 随机选一台HDFS 数据检查(check_data_exists()):当 JSON 配置路径包含 hdfs- 时,会自动检查 HDFS reader 路径是否存在且有数据,无数据则跳过执行。
示例(目标态,用 -env 切环境;命名见 21-命名规范.md §3.9):
# 采集任务(raw 层 ini)
bin/datax-single-job-starter.sh -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini -start-date 20260415 -env prod
# 导出任务(ads 层 ini)
bin/datax-single-job-starter.sh -gc jobs/ads/trd/ads_trd_gmv_d_export.ini -start-date 20260415 -env prod
# 使用已生成的 JSON(跳过生成,env 已嵌入 JSON)
bin/datax-single-job-starter.sh -c /abs/path/to/generated.json
待重构项(见
90-重构路线.md§2.1 DataX 条目):
-env参数目前尚未实现,现阶段切环境靠改datasource/下的实际文件或conf/env.sh(待新建)bin/下几个 DataX 启动脚本 / 生成器里还残留conf/datax/config/前缀剥离逻辑(老项目遗留;该目录已迁至conf/bak/并忽略入库),新项目 ini 放在jobs/raw//jobs/ads//manual/,这段逻辑要清理掉
datax-multiple-job-starter.sh —— 通用批量启动用途:批量启动多个 DataX 任务,支持串行/并行执行。DolphinScheduler 调度的主要入口。
参数:
| 参数 | 优先级 | 说明 |
|---|---|---|
-c <path> |
1(最高) | JSON 配置文件,可多次传入 |
-cd <dir> |
2 | JSON 配置文件目录 |
-gc <path> |
3 | ini 配置文件,可多次传入 |
-gcd <dir> |
4(最低) | ini 配置文件目录 |
-start-date |
— | 开始日期,默认昨天 |
-stop-date |
— | 结束日期,默认今天 |
-host |
— | 指定 Worker 节点 |
-random |
— | 随机选择 Worker |
-parallel |
— | 并行执行(默认串行) |
-skip-datax |
— | 跳过 DataX 执行 |
执行模式:
datax-single-job-starter.sh,日志实时输出(tee),结束后汇报成功/失败计数-parallel):后台启动所有任务,日志写入文件,仅限 bigdata 用户 + 发布主机日志路径:${LOG_ROOT_DIR}/datax/${src-dst}/${project_layer_env}/${START_DATE}/${START_DATE}-${JOB_NAME}.log
示例(目标态):
# 批量执行整个业务域下的 raw 采集 ini
bin/datax-multiple-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env prod -parallel
# 指定多个 ini 文件串行执行
bin/datax-multiple-job-starter.sh \
-gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
-gc jobs/raw/usr/raw_usr_user_info_inc_d.ini \
-start-date 20260415 -env prod
datax-multiple-hive-job-starter.sh —— 带 Hive 分区自动管理的批量启动用途:在 datax-multiple-job-starter.sh 之上封装了 Hive 分区自动管理。任何写入 Hive 分区表的 DataX 同步作业(不限于 MySQL→Hive)都可以用它,脚本头注释里"MySQL-Hive 作业"只是历史命名。日常采集作业的主力入口。
与 multiple-job-starter 的区别:
parse_ddl() 函数,grep "path =" <ini>)ALTER TABLE ... ADD IF NOT EXISTS PARTITION(dt=...)partitioned_tables、generator_config_array 等数组),适合固定调度场景--override 参数临时覆盖脚本内硬编码配置自动建分区只对 ini 输入生效:
parse_ddl()读的是 ini 里的path = ...行。如果走-jc/-jcd传已生成的 JSON,脚本没有 ini 可解析,自动建分区不触发,此时要么改用-t db.table显式声明分区、要么把分区记录在脚本内partitioned_tables数组。
额外参数:
| 参数 | 说明 |
|---|---|
--override |
忽略脚本内硬编码的配置列表,只执行命令行传入的配置 |
-t <db.table> |
显式指定需要建分区的 Hive 表,可多次传入 |
-skip-add-partition |
跳过 Hive 分区创建 |
-jc / -jcd / -gc / -gcd |
同 multiple-job-starter |
-start-date / -stop-date |
同上 |
-random / -parallel / -skip-datax |
同上 |
分区解析逻辑(parse_ddl()):
path = 行/dt=${dt}(分区标识){db}.db/{table_name} → 拼接 ALTER TABLE {db}.{table} ADD IF NOT EXISTS PARTITION(dt={START_DATE})示例(目标态):
# 执行某业务域下所有 raw 采集 ini + 自动建 Hive 分区
bin/datax-multiple-hive-job-starter.sh \
-gcd jobs/raw/trd \
-start-date 20260415 -env prod -parallel
# 覆盖脚本内硬编码配置,只跑指定的失败任务
bin/datax-multiple-hive-job-starter.sh --override \
-gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
-start-date 20260415 -env prod
datax-job-config-generator.py —— ini→JSON 配置生成器用途:将人类可读的 .ini 任务配置转换为 DataX 框架所需的 .json 作业配置文件。通常由 datax-single-job-starter.sh 自动调用,也可独立执行。
参数:
| 参数 | 说明 |
|---|---|
-c <path> |
ini 配置文件路径,可多次传入或逗号分隔 |
-d <dir> |
扫描指定目录下的 ini 文件 |
-r |
配合 -d 使用,递归扫描子目录 |
-start-date |
开始日期,默认昨天 |
-stop-date |
结束日期,默认今天 |
-o <dir> |
输出目录(默认 conf/datax/generated/) |
生成路径规则(当前脚本残留老逻辑,待清理):脚本里仍保留 temp = os.path.dirname(gcf).replace(project_root_dir, '').replace('conf/datax/config/', '').split('/') 这段——老项目的 ini 放在 conf/datax/config/{src-dst}/{env}/ 下,前缀剥离后能派生出 src_dst / project_layer_env 拼接输出路径。新项目 ini 已经不走这条路径(conf/datax/config/ 整体挪到 conf/bak/ 并 gitignore),但脚本里的 replace 语句仍在执行一次无效剥离,输出会落到 conf/datax/generated/jobs/raw/trd/xxx.json——能跑但路径形态不符合新约定。
重构目标:去掉路径前缀剥离逻辑,输出统一扁平为 conf/datax/generated/{env}/{目标表名}.json。登记为硬编码待重构项,见 90-重构路线.md §2.1。
示例(目标态):
# 生成单个 ini 对应的 JSON
python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini -env prod
# 批量生成某业务域下所有 ini(递归)
python3 bin/datax-job-config-generator.py -d jobs/raw/trd -r -env prod
# 指定日期和输出路径
python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
-start-date 20260415 -stop-date 20260416 -env prod -o /tmp/datax-out
datax-gc-generator.py —— ini 配置元生成器此部分需要完全重构,此记录仅为重构提供思路。
用途:连接源数据库读取表结构元数据,自动生成 DataX ini 配置文件。是开发阶段的辅助工具,用于批量初始化 ini 配置,生成后通常需要人工检查和调整。
通用参数:
| 参数 | 说明 |
|---|---|
--from <type> |
源系统类型(mysql / hdfs,默认 mysql) |
--to <type> |
目标系统类型(hdfs / hbase / kafka / mongo / elasticsearch / mysql,默认 hdfs) |
--output <dir> |
生成的 ini 文件存储目录 |
MySQL 作为源(--from mysql)额外参数:-h(主机)、-P(端口)、-u(用户)、-p(密码)、-D(数据库)、-t(指定表)、-tr(表名正则)、-e(排除表)、-er(排除表正则)、--inc-col(增量字段,默认 update_time)
HDFS 作为源(--from hdfs)额外参数:-d(Hive 数据库)、-t(Hive 表)、-e(排除表)、--partitioned(是否分区表)
示例:
# 为 MySQL 库中所有表生成 mysql→hdfs 的 ini 配置,输出到 raw/trd 业务域
python3 bin/datax-gc-generator.py --from mysql --to hdfs \
-h 10.0.0.1 -u reader -p xxx -D hobby_prod \
--output jobs/raw/trd
# 只为指定表生成,排除临时表
python3 bin/datax-gc-generator.py --from mysql --to hdfs \
-h 10.0.0.1 -u reader -p xxx -D hobby_prod \
-tr "^order" -er "^tmp_" \
--output jobs/raw/trd
# 为 Hive 表生成 hdfs→elasticsearch 的 ini 配置
python3 bin/datax-gc-generator.py --from hdfs --to elasticsearch \
-d ads --partitioned \
--output conf/datax/config/hdfs-elasticsearch/prod
安全提示:该脚本接受数据库账密作为命令行参数。生产环境中建议通过环境变量或临时文件传递敏感信息,避免密码出现在 shell history 和进程列表中。
| 配置类型 | 存放位置 | 是否入仓库 | 维护角色 |
|---|---|---|---|
| 数据源连接(含账密) | ../datasource/{db_type}/{env}-{实例简称}.ini |
否 | 运维 |
| DataX 同步任务定义 | jobs/raw/ (采集) 和 jobs/ads/ (导出) |
是 | 开发 |
| Spark 默认参数 | conf/spark-defaults.conf(行为/开关)+ conf/spark-tuning.conf(资源/调优) |
是 | 开发 |
| Spark 单作业覆盖 | 对应 jobs/*.sql 文件内 SET spark.x.y=z |
是 | 开发 |
| 环境变量 / 路径 | conf/env.sh(bin/common/init.sh + dw_base/utils/env_loader.py 消费) |
是 | 开发 |
| 告警 Webhook | dw_base/common/alerter_constants.py |
是(待改 conf/alerter.ini,入库) |
开发 |
命令行 -sc key=value / SparkSQL(...) 显式传参 (L3,最高优先级,临时 override)
↓ 覆盖
SQL 文件内 SET spark.x.y=z (L2,单作业级别,开发写)
↓ 覆盖
conf/spark-defaults.conf + conf/spark-tuning.conf (L1,全局默认,大数据负责人维护)
目标态由 dw_base/spark/spark_sql.py 启动时按 Spark 原生 key value 格式加载 L1,再让 Spark 本身处理 L2/L3 的覆盖。详细改造计划见 90-重构路线.md §2.3。
关键约定:
dataSource 字段只写 {db_type}/{instance},不含环境。环境由启动脚本的 -env 参数注入dataSource = pg-hobby-prod 这种把环境拼进字符串的写法是历史遗留,重构中统一改为上述新形式⚠️ 当前代码实现现状(2026-04-20 实测,与目标态有差距,datax-gc-generator 重写时一并对齐):
dataSource 前缀必须用数据源全名,dw_base/datax/plugins/plugin_factory.py:33 按 - 拆取第 0 段作为 ds_type,与 plugin_factory.py 的类型白名单比对。白名单仅 8 个全名:postgresql / mysql / clickhouse / hdfs / hbase / kafka / mongo / elasticsearch。pg-xxx / ch-xxx / mq-xxx 这种简写当前会报 DataSource type pg ... is not supported yet;上面样例里写 dataSource = pg/hobby 是目标态,当前代码未支持(split('-') 逻辑 + / 路径推导都要跟着 §6.4 改)。columnType 当前被完全忽略:PostgreSQLReader.load_column(postgresql_reader.py:74-76)、MySQLReader、ClickHouseReader 都覆盖了基类 Plugin.load_column,只读 column(字段名列表),columnType 不解析;类型靠 JDBC 驱动的 ResultSetMetaData 返回。对应的 writer 同样只读 column。只有 HDFS/HBase/Kafka 这类读写文件/非关系型存储的插件走基类 Plugin.load_column(plugin.py:63-118),此时 columnType 才生效,且字符串字段可省略(基类默认类型是 string,见 plugin.py:77)。这一条与 kb/20 §8.1 raw 层"DataX ini 不写类型映射"的约定方向一致,但底层机制是上游代码覆盖掉了,不是约定的结果。增量/全量区分:
dt=19700101 或 query={} → 全量query 中含 ${start_date}/${stop_date} → 增量原则:业务代码一套,环境差异收敛在 datasource/ 和 conf/env.sh,运行时注入。
环境集合:dev / test / prod(由运维在 datasource/ 下分别维护一套实例配置)。
注入链路:
启动脚本(-env prod)
│
▼
ini 里 dataSource = pg/hobby
│
│ 脚本拼接
▼
实际路径 datasource/pg/prod/hobby.ini
│
▼
DataX Reader/Writer 建立连接
env 判定优先级(两级,不引入环境变量,避免污染 shell 历史和 CI 环境):
| 级别 | 来源 | 用途 |
|---|---|---|
| L1(最高) | 命令行 -env <name> |
调试 / 跨环境临时切换 |
| L2 | conf/env.sh 里的 DW_ENV 默认值 |
入仓库的一份配置,由开发者维护。默认值通常锁死为 dev(服务本地调试方便)。DolphinScheduler / 生产脚本总是命令行显式挂 -env prod 覆盖。不做任何"按用户/目录"的自动派生 |
执行示例:
# 跑生产环境
bin/datax-multiple-hive-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env prod -parallel
# 本地调试(通常省略 -env,走 conf/env.sh 默认值 dev)
bin/datax-single-job-starter.sh -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini -start-date 20260415
# 跑测试环境(测试 Hive 集群 + 测试后端库 + 测试服务库都在 datasource/*/test/ 下)
bin/datax-multiple-hive-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env test -parallel
集群节点:m3(master), d1, d2, d3, d4(data nodes)
部署目录:/home/bigdata/release/poyee-data-warehouse/
部署用户:bigdata
部署方式:git pull + rsync (publish.sh → re-all 分发)
日志目录:
统一输出到 ${LOG_ROOT_DIR}/{module}/{dt}/{file}.log
(LOG_ROOT_DIR 默认 ${HOME}/log,外配在 conf/env.sh;
release 用户 bigdata 落 /home/bigdata/log/...,个人落各自家目录。
老项目 whoami 分流 /opt/data/log 与 ~/data/log 已废弃,见 90-重构路线.md §7.2.1)
定位:一次性、非幂等的 SQL 脚本;与 jobs/ 语义完全独立,严禁接入 DolphinScheduler 定时调度。
子目录职责:
| 目录 | 用途 |
|---|---|
manual/ddl/ |
所有 DDL(初始 CREATE + 后续 ALTER),唯一来源;内部按 {layer}/{domain}/ 分子目录 |
manual/backfill/ |
历史数据回刷(跨日期重算) |
manual/fix/ |
线上脏数据订正,必须附工单号 |
manual/adhoc/ |
临时取数、问题排查 |
manual/imports/{yyyymmdd}/ |
一次性入仓任务(离线硬盘、历史 dump、外部 CSV 等),按执行日期归档 |
manual/exports/{yyyymmdd}/ |
一次性出仓任务,按执行日期归档 |
manual/archive/ |
执行完毕的历史脚本归档,保留审计痕迹 |
命名规则:{yyyymmdd}_{层}_{域}_{简述}.sql,例如 20260414_dwd_trd_add_refund_col.sql
文件头强制注释:
-- 作者:xxx
-- 日期:2026-04-14
-- 工单:TPAD-1234
-- 目的:补录 2026-Q1 的退款维度
-- 状态:[待执行 | 已执行 2026-04-14]
执行与回收:
bin/spark-sql-starter.py,不新增脚本fix/ 和 backfill/ 类脚本上线前必须经过 1 人以上 Reviewarchive/ 或删除核心原则:DDL 与计算 SQL 物理分离,DDL 全部在 manual/ddl/ 下单一来源。
manual/ddl/ 存放所有 DDL(首次建表 + 后续 ALTER),采用 migration 模式:每次 DDL 操作是一个不可变文件,禁止回头改老文件jobs/ 存放调度执行的采集 / 计算任务,只做 INSERT OVERWRITE 或数据同步,不写 CREATE TABLE一张表的完整生命周期涉及:
manual/ddl/{layer}/{domain}/{表名}_create.sql —— 首次建表,永久保留manual/ddl/{layer}/{domain}/{表名}_{yyyymmdd}_{描述}_change.sql —— 之后每次 ALTER,独立文件jobs/{layer}/{domain}/{表名}.sql 或 jobs/{layer}/{domain}/{表名}/{表名}-{NN}-{描述}.sql —— 调度执行的计算 SQL(不含建表),详见 §9.2manual/ddl/ —— DDL 唯一来源目录组织:按 {layer}/{domain}/ 分子目录。layer 代码取自 21-命名规范.md §3.1(raw/ods/dim/dwd/dws/tdm/ads),domain 代码取自 §3.2(trd/usr/prd/shp/pub)。每张目标表的首次建表 + 所有 ALTER 都落在这个子目录里,便于一眼看清某层某域的表清单。
manual/ddl/
├── raw/
│ └── trd/
│ ├── raw_trd_order_pay_inc_d_create.sql # 首次建表(永久保留)
│ └── 20260612_raw_trd_legacy_order_change_partition.sql
├── ods/
│ └── trd/
│ └── ods_trd_order_pay_inc_d_create.sql
├── dwd/
│ └── trd/
│ ├── dwd_trd_order_pay_inc_d_create.sql
│ └── 20260520_dwd_trd_order_pay_add_refund.sql # ALTER(独立文件,不改原文件)
├── ads/
│ └── trd/
│ └── ads_trd_gmv_d_create.sql
├── tmp/ # 单目标加速中间表 DDL(见 §9.2)
│ └── dwd_trd_order_pay/
│ ├── tmp_dwd_trd_order_pay_01_create.sql
│ └── tmp_dwd_trd_order_pay_02_create.sql
└── archive/
└── 20260301_old_alter.sql # 已归档
按 grep 的友好度:grep -r "CREATE TABLE.*dwd_trd_order_pay_inc_d" manual/ddl/ 仍能直接命中;分子目录带来的额外索引成本小于"一眼看到分层分域"的收益。
存储格式约定:所有分层一律 STORED AS ORC。策略详见 20-数仓分层与建模.md §7。
jobs/ 层 —— 调度执行的计算 SQL文件粒度:一张目标表对应一套 SQL 文件,按复杂度两档:
jobs/{layer}/{domain}/{表名}.sql 一个文件顶到底(单次 INSERT OVERWRITE,可带 WITH CTE)jobs/{layer}/{domain}/{表名}/{表名}-{NN}-{描述}.sql,序号三位,99 固定留给最终 INSERT OVERWRITE 目标表那一步。DS 工作流对应 N 个 task 节点按序号链式依赖所有 .sql 只写 INSERT OVERWRITE / INSERT INTO,不写 CREATE TABLE(表由 manual/ddl/ 保证已存在)。
jobs/dwd/trd/
├── dwd_trd_order_refund_inc_d.sql # 简单表,单文件
├── dwd_trd_shop_gmv_agg_ful_d.sql
└── dwd_trd_order_pay_inc_d/ # 多步表,目标表名同名子目录
├── dwd_trd_order_pay_inc_d-01-build_tmp_pay_base.sql
├── dwd_trd_order_pay_inc_d-02-build_tmp_refund_agg.sql
└── dwd_trd_order_pay_inc_d-99-insert_target.sql
什么时候从简单表升级到多步表:
| 触发条件 | 处理 |
|---|---|
| 单 SQL shuffle 过大(单作业耗时 > 30 min 且 shuffle read > 100GB) | 拆分中间结果物化为 tmp 表 |
| 同一块 CTE 在多个 WITH 节里重复扫描 | 物化后 cache 复用 |
| 复杂业务逻辑,读多源后多轮 join,需要中间落盘便于 debug / 回溯 | 拆分单步 |
| 中间结果需要被多个目标表复用 | 不用 tmp,升层为 dwd/dws 独立表 |
中间表两类,严格区分:
tmp_{目标表名}_{NN},DDL 收到 manual/ddl/tmp/{目标表名}/ 子目录。生命周期跟随本次任务,每次 INSERT OVERWRITE 覆盖或 drop+recreate,不留历史从单文件升级到子目录的操作步骤:删掉原单文件,建子目录、拆 SQL、DS 工作流拆 task 节点;manual/ddl/tmp/{目标表名}/ 同步补齐 tmp 表 DDL。一次性改完,避免半新半旧。
WITH / CTE 还是拆文件:轻量中间结果用 WITH 内联(不物化,本质还是单 SQL);重量中间结果需要物化为 tmp 表时才升级到"多步表子目录"(见本节上方触发条件表)。不要盲目把 CTE 都拆成 tmp —— shuffle 不大、不复用的 CTE 留在 WITH 里反而更清爽。
raw 层的 jobs/ 有两类主要任务,根据源数据形态选择:
| 场景 | 文件类型 | 执行器 |
|---|---|---|
| 从 MongoDB / PG / MySQL 等结构化源库同步 | .ini(DataX 配置) |
bin/datax-single-job-starter.sh |
| 从本地 / 外部 CSV 文件导入 | .sql(含 USING csv 临时视图 + INSERT OVERWRITE) |
bin/csv-to-hdfs-starter.py(阶段 1 实现) |
raw 层数据类型约定:全字段 STRING,类型转换与脏数据识别下推到 ods 层。契约详见 20-数仓分层与建模.md §8.1。
CSV 导入流程:
gzip 压缩bin/csv-to-hdfs-starter.py 把(压缩后的)CSV hdfs dfs -put 到 HDFS 暂存区jobs/raw/{域}/{表}.sql,文件内通过 USING csv OPTIONS(...) 临时视图解析 CSV,再 INSERT OVERWRITE 写入对应 raw 表raw 层写入模式对照:
| 场景 | 写法 | manual/ddl/ |
|---|---|---|
一次性 CSV 导入(历史回刷、单批 vendor 数据),表名 raw_xxx_his_o |
预建 EXTERNAL TABLE(不分区),INSERT OVERWRITE TABLE ... |
需要 |
| 每日重复的 CSV 导入(daily file drop) | 预建分区 EXTERNAL TABLE,每日 INSERT OVERWRITE TABLE ... PARTITION (dt='${dt}') |
需要 |
| 结构化源库同步(PG/MySQL 等) | DataX ini,写入预建 EXTERNAL TABLE(writeMode=truncate 或分区覆盖) |
需要 |
his 表为什么不分区:一次性导入永不追加,分区裁剪没有意义。下游 ods 再按 dt 分区,一次性切片。
为什么用 SQL 而不是 YAML 描述 CSV 任务:
SparkSQL 现有执行链,bin/csv-to-hdfs-starter.py 只需在 bin/spark-sql-starter.py 之外加一层 gzip+put+清理的薄壳,不需要单独的 YAML 渲染器USING csv OPTIONS(...) 本身就是 Spark 的声明式 CSV 读取语法,YAML 再封装一层是多余的.sql),读者不需要切换上下文manual/ddl/
└── ads_trd_gmv_d.sql # 建表 DDL(首次建表,永久保留)
jobs/ads/trd/
├── ads_trd_gmv_d.sql # 每日计算产出 ads 表
└── ads_trd_gmv_d_export.ini # 导出到 Doris/ClickHouse/MySQL 的 DataX ini
命名规则:导出 ini 文件名 = {ads 表名}_export.ini,便于一眼对应。
| 目录 | 文件后缀 | 文件名规则 | 说明 |
|---|---|---|---|
manual/ddl/{layer}/{domain}/ |
.sql |
{表名}_create.sql(首次) 或 {yyyymmdd}_{表名}_{change}.sql(ALTER) |
DDL 唯一来源;首次建表用 CREATE TABLE IF NOT EXISTS,后续 ALTER 带日期前缀 |
manual/ddl/tmp/{目标表名}/ |
.sql |
tmp_{目标表名}_{NN}_create.sql |
多步表的单目标加速中间表 DDL |
jobs/raw/{domain}/ |
.ini(DataX)或 .sql(CSV 导入) |
{目标表名}.ini 或 {目标表名}.sql |
DataX 采集或 CSV 导入任务定义 |
jobs/{ods\|dwd\|dws\|tdm}/{domain}/ |
.sql |
简单表:{目标表名}.sql;多步表:子目录 {目标表名}/{目标表名}-{NN}-{描述}.sql(99 为最终 insert) |
每日 INSERT OVERWRITE 计算,详见 §9.2 |
jobs/ads/{domain}/ |
.sql + .ini |
简单表:{ads 表名}.sql + {ads 表名}__{db_type}_{instance}.ini;多步:{ads 表名}/{ads 表名}-{NN}-{描述}.sql + 同级目录放 ini |
产出 + 导出;同一张 ads 表扇出多下游时各一份 ini(见 21-命名规范.md §3.9) |
manual/backfill/ |
.sql |
{yyyymmdd}_{表名}_history.sql |
一次性历史回刷脚本 |
manual/imports/{yyyymmdd}/ |
.ini / .sql |
{任务描述}.ini 或 .sql |
一次性入仓任务(离线硬盘、历史 dump、外部 CSV 等),按执行日期归档 |
manual/exports/{yyyymmdd}/ |
.ini |
{任务描述}.ini |
一次性出仓任务,按执行日期归档 |
当要给某张表加列 / 改字段时,只写新文件,不改老文件:
在 manual/ddl/{yyyymmdd}_{表名}_{change}.sql 写 ALTER 语句(带工单号、目的、回滚方案)
grep dwd_trd_order_pay manual/ddl/ 即可看到该表的全部 DDL 历史,按文件名时间序回放就是表结构的完整演化manual/ddl/{表名}.sql + 所有相关 ALTER 文件依次执行即可,结果和生产一致。注意:目前没有自动化重放工具,需要人手按文件名时间序执行;未来视需要可以写一个 bin/replay-ddl.sh(当前未实现)