فهرست منبع

refactor(dw_base): 占位模块梳理并删 database

tianyu.chu 2 هفته پیش
والد
کامیت
5fea351c80

+ 0 - 2
dw_base/database/__init__.py

@@ -1,2 +0,0 @@
-# -*- coding:utf-8 -*-
-

+ 0 - 185
dw_base/database/mysql_utils.py

@@ -1,185 +0,0 @@
-# -*- coding:utf-8 -*-
-
-import json
-import re
-from typing import Dict, List
-
-import pymysql
-
-
-class MySQLColumn(object):
-    def __init__(self,
-                 column_name: str,
-                 column_type: str,
-                 column_comment: str,
-                 ordinal_position: str,
-                 is_nullable: bool):
-        self.COLUMN_NAME = column_name
-        self.COLUMN_TYPE = column_type
-        self.COLUMN_COMMENT = column_comment
-        self.ORDINAL_POSITION = ordinal_position
-        self.IS_NULLABLE = is_nullable
-        self._dict = {
-            'COLUMN_NAME': column_name,
-            'COLUMN_TYPE': column_type,
-            'COLUMN_COMMENT': column_comment,
-            'ORDINAL_POSITION': ordinal_position,
-            'IS_NULLABLE': is_nullable,
-        }
-
-    def __str__(self):
-        return json.dumps(self._dict, ensure_ascii=False)
-
-
-class MySQLHandler:
-    def __init__(self, host: str, port: int, username: str, password: str, database: str = None):
-        """
-        MySQL工具包
-        Args:
-            host: 实例地址
-            port: 端口
-            username: 用户名
-            password: 密码
-        """
-        self.jdbcUrl = "jdbc:mysql://%s:%s" % (host, port)
-        self.username = username
-        self.password = password
-        self.connection = pymysql.connect(
-            host=host,
-            port=port,
-            user=username,
-            password=password,
-            database=database,
-            charset='utf8'
-        )
-        self.connection.autocommit(True)
-
-    def list_tables(self,
-                    database: str = None,
-                    exclude_regex: List[str] = None,
-                    table_regex: List[str] = None) -> Dict[str, str]:
-        """
-        列出指定数据库中的表
-        Args:
-            database: 数据库名称
-            exclude_regex: 不要的数据表正则
-            table_regex: 想要的数据表正则
-        Returns: 表及注释
-        """
-        assert database is not None
-        curs = self.connection.cursor()
-        curs.execute('SET NAMES utf8')
-        curs.execute(f'use {database}')
-        sql = "SELECT TABLE_NAME, TABLE_COMMENT " \
-              "  FROM information_schema.TABLES " \
-              f" WHERE TABLE_SCHEMA='{database}' AND TABLE_TYPE = 'BASE TABLE'"
-        curs.execute(sql)
-        rows = curs.fetchall()
-        tables = {}
-        for each_row in rows:
-            if exclude_regex:
-                exclude = False
-                for regex in exclude_regex:
-                    if re.match(regex, each_row[0]):
-                        exclude = True
-                        break
-                if exclude:
-                    continue
-            if table_regex:
-                match = False
-                for regex in table_regex:
-                    if re.match(regex, each_row[0]):
-                        match = True
-                        break
-                if not match:
-                    continue
-            tables[each_row[0]] = each_row[1]
-        return tables
-
-    def list_columns(self, database: str, table_name: str) -> List[MySQLColumn]:
-        """
-        列出指定数据库、指定表的字段及字段的其他信息
-        Args:
-            database: 数据库
-            table_name: 表
-        Returns: 字段及字段的其他信息
-        """
-        assert database is not None
-        assert table_name is not None
-        curs = self.connection.cursor()
-        curs.execute('SET NAMES utf8')
-        curs.execute(f'use {database}')
-        detail_names = ['COLUMN_TYPE', 'COLUMN_COMMENT', 'ORDINAL_POSITION', 'IS_NULLABLE']
-        sql = "SELECT COLUMN_NAME, %s" \
-              "  FROM information_schema.COLUMNS " \
-              " WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'"
-        sql = sql % (', '.join(detail_names), database, table_name)
-        curs.execute(sql)
-        rows = curs.fetchall()
-        columns = []
-        for each_row in rows:
-            column_name = each_row[0]
-            column_type = each_row[1]
-            column_comment = each_row[2]
-            ordinal_position = each_row[3]
-            is_nullable = each_row[4]
-            mysql_column = MySQLColumn(column_name, column_type, column_comment, ordinal_position, is_nullable)
-            columns.append(mysql_column)
-        return columns
-
-    def query(self, sql: str):
-        curs = self.connection.cursor()
-        curs.execute('SET NAMES utf8')
-        curs.execute(sql)
-        rows = curs.fetchall()
-        return rows
-
-    def query_column_hive_metadata(self, table_name: str):
-        curs = self.connection.cursor()
-        curs.execute('SET NAMES utf8')
-        sql = f'SELECT' \
-              f'	t.TBL_NAME,' \
-              f'	c.COLUMN_NAME,' \
-              f'	c.TYPE_NAME,' \
-              f'	c.`COMMENT` ' \
-              f'FROM ' \
-              f' hive.TBLS t' \
-              f' LEFT JOIN hive.SDS s ON t.SD_ID = s.SD_ID' \
-              f' LEFT JOIN hive.COLUMNS_V2 c ON s.CD_ID = c.CD_ID' \
-              f' LEFT JOIN hive.TBLS tbs ON s.SD_ID = tbs.SD_ID ' \
-              f'WHERE t.TBL_NAME = "{table_name}"'
-        curs.execute(sql)
-        column_info = curs.fetchall()
-        return column_info
-
-    def query_tbl_hive_metadata(self, table_name: str):
-        curs = self.connection.cursor()
-        curs.execute('SET NAMES utf8')
-        sql = f'SELECT' \
-              f'	tp.PARAM_KEY,' \
-              f'	tp.PARAM_VALUE ' \
-              f'FROM' \
-              f'	hive.TABLE_PARAMS tp' \
-              f'	LEFT JOIN hive.TBLS t ON tp.TBL_ID = t.TBL_ID ' \
-              f'WHERE' \
-              f'	t.TBL_NAME = "{table_name}"'
-        curs.execute(sql)
-        column_info = curs.fetchall()
-        return column_info
-
-
-if __name__ == '__main__':
-    mysql_handler = MySQLHandler(
-        'rm-m5e76y41wq677ogz7.mysql.rds.aliyuncs.com',
-        3306,
-        'bigdata_sync',
-        '76iW6SG2K6RGN2X68EQb'
-    )
-    database_ame = 'ik_bms_production'
-    tables = mysql_handler.list_tables(database_ame)
-    for table_name, table_comment in tables.items():
-        print(f'{table_name}\t{table_comment}')
-        columns = mysql_handler.list_columns(database_ame, table_name)
-        for col in columns:
-            print(col)
-        break

+ 0 - 39
dw_base/dq/README.md

@@ -1,39 +0,0 @@
-# dw_base/dq — 数据质量
-
-## 职责
-
-对仓内表做数据质量检查,产出结果报告 + 告警。覆盖维度:
-
-- **结构层**:schema drift(字段/类型变更)、分区存在性
-- **值层**:非空、唯一、值域、枚举、正则
-- **规模层**:行数波动、环比/同比阈值
-- **关联层**:主键冲突、外键孤儿、join 命中率
-
-## 对外接口概要(规划中)
-
-- `checks/` —— 各类规则实现
-  - `NotNullCheck(col)` / `UniqueCheck(cols)` / `ValueRangeCheck(col, min, max)`
-  - `RowCountCheck(min, max)` / `RowCountDriftCheck(window_days, threshold)`
-  - `SchemaDriftCheck(expected_schema)` / `PrimaryKeyCheck(pk_cols)`
-- `runner.run(table, dt)` —— 读 `conf/dq/<table>.ini` → 跑声明的 checks → 汇总结果
-- 结果出口:
-  - 失败 → 告警走 `conf/alerter.ini`
-  - 全量落 Hive 表(便于趋势分析,表名待定)
-- 入口脚本 `bin/dq-check.py`(**本批不建**)
-
-## 依赖
-
-- PySpark(SQL 查表统计)
-- `conf/dq/*.ini`(每张表一份规则声明)
-- `conf/alerter.ini`(告警接入)
-
-## 状态
-
-**骨架(未启动)**。实现顺序建议:
-
-1. 最小可用:`RowCountCheck` + `NotNullCheck` + 告警接入(跑通链路)
-2. 铺 `SchemaDriftCheck` + `UniqueCheck`(结构/唯一层)
-3. `RowCountDriftCheck`(需要历史表)
-4. join 命中率 / 枚举值检查(规则复杂,后做)
-
-见 `kb/90-重构路线.md` 聚簇 D。

+ 1 - 1
dw_base/dq/__init__.py

@@ -1 +1 @@
-"""数据质量校验(schema / 非空 / 唯一 / 值域 / 行数 / 主键 / join 命中率)。详见 README.md。"""
+"""数据质量校验(schema / 非空 / 唯一 / 值域 / 行数 / 主键 / join 命中率)"""

+ 0 - 44
dw_base/io/README.md

@@ -1,44 +0,0 @@
-# dw_base/io — I/O 层
-
-## 职责
-
-与**外部系统**通信的边界。所有跨进程/跨主机的读写在这里封装,避免业务代码直接依赖 driver / 网络协议。
-
-划分准则(与 `common/` / `utils/` / `ops/` 的区别):
-
-| 语义 | 放 |
-|---|---|
-| 连接工厂、读写封装(有 I/O) | `io/` |
-| 纯函数、无状态工具(字符串/日期/哈希) | `utils/` |
-| 跨模块常量、配置、context、DI container | `common/` |
-| 数据湖运维操作(compaction / 分区清理 / 小文件合并) | `ops/` |
-
-## 子包
-
-- `db/` —— 数据库连接工厂与读写封装(MySQL / PostgreSQL / Hive / MongoDB 等)
-- `file/` —— 文件格式读写(csv / txt / json / excel)
-- `hdfs/` —— HDFS 文件读写(纯 I/O;compaction/合并属于 `ops/`)
-
-## 对外接口概要(规划中)
-
-- `db.MySQLHandler(conf) -> Connection-like`
-- `db.PGHandler(conf)` / `db.HiveHandler(conf)` / `db.MongoHandler(conf)`
-- `file.read_csv(path, **opts)` / `file.write_csv(df, path, **opts)`
-- `file.read_excel(path, sheet=None)` / `file.write_excel(df, path)`
-- `file.read_json(path)` / `file.write_json(obj, path)`
-- `hdfs.read(path)` / `hdfs.write(path, content)` / `hdfs.ls(path)`
-
-## 依赖
-
-- `configparser`(读 conf/ 配置)
-- 按子包按需:`pymysql` / `psycopg2` / `openpyxl` / HDFS 客户端等
-- `datasource/*.ini` 读取账密(高敏不入库)
-
-## 状态
-
-**骨架(未启动)**。本批仅建目录。后续 B2 阶段搬入:
-- `dw_base/database/mysql_utils.py` → `io/db/mysql.py`
-- `dw_base/utils/file_utils.py` → `io/file/`
-- `dw_base/utils/hdfs_dir_file_coalesce.py` / `hdfs_merge_small_file.py` → **搬 `ops/`(不是 io/hdfs/)**
-
-见 `kb/90-重构路线.md` 聚簇 B。

+ 1 - 1
dw_base/io/__init__.py

@@ -1 +1 @@
-"""I/O 层:与外部系统通信的边界(数据库 / 文件格式 / HDFS)。详见 README.md。"""
+"""I/O 层:与外部系统通信的边界(数据库 / 文件格式 / HDFS)"""

+ 1 - 1
dw_base/io/db/__init__.py

@@ -1 +1 @@
-"""数据库连接工厂与读写封装。详见 dw_base/io/README.md。"""
+"""数据库连接工厂与读写封装。"""

+ 1 - 1
dw_base/io/file/__init__.py

@@ -1 +1 @@
-"""文件格式读写(csv / txt / json / excel)。详见 dw_base/io/README.md。"""
+"""文件格式读写(csv / txt / json / excel)"""

+ 1 - 1
dw_base/io/hdfs/__init__.py

@@ -1 +1 @@
-"""HDFS 文件读写(纯 I/O;compaction 等运维属于 dw_base/ops/)。详见 dw_base/io/README.md。"""
+"""hdfs读写"""

+ 0 - 36
dw_base/ops/README.md

@@ -1,36 +0,0 @@
-# dw_base/ops — 数据湖运维操作
-
-## 职责
-
-对**已存在数据**做运维类操作。与 `io/`(读写 I/O)的区别:`io/` 是数据进出,`ops/` 是对湖内数据的后置处理。
-
-典型场景:
-- 小文件合并 / compaction
-- 按 N 天保留清分区(`DROP PARTITION`)
-- 表级 vacuum / stats 刷新
-- 跨分区数据校对 / 修复(非一次性 backfill,那是 `manual/`)
-
-## 对外接口概要(规划中)
-
-- `coalesce_hdfs_dir(path, target_size_mb)` —— HDFS 目录小文件合并
-- `merge_small_files(table, partition)` —— Hive 表分区 `concatenate` 压实
-- `drop_partitions(table, keep_days, exceptions=None)` —— 按天保留清分区(元表驱动或参数化,见 `kb/92` 阶段 4 规划)
-- `refresh_table_stats(table)` —— 刷新 Hive 表/分区统计
-
-## 依赖
-
-- PySpark(HiveContext)用于 compaction / stats
-- `io/hdfs/` 用于 HDFS 目录操作
-- `conf/alerter.ini` 用于失败告警
-- 元表(按天保留工具用):schema 在 `kb/11-数据资产.md` 或 `kb/20-数仓分层与建模.md` 定义(待补)
-
-## 状态
-
-**骨架(未启动)**。后续搬入 / 重写的内容:
-
-- `dw_base/utils/hdfs_dir_file_coalesce.py` → `ops/coalesce.py`(B2 搬家 + 剥离老业务耦合)
-- `dw_base/utils/hdfs_merge_small_file.py` → `ops/merge.py`(B2 同上)
-- **重写**:原 `dw_base/scheduler/drop_partitions.py` + `drop_daily_full_snapshot_tbls.py`(2026-04-20 已删),阶段 4 新版——元表驱动 + 参数化天数 + 例外白名单
-- **重写**:原 `dw_base/utils/hive_file_merge.py`(2026-04-20 已删),阶段 4 新版——通用化 HiveServer 连接 / 剥离 `cts_*_ex/_im` 表名假设
-
-见 `kb/90-重构路线.md` 聚簇 B + 阶段 4 重写任务。

+ 1 - 1
dw_base/ops/__init__.py

@@ -1 +1 @@
-"""数据湖运维操作(compaction / 分区清理 / 小文件合并 / vacuum / stats 刷新)。详见 README.md。"""
+"""数运维操作(compaction / 分区清理 / 小文件合并 /  元数据刷新)"""

+ 0 - 34
dw_base/pm/README.md

@@ -1,34 +0,0 @@
-# dw_base/pm — 项目管理工具集成
-
-## 职责
-
-调用项目管理工具 API(TAPD / Jira),根据 git 提交记录**生成任务 / 更新任务状态**,打通"代码提交 → 任务闭环"。
-
-典型触发链:
-1. `git log` / CI 钩子拿到本轮提交
-2. 按 commit message 约定(如 `feat(scope): xxx #TASK-123`)解析出任务号与动作
-3. 调 TAPD / Jira API 创建任务或推进状态(开发中 → 待测 → 完成)
-
-## 对外接口概要(规划中)
-
-- `TapdClient(conf)` / `JiraClient(conf)` —— API wrapper(封装鉴权 + 常用动作)
-- `parse_git_log(since_commit, pattern) -> list[TaskOp]` —— 解析 git log → 任务操作序列
-- `sync_tasks(ops: list[TaskOp])` —— 批量推送到对应 PM 工具
-- 入口脚本 `bin/git-sync-task.py`(**本批不建**,后续实现时再加)
-
-## 依赖
-
-- `requests`(HTTP 调 TAPD / Jira API)
-- `git`(子进程调用或 `gitpython`)
-- `configparser`(读 conf 里的非敏感配置:API base URL、projectId 等)
-- **账密**:TAPD / Jira token 走 `datasource/pm/<tool>/<env>.ini`(高敏不入库,见 `project_templates_and_config` 记忆 + `kb/00-项目架构.md` §6)
-
-## 状态
-
-**骨架(未启动)**。需先与用户对齐:
-
-- 公司用的是 TAPD 还是 Jira?两家 API 不同,先做主要那一家,另一家作二期
-- commit message 里任务号的约定格式(Conventional Commits scope 段?footer 段?)
-- 状态机:哪些 commit 动作推哪个状态(`feat` 推"开发完成"?`fix` 推"修复中"?)
-
-见 `kb/90-重构路线.md` 聚簇 D。

+ 1 - 1
dw_base/pm/__init__.py

@@ -1 +1 @@
-"""项目管理工具(TAPD / Jira)API 集成:从 git 提交生成/更新任务。详见 README.md。"""
+"""项目管理工具(TAPD / Jira)API 集成:从 git 提交生成/更新任务。"""

+ 0 - 36
dw_base/sync/README.md

@@ -1,36 +0,0 @@
-# dw_base/sync — 外部文档同步
-
-## 职责
-
-周期性从外部文档系统(公司内 Docmost 优先;后续可扩 Confluence / Notion / 飞书知识库)拉取 page → 落到 `kb/inbox/` → 供 AI 整理进正式 `kb/` 分类文档。
-
-## 对外接口概要(规划中)
-
-- `docmost.pull_pages(space, since_ts=None) -> list[Path]`
-  - 拉取指定 space 的 page,落到 `kb/inbox/{yyyymmdd}/<page_id>_<slug>.md`
-  - `since_ts` 增量拉取游标(避免重复)
-- `docmost.auth(token)` —— API 鉴权
-- 入口脚本 `bin/docs-sync.py`(**本批不建**,后续实现时再加)
-
-## 约定
-
-- 落盘路径:`kb/inbox/{yyyymmdd}/<page_id>_<slug>.md`
-  - 日期子目录防止 inbox 根目录被塞爆
-  - 文件名带 page_id 方便增量覆盖同一页的后续更新
-- 拉回的是**草稿**,**不直接进 kb/** 正式分类,必须走"AI 整理 → 人工审 → 归入 kb/{NN}-xxx.md"流程
-
-## 依赖
-
-- `requests`(调 Docmost API)
-- `configparser`(读 `conf/sync/docmost.ini` 非敏感配置:base URL、space id 等)
-- **账密**:Docmost API token 走 `datasource/sync/docmost/<env>.ini`(高敏不入库)
-
-## 状态
-
-**骨架(未启动)**。待确认:
-
-- Docmost API 鉴权方式(token / OAuth?)
-- 是否支持 webhook(事件驱动增量同步,避免轮询)
-- 多 space 支持优先级
-
-见 `kb/90-重构路线.md` 聚簇 D。

+ 1 - 1
dw_base/sync/__init__.py → dw_base/wiki/__init__.py

@@ -1 +1 @@
-"""外部文档同步到 kb/inbox/(Docmost / Confluence / Notion 等 → 供 AI 整理)。详见 README.md。"""
+"""外部文档同步到 kb/inbox/(Docmost / Confluence / Notion 等 → 供 AI 整理)"""