Ver código fonte

refactor(bin): datax-sync-template-gen 一次产出 md 表 + ini 模板

工作流定型:跑一次同时拿到 md 表(用于讨论字段裁剪)+ ini 模板
(按 md 决策手动改)。
- 删 --schema-md flag(不再单输出模式)
- stdout:先 md 表后 ini 模板
- -o:写两文件 {table}.md + {table}.ini
- 删 query_columns / query_primary_key(被 query_columns_full 取代)
- 单测对应 5 条删除
tianyu.chu 1 semana atrás
pai
commit
73675bf1a1

+ 33 - 61
bin/datax-sync-template-gen.py

@@ -1,10 +1,13 @@
 #!/usr/bin/env /usr/bin/python3
 # -*- coding:utf-8 -*-
 """
-PG → HDFS DataX sync ini 模板生成器。
+PG → HDFS DataX sync ini 模板生成器 + raw 建模 metadata 表
 
-生成全字段 sync ini 模板(参考起点)。开发者按需裁剪字段 / 改 where /
-加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/。
+一次跑同时产出两件:
+  1. PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/
+     裁剪类型空列)—— 用于 kb/24 raw 建模文档讨论字段裁剪
+  2. 全字段 sync ini 模板 —— 开发者按 md 讨论结果手动裁剪字段 / 改 where /
+     加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/
 
 CLI:
   python3 bin/datax-sync-template-gen.py \\
@@ -17,10 +20,9 @@ CLI:
        dataSource 字段格式)。暂只支持 postgresql。
   -t   schema 限定的表名,形如 schema.table(如 public.card_group_order_info)。
   -o   输出目录(可选):
-       - 不传:stdout
-       - 传 -o 不带值:workspace/{yyyymmdd}/
-       - 传 -o <DIR>:自定义目录
-       输出文件名固定 {table}.ini(去掉 schema 前缀)。
+       - 不传:stdout 同时打印 md 表 + ini 模板
+       - 传 -o 不带值:workspace/{yyyymmdd}/,写两个文件 {table}.md + {table}.ini
+       - 传 -o <DIR>:自定义目录,写两个文件
 """
 import argparse
 import os
@@ -64,41 +66,6 @@ def parse_jdbc_url(jdbc_url):
     return m.group(1), int(m.group(2) or 5432), m.group(3)
 
 
-def query_columns(conn, schema, table):
-    """查全字段名 + 注释,按 attnum 排序。"""
-    cur = conn.cursor()
-    cur.execute("""
-        SELECT a.attname, pg_catalog.col_description(a.attrelid, a.attnum)
-        FROM pg_catalog.pg_attribute a
-        JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
-        JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
-        WHERE n.nspname = %s AND c.relname = %s
-          AND a.attnum > 0 AND NOT a.attisdropped
-        ORDER BY a.attnum
-    """, (schema, table))
-    rows = cur.fetchall()
-    if not rows:
-        raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
-    return [(r[0], r[1] or '') for r in rows]
-
-
-def query_primary_key(conn, schema, table):
-    """查单字段主键名;无主键 / 复合主键 → 返回空字符串。"""
-    cur = conn.cursor()
-    cur.execute("""
-        SELECT a.attname
-        FROM pg_index i
-        JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
-        JOIN pg_class c ON c.oid = i.indrelid
-        JOIN pg_namespace n ON n.oid = c.relnamespace
-        WHERE n.nspname = %s AND c.relname = %s AND i.indisprimary
-    """, (schema, table))
-    rows = cur.fetchall()
-    if len(rows) == 1:
-        return rows[0][0]
-    return ''
-
-
 def query_columns_full(conn, schema, table):
     """带序号 / 类型 / 主键标识的全字段 metadata 查询,按 attnum 排序。
 
@@ -189,9 +156,7 @@ def main():
     parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE',
                         help='schema 限定的表名(如 public.card_group_order_info)')
     parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
-                        help='输出目录(不传 stdout;不带值 workspace/{yyyymmdd}/;带值自定义)')
-    parser.add_argument('--schema-md', action='store_true',
-                        help='改为输出 PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/裁剪类型空列),用于 kb/24 raw 建模文档')
+                        help='输出目录(不传 stdout 同时打印 md 表 + ini 模板;不带值 workspace/{yyyymmdd}/ 写两文件;带值自定义目录写两文件)')
     args = parser.parse_args()
 
     if '.' not in args.t:
@@ -212,28 +177,35 @@ def main():
         user=user, password=password,
     )
     try:
-        if args.schema_md:
-            rows = query_columns_full(conn, schema, table)
-            if not rows:
-                raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
-            content = render_schema_md(rows)
-            out_suffix = '.md'
-        else:
-            columns = query_columns(conn, schema, table)
-            pk = query_primary_key(conn, schema, table)
-            content = render_template(args.ds, database, schema, table, columns, pk)
-            out_suffix = '.ini'
+        full_rows = query_columns_full(conn, schema, table)
+        if not full_rows:
+            raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
     finally:
         conn.close()
 
+    # full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...]
+    columns = [(r[1], r[2] or '') for r in full_rows]
+    pk_names = [r[1] for r in full_rows if r[4] == 'PK']
+    pk = pk_names[0] if len(pk_names) == 1 else ''  # 复合主键 / 无主键 → 空
+
+    md_content = render_schema_md(full_rows)
+    ini_content = render_template(args.ds, database, schema, table, columns, pk)
+
     if args.o is None:
-        sys.stdout.write(content)
+        # stdout:先 md 表后 ini 模板
+        sys.stdout.write(md_content)
+        sys.stdout.write('\n')
+        sys.stdout.write(ini_content)
     else:
         os.makedirs(args.o, exist_ok=True)
-        out_path = os.path.join(args.o, table + out_suffix)
-        with open(out_path, 'w', encoding='utf-8') as f:
-            f.write(content)
-        print('已写入: ' + out_path, file=sys.stderr)
+        md_path = os.path.join(args.o, table + '.md')
+        ini_path = os.path.join(args.o, table + '.ini')
+        with open(md_path, 'w', encoding='utf-8') as f:
+            f.write(md_content)
+        with open(ini_path, 'w', encoding='utf-8') as f:
+            f.write(ini_content)
+        print('已写入: ' + md_path, file=sys.stderr)
+        print('已写入: ' + ini_path, file=sys.stderr)
 
 
 if __name__ == '__main__':

+ 1 - 42
tests/unit/datax/test_sync_template_gen.py

@@ -2,7 +2,7 @@
 """
 datax-sync-template-gen 模板渲染 + JDBC URL 解析单测。
 
-不连真 PG(query_columns / query_primary_key 走 mock conn)。
+不连真 PG(query_columns_full 走 mock conn)。
 脚本路径含连字符,用 importlib.util 动态加载为模块。
 """
 import importlib.util
@@ -46,47 +46,6 @@ def test_parse_jdbc_url_invalid():
         GEN.parse_jdbc_url('mysql://10.0.0.1:3306/foo')
 
 
-def test_query_columns_returns_name_and_comment():
-    conn = MagicMock()
-    cur = conn.cursor.return_value
-    cur.fetchall.return_value = [
-        ('id', 'id'),
-        ('user_id', '用户id'),
-        ('create_time', None),  # 无注释
-    ]
-    cols = GEN.query_columns(conn, 'public', 'orders')
-    assert cols == [('id', 'id'), ('user_id', '用户id'), ('create_time', '')]
-
-
-def test_query_columns_empty_raises():
-    conn = MagicMock()
-    cur = conn.cursor.return_value
-    cur.fetchall.return_value = []
-    with pytest.raises(ValueError, match='表不存在或无字段'):
-        GEN.query_columns(conn, 'public', 'no_such_table')
-
-
-def test_query_primary_key_single():
-    conn = MagicMock()
-    cur = conn.cursor.return_value
-    cur.fetchall.return_value = [('id',)]
-    assert GEN.query_primary_key(conn, 'public', 'orders') == 'id'
-
-
-def test_query_primary_key_composite_returns_empty():
-    conn = MagicMock()
-    cur = conn.cursor.return_value
-    cur.fetchall.return_value = [('id1',), ('id2',)]
-    assert GEN.query_primary_key(conn, 'public', 'orders') == ''
-
-
-def test_query_primary_key_none_returns_empty():
-    conn = MagicMock()
-    cur = conn.cursor.return_value
-    cur.fetchall.return_value = []
-    assert GEN.query_primary_key(conn, 'public', 'orders') == ''
-
-
 def test_render_template_includes_required_fields():
     columns = [('id', 'id'), ('name', '姓名'), ('create_time', '创建时间')]
     out = GEN.render_template(