| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- PG → HDFS DataX sync ini 模板生成器 + raw 建模 metadata 表。
- 一次跑同时产出两件:
- 1. PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/
- 裁剪类型空列)—— 用于 kb/24 raw 建模文档讨论字段裁剪
- 2. 全字段 sync ini 模板 —— 开发者按 md 讨论结果手动裁剪字段 / 改 where /
- 加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/
- CLI:
- python3 bin/datax-sync-template-gen.py \\
- -ds postgresql/prod-hobby \\
- -t public.card_group_order_info \\
- [-o [DIR]]
- 参数:
- -ds 数据源 ref,形如 {db_type}/{env}-{实例简称}(同 sync ini 里
- dataSource 字段格式)。暂只支持 postgresql。
- -t schema 限定的表名,形如 schema.table(如 public.card_group_order_info)。
- -o 输出目录(可选):
- - 不传:stdout 同时打印 md 表 + ini 模板
- - 传 -o 不带值:workspace/{yyyymmdd}/,写两个文件 {table}.md + {table}.ini
- - 传 -o <DIR>:自定义目录,写两个文件
- """
- import argparse
- import os
- import re
- import sys
- from datetime import datetime
- project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(project_root)
- from dw_base.datax.datasources.data_source_factory import DataSourceFactory
- from dw_base.datax.datax_constants import DS_POSTGRE_SQL_JDBC_URL
- WORKSPACE_DEFAULT = os.path.join(
- project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
- )
- def resolve_datasource(ds_ref):
- """复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。
- ds_ref 形如 'postgresql/prod-hobby',首段为 db_type(同父目录名)。
- datasource ini 落点:项目同级 ../datasource/{ds_ref}.ini。
- """
- ds_type = ds_ref.split('/')[0]
- if ds_type != 'postgresql':
- raise NotImplementedError('暂只支持 postgresql 数据源,收到: ' + ds_type)
- ds_file_path = os.path.normpath(
- os.path.join(project_root, '..', 'datasource', ds_ref + '.ini'))
- if not os.path.isfile(ds_file_path):
- raise FileNotFoundError('数据源 ini 不存在: ' + ds_file_path)
- return DataSourceFactory.get_data_source(ds_type, ds_file_path)
- def parse_jdbc_url(jdbc_url):
- """从 jdbc:postgresql://host:port/database 抽 (host, port, database)。"""
- m = re.match(r'jdbc:postgresql://([^:/]+)(?::(\d+))?/(.+)', jdbc_url)
- if not m:
- raise ValueError('无法解析 PG jdbcUrl: ' + jdbc_url)
- return m.group(1), int(m.group(2) or 5432), m.group(3)
- def query_columns_full(conn, schema, table):
- """带序号 / 类型 / 主键标识的全字段 metadata 查询,按 attnum 排序。
- 返回 [(attnum, attname, comment, pg_type, pk_flag), ...]
- """
- cur = conn.cursor()
- cur.execute("""
- SELECT
- a.attnum,
- a.attname,
- pg_catalog.col_description(a.attrelid, a.attnum),
- pg_catalog.format_type(a.atttypid, a.atttypmod),
- CASE WHEN EXISTS (
- SELECT 1 FROM pg_index i
- WHERE i.indrelid = a.attrelid AND i.indisprimary
- AND a.attnum = ANY(i.indkey)
- ) THEN 'PK' ELSE '' END
- FROM pg_catalog.pg_attribute a
- JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
- JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
- WHERE n.nspname = %s AND c.relname = %s
- AND a.attnum > 0 AND NOT a.attisdropped
- ORDER BY a.attnum
- """, (schema, table))
- return cur.fetchall()
- def render_schema_md(rows):
- """输出 markdown 表格:序号 / 字段名 / 中文名 / 数据类型 / 主键标识 / 裁剪类型(空,开发者填)"""
- lines = [
- '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 裁剪类型 |',
- '| --- | --- | --- | --- | --- | --- |',
- ]
- for num, name, comment, typ, pk in rows:
- lines.append('| {} | `{}` | {} | {} | {} | |'.format(
- num, name, comment or '', typ, pk))
- return '\n'.join(lines) + '\n'
- def render_template(ds_ref, database, schema, table, columns, pk):
- column_str = ','.join(c for c, _ in columns)
- today = datetime.now().strftime('%Y-%m-%d')
- return (
- '; 作者:<TODO>\n'
- '; 日期:{today}\n'
- '; 工单:<TODO>\n'
- '; 目的:PG {database}.{schema}.{table} → Hive raw.<TODO> 同步模板\n'
- '; 状态:[待执行]\n'
- '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 [mask] /\n'
- '; 调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n'
- ';\n'
- '; 配套 DDL:manual/ddl/raw/<TODO_domain>/raw_<TODO>_create.sql\n'
- '\n'
- '[reader]\n'
- 'dataSource = {ds_ref}\n'
- 'database = {database}\n'
- 'table = {schema}.{table}\n'
- 'column = {column_str}\n'
- 'columnType =\n'
- "where = update_time >= '${{start_date}}' AND update_time < '${{stop_date}}'\n"
- 'querySql =\n'
- 'splitPk = {pk}\n'
- 'fetchSize = 1000\n'
- '\n'
- '[writer]\n'
- 'dataSource = hdfs/<TODO>\n'
- 'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n'
- 'column = {column_str}\n'
- 'columnType =\n'
- 'fileType = orc\n'
- 'fileName = {table}_TODO_d\n'
- 'encoding = UTF-8\n'
- 'writeMode = truncate\n'
- 'fieldDelimiter = \\t\n'
- ).format(
- today=today, ds_ref=ds_ref, database=database, schema=schema,
- table=table, column_str=column_str, pk=pk,
- )
- def main():
- parser = argparse.ArgumentParser(
- prog='datax-sync-template-gen',
- description='PG → HDFS DataX sync ini 模板生成器(全字段参考模板)',
- )
- parser.add_argument('-ds', required=True, metavar='DS_REF',
- help='数据源 ref,形如 postgresql/prod-hobby(同 sync ini dataSource 字段)')
- parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE',
- help='schema 限定的表名(如 public.card_group_order_info)')
- parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
- help='输出目录(不传 stdout 同时打印 md 表 + ini 模板;不带值 workspace/{yyyymmdd}/ 写两文件;带值自定义目录写两文件)')
- args = parser.parse_args()
- if '.' not in args.t:
- print('-t 必须 schema.table 格式,收到: ' + args.t, file=sys.stderr)
- sys.exit(2)
- schema, table = args.t.split('.', 1)
- ds = resolve_datasource(args.ds)
- ds_dict = ds.parse()
- jdbc_url = ds_dict[DS_POSTGRE_SQL_JDBC_URL]
- user = ds_dict['username']
- password = ds_dict['password']
- host, port, database = parse_jdbc_url(jdbc_url)
- import pg8000.dbapi
- conn = pg8000.dbapi.connect(
- host=host, port=port, database=database,
- user=user, password=password,
- )
- try:
- full_rows = query_columns_full(conn, schema, table)
- if not full_rows:
- raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
- finally:
- conn.close()
- # full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...]
- columns = [(r[1], r[2] or '') for r in full_rows]
- pk_names = [r[1] for r in full_rows if r[4] == 'PK']
- pk = pk_names[0] if len(pk_names) == 1 else '' # 复合主键 / 无主键 → 空
- md_content = render_schema_md(full_rows)
- ini_content = render_template(args.ds, database, schema, table, columns, pk)
- if args.o is None:
- # stdout:先 md 表后 ini 模板
- sys.stdout.write(md_content)
- sys.stdout.write('\n')
- sys.stdout.write(ini_content)
- else:
- os.makedirs(args.o, exist_ok=True)
- md_path = os.path.join(args.o, table + '.md')
- ini_path = os.path.join(args.o, table + '.ini')
- with open(md_path, 'w', encoding='utf-8') as f:
- f.write(md_content)
- with open(ini_path, 'w', encoding='utf-8') as f:
- f.write(ini_content)
- print('已写入: ' + md_path, file=sys.stderr)
- print('已写入: ' + ini_path, file=sys.stderr)
- if __name__ == '__main__':
- main()
|