|
|
@@ -319,10 +319,10 @@ python3 bin/datax-gc-generator.py --from hdfs --to elasticsearch \
|
|
|
|----------|---------|-----------|------|
|
|
|
| 数据源连接(含账密) | `../datasource/{db_type}/{env}/{instance}.ini` | 否 | 运维 |
|
|
|
| DataX 同步任务定义 | `jobs/raw/` (采集) 和 `jobs/ads/` (导出) | 是 | 开发 |
|
|
|
-| Spark 默认参数 | `conf/spark-defaults.conf`(目标态,Spark 原生格式) / `spark_sql.py` 构造函数(现状) | 是 | 开发 |
|
|
|
+| Spark 默认参数 | `conf/spark-defaults.conf`(行为/开关)+ `conf/spark-tuning.conf`(资源/调优) | 是 | 开发 |
|
|
|
| Spark 单作业覆盖 | 对应 `jobs/*.sql` 文件内 `SET spark.x.y=z` | 是 | 开发 |
|
|
|
-| 环境变量 / 路径 | `dw_base/__init__.py`、`bin/common/init.sh` | 是(待改为conf) | 开发 |
|
|
|
-| 告警 Webhook | `dw_base/common/alerter_constants.py` | 否(待改 `conf/alerter.ini`,入库) | 开发 |
|
|
|
+| 环境变量 / 路径 | `conf/env.sh`(`bin/common/init.sh` + `dw_base/utils/env_loader.py` 消费) | 是 | 开发 |
|
|
|
+| 告警 Webhook | `dw_base/common/alerter_constants.py` | 是(待改 `conf/alerter.ini`,入库) | 开发 |
|
|
|
|
|
|
### 6.2 Spark 参数优先级(三级覆盖)
|
|
|
|
|
|
@@ -331,50 +331,18 @@ python3 bin/datax-gc-generator.py --from hdfs --to elasticsearch \
|
|
|
↓ 覆盖
|
|
|
SQL 文件内 SET spark.x.y=z (L2,单作业级别,开发写)
|
|
|
↓ 覆盖
|
|
|
-conf/spark-defaults.conf (L1,全局默认,大数据负责人维护)
|
|
|
+conf/spark-defaults.conf + conf/spark-tuning.conf (L1,全局默认,大数据负责人维护)
|
|
|
```
|
|
|
|
|
|
**目标态由 `dw_base/spark/spark_sql.py` 启动时按 Spark 原生 `key value` 格式加载 L1,再让 Spark 本身处理 L2/L3 的覆盖**。详细改造计划见 `90-重构路线.md` §2.3。
|
|
|
|
|
|
### 6.3 DataX ini 配置格式
|
|
|
|
|
|
-**数据源定义(datasource/*.ini):**
|
|
|
-```ini
|
|
|
-[base]
|
|
|
-address = mongodb://user:pass@host:port/db # MongoDB
|
|
|
-# 或
|
|
|
-jdbcUrl = jdbc:mysql://host:port # MySQL
|
|
|
-userName = xxx
|
|
|
-userPassword = xxx
|
|
|
-# 或
|
|
|
-defaultFS = hdfs://nameservice1 # HDFS
|
|
|
-```
|
|
|
-
|
|
|
-**同步任务定义(jobs/raw/{domain}/{目标表名}.ini):**
|
|
|
-```ini
|
|
|
-[reader]
|
|
|
-dataSource = pg/hobby # 引用 datasource/pg/${env}/hobby.ini,${env} 运行时注入
|
|
|
-dbName = xxx
|
|
|
-schemaName = public
|
|
|
-tableName = orders
|
|
|
-column = col1,col2,...
|
|
|
-columnType = col1:bigint,col2:date
|
|
|
-where = update_time >= '${start_date}' AND update_time < '${stop_date}'
|
|
|
-
|
|
|
-[writer]
|
|
|
-dataSource = hdfs/default # 引用 datasource/hdfs/${env}/default.ini
|
|
|
-column = col1,col2,...
|
|
|
-columnType = col1:bigint,col2:date
|
|
|
-path = /user/hive/warehouse/raw.db/raw_trd_order_pay_inc_d/dt=${dt}
|
|
|
-fileType = orc
|
|
|
-writeMode = truncate
|
|
|
-```
|
|
|
-
|
|
|
**关键约定**:
|
|
|
- `dataSource` 字段只写 `{db_type}/{instance}`,**不含环境**。环境由启动脚本的 `-env` 参数注入
|
|
|
- 新项目推荐规范见 §6.4;老项目里 `dataSource = pg-hobby-prod` 这种把环境拼进字符串的写法是历史遗留,重构中统一改为上述新形式
|
|
|
|
|
|
-**⚠️ 当前代码实现现状(2026-04-20 实测,与上述目标态有差距,`datax-gc-generator` 重写时一并对齐)**:
|
|
|
+**⚠️ 当前代码实现现状(2026-04-20 实测,与目标态有差距,`datax-gc-generator` 重写时一并对齐)**:
|
|
|
|
|
|
1. **`dataSource` 前缀必须用数据源全名**,`dw_base/datax/plugins/plugin_factory.py:33` 按 `-` 拆取第 0 段作为 ds_type,与 `plugin_factory.py` 的类型白名单比对。白名单仅 8 个全名:`postgresql` / `mysql` / `clickhouse` / `hdfs` / `hbase` / `kafka` / `mongo` / `elasticsearch`。`pg-xxx` / `ch-xxx` / `mq-xxx` 这种简写**当前会报 `DataSource type pg ... is not supported yet`**;上面样例里写 `dataSource = pg/hobby` 是目标态,当前代码未支持(`split('-')` 逻辑 + `/` 路径推导都要跟着 §6.4 改)。
|
|
|
2. **RDBMS reader 的 `columnType` 当前被完全忽略**:`PostgreSQLReader.load_column`(`postgresql_reader.py:74-76`)、`MySQLReader`、`ClickHouseReader` 都覆盖了基类 `Plugin.load_column`,只读 `column`(字段名列表),`columnType` 不解析;类型靠 JDBC 驱动的 `ResultSetMetaData` 返回。对应的 writer 同样只读 `column`。**只有 HDFS/HBase/Kafka 这类读写文件/非关系型存储的插件**走基类 `Plugin.load_column`(`plugin.py:63-118`),此时 `columnType` 才生效,且字符串字段可省略(基类默认类型是 `string`,见 `plugin.py:77`)。这一条与 kb/20 §8.1 raw 层"DataX ini 不写类型映射"的约定方向一致,但底层机制是上游代码覆盖掉了,不是约定的结果。
|
|
|
@@ -516,74 +484,6 @@ manual/ddl/
|
|
|
|
|
|
**按 `grep` 的友好度**:`grep -r "CREATE TABLE.*dwd_trd_order_pay_inc_d" manual/ddl/` 仍能直接命中;分子目录带来的额外索引成本小于"一眼看到分层分域"的收益。
|
|
|
|
|
|
-**首次建表 DDL 文件模板**(`manual/ddl/dwd_trd_order_pay_inc_d.sql`):
|
|
|
-
|
|
|
-```sql
|
|
|
--- ============================================================
|
|
|
--- 表名:dwd.dwd_trd_order_pay_inc_d
|
|
|
--- 业务含义:交易域-订单支付明细-增量-日
|
|
|
--- 分层:DWD
|
|
|
--- 更新周期:日 / 增量
|
|
|
--- 负责人:tianyu.chu
|
|
|
--- 创建日期:2026-xx-xx
|
|
|
--- 工单:TPAD-1001
|
|
|
--- 状态:已执行 2026-xx-xx
|
|
|
--- ============================================================
|
|
|
-
|
|
|
-CREATE TABLE IF NOT EXISTS dwd.dwd_trd_order_pay_inc_d (
|
|
|
- order_id BIGINT COMMENT '订单ID',
|
|
|
- user_id BIGINT COMMENT '用户ID',
|
|
|
- shop_id BIGINT COMMENT '店铺ID',
|
|
|
- pay_amt_cny DECIMAL(18,2) COMMENT '支付金额(人民币)',
|
|
|
- pay_time TIMESTAMP COMMENT '支付时间',
|
|
|
- etl_time TIMESTAMP COMMENT 'ETL写入时间'
|
|
|
-) COMMENT '交易域-订单支付明细-增量-日'
|
|
|
-PARTITIONED BY (dt STRING COMMENT '日期分区 yyyyMMdd')
|
|
|
-STORED AS ORC;
|
|
|
-```
|
|
|
-
|
|
|
-**raw 层 EXTERNAL 建表模板**(`manual/ddl/raw/trd/raw_trd_legacy_order_his_o_create.sql`):
|
|
|
-
|
|
|
-```sql
|
|
|
--- ============================================================
|
|
|
--- 表名:raw.raw_trd_legacy_order_his_o
|
|
|
--- 业务含义:交易域-历史订单 CSV 导入(一次性 his)
|
|
|
--- 分层:RAW(全字段 STRING;EXTERNAL 兜底,详见 20-数仓分层与建模.md §8.1)
|
|
|
--- 负责人:tianyu.chu
|
|
|
--- 创建日期:2026-xx-xx
|
|
|
--- 状态:[待执行]
|
|
|
--- ============================================================
|
|
|
-
|
|
|
-CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_trd_legacy_order_his_o (
|
|
|
- order_id STRING COMMENT '订单ID(源端字面量)',
|
|
|
- user_id STRING COMMENT '用户ID(源端字面量)',
|
|
|
- amount STRING COMMENT '金额(源端字面量)',
|
|
|
- order_time STRING COMMENT '下单时间(源端字面量)'
|
|
|
-) COMMENT '交易域-历史订单CSV导入(一次性,全STRING)'
|
|
|
-STORED AS ORC
|
|
|
-LOCATION '/user/hive/warehouse/raw.db/raw_trd_legacy_order_his_o/';
|
|
|
-```
|
|
|
-
|
|
|
-**ALTER 文件模板**(`manual/ddl/20260520_dwd_trd_order_pay_add_refund.sql`):
|
|
|
-
|
|
|
-```sql
|
|
|
--- ============================================================
|
|
|
--- 表名:dwd.dwd_trd_order_pay_inc_d
|
|
|
--- 作者:tianyu.chu
|
|
|
--- 日期:2026-05-20
|
|
|
--- 工单:TPAD-1234
|
|
|
--- 目的:加 refund_amt_cny 字段(业务上线退款流程)
|
|
|
--- 回滚:ALTER TABLE dwd.dwd_trd_order_pay_inc_d REPLACE COLUMNS (...)
|
|
|
--- (Hive 2.x 不支持 DROP COLUMN)
|
|
|
--- 状态:[待执行]
|
|
|
--- ============================================================
|
|
|
-
|
|
|
-ALTER TABLE dwd.dwd_trd_order_pay_inc_d
|
|
|
-ADD COLUMNS (
|
|
|
- refund_amt_cny DECIMAL(18,2) COMMENT '退款金额(人民币)'
|
|
|
-);
|
|
|
-```
|
|
|
-
|
|
|
**存储格式约定**:所有分层一律 `STORED AS ORC`。策略详见 `20-数仓分层与建模.md` §7。
|
|
|
|
|
|
### 9.2 `jobs/` 层 —— 调度执行的计算 SQL
|
|
|
@@ -621,39 +521,6 @@ jobs/dwd/trd/
|
|
|
|
|
|
**从单文件升级到子目录的操作步骤**:删掉原单文件,建子目录、拆 SQL、DS 工作流拆 task 节点;`manual/ddl/tmp/{目标表名}/` 同步补齐 tmp 表 DDL。一次性改完,避免半新半旧。
|
|
|
|
|
|
-**计算 SQL 文件模板**:
|
|
|
-```sql
|
|
|
--- ============================================================
|
|
|
--- 目标表:dwd.dwd_trd_order_pay_inc_d
|
|
|
--- DDL:manual/ddl/dwd_trd_order_pay_inc_d.sql(首次建表)
|
|
|
--- manual/ddl/2026*_dwd_trd_order_pay_*.sql(历次 ALTER)
|
|
|
--- 业务含义:交易域-订单支付明细-增量-日
|
|
|
--- 上游依赖:
|
|
|
--- - ods.ods_trd_order_pay_inc_d
|
|
|
--- - dim.dim_prd_product_ful_d
|
|
|
--- 下游使用:dws_trd_order_agg_inc_d、ads_trd_gmv_d
|
|
|
--- 负责人:tianyu.chu
|
|
|
--- 创建日期:2026-xx-xx
|
|
|
--- ============================================================
|
|
|
-
|
|
|
--- Spark 参数
|
|
|
-SET spark.sql.shuffle.partitions=200;
|
|
|
-SET hive.exec.dynamic.partition.mode=nonstrict;
|
|
|
-
|
|
|
--- 主逻辑(纯 INSERT,不含建表)
|
|
|
-INSERT OVERWRITE TABLE dwd.dwd_trd_order_pay_inc_d PARTITION (dt='${dt}')
|
|
|
-SELECT
|
|
|
- o.order_id,
|
|
|
- o.user_id,
|
|
|
- o.shop_id,
|
|
|
- CAST(o.pay_amt AS DECIMAL(18,2)) AS pay_amt_cny,
|
|
|
- CAST(o.pay_time AS TIMESTAMP) AS pay_time,
|
|
|
- CURRENT_TIMESTAMP() AS etl_time
|
|
|
-FROM ods.ods_trd_order_pay_inc_d o
|
|
|
-WHERE o.dt = '${dt}'
|
|
|
-;
|
|
|
-```
|
|
|
-
|
|
|
**WITH / CTE 还是拆文件**:轻量中间结果用 `WITH` 内联(不物化,本质还是单 SQL);重量中间结果需要物化为 tmp 表时才升级到"多步表子目录"(见本节上方触发条件表)。不要盲目把 CTE 都拆成 tmp —— shuffle 不大、不复用的 CTE 留在 `WITH` 里反而更清爽。
|
|
|
|
|
|
### 9.3 raw 层(采集任务)
|
|
|
@@ -674,51 +541,6 @@ raw 层的 `jobs/` 有两类主要任务,根据源数据形态选择:
|
|
|
3. 调用 SparkSQL 执行 `jobs/raw/{域}/{表}.sql`,文件内通过 `USING csv OPTIONS(...)` 临时视图解析 CSV,再 `INSERT OVERWRITE` 写入对应 raw 表
|
|
|
4. 清理 HDFS 暂存文件
|
|
|
|
|
|
-**CSV 导入任务定义模板**(`jobs/raw/trd/raw_trd_legacy_order_his_o.sql`):
|
|
|
-
|
|
|
-DDL 先在 `manual/ddl/raw_trd_legacy_order_his_o.sql` 按 raw 层规范建好 `EXTERNAL TABLE`(全字段 STRING,契约详见 `20-数仓分层与建模.md` §8.1;DDL 模板见本文档 §9.1 raw 变体)。本脚本只负责"读 CSV → 写入 EXTERNAL 表的 LOCATION"。
|
|
|
-
|
|
|
-```sql
|
|
|
--- ============================================================
|
|
|
--- 目标表:raw.raw_trd_legacy_order_his_o(EXTERNAL,DDL 在 manual/ddl/ 预建)
|
|
|
--- 业务含义:交易域 - 历史订单 CSV 导入(一次性历史快照 his)
|
|
|
--- 源数据:本地路径 /data/uploads/legacy_orders/historical.csv
|
|
|
--- 编码 UTF-8 / 带表头 / 逗号分隔 / 双引号 quote / 字段内允许换行
|
|
|
--- 执行器:bin/csv-to-hdfs-starter.py
|
|
|
--- 暂存路径:/tmp/csv_staging/raw_trd_legacy_order_his_o/
|
|
|
--- 负责人:tianyu.chu
|
|
|
--- 创建日期:2026-xx-xx
|
|
|
--- 状态:[待执行]
|
|
|
--- ============================================================
|
|
|
-
|
|
|
--- csv-to-hdfs-starter 通过文件头注释读取这两个变量并完成 gzip+put 操作
|
|
|
--- @local_csv = /data/uploads/legacy_orders/historical.csv
|
|
|
--- @hdfs_staging = /tmp/csv_staging/raw_trd_legacy_order_his_o/
|
|
|
-
|
|
|
-SET spark.sql.shuffle.partitions=50;
|
|
|
-
|
|
|
--- 用 USING csv 临时视图解析 HDFS 暂存区的 CSV,所有字段读为 STRING
|
|
|
-CREATE OR REPLACE TEMPORARY VIEW v_raw_trd_legacy_order_stage
|
|
|
-USING csv
|
|
|
-OPTIONS (
|
|
|
- path '/tmp/csv_staging/raw_trd_legacy_order_his_o/',
|
|
|
- header 'true',
|
|
|
- quote '"',
|
|
|
- escape '"',
|
|
|
- multiLine 'true',
|
|
|
- inferSchema 'false'
|
|
|
-);
|
|
|
-
|
|
|
-INSERT OVERWRITE TABLE raw.raw_trd_legacy_order_his_o
|
|
|
-SELECT
|
|
|
- order_id,
|
|
|
- user_id,
|
|
|
- amount,
|
|
|
- order_time
|
|
|
-FROM v_raw_trd_legacy_order_stage
|
|
|
-;
|
|
|
-```
|
|
|
-
|
|
|
**raw 层写入模式对照**:
|
|
|
|
|
|
| 场景 | 写法 | `manual/ddl/` |
|
|
|
@@ -727,53 +549,13 @@ FROM v_raw_trd_legacy_order_stage
|
|
|
| **每日重复的 CSV 导入**(daily file drop) | 预建分区 `EXTERNAL TABLE`,每日 `INSERT OVERWRITE TABLE ... PARTITION (dt='${dt}')` | 需要 |
|
|
|
| **结构化源库同步**(PG/MySQL 等) | DataX ini,写入预建 `EXTERNAL TABLE`(`writeMode=truncate` 或分区覆盖) | 需要 |
|
|
|
|
|
|
-**`his` 表为什么不分区**:一次性导入永不追加,分区裁剪没有意义。下游 ods 再按 `dt` 分区,用 `INSERT OVERWRITE PARTITION (dt) ... DISTRIBUTE BY dt` 一次性把 raw → ods 切片(见 §9.3.1)。
|
|
|
+**`his` 表为什么不分区**:一次性导入永不追加,分区裁剪没有意义。下游 ods 再按 `dt` 分区,一次性切片。
|
|
|
|
|
|
**为什么用 SQL 而不是 YAML 描述 CSV 任务**:
|
|
|
- 复用 `SparkSQL` 现有执行链,`bin/csv-to-hdfs-starter.py` 只需在 `bin/spark-sql-starter.py` 之外加一层 gzip+put+清理的薄壳,不需要单独的 YAML 渲染器
|
|
|
- `USING csv OPTIONS(...)` 本身就是 Spark 的声明式 CSV 读取语法,YAML 再封装一层是多余的
|
|
|
- 与其他分层文件类型一致(除 raw DataX ini 外,其他都是 `.sql`),读者不需要切换上下文
|
|
|
|
|
|
-### 9.3.1 raw → ods 历史回刷(his → 分区表)
|
|
|
-
|
|
|
-**典型链路**:`raw_xxx_his_o`(不分区,全 STRING)→ `ods_xxx_inc_d`(按 `dt` 分区,类型化)。一次性 SQL 跑完后,下游 dwd/dws 看到的就是和"每日增量 ods"一样形态的分区表,无需感知数据来自历史导入。
|
|
|
-
|
|
|
-**ods 表先在 `manual/ddl/ods_trd_order_pay_inc_d.sql` 建好**(同 §9.1 模板)。然后用一次性 SQL 把 raw 的全量切片到 ods 各分区:
|
|
|
-
|
|
|
-```sql
|
|
|
--- ============================================================
|
|
|
--- 目标表:ods.ods_trd_order_pay_inc_d
|
|
|
--- 上游: raw.raw_trd_legacy_order_his_o(一次性历史 his)
|
|
|
--- 性质: 一次性回刷,跑完后该 SQL 移入 manual/backfill/ 归档
|
|
|
--- 状态: [待执行]
|
|
|
--- ============================================================
|
|
|
-
|
|
|
-SET hive.exec.dynamic.partition=true;
|
|
|
-SET hive.exec.dynamic.partition.mode=nonstrict;
|
|
|
-SET hive.exec.max.dynamic.partitions=5000;
|
|
|
-SET hive.exec.max.dynamic.partitions.pernode=2000;
|
|
|
-SET spark.sql.shuffle.partitions=400;
|
|
|
-
|
|
|
-INSERT OVERWRITE TABLE ods.ods_trd_order_pay_inc_d PARTITION (dt)
|
|
|
-SELECT
|
|
|
- CAST(order_id AS BIGINT) AS order_id,
|
|
|
- CAST(user_id AS BIGINT) AS user_id,
|
|
|
- CAST(amount AS DECIMAL(18,2)) AS pay_amt_cny,
|
|
|
- CAST(order_time AS TIMESTAMP) AS pay_time,
|
|
|
- CURRENT_TIMESTAMP() AS etl_time,
|
|
|
- SUBSTR(order_time, 1, 10) AS dt -- 分区列必须放最后
|
|
|
-FROM raw.raw_trd_legacy_order_his_o
|
|
|
-DISTRIBUTE BY dt;
|
|
|
-```
|
|
|
-
|
|
|
-**为什么必须 `DISTRIBUTE BY dt`**:
|
|
|
-
|
|
|
-- 不加时,每个 reducer 都可能写所有分区 → 每个分区产出 `shuffle.partitions` 个小文件,几百日的历史会瞬间产生数万小文件,NameNode 直接告急
|
|
|
-- 加了之后同一 dt 的数据汇聚到一个 reducer → 每个分区只有 1 个文件,对历史回刷来说写入性能也最高
|
|
|
-- 如果某些 dt 数据量特别大(双十一这种),单 reducer 写不动,可改成 `DISTRIBUTE BY dt, FLOOR(RAND() * 8)` 把每个大 dt 切 8 份并行
|
|
|
-
|
|
|
-**回刷脚本归档**:这类一次性 SQL 不放 `jobs/`(避免被 DS 误调度),而是放在 `manual/backfill/{yyyymmdd}_{表名}_history.sql`,跑完后保留 3-6 个月入档。
|
|
|
-
|
|
|
### 9.4 ads 层(SQL + 导出 ini 并存)
|
|
|
|
|
|
```
|