Эх сурвалжийг харах

chore: 新增 io/ops/pm/dq/sync/tests 占位骨架;删 bin/excel_to_hive.py;publish.sh 挪入 bin

tianyu.chu 2 долоо хоног өмнө
parent
commit
6936460ac3

+ 0 - 122
bin/excel_to_hive.py

@@ -1,122 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-"""
-  -- 设置 SparkSession 名称(血缘分析)
-  SET spark.app.name=excel_to_hive;
-  -- 设置 Spark 配置
-  SET spark.xxx.yyy.zzz=xyz;
-  -- 引用 UDF
-  ADD FILE dw_base/udf/business/spark_xxx_udf.py;
-  -- 声明变量
-  SET TOPIC=xxx;
-  -- 查看数据行数
-  SET LIMIT=1000;
-"""
-import json
-import os
-import sys
-import argparse
-import re
-import pandas as pd
-from pyspark.sql import SparkSession
-
-base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(base_dir)
-
-
-class Excel2HiveUtil:
-    def __init__(self):
-        self.base_dir = base_dir
-
-    def run(self, excel_position: str, db_table: str, topic: str, dt: str, skip_rows=0):
-        spark = SparkSession.builder \
-            .appName("ExcelToHive") \
-            .master("yarn") \
-            .config('hive.exec.orc.default.block.size', 134217728) \
-            .config('spark.debug.maxToStringFields', 5000) \
-            .config('spark.dynamicAllocation.enabled', False) \
-            .config('spark.files.ignoreCorruptFiles', True) \
-            .config('spark.sql.adaptive.enabled', 'true') \
-            .config('spark.sql.broadcastTimeout', -1) \
-            .config('spark.sql.codegen.wholeStage', 'false') \
-            .config('spark.sql.execution.arrow.enabled', True) \
-            .config('spark.sql.execution.arrow.fallback.enabled', True) \
-            .config('spark.sql.files.ignoreCorruptFiles', True) \
-            .config('spark.sql.statistics.fallBackToHdfs', True) \
-            .config('hive.exec.dynamic.partition.mode', 'nonstrict') \
-            .config('spark.yarn.queue', "default") \
-            .enableHiveSupport().getOrCreate()
-
-        if not excel_position.startswith('/'):
-            full_path = os.path.join(self.base_dir, excel_position)
-        else:
-            full_path = excel_position
-
-        # 读取 Excel 数据
-        excel_data = pd.read_excel(full_path, dtype=str, skiprows=skip_rows)
-        excel_data.fillna(value='', inplace=True)
-
-        json_data = excel_data.apply(
-            lambda row: json.dumps({f'col{i + 1}': str(re.sub(r'[\n\t]', '', val)) for i, val in enumerate(row)}),
-            axis=1)
-
-        pandas_df = pd.DataFrame({'ori_json': json_data})
-        pandas_df['dt'] = dt
-        pandas_df['topic'] = topic
-
-        spark_df = spark.createDataFrame(pandas_df)
-
-        # # Write data to Hive table with partitions
-        # spark_df.write.mode('overwrite').partitionBy("dt", "topic").saveAsTable(db_table)
-
-        spark_df.createOrReplaceTempView("temp_view")
-        spark.sql(
-            f"""
-                    INSERT OVERWRITE TABLE {db_table} PARTITION (dt='{dt}', topic='{topic}')
-                    SELECT ori_json FROM temp_view
-                """)
-
-        spark.stop()
-
-
-def usage():
-    print(
-        f'Usage: {sys.argv[0]}\n'
-        f'  [-h/--help]                     打印脚本使用方法\n'
-        f'  [-p/--path <excel_file_path>]   要转换的 Excel 文件路径(必填)\n'
-        f'  [-t/--topic <topic>]            要插入到 Hive 表中的 topic(必填)\n'
-        f'  [-s/--skip <skip_rows>]         要跳过的 Excel 文件的行数(可选,默认为 0)\n'
-        f'  [-d/--dt <date>]                要插入到 Hive 表中的日期(可选,默认为 19700101)\n'
-    )
-    exit(1)
-
-
-def parse_args():
-    parser = argparse.ArgumentParser(description='Excel to Hive')
-    parser.add_argument('-p', '--path', type=str, required=True, help='Path to the Excel file')
-    parser.add_argument('-t', '--topic', type=str, required=True, help='Topic name to insert into Hive table')
-    parser.add_argument('-s', '--skip', type=int, default=0, help='Number of rows to skip in the Excel file')
-    parser.add_argument('-d', '--dt', type=str, default='19700101',
-                        help='Date to insert into Hive table, default is 19700101')
-    args = parser.parse_args()
-
-    config = {
-        'path': args.path,
-        'topic': args.topic,
-        'skip': args.skip,
-        'dt': args.dt
-    }
-    return config
-
-
-if __name__ == '__main__':
-    config = parse_args()
-
-    excel_position = config['path']
-    db_table = 'ent_raw.manual_import_data'
-    topic = config['topic']
-    skip_rows = config['skip']
-    dt = config['dt']
-
-    Excel2HiveUtil().run(excel_position, db_table, topic, dt, skip_rows)
-    print("================= Transfer completed! =======================")

+ 0 - 0
publish.sh → bin/publish.sh


+ 39 - 0
dw_base/dq/README.md

@@ -0,0 +1,39 @@
+# dw_base/dq — 数据质量
+
+## 职责
+
+对仓内表做数据质量检查,产出结果报告 + 告警。覆盖维度:
+
+- **结构层**:schema drift(字段/类型变更)、分区存在性
+- **值层**:非空、唯一、值域、枚举、正则
+- **规模层**:行数波动、环比/同比阈值
+- **关联层**:主键冲突、外键孤儿、join 命中率
+
+## 对外接口概要(规划中)
+
+- `checks/` —— 各类规则实现
+  - `NotNullCheck(col)` / `UniqueCheck(cols)` / `ValueRangeCheck(col, min, max)`
+  - `RowCountCheck(min, max)` / `RowCountDriftCheck(window_days, threshold)`
+  - `SchemaDriftCheck(expected_schema)` / `PrimaryKeyCheck(pk_cols)`
+- `runner.run(table, dt)` —— 读 `conf/dq/<table>.ini` → 跑声明的 checks → 汇总结果
+- 结果出口:
+  - 失败 → 告警走 `conf/alerter.ini`
+  - 全量落 Hive 表(便于趋势分析,表名待定)
+- 入口脚本 `bin/dq-check.py`(**本批不建**)
+
+## 依赖
+
+- PySpark(SQL 查表统计)
+- `conf/dq/*.ini`(每张表一份规则声明)
+- `conf/alerter.ini`(告警接入)
+
+## 状态
+
+**骨架(未启动)**。实现顺序建议:
+
+1. 最小可用:`RowCountCheck` + `NotNullCheck` + 告警接入(跑通链路)
+2. 铺 `SchemaDriftCheck` + `UniqueCheck`(结构/唯一层)
+3. `RowCountDriftCheck`(需要历史表)
+4. join 命中率 / 枚举值检查(规则复杂,后做)
+
+见 `kb/90-重构路线.md` 聚簇 D。

+ 1 - 0
dw_base/dq/__init__.py

@@ -0,0 +1 @@
+"""数据质量校验(schema / 非空 / 唯一 / 值域 / 行数 / 主键 / join 命中率)。详见 README.md。"""

+ 44 - 0
dw_base/io/README.md

@@ -0,0 +1,44 @@
+# dw_base/io — I/O 层
+
+## 职责
+
+与**外部系统**通信的边界。所有跨进程/跨主机的读写在这里封装,避免业务代码直接依赖 driver / 网络协议。
+
+划分准则(与 `common/` / `utils/` / `ops/` 的区别):
+
+| 语义 | 放 |
+|---|---|
+| 连接工厂、读写封装(有 I/O) | `io/` |
+| 纯函数、无状态工具(字符串/日期/哈希) | `utils/` |
+| 跨模块常量、配置、context、DI container | `common/` |
+| 数据湖运维操作(compaction / 分区清理 / 小文件合并) | `ops/` |
+
+## 子包
+
+- `db/` —— 数据库连接工厂与读写封装(MySQL / PostgreSQL / Hive / MongoDB 等)
+- `file/` —— 文件格式读写(csv / txt / json / excel)
+- `hdfs/` —— HDFS 文件读写(纯 I/O;compaction/合并属于 `ops/`)
+
+## 对外接口概要(规划中)
+
+- `db.MySQLHandler(conf) -> Connection-like`
+- `db.PGHandler(conf)` / `db.HiveHandler(conf)` / `db.MongoHandler(conf)`
+- `file.read_csv(path, **opts)` / `file.write_csv(df, path, **opts)`
+- `file.read_excel(path, sheet=None)` / `file.write_excel(df, path)`
+- `file.read_json(path)` / `file.write_json(obj, path)`
+- `hdfs.read(path)` / `hdfs.write(path, content)` / `hdfs.ls(path)`
+
+## 依赖
+
+- `configparser`(读 conf/ 配置)
+- 按子包按需:`pymysql` / `psycopg2` / `openpyxl` / HDFS 客户端等
+- `datasource/*.ini` 读取账密(高敏不入库)
+
+## 状态
+
+**骨架(未启动)**。本批仅建目录。后续 B2 阶段搬入:
+- `dw_base/database/mysql_utils.py` → `io/db/mysql.py`
+- `dw_base/utils/file_utils.py` → `io/file/`
+- `dw_base/utils/hdfs_dir_file_coalesce.py` / `hdfs_merge_small_file.py` → **搬 `ops/`(不是 io/hdfs/)**
+
+见 `kb/90-重构路线.md` 聚簇 B。

+ 1 - 0
dw_base/io/__init__.py

@@ -0,0 +1 @@
+"""I/O 层:与外部系统通信的边界(数据库 / 文件格式 / HDFS)。详见 README.md。"""

+ 1 - 0
dw_base/io/db/__init__.py

@@ -0,0 +1 @@
+"""数据库连接工厂与读写封装。详见 dw_base/io/README.md。"""

+ 1 - 0
dw_base/io/file/__init__.py

@@ -0,0 +1 @@
+"""文件格式读写(csv / txt / json / excel)。详见 dw_base/io/README.md。"""

+ 1 - 0
dw_base/io/hdfs/__init__.py

@@ -0,0 +1 @@
+"""HDFS 文件读写(纯 I/O;compaction 等运维属于 dw_base/ops/)。详见 dw_base/io/README.md。"""

+ 36 - 0
dw_base/ops/README.md

@@ -0,0 +1,36 @@
+# dw_base/ops — 数据湖运维操作
+
+## 职责
+
+对**已存在数据**做运维类操作。与 `io/`(读写 I/O)的区别:`io/` 是数据进出,`ops/` 是对湖内数据的后置处理。
+
+典型场景:
+- 小文件合并 / compaction
+- 按 N 天保留清分区(`DROP PARTITION`)
+- 表级 vacuum / stats 刷新
+- 跨分区数据校对 / 修复(非一次性 backfill,那是 `manual/`)
+
+## 对外接口概要(规划中)
+
+- `coalesce_hdfs_dir(path, target_size_mb)` —— HDFS 目录小文件合并
+- `merge_small_files(table, partition)` —— Hive 表分区 `concatenate` 压实
+- `drop_partitions(table, keep_days, exceptions=None)` —— 按天保留清分区(元表驱动或参数化,见 `kb/92` 阶段 4 规划)
+- `refresh_table_stats(table)` —— 刷新 Hive 表/分区统计
+
+## 依赖
+
+- PySpark(HiveContext)用于 compaction / stats
+- `io/hdfs/` 用于 HDFS 目录操作
+- `conf/alerter.ini` 用于失败告警
+- 元表(按天保留工具用):schema 在 `kb/11-数据资产.md` 或 `kb/20-数仓分层与建模.md` 定义(待补)
+
+## 状态
+
+**骨架(未启动)**。后续搬入 / 重写的内容:
+
+- `dw_base/utils/hdfs_dir_file_coalesce.py` → `ops/coalesce.py`(B2 搬家 + 剥离老业务耦合)
+- `dw_base/utils/hdfs_merge_small_file.py` → `ops/merge.py`(B2 同上)
+- **重写**:原 `dw_base/scheduler/drop_partitions.py` + `drop_daily_full_snapshot_tbls.py`(2026-04-20 已删),阶段 4 新版——元表驱动 + 参数化天数 + 例外白名单
+- **重写**:原 `dw_base/utils/hive_file_merge.py`(2026-04-20 已删),阶段 4 新版——通用化 HiveServer 连接 / 剥离 `cts_*_ex/_im` 表名假设
+
+见 `kb/90-重构路线.md` 聚簇 B + 阶段 4 重写任务。

+ 1 - 0
dw_base/ops/__init__.py

@@ -0,0 +1 @@
+"""数据湖运维操作(compaction / 分区清理 / 小文件合并 / vacuum / stats 刷新)。详见 README.md。"""

+ 34 - 0
dw_base/pm/README.md

@@ -0,0 +1,34 @@
+# dw_base/pm — 项目管理工具集成
+
+## 职责
+
+调用项目管理工具 API(TAPD / Jira),根据 git 提交记录**生成任务 / 更新任务状态**,打通"代码提交 → 任务闭环"。
+
+典型触发链:
+1. `git log` / CI 钩子拿到本轮提交
+2. 按 commit message 约定(如 `feat(scope): xxx #TASK-123`)解析出任务号与动作
+3. 调 TAPD / Jira API 创建任务或推进状态(开发中 → 待测 → 完成)
+
+## 对外接口概要(规划中)
+
+- `TapdClient(conf)` / `JiraClient(conf)` —— API wrapper(封装鉴权 + 常用动作)
+- `parse_git_log(since_commit, pattern) -> list[TaskOp]` —— 解析 git log → 任务操作序列
+- `sync_tasks(ops: list[TaskOp])` —— 批量推送到对应 PM 工具
+- 入口脚本 `bin/git-sync-task.py`(**本批不建**,后续实现时再加)
+
+## 依赖
+
+- `requests`(HTTP 调 TAPD / Jira API)
+- `git`(子进程调用或 `gitpython`)
+- `configparser`(读 conf 里的非敏感配置:API base URL、projectId 等)
+- **账密**:TAPD / Jira token 走 `datasource/pm/<tool>/<env>.ini`(高敏不入库,见 `project_templates_and_config` 记忆 + `kb/00-项目架构.md` §6)
+
+## 状态
+
+**骨架(未启动)**。需先与用户对齐:
+
+- 公司用的是 TAPD 还是 Jira?两家 API 不同,先做主要那一家,另一家作二期
+- commit message 里任务号的约定格式(Conventional Commits scope 段?footer 段?)
+- 状态机:哪些 commit 动作推哪个状态(`feat` 推"开发完成"?`fix` 推"修复中"?)
+
+见 `kb/90-重构路线.md` 聚簇 D。

+ 1 - 0
dw_base/pm/__init__.py

@@ -0,0 +1 @@
+"""项目管理工具(TAPD / Jira)API 集成:从 git 提交生成/更新任务。详见 README.md。"""

+ 36 - 0
dw_base/sync/README.md

@@ -0,0 +1,36 @@
+# dw_base/sync — 外部文档同步
+
+## 职责
+
+周期性从外部文档系统(公司内 Docmost 优先;后续可扩 Confluence / Notion / 飞书知识库)拉取 page → 落到 `kb/inbox/` → 供 AI 整理进正式 `kb/` 分类文档。
+
+## 对外接口概要(规划中)
+
+- `docmost.pull_pages(space, since_ts=None) -> list[Path]`
+  - 拉取指定 space 的 page,落到 `kb/inbox/{yyyymmdd}/<page_id>_<slug>.md`
+  - `since_ts` 增量拉取游标(避免重复)
+- `docmost.auth(token)` —— API 鉴权
+- 入口脚本 `bin/docs-sync.py`(**本批不建**,后续实现时再加)
+
+## 约定
+
+- 落盘路径:`kb/inbox/{yyyymmdd}/<page_id>_<slug>.md`
+  - 日期子目录防止 inbox 根目录被塞爆
+  - 文件名带 page_id 方便增量覆盖同一页的后续更新
+- 拉回的是**草稿**,**不直接进 kb/** 正式分类,必须走"AI 整理 → 人工审 → 归入 kb/{NN}-xxx.md"流程
+
+## 依赖
+
+- `requests`(调 Docmost API)
+- `configparser`(读 `conf/sync/docmost.ini` 非敏感配置:base URL、space id 等)
+- **账密**:Docmost API token 走 `datasource/sync/docmost/<env>.ini`(高敏不入库)
+
+## 状态
+
+**骨架(未启动)**。待确认:
+
+- Docmost API 鉴权方式(token / OAuth?)
+- 是否支持 webhook(事件驱动增量同步,避免轮询)
+- 多 space 支持优先级
+
+见 `kb/90-重构路线.md` 聚簇 D。

+ 1 - 0
dw_base/sync/__init__.py

@@ -0,0 +1 @@
+"""外部文档同步到 kb/inbox/(Docmost / Confluence / Notion 等 → 供 AI 整理)。详见 README.md。"""

+ 33 - 0
tests/README.md

@@ -0,0 +1,33 @@
+# tests — 测试体系
+
+## 职责
+
+对 `dw_base/` 下的代码(尤其是 UDF、utils、io、dq、pm 等模块)做自动化测试,保证重构与新开发不回退。
+
+## 目录
+
+- `unit/` —— 单元测试。**无外部依赖**(不连真实 Spark 集群 / DB),Spark 相关用本地 `SparkSession` 或 mock
+- `integration/` —— 集成测试。连真实 Spark / MySQL / HDFS,验证端到端链路
+- `conftest.py`(后续加)—— pytest fixtures:`fake_spark`、`tmp_hdfs`、`sample_df` 等
+
+## 入口
+
+`pytest tests/unit` / `pytest tests/integration`。CI 只跑 unit(集成测试手动触发)。
+
+## 首批目标
+
+1. `tests/unit/udf/test_spark_common_udf.py` —— 对 `dw_base/udf/common/spark_common_udf.py` 40 个函数的单测(本地 SparkSession,小样本 DataFrame)
+2. `tests/unit/utils/` —— 对 `dw_base/utils/*` 的纯函数单测
+3. `tests/unit/datax/` —— `dw_base/datax/job_config_generator.py` 配置生成单测
+
+## 依赖
+
+- `pytest`(**待加入 `requirements.txt`**)
+- `pyspark==2.4.0`(已在 requirements.txt,`findspark` 运行时定位 CDH 集群,同版本双轨)
+- 对 UDF 单测:本地 SparkSession 即可跑,不需要集群
+
+## 状态
+
+**骨架(未启动)**。本批仅建目录 + 本 README。
+
+见 `kb/90-重构路线.md` 聚簇 D + §6。

+ 0 - 0
tests/integration/.gitkeep


+ 0 - 0
tests/unit/.gitkeep