#!/usr/bin/env /usr/bin/python3
# -*- coding:utf-8 -*-
"""
PG → HDFS DataX sync ini 模板生成器。
生成全字段 sync ini 模板(参考起点)。开发者按需裁剪字段 / 改 where /
加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/。
CLI:
python3 bin/datax-sync-template-gen.py \\
-ds postgresql/prod-hobby \\
-t public.card_group_order_info \\
[-o [DIR]]
参数:
-ds 数据源 ref,形如 {db_type}/{env}-{实例简称}(同 sync ini 里
dataSource 字段格式)。暂只支持 postgresql。
-t schema 限定的表名,形如 schema.table(如 public.card_group_order_info)。
-o 输出目录(可选):
- 不传:stdout
- 传 -o 不带值:workspace/{yyyymmdd}/
- 传 -o
:自定义目录
输出文件名固定 {table}.ini(去掉 schema 前缀)。
"""
import argparse
import os
import re
import sys
from datetime import datetime
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
from dw_base.datax.datasources.data_source_factory import DataSourceFactory
from dw_base.datax.datax_constants import DS_POSTGRE_SQL_JDBC_URL
WORKSPACE_DEFAULT = os.path.join(
project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
)
def resolve_datasource(ds_ref):
"""复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。
ds_ref 形如 'postgresql/prod-hobby',首段为 db_type(同父目录名)。
datasource ini 落点:项目同级 ../datasource/{ds_ref}.ini。
"""
ds_type = ds_ref.split('/')[0]
if ds_type != 'postgresql':
raise NotImplementedError('暂只支持 postgresql 数据源,收到: ' + ds_type)
ds_file_path = os.path.normpath(
os.path.join(project_root, '..', 'datasource', ds_ref + '.ini'))
if not os.path.isfile(ds_file_path):
raise FileNotFoundError('数据源 ini 不存在: ' + ds_file_path)
return DataSourceFactory.get_data_source(ds_type, ds_file_path)
def parse_jdbc_url(jdbc_url):
"""从 jdbc:postgresql://host:port/database 抽 (host, port, database)。"""
m = re.match(r'jdbc:postgresql://([^:/]+)(?::(\d+))?/(.+)', jdbc_url)
if not m:
raise ValueError('无法解析 PG jdbcUrl: ' + jdbc_url)
return m.group(1), int(m.group(2) or 5432), m.group(3)
def query_columns(conn, schema, table):
"""查全字段名 + 注释,按 attnum 排序。"""
cur = conn.cursor()
cur.execute("""
SELECT a.attname, pg_catalog.col_description(a.attrelid, a.attnum)
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = %s AND c.relname = %s
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum
""", (schema, table))
rows = cur.fetchall()
if not rows:
raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
return [(r[0], r[1] or '') for r in rows]
def query_primary_key(conn, schema, table):
"""查单字段主键名;无主键 / 复合主键 → 返回空字符串。"""
cur = conn.cursor()
cur.execute("""
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
JOIN pg_class c ON c.oid = i.indrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = %s AND c.relname = %s AND i.indisprimary
""", (schema, table))
rows = cur.fetchall()
if len(rows) == 1:
return rows[0][0]
return ''
def render_template(ds_ref, database, schema, table, columns, pk):
column_str = ','.join(c for c, _ in columns)
today = datetime.now().strftime('%Y-%m-%d')
return (
'; 作者:\n'
'; 日期:{today}\n'
'; 工单:\n'
'; 目的:PG {database}.{schema}.{table} → Hive raw. 同步模板\n'
'; 状态:[待执行]\n'
'; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 [mask] /\n'
'; 调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n'
';\n'
'; 配套 DDL:manual/ddl/raw//raw__create.sql\n'
'\n'
'[reader]\n'
'dataSource = {ds_ref}\n'
'database = {database}\n'
'table = {schema}.{table}\n'
'column = {column_str}\n'
'columnType =\n'
"where = update_time >= '${{start_date}}' AND update_time < '${{stop_date}}'\n"
'querySql =\n'
'splitPk = {pk}\n'
'fetchSize = 1000\n'
'\n'
'[writer]\n'
'dataSource = hdfs/\n'
'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n'
'column = {column_str}\n'
'columnType =\n'
'fileType = orc\n'
'fileName = {table}_TODO_d\n'
'encoding = UTF-8\n'
'writeMode = truncate\n'
'fieldDelimiter = \\t\n'
).format(
today=today, ds_ref=ds_ref, database=database, schema=schema,
table=table, column_str=column_str, pk=pk,
)
def main():
parser = argparse.ArgumentParser(
prog='datax-sync-template-gen',
description='PG → HDFS DataX sync ini 模板生成器(全字段参考模板)',
)
parser.add_argument('-ds', required=True, metavar='DS_REF',
help='数据源 ref,形如 postgresql/prod-hobby(同 sync ini dataSource 字段)')
parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE',
help='schema 限定的表名(如 public.card_group_order_info)')
parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
help='输出目录(不传 stdout;不带值 workspace/{yyyymmdd}/;带值自定义)')
args = parser.parse_args()
if '.' not in args.t:
print('-t 必须 schema.table 格式,收到: ' + args.t, file=sys.stderr)
sys.exit(2)
schema, table = args.t.split('.', 1)
ds = resolve_datasource(args.ds)
ds_dict = ds.parse()
jdbc_url = ds_dict[DS_POSTGRE_SQL_JDBC_URL]
user = ds_dict['username']
password = ds_dict['password']
host, port, database = parse_jdbc_url(jdbc_url)
import pg8000.dbapi
conn = pg8000.dbapi.connect(
host=host, port=port, database=database,
user=user, password=password,
)
try:
columns = query_columns(conn, schema, table)
pk = query_primary_key(conn, schema, table)
finally:
conn.close()
content = render_template(args.ds, database, schema, table, columns, pk)
if args.o is None:
sys.stdout.write(content)
else:
os.makedirs(args.o, exist_ok=True)
out_path = os.path.join(args.o, table + '.ini')
with open(out_path, 'w', encoding='utf-8') as f:
f.write(content)
print('已写入: ' + out_path, file=sys.stderr)
if __name__ == '__main__':
main()