|
|
@@ -42,7 +42,7 @@
|
|
|
| `RELEASE_USER="alvis"` | `bin/common/init.sh` | 改为 `RELEASE_USER="bigdata"` 并移入 `conf/env.sh` |
|
|
|
| `RELEASE_ROOT_DIR="/home/alvis/release"` | `init.sh`、`__init__.py` | 改为 `/home/bigdata/release` 并移入 `conf/env.sh` |
|
|
|
| 项目部署目录 `poyee-data-warehouse/` | `publish.sh` | 新项目发布目录为 `/home/bigdata/release/poyee-data-warehouse/` |
|
|
|
-| `DATAX_WORKERS=(m3 d1 d2 d3 d4)` + 权重 | `init.sh` | 移入 `conf/workers.conf` |
|
|
|
+| `DATAX_WORKERS=(m3 d1 d2 d3 d4)` + `DATAX_WORKERS_WEIGHTS` 权重 map | `init.sh:18-31`(含展开 `DATAX_WORKERS_QUEUE` 的循环) | workers 列表 + 权重 map **整体**移入 `conf/workers.conf`(ini 或 yaml 格式),`init.sh` 仅保留读取 + 展开逻辑 |
|
|
|
| `HADOOP_CONF_DIR='/etc/hadoop/conf'` | `__init__.py` | 使用系统环境变量 |
|
|
|
| `LOG_ROOT_DIR="/opt/data/log"` | `init.sh`、`__init__.py` | 移入 `conf/env.sh` |
|
|
|
| 钉钉 access_token | `dingtalk_notifier.py` | 移入 `conf/alerter.conf`(敏感项) |
|
|
|
@@ -184,6 +184,9 @@ venv/
|
|
|
*.log
|
|
|
dw_base.zip
|
|
|
|
|
|
+# ---- 开发者本地草稿区(datax-gc-generator 输出的参考模板等) ----
|
|
|
+workspace/
|
|
|
+
|
|
|
# ---- 敏感配置(运行时自动从 datasource/ 注入或在 conf/ 本地覆盖) ----
|
|
|
conf/alerter.conf
|
|
|
```
|
|
|
@@ -195,7 +198,8 @@ conf/alerter.conf
|
|
|
- 忽略:`workspace.xml`、`tasks.xml`、`shelf/`、`usage.statistics.xml` 等个人/统计文件
|
|
|
2. **`.claude/` 也不整体 ignore**:`settings.json`、`commands/`、`agents/` 是团队共享配置;只忽略 `settings.local.json`
|
|
|
3. **`dw_base.zip`** 是 `spark_sql.py` 运行时生成的 PySpark 打包产物,属于构建产物不入库
|
|
|
-4. **`conf/alerter.conf`** 一开始就放进 `.gitignore`:阶段 2 迁移钉钉/企微 Webhook 时,新建文件前 `.gitignore` 必须先就位
|
|
|
+4. **`workspace/`** 是开发者本地草稿区(`datax-gc-generator` 输出的参考模板、临时 SQL 调试等),**永不入仓**;开发者认可的成品再手动复制到 `jobs/` 或 `manual/` 下提交
|
|
|
+5. **`conf/alerter.conf`** 一开始就放进 `.gitignore`:阶段 2 迁移钉钉/企微 Webhook 时,新建文件前 `.gitignore` 必须先就位
|
|
|
|
|
|
**与仓库改名的联动**:
|
|
|
|
|
|
@@ -284,6 +288,198 @@ DW_ENV=dev
|
|
|
|
|
|
**参考**:kb/00-项目架构.md §4.3(DataX 脚本详细使用)、§6.4(多环境机制)、kb/21-命名规范.md §3.9(DataX ini 文件命名)。
|
|
|
|
|
|
+### 2.6 DataX 入口脚本收口为两条命令(中优先级)
|
|
|
+
|
|
|
+**现状**:DataX 入口分散,学习成本高:
|
|
|
+
|
|
|
+- `bin/datax-single-job-starter.sh` — 单 ini 执行
|
|
|
+- `bin/datax-multiple-job-starter.sh` — 批量 + 按 start-date/stop-date 范围展开
|
|
|
+- `bin/datax-multiple-hive-job-starter.sh` — Hive 表批量(语义与上者重合)
|
|
|
+- `bin/datax-single-hdfs-job-starter.sh`(如有残留) — 单次 HDFS 导出
|
|
|
+- `bin/datax-job-config-generator.py` — ini → json 翻译器(内部工具)
|
|
|
+- `bin/datax-gc-generator.py` — ini 元生成器(详见 §2.7)
|
|
|
+
|
|
|
+**目标态**:顶层收成两个命令,每个命令内部吃 single / batch 两种输入形态;底层的 json 翻译 / worker 选择 / 日志路径由公共模块承担,调用方不感知。
|
|
|
+
|
|
|
+| 顶层命令 | 语义 | 关键参数 |
|
|
|
+|---|---|---|
|
|
|
+| **`bin/datax-import`**(命名待确认) | 导入到 Hive(目标侧带分区管理) | `-ini <file>` 单 ini · `-inis <dir>` 批量 · `-dt <yyyymmdd>` 指定分区 · `-start-date / -stop-date` 日期范围展开 · `-skip-exist` 默认开,已存在分区跳过 · `-force-overwrite` 强制覆盖 · `-skip-partitions <csv>` 手动跳过特定分区 · `-env <dev\|test\|prod>` |
|
|
|
+| **`bin/datax-export`**(命名待确认) | 从 Hive/HDFS 导出到外部系统(源侧带路径探测) | `-ini / -inis` 同上 · `-src-check`(默认 fail-fast)· `-skip-missing` 源路径缺失时跳过不报错 · `-dt / -start-date / -stop-date` · `-env` |
|
|
|
+
|
|
|
+**实现建议**:
|
|
|
+
|
|
|
+1. 把老脚本 worker 选择、日志路径、json 翻译提到 Python 模块 `dw_base/datax/entry.py`,两个 sh 只做参数解析 + 调用
|
|
|
+2. 分区检查:`datax-import` 在执行前 `SHOW PARTITIONS` 目标表 → 命中则按 `-skip-exist` / `-force-overwrite` 决策;`datax-export` 在执行前 `hdfs dfs -test -e <源路径>` → 不存在按 `-src-check` / `-skip-missing` 决策
|
|
|
+3. `-inis` 的批量展开规则:传目录则递归扫 `.ini` 文件,传文件列表(`jobs.list`)则读文件每行一个 ini
|
|
|
+4. 老脚本 `datax-single-job-starter.sh` / `datax-multiple-*-starter.sh` 在两个新命令稳定后整体删除,保留一期转发封装作为兼容
|
|
|
+
|
|
|
+**第三条命令 `datax-gc-generator`(ini 元生成器)独立保留**:用户已确认。职责是"从 PG 扫 schema 生成 ini 参考模板",和"执行 ini"不是一回事,不收口到上面两条里。详见 §2.7。
|
|
|
+
|
|
|
+### 2.7 `datax-gc-generator` 从零重写(中优先级)
|
|
|
+
|
|
|
+**现状**(凭查证 2026-04-18):`bin/datax-gc-generator.py` 支持 `from ∈ {mysql, hdfs}` × `to ∈ {elasticsearch, hbase, hdfs, kafka, mongo, mysql}`,覆盖面大、代码沉重,且:
|
|
|
+
|
|
|
+- **全项目没有任何其他模块 import 或 shell 调用它**(仅 3 处 `SparkSQL('datax-gc-generator')` 是自己设置 app name)
|
|
|
+- 内部走 `MySQLHandler` + `MySQLDataSource` + `convert_mysql_column_types` + `MySQLReader.generate_hive_ddl()` 一条链,与 §2.1 已登记废弃的"自动 DDL 生成"方向相冲突
|
|
|
+- 新项目新业务只需要 `from=pg to=hdfs` 一条路径,其他组合全是老项目遗产
|
|
|
+
|
|
|
+**定位**:**参考模板生成器**,不是"一键出可用 ini"。产物是开发者人工调整的起点 —— 常见修改包括字段剪裁(只同步用到的列)、WHERE 过滤条件、hivePartitions 配置、大表拆分策略等。开发者 diff 参考模板和自己的需求,改完再把成品 ini 提交到 `jobs/raw/{domain}/`。
|
|
|
+
|
|
|
+**方向**:整个文件废弃 + 从零重写(凭记忆:未完全定稿,待真正开始写代码时再细化)
|
|
|
+
|
|
|
+**重写目标**:
|
|
|
+- 仅支持 `from=pg to=hdfs`
|
|
|
+- 读 PG 表结构(`information_schema.columns` + `pg_catalog.pg_description` 取字段注释)
|
|
|
+- 输出到**开发者本地的 `workspace/{yyyymmdd}/{name}.ini`**(`workspace/` 被 `.gitignore` 排除,**不入仓**),作为参考模板(原始全字段 + 默认分区 + 默认 `where=1=1`);开发者裁剪后再把最终 ini 提交到 `jobs/raw/{domain}/`
|
|
|
+- **不再生成 DDL**:DDL 统一由开发者按 `21-命名规范.md` 手写到 `manual/ddl/{layer}/{domain}/`(CLAUDE.md 单一来源约束,与 §2.1 已登记项一致)
|
|
|
+
|
|
|
+**目录约定**:
|
|
|
+- `workspace/` 在仓库根,**仅存在于开发者本地**,整个目录进 `.gitignore`
|
|
|
+- `workspace/{yyyymmdd}/` 按运行日期分子目录,便于开发者看"我今天生成了哪些候选"
|
|
|
+- 与 `manual/imports/{yyyymmdd}/` 的分工:`manual/imports/` 放一次性**执行**的 SQL / ini(会入仓做审计证据,执行完归档),`workspace/` 放自动化工具**未经人工确认**的中间产物(永不入仓)
|
|
|
+
|
|
|
+**拆除清单**(重写时连带删除):
|
|
|
+- `dw_base/database/mysql_utils.py` 的 `list_tables` / `list_columns` 方法(只服务老 generator)
|
|
|
+- `dw_base/datax/datasources/mysql_data_source.py`
|
|
|
+- `dw_base/datax/plugins/reader/mysql_reader.py` 的 `generate_hive_ddl` / `generate_hive_over_hbase_ddl` 方法
|
|
|
+- `dw_base/datax/datax_utils.py` 的 `convert_mysql_column_types`
|
|
|
+- 所有 mongo / kafka / hbase writer 在 generator 里的分支
|
|
|
+
|
|
|
+**注意**:以上删除范围与 `dw_base/datax/plugins/` 里仍在被真实采集任务调用的 reader/writer 不冲突 —— 真实采集任务只用到 `HDFSReader` / `HDFSWriter` / `MongoWriter`(如果还有 mongo 采集任务)。删之前要用 `grep -r "from dw_base.datax.plugins.reader.mysql_reader"` 再确认一次。
|
|
|
+
|
|
|
+### 2.8 HDFS 数据源 ini 支持 HA nameservice(**Path B 锁定**)
|
|
|
+
|
|
|
+**结论(凭 2026-04-18 新 CDH 环境实测)**:DataX hdfswriter 在新环境下连 HA 集群**必须**在 json 里显式提供 `hadoopConfig` 块,这是 §2.8 改造的动机。
|
|
|
+
|
|
|
+**为什么 `HADOOP_CONF_DIR` 对 DataX 无效(实测三连)**:
|
|
|
+
|
|
|
+| 测试 | json 内容 | `HADOOP_CONF_DIR` | 结果 |
|
|
|
+|---|---|---|---|
|
|
|
+| 1 | `defaultFS` + 完整 `hadoopConfig` | 未设置 | ✅ 成功 |
|
|
|
+| 2 | 只有 `defaultFS` | 未设置 | ❌ UnknownHostException: nameservice1 |
|
|
|
+| 3 | 只有 `defaultFS` | `/etc/hadoop/conf` | ❌ 仍 UnknownHostException |
|
|
|
+
|
|
|
+`hadoop fs -ls` 命令能解析 `nameservice1`,是因为 `hadoop` shell 脚本把 `$HADOOP_CONF_DIR` **加入了 Java classpath**(`hdfs-site.xml` 才能被 `Configuration` 自动加载)。`datax.py` 启动 Java 时**不做这件事**,classpath 里不含 `/etc/hadoop/conf`,所以 DataX 的 Hadoop `Configuration` 对象是"干净的",只能靠 json `hadoopConfig` 块显式注入 HA 参数。
|
|
|
+
|
|
|
+(老项目在上一个公司/服务器只写 `defaultFS` 也能跑通 HA,最可能的原因是运维把 `hdfs-site.xml` 塞进了 DataX 的 classpath 目录 —— 比如 `datax/conf/` / `datax/lib/` / 某 plugin 的 `libs/`。新环境 `/opt/datax` 下没有这类预置文件,不走这条路。)
|
|
|
+
|
|
|
+**老的 env 设置是死代码**:`dw_base/__init__.py:16` 的 `os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'` 对 DataX JVM 子进程无影响 —— 一是 classpath 不含 conf 目录(见上);二是 DataX 由 `datax-single-job-starter.sh` 通过 `python3 datax.py` 启动,并未 import `dw_base`。这行已在本次 §2.8 一并清理。
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+**正确的 json 形态**:
|
|
|
+
|
|
|
+```json
|
|
|
+"writer": {
|
|
|
+ "name": "hdfswriter",
|
|
|
+ "parameter": {
|
|
|
+ "defaultFS": "hdfs://nameservice1",
|
|
|
+ "hadoopConfig": {
|
|
|
+ "dfs.nameservices": "nameservice1",
|
|
|
+ "dfs.ha.namenodes.nameservice1": "nn1,nn2",
|
|
|
+ "dfs.namenode.rpc-address.nameservice1.nn1": "192.168.33.61:8020",
|
|
|
+ "dfs.namenode.rpc-address.nameservice1.nn2": "192.168.33.62:8020",
|
|
|
+ "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
|
|
|
+ },
|
|
|
+ ...
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+**ini 新格式**:
|
|
|
+
|
|
|
+**非 HA / 单 NN(dev / test 环境)**:
|
|
|
+
|
|
|
+```ini
|
|
|
+[base]
|
|
|
+defaultFS = hdfs://192.168.33.61:8020
|
|
|
+```
|
|
|
+
|
|
|
+**HA / nameservice(prod 环境)**:
|
|
|
+
|
|
|
+```ini
|
|
|
+[base]
|
|
|
+defaultFS = hdfs://nameservice1
|
|
|
+
|
|
|
+[hadoop_config]
|
|
|
+dfs.nameservices = nameservice1
|
|
|
+dfs.ha.namenodes.nameservice1 = nn1,nn2
|
|
|
+dfs.namenode.rpc-address.nameservice1.nn1 = 192.168.33.61:8020
|
|
|
+dfs.namenode.rpc-address.nameservice1.nn2 = 192.168.33.62:8020
|
|
|
+dfs.client.failover.proxy.provider.nameservice1 = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
|
|
|
+```
|
|
|
+
|
|
|
+**设计要点**:
|
|
|
+- `[hadoop_config]` 节的 key **原样照搬 `/etc/hadoop/conf/hdfs-site.xml`**,不做缩写 / 翻译 —— 运维直接复制粘贴,出了问题可以字段级对比,零心智成本
|
|
|
+- `[hadoop_config]` 节**可选**:不写即视为非 HA,代码不注入 `hadoopConfig`,生成的 json 只有 `defaultFS`,与单 NN 环境兼容
|
|
|
+- 将来若要支持多 nameservice(federation),`[hadoop_config]` 节天然兼容(多写几组 key 即可),无需再改 ini schema
|
|
|
+
|
|
|
+**代码改造清单**:
|
|
|
+
|
|
|
+| 文件 | 改动 |
|
|
|
+|------|------|
|
|
|
+| `dw_base/datax/datasources/hdfs_data_source.py` | 覆写 `get_datasource_dict()`:父类逻辑外,检测 `[hadoop_config]` 节是否存在;存在则把整节作为 dict 塞进 `ds_dict['hadoopConfig']` |
|
|
|
+| `dw_base/datax/plugins/plugin.py` | **不用改** —— `load_data_source()` 的 `for key, value in ds_dict.items()` 里,value 是 dict 时走 Python 原生赋值,json 序列化自然成立 |
|
|
|
+| `dw_base/datax/plugins/reader/hdfs_reader.py` / `writer/hdfs_writer.py` | **不用改**,`defaultFS + hadoopConfig` 由 `load_data_source()` 自动注入到 `parameter` |
|
|
|
+| `dw_base/__init__.py:16` | **删除** `os.environ['HADOOP_CONF_DIR']` 死代码(实测对 DataX JVM 无影响) |
|
|
|
+| `bin/datax-gc-generator.py` | §2.7 从零重写时一并处理,这里不单独列 |
|
|
|
+| `datasource/hdfs/{env}/*.ini` | 按新 schema 新建(prod 带 `[hadoop_config]`;dev/test 只写 `[base] defaultFS`) |
|
|
|
+
|
|
|
+**回归测试**:
|
|
|
+- prod HA 集群:新 ini 跑一次真实 PG→HDFS 任务,生成 json 含完整 `hadoopConfig`,任务成功(2026-04-18 已用手写 json 预验证)
|
|
|
+- NameNode 主备切换期间跑一次,DataX 自动切到 standby(HA 的原始动机)
|
|
|
+- dev / test 单 NN 集群:不写 `[hadoop_config]`,生成 json 只含 `defaultFS`,任务成功
|
|
|
+
|
|
|
+### 2.9 DataX 速率控制外移到 conf(中优先级)
|
|
|
+
|
|
|
+**现状**:`dw_base/datax/job_config_generator.py:60-67` 硬编码分时调度:
|
|
|
+
|
|
|
+```python
|
|
|
+local_time = int(datetime_utils.formatted_now('%H%M'))
|
|
|
+if 750 < local_time < 1900:
|
|
|
+ speed = self.get_speed(10, byte=10485760, record=40000) # 白天 10 channel × 10 MB/s
|
|
|
+ core_speed = self.get_core_speed(byte=10485760, record=40000)
|
|
|
+else:
|
|
|
+ speed = self.get_speed() # 夜间 6 channel × 256 MB/s
|
|
|
+ core_speed = self.get_core_speed()
|
|
|
+```
|
|
|
+
|
|
|
+**问题**:
|
|
|
+- 白天 / 夜间的边界(0750、1900)、channel 数、单 channel 速率全写死在代码里,想调一个值要改 py 源码重新发布
|
|
|
+- 不同业务域 / 不同表的速率诉求可能不同,现在一刀切
|
|
|
+- 生产偶发突发(白天要抢刷一次全量)没有临时放大速率的口子
|
|
|
+- 硬编码的动机在代码里一字没留(为什么 0750-1900?避开业务高峰?避开实时链路?后人读代码猜不到)
|
|
|
+
|
|
|
+**目标**:把速率配置抽到 `conf/datax-speed.conf`,`JobConfigGenerator` 运行时读;默认值 = 现硬编码值,保持向后兼容。
|
|
|
+
|
|
|
+**conf 格式**(ini,按时间段分段,自上而下匹配,未命中走 `[default]`):
|
|
|
+
|
|
|
+```ini
|
|
|
+; conf/datax-speed.conf
|
|
|
+; 速率档位定义:从上到下依次匹配,第一条命中的为准;无匹配走 [default]
|
|
|
+
|
|
|
+[daytime]
|
|
|
+hours = 07:50-19:00
|
|
|
+channel = 10
|
|
|
+byte_per_channel = 10485760 ; 10 MB/s/channel,避开业务高峰
|
|
|
+record_per_channel = 40000
|
|
|
+
|
|
|
+[default]
|
|
|
+channel = 6
|
|
|
+byte_per_channel = 268435456 ; 256 MB/s/channel,夜间放满
|
|
|
+record_per_channel = 100000
|
|
|
+```
|
|
|
+
|
|
|
+**代码改造**:
|
|
|
+- 在 `JobConfigGenerator.__init__()` 读 `conf/datax-speed.conf`
|
|
|
+- 用当前时间在各段 `hours` 里找匹配,取 `channel / byte_per_channel / record_per_channel`
|
|
|
+- `assemble()` 里的分支逻辑改为 `speed = self.resolve_speed_profile()` 一行
|
|
|
+- 落 `conf/datax-speed.conf` 时在文件头注释里把**"白天是为了避开业务高峰"**的动机写清楚(消除默认值的来历盲区)
|
|
|
+
|
|
|
+**扩展方向**(P2,先不做):
|
|
|
+- 支持按 ini 名(业务表)在 conf 里覆盖特定任务的速率
|
|
|
+- 支持命令行 `-speed fast` / `-speed slow` 手动切档(突发高峰 / 限流时用)
|
|
|
+
|
|
|
## 三、`__init__.py` 瘦身(高优先级)
|
|
|
|
|
|
**现状:** `tendata/__init__.py` 约 120 行,import 即执行以下操作:
|
|
|
@@ -427,16 +623,23 @@ else
|
|
|
fi
|
|
|
```
|
|
|
|
|
|
-**问题:**
|
|
|
-- `alvis` 是老环境硬编码,新环境部署用户是 `bigdata`,迁移时必须一起改
|
|
|
-- "按执行者身份决定日志路径"把运行身份与路径策略耦合在一起,代码里到处都要判断当前用户
|
|
|
-- 调度执行(`bigdata`)和个人调试的日志散落到不同目录,排查问题时需要来回切换
|
|
|
-- 本质是把环境差异写进代码,而不是写进配置
|
|
|
-
|
|
|
-**建议:**
|
|
|
-1. 删除 `whoami == RELEASE_USER` 分支逻辑
|
|
|
-2. 日志根路径统一由 `conf/env.sh` 的 `LOG_ROOT_DIR` 决定(默认 `/opt/data/log`),个人调试可在自己的 shell 里 `export LOG_ROOT_DIR=~/data/log` 覆盖
|
|
|
-3. `RELEASE_USER` 若仍需保留(如 publish.sh 发布身份校验),只作为白名单,不参与日志路径决策
|
|
|
+**方向(凭记忆:用户 2026-04-18 确认):分流策略保留,但目的地形态变更**
|
|
|
+
|
|
|
+- release 用户(`bigdata` / `dolphinscheduler`)的生产调度作业:日志落到 `/opt/data/log/{module}/{dt}/{file}.log`
|
|
|
+- 个人调试:落到 `~/log/{module}/{dt}/{file}.log`(不是原来的 `~/data/log`,去掉中间 `data/` 一级)
|
|
|
+
|
|
|
+**为什么保留分流**:个人调试的日志本来就不该和生产日志混在同一系统目录(权限、轮转、审计、磁盘空间都不一样);而统一路径又会引入"调度用户没写权限"类新问题。保留分流是务实选择。
|
|
|
+
|
|
|
+**为什么改目的地形态为 `{module}/{dt}/{file}.log`**:
|
|
|
+- 当前老结构 `/opt/data/log/datax/20260418/xxx.log` 已按 `{module}/{dt}/` 分,但不是所有入口都遵守(spark、ds 等散落在各自子结构下)
|
|
|
+- 新结构强制三级 `{module}/{dt}/{file}.log`,便于按天归档 + 按模块清理
|
|
|
+- `{module}` 取值:`datax` / `spark` / `ds` / `csv` / `export` 等顶层入口名
|
|
|
+
|
|
|
+**代码改动:**
|
|
|
+1. 保留 `whoami == RELEASE_USER` 分支逻辑,但分支里走新模板路径
|
|
|
+2. `LOG_ROOT_DIR` 放到 `conf/env.sh`,两个分支里显式分别赋值为 `/opt/data/log` 和 `${HOME}/log`
|
|
|
+3. 日志文件路径拼接统一走一个工具函数 `log_path(module, dt, file)`(Python 和 Shell 各一份),避免入口脚本各自拼
|
|
|
+4. `RELEASE_USER` 作为单一来源定义在 `conf/env.sh`,与 publish.sh 共用
|
|
|
|
|
|
### 7.3 部署改进
|
|
|
|