Kaynağa Gözat

feat(bin): 加 hive-ddl-gen.py(raw 层 DDL 生成器)

按 ADR-06 raw 范围实施。读 sync ini reader.dataSource / table /
column + writer.path,PG pg_description 拿字段中文注释(复用
sync-template-gen 的 datasource 解析与查询函数),按 reader.column
顺序渲染全字段 STRING + dt 分区 + ORC + EXTERNAL TABLE。

CLI: -l {raw|ods}(必填,ods 报 NotImplementedError)+ -ini PATH
+ -o 三态(同 sync gen,stdout 始终)。

表名由 writer.path 末段反推(要求 dt=... 结尾,否则报错)。

单测 17 条覆盖渲染 / sync ini 解析 / writer.path 反推 / -l ods 报错
/ -o 三态 stdout(mock conn 不连真 PG)。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tianyu.chu 1 hafta önce
ebeveyn
işleme
b6b3776af9
2 değiştirilmiş dosya ile 426 ekleme ve 0 silme
  1. 202 0
      bin/hive-ddl-gen.py
  2. 224 0
      tests/unit/datax/test_hive_ddl_gen.py

+ 202 - 0
bin/hive-ddl-gen.py

@@ -0,0 +1,202 @@
+#!/usr/bin/env /usr/bin/python3
+# -*- coding:utf-8 -*-
+"""
+Hive DDL 生成器(raw 层;ods 层占位待实施)。
+
+输入 sync ini,从 PG 抽字段中文注释,按 reader.column 顺序渲染
+全字段 STRING + dt STRING 分区 + ORC + EXTERNAL TABLE,写到 stdout
+(传 -o 时额外落盘 {table_name}_create.sql)。
+
+CLI:
+  python3 bin/hive-ddl-gen.py -l raw -ini jobs/raw/{域}/{table}.ini [-o [DIR]]
+
+参数:
+  -l    层级(raw 必填;ods 暂未实施,传入直接 NotImplementedError)
+  -ini  sync ini 路径(按项目根解析相对路径,与项目其他 bin 入口一致)
+  -o    输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘
+        workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)
+
+表名由 writer.path 末两段反推(path 末段必须是 dt=... 占位)。
+"""
+import argparse
+import importlib.util
+import os
+import sys
+from configparser import ConfigParser
+from datetime import datetime
+
+project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.append(project_root)
+
+
+def _load_sync_gen():
+    """复用 datax-sync-template-gen 的 resolve_datasource / parse_jdbc_url /
+    query_columns_full / DS_POSTGRE_SQL_JDBC_URL(脚本名含连字符,importlib 加载)。
+    """
+    spec = importlib.util.spec_from_file_location(
+        'datax_sync_template_gen',
+        os.path.join(project_root, 'bin', 'datax-sync-template-gen.py'),
+    )
+    mod = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(mod)
+    return mod
+
+
+SYNC_GEN = _load_sync_gen()
+WORKSPACE_DEFAULT = os.path.join(
+    project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
+)
+
+
+def _resolve_to_project_root(path):
+    if os.path.isabs(path):
+        return path
+    return os.path.join(project_root, path)
+
+
+def parse_sync_ini(path):
+    """解析 sync ini,提取 raw DDL 渲染所需 5 项。"""
+    if not os.path.isfile(path):
+        raise FileNotFoundError('sync ini 不存在: ' + path)
+    cp = ConfigParser()
+    cp.read(path, encoding='utf-8')
+    if not cp.has_section('reader'):
+        raise KeyError('sync ini 缺 [reader] 段: ' + path)
+    if not cp.has_section('writer'):
+        raise KeyError('sync ini 缺 [writer] 段: ' + path)
+
+    ds_ref = cp.get('reader', 'dataSource').strip()
+    schema_table = cp.get('reader', 'table').strip()
+    columns_str = cp.get('reader', 'column').strip()
+    writer_path = cp.get('writer', 'path').strip()
+
+    if '.' not in schema_table:
+        raise ValueError('reader.table 必须 schema.table 格式: ' + schema_table)
+    schema, table = schema_table.split('.', 1)
+    columns = [c.strip() for c in columns_str.split(',') if c.strip()]
+    if not columns:
+        raise ValueError('reader.column 为空')
+
+    return {
+        'ds_ref': ds_ref,
+        'schema': schema,
+        'table': table,
+        'columns': columns,
+        'writer_path': writer_path,
+    }
+
+
+def reverse_table_name(writer_path):
+    """从 writer.path 反推 Hive 表名。
+
+    path 形如 /user/hive/warehouse/raw.db/{table_name}/dt=${dt}/
+    末段必须是 dt=... 占位,倒数第二段即表名。
+    """
+    p = writer_path.rstrip('/')
+    parts = p.rsplit('/', 1)
+    if len(parts) != 2 or not parts[1].startswith('dt='):
+        raise ValueError(
+            'writer.path 末段必须是 dt=...,无法反推表名: ' + writer_path)
+    return parts[0].rsplit('/', 1)[-1]
+
+
+def fetch_column_comments(ds_ref, schema, table):
+    """连 PG 拿 schema.table 的 {字段名: 中文注释} dict。
+
+    复用 sync-template-gen 的 datasource 解析与 pg_catalog 查询,不另起一套。
+    """
+    ds = SYNC_GEN.resolve_datasource(ds_ref)
+    ds_dict = ds.parse()
+    jdbc_url = ds_dict[SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL]
+    user = ds_dict['username']
+    password = ds_dict['password']
+    host, port, database = SYNC_GEN.parse_jdbc_url(jdbc_url)
+
+    import pg8000.dbapi
+    conn = pg8000.dbapi.connect(
+        host=host, port=port, database=database,
+        user=user, password=password,
+    )
+    try:
+        rows = SYNC_GEN.query_columns_full(conn, schema, table)
+    finally:
+        conn.close()
+    return {name: (comment or '') for _, name, comment, _, _ in rows}
+
+
+def render_raw_ddl(table_name, columns, comment_dict):
+    """渲染 raw 层 DDL:全字段 STRING + dt STRING 分区 + ORC + EXTERNAL。
+
+    字段顺序严格按 columns(已是 sync ini reader.column 裁剪后顺序);
+    字段注释从 comment_dict 按字段名查,缺失留空字符串。
+    """
+    today = datetime.now().strftime('%Y-%m-%d')
+    width = max(len(c) for c in columns) + 4
+
+    lines = [
+        '-- 作者:<TODO>',
+        '-- 日期:' + today,
+        '-- 工单:<TODO>',
+        '-- 目的:<TODO>',
+        '-- 状态:[待执行]',
+        '-- 备注:<TODO>',
+        '',
+        'DROP TABLE IF EXISTS raw.' + table_name + ';',
+        '',
+        'CREATE EXTERNAL TABLE IF NOT EXISTS raw.' + table_name + ' (',
+    ]
+    last_idx = len(columns) - 1
+    for i, col in enumerate(columns):
+        comma = ',' if i < last_idx else ''
+        comment = comment_dict.get(col, '').replace("'", "''")
+        lines.append("    {col:<{w}}STRING{comma} COMMENT '{comment}'".format(
+            col=col, w=width, comma=comma, comment=comment))
+    lines.extend([
+        ')',
+        "COMMENT '<TODO>'",
+        'PARTITIONED BY (dt STRING)',
+        'STORED AS ORC',
+        "LOCATION '/user/hive/warehouse/raw.db/" + table_name + "';",
+        '',
+    ])
+    return '\n'.join(lines)
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        prog='hive-ddl-gen',
+        description='Hive DDL 生成器(raw 层;ods 层占位待实施)',
+    )
+    parser.add_argument('-l', required=True, choices=['raw', 'ods'],
+                        metavar='LAYER',
+                        help='层级(raw 必填;ods 暂未实施直接报错)')
+    parser.add_argument('-ini', required=True, metavar='PATH',
+                        help='sync ini 路径(按项目根解析相对路径)')
+    parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None,
+                        metavar='DIR',
+                        help='输出目录(任意三态 stdout 始终打印;不传仅 stdout;不带值额外落盘 workspace/{yyyymmdd}/;带值额外落盘 <DIR>/)')
+    args = parser.parse_args()
+
+    if args.l == 'ods':
+        raise NotImplementedError('ods 层 DDL 生成暂未实施(ADR-06)')
+
+    ini_path = _resolve_to_project_root(args.ini)
+    spec = parse_sync_ini(ini_path)
+    table_name = reverse_table_name(spec['writer_path'])
+    comment_dict = fetch_column_comments(
+        spec['ds_ref'], spec['schema'], spec['table'])
+
+    ddl = render_raw_ddl(table_name, spec['columns'], comment_dict)
+
+    sys.stdout.write(ddl)
+
+    if args.o is not None:
+        os.makedirs(args.o, exist_ok=True)
+        out_path = os.path.join(args.o, table_name + '_create.sql')
+        with open(out_path, 'w', encoding='utf-8') as f:
+            f.write(ddl)
+        print('已写入: ' + out_path, file=sys.stderr)
+
+
+if __name__ == '__main__':
+    main()

+ 224 - 0
tests/unit/datax/test_hive_ddl_gen.py

@@ -0,0 +1,224 @@
+# -*- coding:utf-8 -*-
+"""
+hive-ddl-gen 渲染 / sync ini 解析 / writer.path 反推单测。
+
+不连真 PG(fetch_column_comments 走 mock conn)。
+脚本路径含连字符,用 importlib.util 动态加载为模块。
+"""
+import importlib.util
+import os
+import sys
+from unittest.mock import MagicMock
+
+import pytest
+
+PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'hive-ddl-gen.py')
+
+
+def _load_script():
+    spec = importlib.util.spec_from_file_location('hive_ddl_gen', SCRIPT_PATH)
+    mod = importlib.util.module_from_spec(spec)
+    sys.modules['hive_ddl_gen'] = mod
+    spec.loader.exec_module(mod)
+    return mod
+
+
+GEN = _load_script()
+
+
+def test_reverse_table_name_basic():
+    assert GEN.reverse_table_name(
+        '/user/hive/warehouse/raw.db/raw_trd_card_group_order_info_inc_d/dt=${dt}/'
+    ) == 'raw_trd_card_group_order_info_inc_d'
+
+
+def test_reverse_table_name_no_trailing_slash():
+    assert GEN.reverse_table_name(
+        '/user/hive/warehouse/raw.db/foo/dt=20260429'
+    ) == 'foo'
+
+
+def test_reverse_table_name_missing_dt_segment_raises():
+    with pytest.raises(ValueError, match='dt='):
+        GEN.reverse_table_name('/user/hive/warehouse/raw.db/foo/')
+
+
+def test_parse_sync_ini_basic(tmp_path):
+    p = tmp_path / 'sync.ini'
+    p.write_text(
+        '[reader]\n'
+        'dataSource = postgresql/prd-poyee\n'
+        'table = public.users\n'
+        'column = id, name, create_time\n'
+        '\n'
+        '[writer]\n'
+        'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
+        encoding='utf-8',
+    )
+    spec = GEN.parse_sync_ini(str(p))
+    assert spec == {
+        'ds_ref': 'postgresql/prd-poyee',
+        'schema': 'public',
+        'table': 'users',
+        'columns': ['id', 'name', 'create_time'],
+        'writer_path': '/user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/',
+    }
+
+
+def test_parse_sync_ini_missing_file_raises():
+    with pytest.raises(FileNotFoundError, match='sync ini 不存在'):
+        GEN.parse_sync_ini('/nonexistent/x.ini')
+
+
+def test_parse_sync_ini_missing_writer_section_raises(tmp_path):
+    p = tmp_path / 'bad.ini'
+    p.write_text(
+        '[reader]\ndataSource = a/b\ntable = s.t\ncolumn = id\n',
+        encoding='utf-8',
+    )
+    with pytest.raises(KeyError, match='\\[writer\\]'):
+        GEN.parse_sync_ini(str(p))
+
+
+def test_parse_sync_ini_table_without_dot_raises(tmp_path):
+    p = tmp_path / 'bad.ini'
+    p.write_text(
+        '[reader]\ndataSource = a/b\ntable = users\ncolumn = id\n'
+        '[writer]\npath = /x/dt=${dt}/\n',
+        encoding='utf-8',
+    )
+    with pytest.raises(ValueError, match='schema.table'):
+        GEN.parse_sync_ini(str(p))
+
+
+def test_parse_sync_ini_empty_column_raises(tmp_path):
+    p = tmp_path / 'bad.ini'
+    p.write_text(
+        '[reader]\ndataSource = a/b\ntable = s.t\ncolumn =\n'
+        '[writer]\npath = /x/dt=${dt}/\n',
+        encoding='utf-8',
+    )
+    with pytest.raises(ValueError, match='column'):
+        GEN.parse_sync_ini(str(p))
+
+
+def test_render_raw_ddl_field_order_follows_columns():
+    columns = ['id', 'name', 'create_time']
+    comments = {'id': 'id', 'name': '姓名', 'create_time': '创建时间'}
+    out = GEN.render_raw_ddl('raw_usr_users_inc_d', columns, comments)
+    assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in out
+    id_idx = out.index("'id'")
+    name_idx = out.index("'姓名'")
+    ct_idx = out.index("'创建时间'")
+    assert id_idx < name_idx < ct_idx
+    assert 'PARTITIONED BY (dt STRING)' in out
+    assert 'STORED AS ORC' in out
+    assert "LOCATION '/user/hive/warehouse/raw.db/raw_usr_users_inc_d';" in out
+
+
+def test_render_raw_ddl_missing_comment_blank():
+    out = GEN.render_raw_ddl('t', ['col_no_cmt'], {})
+    assert "col_no_cmt" in out
+    assert "COMMENT ''" in out
+
+
+def test_render_raw_ddl_single_quote_in_comment_escaped():
+    out = GEN.render_raw_ddl('t', ['col'], {'col': "don't"})
+    assert "COMMENT 'don''t'" in out
+
+
+def test_render_raw_ddl_last_column_no_trailing_comma():
+    out = GEN.render_raw_ddl('t', ['a', 'b'], {})
+    field_lines = [l for l in out.split('\n') if 'STRING' in l]
+    assert len(field_lines) == 2
+    assert 'STRING,' in field_lines[0]
+    assert 'STRING,' not in field_lines[1]
+
+
+def test_render_raw_ddl_external_and_drop():
+    out = GEN.render_raw_ddl('t', ['a'], {})
+    assert 'DROP TABLE IF EXISTS raw.t;' in out
+    assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.t (' in out
+
+
+def _patch_main_dependencies(monkeypatch, tmp_path):
+    """共享 mock:让 main() 不连真 PG / 真 datasource。"""
+    sync_ini = tmp_path / 'sync.ini'
+    sync_ini.write_text(
+        '[reader]\n'
+        'dataSource = postgresql/prd-poyee\n'
+        'table = public.users\n'
+        'column = id, name\n'
+        '\n'
+        '[writer]\n'
+        'path = /user/hive/warehouse/raw.db/raw_usr_users_inc_d/dt=${dt}/\n',
+        encoding='utf-8',
+    )
+
+    fake_ds = MagicMock()
+    fake_ds.parse.return_value = {
+        GEN.SYNC_GEN.DS_POSTGRE_SQL_JDBC_URL: 'jdbc:postgresql://10.0.0.1:5432/mydb',
+        'username': 'u',
+        'password': 'p',
+    }
+    monkeypatch.setattr(GEN.SYNC_GEN, 'resolve_datasource', lambda ref: fake_ds)
+
+    fake_conn = MagicMock()
+    fake_cur = fake_conn.cursor.return_value
+    fake_cur.fetchall.return_value = [
+        (1, 'id', 'id', 'bigint', 'PK'),
+        (2, 'name', '姓名', 'character varying', ''),
+    ]
+    fake_pg8000 = MagicMock()
+    fake_pg8000.dbapi.connect.return_value = fake_conn
+    monkeypatch.setitem(sys.modules, 'pg8000', fake_pg8000)
+    monkeypatch.setitem(sys.modules, 'pg8000.dbapi', fake_pg8000.dbapi)
+
+    return str(sync_ini)
+
+
+def test_main_l_ods_raises_not_implemented(monkeypatch, tmp_path):
+    sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
+    monkeypatch.setattr(sys, 'argv', [
+        'hive-ddl-gen.py', '-l', 'ods', '-ini', sync_ini,
+    ])
+    with pytest.raises(NotImplementedError, match='ods'):
+        GEN.main()
+
+
+def test_main_stdout_only_when_no_o(monkeypatch, capsys, tmp_path):
+    sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
+    monkeypatch.setattr(sys, 'argv', [
+        'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini,
+    ])
+    GEN.main()
+    captured = capsys.readouterr()
+    assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
+    assert "'姓名'" in captured.out
+    assert '已写入' not in captured.err
+
+
+def test_main_stdout_and_disk_when_o_with_dir(monkeypatch, capsys, tmp_path):
+    sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
+    out_dir = tmp_path / 'out'
+    monkeypatch.setattr(sys, 'argv', [
+        'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o', str(out_dir),
+    ])
+    GEN.main()
+    captured = capsys.readouterr()
+    assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
+    assert '已写入' in captured.err
+    assert (out_dir / 'raw_usr_users_inc_d_create.sql').exists()
+
+
+def test_main_stdout_and_disk_when_o_no_value(monkeypatch, capsys, tmp_path):
+    sync_ini = _patch_main_dependencies(monkeypatch, tmp_path)
+    monkeypatch.setattr(GEN, 'WORKSPACE_DEFAULT', str(tmp_path / 'workspace'))
+    monkeypatch.setattr(sys, 'argv', [
+        'hive-ddl-gen.py', '-l', 'raw', '-ini', sync_ini, '-o',
+    ])
+    GEN.main()
+    captured = capsys.readouterr()
+    assert 'CREATE EXTERNAL TABLE IF NOT EXISTS raw.raw_usr_users_inc_d (' in captured.out
+    assert '已写入' in captured.err