datax-sync-template-gen.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. PG → HDFS DataX sync ini 模板生成器。
  5. 生成全字段 sync ini 模板(参考起点)。开发者按需裁剪字段 / 改 where /
  6. 加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/。
  7. CLI:
  8. python3 bin/datax-sync-template-gen.py \\
  9. -ds postgresql/prod-hobby \\
  10. -t public.card_group_order_info \\
  11. [-o [DIR]]
  12. 参数:
  13. -ds 数据源 ref,形如 {db_type}/{env}-{实例简称}(同 sync ini 里
  14. dataSource 字段格式)。暂只支持 postgresql。
  15. -t schema 限定的表名,形如 schema.table(如 public.card_group_order_info)。
  16. -o 输出目录(可选):
  17. - 不传:stdout
  18. - 传 -o 不带值:workspace/{yyyymmdd}/
  19. - 传 -o <DIR>:自定义目录
  20. 输出文件名固定 {table}.ini(去掉 schema 前缀)。
  21. """
  22. import argparse
  23. import os
  24. import re
  25. import sys
  26. from datetime import datetime
  27. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  28. sys.path.append(project_root)
  29. from dw_base.datax.datasources.data_source_factory import DataSourceFactory
  30. from dw_base.datax.datax_constants import DS_POSTGRE_SQL_JDBC_URL
  31. WORKSPACE_DEFAULT = os.path.join(
  32. project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
  33. )
  34. def resolve_datasource(ds_ref):
  35. """复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。
  36. ds_ref 形如 'postgresql/prod-hobby',首段为 db_type(同父目录名)。
  37. datasource ini 落点:项目同级 ../datasource/{ds_ref}.ini。
  38. """
  39. ds_type = ds_ref.split('/')[0]
  40. if ds_type != 'postgresql':
  41. raise NotImplementedError('暂只支持 postgresql 数据源,收到: ' + ds_type)
  42. ds_file_path = os.path.normpath(
  43. os.path.join(project_root, '..', 'datasource', ds_ref + '.ini'))
  44. if not os.path.isfile(ds_file_path):
  45. raise FileNotFoundError('数据源 ini 不存在: ' + ds_file_path)
  46. return DataSourceFactory.get_data_source(ds_type, ds_file_path)
  47. def parse_jdbc_url(jdbc_url):
  48. """从 jdbc:postgresql://host:port/database 抽 (host, port, database)。"""
  49. m = re.match(r'jdbc:postgresql://([^:/]+)(?::(\d+))?/(.+)', jdbc_url)
  50. if not m:
  51. raise ValueError('无法解析 PG jdbcUrl: ' + jdbc_url)
  52. return m.group(1), int(m.group(2) or 5432), m.group(3)
  53. def query_columns(conn, schema, table):
  54. """查全字段名 + 注释,按 attnum 排序。"""
  55. cur = conn.cursor()
  56. cur.execute("""
  57. SELECT a.attname, pg_catalog.col_description(a.attrelid, a.attnum)
  58. FROM pg_catalog.pg_attribute a
  59. JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
  60. JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
  61. WHERE n.nspname = %s AND c.relname = %s
  62. AND a.attnum > 0 AND NOT a.attisdropped
  63. ORDER BY a.attnum
  64. """, (schema, table))
  65. rows = cur.fetchall()
  66. if not rows:
  67. raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
  68. return [(r[0], r[1] or '') for r in rows]
  69. def query_primary_key(conn, schema, table):
  70. """查单字段主键名;无主键 / 复合主键 → 返回空字符串。"""
  71. cur = conn.cursor()
  72. cur.execute("""
  73. SELECT a.attname
  74. FROM pg_index i
  75. JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
  76. JOIN pg_class c ON c.oid = i.indrelid
  77. JOIN pg_namespace n ON n.oid = c.relnamespace
  78. WHERE n.nspname = %s AND c.relname = %s AND i.indisprimary
  79. """, (schema, table))
  80. rows = cur.fetchall()
  81. if len(rows) == 1:
  82. return rows[0][0]
  83. return ''
  84. def query_columns_full(conn, schema, table):
  85. """带序号 / 类型 / 主键标识的全字段 metadata 查询,按 attnum 排序。
  86. 返回 [(attnum, attname, comment, pg_type, pk_flag), ...]
  87. """
  88. cur = conn.cursor()
  89. cur.execute("""
  90. SELECT
  91. a.attnum,
  92. a.attname,
  93. pg_catalog.col_description(a.attrelid, a.attnum),
  94. pg_catalog.format_type(a.atttypid, a.atttypmod),
  95. CASE WHEN EXISTS (
  96. SELECT 1 FROM pg_index i
  97. WHERE i.indrelid = a.attrelid AND i.indisprimary
  98. AND a.attnum = ANY(i.indkey)
  99. ) THEN 'PK' ELSE '' END
  100. FROM pg_catalog.pg_attribute a
  101. JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
  102. JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
  103. WHERE n.nspname = %s AND c.relname = %s
  104. AND a.attnum > 0 AND NOT a.attisdropped
  105. ORDER BY a.attnum
  106. """, (schema, table))
  107. return cur.fetchall()
  108. def render_schema_md(rows):
  109. """输出 markdown 表格:序号 / 字段名 / 中文名 / 数据类型 / 主键标识 / 裁剪类型(空,开发者填)"""
  110. lines = [
  111. '| 序号 | 字段名 | 中文名 | 数据类型 | 主键标识 | 裁剪类型 |',
  112. '| --- | --- | --- | --- | --- | --- |',
  113. ]
  114. for num, name, comment, typ, pk in rows:
  115. lines.append('| {} | `{}` | {} | {} | {} | |'.format(
  116. num, name, comment or '', typ, pk))
  117. return '\n'.join(lines) + '\n'
  118. def render_template(ds_ref, database, schema, table, columns, pk):
  119. column_str = ','.join(c for c, _ in columns)
  120. today = datetime.now().strftime('%Y-%m-%d')
  121. return (
  122. '; 作者:<TODO>\n'
  123. '; 日期:{today}\n'
  124. '; 工单:<TODO>\n'
  125. '; 目的:PG {database}.{schema}.{table} → Hive raw.<TODO> 同步模板\n'
  126. '; 状态:[待执行]\n'
  127. '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 [mask] /\n'
  128. '; 调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n'
  129. ';\n'
  130. '; 配套 DDL:manual/ddl/raw/<TODO_domain>/raw_<TODO>_create.sql\n'
  131. '\n'
  132. '[reader]\n'
  133. 'dataSource = {ds_ref}\n'
  134. 'database = {database}\n'
  135. 'table = {schema}.{table}\n'
  136. 'column = {column_str}\n'
  137. 'columnType =\n'
  138. "where = update_time >= '${{start_date}}' AND update_time < '${{stop_date}}'\n"
  139. 'querySql =\n'
  140. 'splitPk = {pk}\n'
  141. 'fetchSize = 1000\n'
  142. '\n'
  143. '[writer]\n'
  144. 'dataSource = hdfs/<TODO>\n'
  145. 'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n'
  146. 'column = {column_str}\n'
  147. 'columnType =\n'
  148. 'fileType = orc\n'
  149. 'fileName = {table}_TODO_d\n'
  150. 'encoding = UTF-8\n'
  151. 'writeMode = truncate\n'
  152. 'fieldDelimiter = \\t\n'
  153. ).format(
  154. today=today, ds_ref=ds_ref, database=database, schema=schema,
  155. table=table, column_str=column_str, pk=pk,
  156. )
  157. def main():
  158. parser = argparse.ArgumentParser(
  159. prog='datax-sync-template-gen',
  160. description='PG → HDFS DataX sync ini 模板生成器(全字段参考模板)',
  161. )
  162. parser.add_argument('-ds', required=True, metavar='DS_REF',
  163. help='数据源 ref,形如 postgresql/prod-hobby(同 sync ini dataSource 字段)')
  164. parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE',
  165. help='schema 限定的表名(如 public.card_group_order_info)')
  166. parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
  167. help='输出目录(不传 stdout;不带值 workspace/{yyyymmdd}/;带值自定义)')
  168. parser.add_argument('--schema-md', action='store_true',
  169. help='改为输出 PG 全字段 metadata markdown 表(序号/字段名/中文名/数据类型/主键标识/裁剪类型空列),用于 kb/24 raw 建模文档')
  170. args = parser.parse_args()
  171. if '.' not in args.t:
  172. print('-t 必须 schema.table 格式,收到: ' + args.t, file=sys.stderr)
  173. sys.exit(2)
  174. schema, table = args.t.split('.', 1)
  175. ds = resolve_datasource(args.ds)
  176. ds_dict = ds.parse()
  177. jdbc_url = ds_dict[DS_POSTGRE_SQL_JDBC_URL]
  178. user = ds_dict['username']
  179. password = ds_dict['password']
  180. host, port, database = parse_jdbc_url(jdbc_url)
  181. import pg8000.dbapi
  182. conn = pg8000.dbapi.connect(
  183. host=host, port=port, database=database,
  184. user=user, password=password,
  185. )
  186. try:
  187. if args.schema_md:
  188. rows = query_columns_full(conn, schema, table)
  189. if not rows:
  190. raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
  191. content = render_schema_md(rows)
  192. out_suffix = '.md'
  193. else:
  194. columns = query_columns(conn, schema, table)
  195. pk = query_primary_key(conn, schema, table)
  196. content = render_template(args.ds, database, schema, table, columns, pk)
  197. out_suffix = '.ini'
  198. finally:
  199. conn.close()
  200. if args.o is None:
  201. sys.stdout.write(content)
  202. else:
  203. os.makedirs(args.o, exist_ok=True)
  204. out_path = os.path.join(args.o, table + out_suffix)
  205. with open(out_path, 'w', encoding='utf-8') as f:
  206. f.write(content)
  207. print('已写入: ' + out_path, file=sys.stderr)
  208. if __name__ == '__main__':
  209. main()