Sfoglia il codice sorgente

refactor(bin): datax-gc-generator 重写为 datax-sync-template-gen

仅 PG→HDFS 路径,按 sync ini dataSource 字段格式(postgresql/prd-xxx)
读 ../datasource/{ref}.ini 拿 jdbc/user/pwd → pg8000 直连 PG 查
全字段 + 单字段主键 → 渲染 sync ini 模板(全字段 column / where 用
update_time / writer.path 用源表名+TODO 占位 / 默认 stdout,-o
nargs=? 三态:不传 stdout / 不带值 workspace/{yyyymmdd}/ / 带值
自定义)。requirements.txt 加 pg8000~=1.30。删老 798 行
datax-gc-generator.py(mysql 链路 + 多 reader/writer 组合,全项目零引用)。
tianyu.chu 1 settimana fa
parent
commit
d6f1053435

+ 0 - 798
bin/datax-gc-generator.py

@@ -1,798 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-"""
-  生成DataX作业配置文件生成器的配置文件
-"""
-# -*- coding=utf-8 -*-
-
-import os
-import re
-import sys
-import time
-
-base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(base_dir)
-from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_YEL, NORM_RED
-from dw_base.database.mysql_utils import MySQLHandler
-from dw_base.datax.datasources.hdfs_data_source import HDFSDataSource
-from dw_base.datax.datasources.mysql_data_source import MySQLDataSource
-from dw_base.datax.datax_utils import convert_mysql_column_types
-from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader
-from dw_base.datax.plugins.reader.mysql_reader import MySQLReader
-from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter
-from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter
-from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter
-from dw_base.datax.plugins.writer.mongo_writer import MongoWriter, MONGO_SPECIAL_WORDS_DICT
-from dw_base.hive.hive_utils import get_hive_database_name, get_hive_table_prefix
-from dw_base.spark.spark_sql import SparkSQL
-from dw_base.utils.common_utils import exist
-from dw_base.utils.config_utils import parse_args
-from dw_base.utils.file_utils import write_file, append_file, list_files, load_json_file
-from dw_base.utils.log_utils import pretty_print
-from dw_base.utils.string_utils import snake_case_to_pascal_case, snake_case_to_camel_case
-
-
-def usage(code: int):
-    print(
-        f'{NORM_MGT}Usage: {sys.argv[0]}\n'
-        f'{NORM_CYN}\t[-H/--H/--help]                   打印脚本使用方法{DO_RESET}'
-    )
-    if not from_system or not to_system:
-        print(
-            f'{NORM_MGT}Usage: {sys.argv[0]}\n'
-            f'{NORM_GRN}\t<[-]-from< /=>from>               源系统类型(默认MySQL),目前支持hdfs、mysql\n'
-            f'{NORM_GRN}\t<[-]-to< /=>to>                   目标系统类型(默认HDFS),目前支持elasticsearch、hbase、hdfs、kafka、mongo、mysql\n'
-            f'{NORM_CYN}\t[[-]-output< /=>output directory] 生成的ini文件存储位置,可以是绝对路径或相对路径'
-            f'{DO_RESET}'
-        )
-    if from_system == "hdfs":
-        print(
-            f'{NORM_MGT}Parameters when from is HDFS: \n'
-            f'{NORM_GRN}\t<[-]-d< /=>database>              Hive数据库(默认crl_mg)\n'
-            f'{NORM_GRN}\t<[-]-t< /=>table>                 Hive数据表(可多次传入,不传扫描-D传入的库中所有表)\n'
-            f'{NORM_GRN}\t<[-]-e< /=>exclude>               忽略的Hive数据表(可多次传入)\n'
-            f'{NORM_CYN}\t[[-]-partitioned]                 是否是分区表(默认为是),目前只支持日分区\n'
-            f'{DO_RESET}'
-        )
-    elif from_system == "mysql":
-        print(
-            f'{NORM_MGT}Parameters when from is MySQL: \n'
-            f'{NORM_GRN}\t<[-]-h< /=>host>                  MySQL主机\n'
-            f'{NORM_CYN}\t[[-]-P< /=>port]                  MySQL端口\n'
-            f'{NORM_GRN}\t<[-]-u< /=>username>              MySQL用户\n'
-            f'{NORM_GRN}\t<[-]-p< /=>password>              MySQL密码\n'
-            f'{NORM_GRN}\t<[-]-D< /=>database>              MySQL数据库\n'
-            f'{NORM_GRN}\t<[-]-tr< /=>table>                MySQL数据表正则(可多次传入,优先级高于t)\n'
-            f'{NORM_GRN}\t<[-]-t< /=>table>                 MySQL数据表(可多次传入)\n'
-            f'{NORM_GRN}\t<[-]-er< /=>exclude>              忽略的MySQL数据表正则(可多次传入,优先级高于e)\n'
-            f'{NORM_GRN}\t<[-]-e< /=>exclude>               忽略的MySQL数据表(可多次传入)\n'
-            f'{NORM_CYN}\t[[-]-inc-col]                     增量抽取字段名称(默认update_time)'
-            f'{DO_RESET}'
-        )
-    if to_system == "elasticsearch":
-        print(
-            f'{NORM_MGT}Parameters when to is Elasticsearch: \n'
-            f'{NORM_CYN}\t[[-]-inc-col]                     增量抽取字段名称(默认update_time)'
-            f'{DO_RESET}'
-        )
-    elif to_system == "hbase":
-        print(
-            f'{NORM_MGT}Parameters when to is HBase: \n'
-            f'{NORM_CYN}\t[[-]-inc-col]                     增量抽取字段名称(默认update_time)'
-            f'{DO_RESET}'
-        )
-    elif to_system == "hdfs":
-        print(
-            f'{NORM_MGT}Parameters when to is HDFS: \n'
-            f'{NORM_CYN}\t[[-]-project< /=>project]         项目名称,如skb、bms\n'
-            f'{NORM_CYN}\t[[-]-layer< /=>dw-layer]          数据仓库分层,如ods、dwd\n'
-            f'{NORM_CYN}\t[[-]-env< /=>dw-env]              数据仓库分层,如test\n'
-            f'{NORM_CYN}\t[[-]-edition< /=>dw-edition]      数据仓库版本,如dl,dd,\n'
-            f'{NORM_CYN}\t[[-]-partitioned]                 是否是分区表(默认为是),目前只支持日分区'
-            f'{DO_RESET}'
-        )
-    elif to_system == "kafka":
-        print(
-            f'{NORM_MGT}Parameters when to is Kafka: \n'
-            f'{NORM_GRN}\t<[-]-T< /=>kafka topic>           Kafka Topic\n'
-            f'{NORM_GRN}\t<[-]-K< /=>kafka key>             Kafka Key of data\n'
-            f'{DO_RESET}'
-        )
-    elif to_system == "mongo":
-        print(
-            f'{NORM_MGT}Parameters when to is Mongo: \n'
-            f'{NORM_CYN}\t[[-]-inc-col]                     增量抽取字段名称(默认update_time)'
-            f'{DO_RESET}'
-        )
-    elif to_system == "mysql":
-        print(
-            f'{NORM_MGT}Parameters when to is MySQL: \n'
-            f'{NORM_GRN}\t<[-]-h< /=>host>                  MySQL主机\n'
-            f'{NORM_CYN}\t[[-]-P< /=>port]                  MySQL端口\n'
-            f'{NORM_GRN}\t<[-]-u< /=>username>              MySQL用户\n'
-            f'{NORM_GRN}\t<[-]-p< /=>password>              MySQL密码\n'
-            f'{NORM_GRN}\t<[-]-D< /=>database>              MySQL数据库\n'
-            f'{NORM_GRN}\t<[-]-t< /=>table>                 MySQL数据表'
-            f'{DO_RESET}'
-        )
-    exit(code)
-
-
-def hdfs_elasticsearch_generator():
-    raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
-
-
-def hdfs_hbase_generator():
-    with SparkSQL('datax-gc-generator') as spark_sql:
-        hdfs_ds_name = 'hdfs-aliyun-cloud'
-        hbase_ds_name = 'hbase-default'
-        hdfs_path = '/user/hive/warehouse'
-        output = CONFIG.get('output', f'{base_dir}/ignored')
-        hive_database = CONFIG.get('d')
-        if not hive_database:
-            pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
-            usage(1)
-        config_ini_path = '{0}/config/hdfs-hbase/{1}'.format(output, hive_database)
-        os.system(f'mkdir -p {config_ini_path}')
-        included_tables = CONFIG.get('t', [])
-        if isinstance(included_tables, str):
-            included_tables = [included_tables]
-        excluded_tables = CONFIG.get('e', [])
-        if isinstance(excluded_tables, str):
-            excluded_tables = [excluded_tables]
-        if len(included_tables) == 0 and len(excluded_tables) == 0:
-            pretty_print(f'{NORM_YEL}注意:'
-                         f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
-                         f'{NORM_MGT}未提供,将扫描数据库 '
-                         f'{NORM_GRN}{hive_database}{DO_RESET} '
-                         f'{NORM_MGT}下所有表')
-        hbase_namespace = CONFIG.get('n')
-        if not hbase_namespace:
-            pretty_print(f'{NORM_YEL}注意:'
-                         f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} '
-                         f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 '
-                         f'{NORM_GRN}default')
-            hbase_namespace = 'default'
-        final_tables = []
-        if len(included_tables) == 0:
-            all_hive_tables = []
-            # 如果没有传入仅需的表则获取数据库下所有表
-            tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
-            desc_tables = tables_df.collect()
-            for row in desc_tables:
-                all_hive_tables.append(row[1])
-            # 去除忽略的数据表
-            for hive_table in all_hive_tables:
-                if hive_table not in excluded_tables:
-                    final_tables.append(f'{hive_database}.{hive_table}')
-        else:
-            final_tables = included_tables
-        number_width = len(str(len(final_tables)))
-        index = 1
-        for hive_table_full_name in final_tables:
-            if hive_table_full_name.__contains__("."):
-                hive_db, hive_table_name = hive_table_full_name.split('.')
-            else:
-                hive_db = hive_database
-                hive_table_name = hive_table_full_name
-            # 获取表备注
-            create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
-            hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
-            if 'COMMENT' in hive_table_comment:
-                hive_table_comment = hive_table_comment.split("'")[1]
-            else:
-                hive_table_comment = ''
-            # 表是否有分区
-            partitioned = False
-            if "`dt`" in create_table_str:
-                partitioned = True
-            # 获取表列名
-            hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
-            hive_column_names = []
-            hive_column_types = {}
-            for column_name, column_value in hive_columns.items():
-                hive_column_names.append(column_name)
-                column_type = column_value[0].upper()
-                if column_type == 'BIGINT':
-                    column_type = 'LONG'
-                hive_column_types[column_name] = column_type
-            hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_db, hive_table_name,
-                                                             hive_table_comment,
-                                                             partitioned,
-                                                             hive_column_names, hive_column_types)
-            # hbase列名
-            hbase_column_names = []
-            for column_name in hive_column_names:
-                hbase_column_names.append(f'cf:{column_name}')
-            row_key_columns = []
-            row_key_columns.append('reverse(主键如果是自增ID,建议reverse)')
-            row_key_columns.append('separator(@@)')
-            row_key_columns.append(f'separator({hive_db})')
-            row_key_columns.append(f'separator(.)')
-            row_key_columns.append(f'separator({hive_table_name})')
-            hbase_writer_str = HBaseWriter.generate_definition(hbase_ds_name,
-                                                               hbase_namespace,
-                                                               hive_table_name,
-                                                               hive_table_name,
-                                                               hive_table_comment,
-                                                               "cf",
-                                                               hbase_column_names,
-                                                               {},
-                                                               row_key_columns)
-            # 写文件
-            generator_config_file = '{0}/hdfs-hbase-{1}-{2}.ini'.format(config_ini_path, hbase_namespace, hive_table_name)
-            write_file(f'{hdfs_reader_str}\n\n{hbase_writer_str}\n', generator_config_file)
-            pretty_print(
-                f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
-                f'{NORM_GRN}{hive_database}.{hive_table_name}  '
-                f'{NORM_MGT}\n                    写 HBase DataX作业配置文件生成器配置已写入文件 '
-                f'{NORM_GRN}{generator_config_file}'
-            )
-            index += 1
-
-
-def hdfs_kafka_generator():
-    kafka_topic = CONFIG.get('T')
-    kafka_key = CONFIG.get('K')
-    if not kafka_topic:
-        pretty_print(f'{NORM_RED}参数 {NORM_GRN}-T{DO_RESET}{NORM_RED} 未提供')
-        usage(1)
-    included_tables = CONFIG.get('t', [])
-    if isinstance(included_tables, str):
-        included_tables = [included_tables]
-    excluded_tables = CONFIG.get('e', [])
-    if isinstance(excluded_tables, str):
-        excluded_tables = [excluded_tables]
-    hive_database = CONFIG.get('d')
-    if len(included_tables) == 0 and len(excluded_tables) == 0:
-        if not hive_database:
-            pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
-            usage(1)
-        else:
-            pretty_print(f'{NORM_YEL}注意:'
-                         f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
-                         f'{NORM_MGT}未提供,将扫描数据库 '
-                         f'{NORM_GRN}{hive_database}{DO_RESET} '
-                         f'{NORM_MGT}下所有表')
-    with SparkSQL('datax-gc-generator') as spark_sql:
-        hdfs_ds_name = 'hdfs-aliyun-cloud'
-        hdfs_path = '/user/hive/warehouse'
-        kafka_ds_name = 'kafka-aliyun'
-        output = CONFIG.get('output', f'{base_dir}/ignored')
-        final_tables = []
-        if len(included_tables) == 0:
-            all_hive_tables = []
-            # 如果没有传入仅需的表则获取数据库下所有表
-            tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
-            desc_tables = tables_df.collect()
-            for row in desc_tables:
-                all_hive_tables.append(row[1])
-            # 去除忽略的数据表
-            for hive_table in all_hive_tables:
-                if hive_table not in excluded_tables:
-                    final_tables.append(f'{hive_database}.{hive_table}')
-        else:
-            final_tables = included_tables
-        number_width = len(str(len(final_tables)))
-        index = 1
-        # 判断是否为外部表,若为外部表,则获取表信息内容改变
-        for table_name in final_tables:
-            # if hive_table_full_name.endswith("es_mapping"):
-            #     continue
-            # hive_database, hive_table_name = hive_table_full_name.split('.')  # type: str,str
-            config_ini_path = '{0}/config/hdfs-kafka/{1}'.format(output, hive_database)
-            os.system(f'mkdir -p {config_ini_path}')
-            hive_table_name = ''
-            # 如果传入的配置中(-t=*)是完整的库名+表名(crl_es.xxx),则通过完整的库名表名获取信息
-            if table_name.__contains__('.'):
-                hive_database, hive_table_name = table_name.split('.')  # type: str,str
-                hive_table_full_name = table_name
-            # 如果传入的配置中是表名,则通过传入的配置获取库信息
-            elif hive_database is not None:
-                hive_table_name = table_name  # type: str
-                hive_table_full_name = f'{hive_database}.{hive_table_name}'
-            else:
-                raise ValueError("hive_database undefined")
-            hive_column_names = []
-            hive_column_types = {}
-            hive_table_names_mapping = {}
-            # 获取表备注
-            if hive_table_full_name.__contains__('mapping'):
-                mysql_handler = MySQLHandler(
-                    host='rm-m5er2i6wz605su9bi.mysql.rds.aliyuncs.com',
-                    port=3306,
-                    username='meta_ro',
-                    password='Ts#r5rO1'
-                )
-                partitioned = True  # 如果是mapping表则默认有分区,因为只是通过mapping表看映射
-                hive_table_tbl = dict(
-                    mysql_handler.query_tbl_hive_metadata(hive_table_name))  # 获取hive的mapping表全部的tblproperties
-                hive_table_comment = hive_table_tbl.get('comment')  # 获取hive表comment字段
-                hive_table_names_mapping = hive_table_tbl.get('es.mapping.names')  # 获取hive与es映射的字段信息
-                hive_table_es_index = hive_table_tbl.get('es.resource').split('/')[0]  # 获取写入es的索引
-                hive_table_column = mysql_handler.query_column_hive_metadata(hive_table_name)  # 获取hive表的列名、字段类型、注释等数据
-                hive_columns = {column[1]: (column[2], column[3]) for column in
-                                hive_table_column}  # 将读取到的列名、类型、注释转换成与spark读取出来的格式一致
-                kafka_column_types = {column[1]: column[2] for column in hive_table_column}  # 单独列出一个{列名:类型}的字典
-                hdfs_reader_column = [mappingName.split(':')[0] for mappingName in
-                                      str(hive_table_names_mapping).split(',')]  # 在hive与es映射字段的表内取出要读hdfs的字段
-                kafka_writer_column = [mappingName.split(':')[1] for mappingName in
-                                       str(hive_table_names_mapping).split(',')]  # 在hive与es映射字段的表内取出要写kafka的字段
-                column_mapping = dict(item.split(":") for item in hive_table_names_mapping.split(","))  # 将hive与es字段的映射转换为字典
-
-                for k, v in dict(
-                        item.split(":") for item in hive_table_names_mapping.split(",")).items():  # 保存一个不包含struct结构体映射的字典
-                    if k not in hive_columns.keys():
-                        column_mapping.pop(k)
-
-                for column in hdfs_reader_column.copy():  # HDFS reader中去掉结构体的列名
-                    if column not in hive_columns.keys():
-                        hdfs_reader_column.remove(column)
-
-                for column in kafka_writer_column.copy():  # kafka reader中去掉结构体的列名
-                    if column not in column_mapping.values():
-                        kafka_writer_column.remove(column)
-
-                hive_es_column_mapping = {hdfs_reader_column[i]: kafka_writer_column[i] for i in
-                                          range(len(hdfs_reader_column))}
-
-                for key, value in kafka_column_types.copy().items():  # 替换kafka writer字典中的键
-                    if key in hive_es_column_mapping:
-                        kafka_column_types[hive_es_column_mapping[key]] = kafka_column_types.pop(key)
-                    if key not in hive_es_column_mapping:
-                        kafka_column_types.pop(key)
-
-                hive_column_types = {}
-                column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE']
-                for column_name, column_value in hive_columns.items():
-                    hive_column_names.append(column_name)
-                    column_type = column_value[0].upper()
-                    if column_type in column_type_flag:
-                        hive_column_types[column_name] = column_type
-                    else:
-                        if column_type == 'INT':
-                            hive_column_types[column_name] = 'LONG'
-
-                hive_column_names = hdfs_reader_column
-                hive_table_name = re.sub(r"({}).*$".format(re.escape("sum")), "sum", hive_table_name)
-                if kafka_key is None:
-                    kafka_key = hive_table_es_index
-            else:
-                create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
-                hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
-                if 'COMMENT' in hive_table_comment:
-                    hive_table_comment = hive_table_comment.split("'")[1]
-                else:
-                    hive_table_comment = ''
-                # 表是否有分区
-                partitioned = False
-                if "`dt`" in create_table_str:
-                    partitioned = True
-                # 获取表列名
-                hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
-
-                column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE']
-                for column_name, column_value in hive_columns.items():
-                    hive_column_names.append(column_name)
-                    column_type = column_value[0].upper()
-                    if column_type in column_type_flag:
-                        hive_column_types[column_name] = column_type
-                    else:
-                        if column_type == 'INT':
-                            hive_column_types[column_name] = 'LONG'
-
-                kafka_writer_column = hive_column_names
-                kafka_column_types = hive_column_types
-
-            hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name,
-                                                             hive_table_comment,
-                                                             partitioned,
-                                                             hive_column_names, hive_column_types)
-
-            source_name = hive_table_name.replace('es_crl_', '').replace('_sum', '')
-            kafka_writer_str = KafkaWriter.generate_definition(kafka_ds_name, kafka_topic, kafka_key, source_name,
-                                                               kafka_writer_column, kafka_column_types,
-                                                               hive_table_names_mapping)
-
-            # 写文件
-            generator_config_file = '{0}/hdfs-kafka-{1}-{2}.ini'.format(config_ini_path, hive_database, hive_table_name)
-            write_file(f'{hdfs_reader_str}\n\n{kafka_writer_str}\n', generator_config_file)
-            pretty_print(
-                f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
-                f'{NORM_GRN}{hive_database}.{hive_table_name}'
-                f'{NORM_MGT}\n                    写 Kafka DataX作业配置文件生成器配置已写入文件 '
-                f'{NORM_GRN}{generator_config_file}'
-            )
-            index += 1
-
-
-def hdfs_mongo_generator():
-    hive_database = CONFIG.get('d')
-    if not hive_database:
-        pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
-        usage(1)
-    included_tables = CONFIG.get('t', [])
-    if isinstance(included_tables, str):
-        included_tables = [included_tables]
-    excluded_tables = CONFIG.get('e', [])
-    if isinstance(excluded_tables, str):
-        excluded_tables = [excluded_tables]
-    if len(included_tables) == 0 and len(excluded_tables) == 0:
-        pretty_print(f'{NORM_YEL}注意:'
-                     f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
-                     f'{NORM_MGT}未提供,将扫描数据库 '
-                     f'{NORM_GRN}{hive_database}{DO_RESET} '
-                     f'{NORM_MGT}下所有表')
-    with SparkSQL('datax-gc-generator') as spark_sql:
-        # 获取json文件中表的主键
-        table_pk_fields = []
-        for validation_config_file in list_files('conf/validation', True):
-            if validation_config_file.endswith('.json'):
-                validation_config = load_json_file(validation_config_file)
-                dwd_table_config = validation_config.get('dwd_table')  # type: str
-                if not dwd_table_config:
-                    pretty_print(f'{NORM_YEL}文件 {NORM_GRN}{validation_config_file}'
-                                 f'{NORM_YEL} 中没有发现 {NORM_GRN}dwd_table{NORM_YEL} 的定义')
-                    continue
-                # 表名后缀,如:court_announcement
-                table_name = dwd_table_config.split("_crl_")[1]
-                ods_dwd_config = validation_config.get('ods_dwd_config')
-                if not ods_dwd_config:
-                    continue
-                # 获取主键名
-                new_pk_fields = ods_dwd_config.get('pk_fields')  # type:list
-                # 放入到字典中
-                for pk_fields in new_pk_fields:
-                    table_pk_fields.append(table_name + "-" + pk_fields)
-        hdfs_ds_name = 'hdfs-aliyun-cloud'
-        hdfs_path = '/user/hive/warehouse'
-        output = CONFIG.get('output', f'{base_dir}/ignored')
-        config_ini_path = '{0}/config/hdfs-mongo/{1}'.format(output, hive_database)
-        os.system(f'mkdir -p {config_ini_path}')
-        final_tables = []
-        if len(included_tables) == 0:
-            all_hive_tables = []
-            # 如果没有传入仅需的表则获取数据库下所有表
-            tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
-            desc_tables = tables_df.collect()
-            for row in desc_tables:
-                all_hive_tables.append(row[1])
-            # 去除忽略的数据表
-            for hive_table in all_hive_tables:
-                if hive_table not in excluded_tables:
-                    final_tables.append(f'{hive_database}.{hive_table}')
-        else:
-            final_tables = included_tables
-        number_width = len(str(len(final_tables)))
-        index = 1
-        for hive_table_full_name in final_tables:
-            hive_database, hive_table_name = hive_table_full_name.split('.')
-            # 获取表备注
-            create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
-            hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
-            if 'COMMENT' in hive_table_comment:
-                hive_table_comment = hive_table_comment.split("'")[1]
-            else:
-                hive_table_comment = ''
-            # 表是否有分区
-            partitioned = False
-            if "`dt`" in create_table_str:
-                partitioned = True
-            # 获取表列名
-            hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
-            hive_column_names = []
-            hive_column_types = {}
-            for column_name, column_value in hive_columns.items():
-                hive_column_names.append(column_name)
-                column_type = column_value[0].upper()
-                if column_type == 'BIGINT':
-                    column_type = 'LONG'
-                hive_column_types[column_name] = column_type
-            hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name,
-                                                             hive_table_comment,
-                                                             partitioned,
-                                                             hive_column_names, hive_column_types)
-            mongo_ds_name = 'mongo-dev-rw'
-            if hive_table_full_name.__contains__('_crl_'):
-                split_table_name = hive_table_full_name.split("_crl_")[1]
-            else:
-                split_table_name = 'unknown'
-            mongo_database = 'enterprise'
-            mongo_collection = snake_case_to_pascal_case(split_table_name)
-            # mongo列名
-            mongo_column_names = []
-            for column_name in hive_column_names:
-                if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name):
-                    mongo_column_names.append(MONGO_SPECIAL_WORDS_DICT[column_name])
-                else:
-                    column_name = snake_case_to_camel_case(column_name)
-                    mongo_column_names.append(column_name)
-            # mongo列类型和主键
-            mongo_column_types = {}
-            pk_fields = []
-            for column_name, column_type in hive_column_types.items():
-                is_pk = False
-                if split_table_name + "-" + column_name in table_pk_fields:
-                    is_pk = True
-                if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name):
-                    column_name = MONGO_SPECIAL_WORDS_DICT[column_name]
-                else:
-                    column_name = snake_case_to_camel_case(column_name)
-                if column_name == 'createDate' and column_type == 'LONG':
-                    mongo_column_types[column_name] = 'DATE'
-                elif column_name == 'updateDate' and column_type == 'LONG':
-                    mongo_column_types[column_name] = 'DATE'
-                else:
-                    mongo_column_types[column_name] = column_type
-                if is_pk:
-                    pk_fields.append(column_name)
-
-            mongo_writer_str = MongoWriter.generate_definition(mongo_ds_name, mongo_database, mongo_collection,
-                                                               mongo_column_names,
-                                                               mongo_column_types, pk_fields)
-            # 写文件
-            generator_config_file = '{0}/hdfs-mongo-{1}-{2}.ini'.format(config_ini_path, mongo_database, split_table_name)
-            write_file(f'{hdfs_reader_str}\n\n{mongo_writer_str}\n', generator_config_file)
-            pretty_print(
-                f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
-                f'{NORM_GRN}{hive_database}.{hive_table_name}'
-                f'{NORM_MGT}\n                    写 MongoDB DataX作业配置文件生成器配置已写入文件 '
-                f'{NORM_GRN}{generator_config_file}'
-            )
-            index += 1
-
-
-def hdfs_mysql_generator():
-    raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
-
-
-def mysql_hdfs_generator():
-    host = CONFIG.get('h')
-    port = int(CONFIG.get('P', '3306'))
-    username = CONFIG.get('u')
-    password = CONFIG.get('p')
-    mysql_database = CONFIG.get('D')
-    tables = CONFIG.get('t', [])
-    if isinstance(tables, str):
-        tables = [tables]
-    excluded_tables = CONFIG.get('e', [])
-    if isinstance(excluded_tables, str):
-        excluded_tables = [excluded_tables]
-    table_regex = CONFIG.get('tr', [])
-    if isinstance(table_regex, str):
-        table_regex = [table_regex]
-    exclude_regex = CONFIG.get('er', [])
-    if isinstance(exclude_regex, str):
-        exclude_regex = [exclude_regex]
-    project = CONFIG.get('project')
-    layer = CONFIG.get('layer', 'ods')
-    edition = CONFIG.get('edition')
-    env = CONFIG.get('env')
-    partitioned = CONFIG.get('partitioned', False)
-    inc_col = CONFIG.get('inc-col', 'update_time')
-    if not (host and username and password and mysql_database):
-        usage(1)
-    hdfs_ds_name = 'hdfs-aliyun-cloud'
-    hdfs_default_fs = 'hdfs://cluster'
-    hdfs_path = '/user/hive/warehouse'
-    output = CONFIG.get('output', f'{base_dir}/ignored')
-    hive_database = get_hive_database_name(project, layer, env)
-    hive_table_prefix = get_hive_table_prefix(project, layer, edition)
-    mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database)
-    hdfs_datasource_path = '{0}/datasource/hdfs'.format(output)
-    config_ini_path = '{0}/config/mysql-hdfs/{1}'.format(output, hive_database)
-    hive_ddl_path = '{0}/ddl'.format(output)
-    hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database)
-    os.system(f'mkdir -p {mysql_datasource_path}')
-    os.system(f'mkdir -p {hdfs_datasource_path}')
-    os.system(f'mkdir -p {config_ini_path}')
-    os.system(f'mkdir -p {hive_ddl_path}')
-    mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database)
-    hdfs_ds_def = HDFSDataSource.generate_definition(hdfs_default_fs)
-    mysql_handler = MySQLHandler(host, port, username, password)
-    mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex)
-    number_width = len(str(len(mysql_tables)))
-    index = 1
-    if os.path.exists(hive_ddl_file):
-        hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time())))
-    write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file)
-    for mysql_table_name, mysql_table_comment in mysql_tables.items():
-        if tables and not tables.__contains__(mysql_table_name):
-            continue
-        if excluded_tables and excluded_tables.__contains__(mysql_table_name):
-            continue
-        mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name)
-        mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list]
-        column_types = convert_mysql_column_types(mysql_column_list)
-        mysql_reader_def = MySQLReader.generate_definition(
-            mysql_database,
-            mysql_table_name, mysql_table_comment,
-            mysql_column_names, column_types,
-            hive_database, partitioned, inc_col
-        )
-
-        hive_table_name = f'{hive_table_prefix}_{mysql_table_name}'
-
-        hive_ddl_def = MySQLReader.generate_hive_ddl(
-            hive_database, hive_table_name, mysql_table_comment,
-            partitioned,
-            mysql_column_list, column_types
-        )
-        hdfs_writer_def = HDFSWriter.generate_definition(
-            hdfs_ds_name, hdfs_path,
-            hive_database, hive_table_name, partitioned,
-            mysql_column_names, column_types
-        )
-        generator_config_file = '{0}/mysql-hdfs-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name)
-        write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file)
-        append_file(f'{hive_ddl_def}\n', hive_ddl_file)
-        pretty_print(
-            f'{NORM_YEL}{str(index).rjust(number_width, " ")}{NORM_MGT} 读 MySQL 数据库表 '
-            f'{NORM_GRN}{mysql_database}.{mysql_table_name}'
-            f'{NORM_MGT}\n                    写 HDFS DataX作业配置文件生成器配置已写入文件 '
-            f'{NORM_GRN}{generator_config_file}'
-            f'{NORM_MGT}\n                    对应的Hive建表DDL已写入文件 '
-            f'{NORM_GRN}{hive_ddl_file}'
-        )
-        index += 1
-        # break
-    mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database)
-    if os.path.exists(mysql_ds_def_file):
-        mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time())))
-    write_file(f'{mysql_ds_def}\n', mysql_ds_def_file)
-    pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}')
-    hdfs_ds_def_file = '{0}/{1}.ini'.format(hdfs_datasource_path, hdfs_ds_name)
-    if os.path.exists(hdfs_ds_def_file):
-        hdfs_ds_def_file = '{0}/{1}-{2}.ini'.format(hdfs_datasource_path, hdfs_ds_name, str(int(time.time())))
-    write_file(f'{hdfs_ds_def}\n', hdfs_ds_def_file)
-    pretty_print(f'{NORM_MGT}HDFS 数据源配置已写入文件 {NORM_GRN}{hdfs_ds_def_file}')
-
-
-def mysql_hbase_generator():
-    host = CONFIG.get('h')
-    port = int(CONFIG.get('P', '3306'))
-    username = CONFIG.get('u')
-    password = CONFIG.get('p')
-    mysql_database = CONFIG.get('D')
-    if not (host and username and password and mysql_database):
-        usage(1)
-    included_tables = CONFIG.get('t', [])
-    if isinstance(included_tables, str):
-        included_tables = [included_tables]
-    excluded_tables = CONFIG.get('e', [])
-    if isinstance(excluded_tables, str):
-        excluded_tables = [excluded_tables]
-    table_regex = CONFIG.get('tr', [])
-    if isinstance(table_regex, str):
-        table_regex = [table_regex]
-    exclude_regex = CONFIG.get('er', [])
-    if isinstance(exclude_regex, str):
-        exclude_regex = [exclude_regex]
-    if len(included_tables) == 0 and len(excluded_tables) == 0 and len(table_regex) == 0 and len(exclude_regex) == 0:
-        pretty_print(f'{NORM_YEL}注意:'
-                     f'{NORM_MGT}参数 {NORM_GRN}-t -e -tr -er '
-                     f'{NORM_MGT}都未提供,将扫描数据库 '
-                     f'{NORM_GRN}{mysql_database}{DO_RESET} '
-                     f'{NORM_MGT}下所有表')
-    hbase_namespace = CONFIG.get('n')
-    if not hbase_namespace:
-        pretty_print(f'{NORM_YEL}注意:'
-                     f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} '
-                     f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 '
-                     f'{NORM_GRN}default')
-        hbase_namespace = 'default'
-    project = CONFIG.get('project')
-    layer = CONFIG.get('layer', 'ods')
-    edition = CONFIG.get('edition')
-    env = CONFIG.get('env')
-    partitioned = CONFIG.get('partitioned', False)
-    inc_col = CONFIG.get('inc-col', 'update_time')
-    hbase_ds_name = 'hbase-default'
-    hdfs_default_fs = 'hdfs://cluster'
-    output = CONFIG.get('output', f'{base_dir}/ignored')
-    hive_database = get_hive_database_name(project, layer, env)
-    hive_table_prefix = get_hive_table_prefix(project, layer, edition)
-    mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database)
-    config_ini_path = '{0}/config/mysql-hbase/{1}'.format(output, hive_database)
-    hive_ddl_path = '{0}/ddl'.format(output)
-    hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database)
-    os.system(f'mkdir -p {mysql_datasource_path}')
-    os.system(f'mkdir -p {config_ini_path}')
-    os.system(f'mkdir -p {hive_ddl_path}')
-    mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database)
-    mysql_handler = MySQLHandler(host, port, username, password)
-    mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex)
-    number_width = len(str(len(mysql_tables)))
-    index = 1
-    if os.path.exists(hive_ddl_file):
-        hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time())))
-    write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file)
-    for mysql_table_name, mysql_table_comment in mysql_tables.items():
-        if included_tables and not included_tables.__contains__(mysql_table_name):
-            continue
-        if excluded_tables and excluded_tables.__contains__(mysql_table_name):
-            continue
-        mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name)
-        mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list]
-        column_types = convert_mysql_column_types(mysql_column_list)
-        mysql_reader_def = MySQLReader.generate_definition(
-            mysql_database,
-            mysql_table_name, mysql_table_comment,
-            mysql_column_names, column_types,
-            hive_database, partitioned, inc_col
-        )
-
-        hive_table_name = f'{hive_table_prefix}_{mysql_table_name.lower()}_hbase_mapping'
-        hbase_table_name = mysql_table_name.lower()
-
-        hive_over_hbase_ddl_def = MySQLReader.generate_hive_over_hbase_ddl(
-            hive_database, hive_table_name, mysql_table_comment, hbase_namespace, hbase_table_name,
-            mysql_column_list, column_types
-        )
-        row_key_columns = []
-        row_key_columns.append('reverse(主键如果是自增ID,建议reverse)')
-        row_key_columns.append('separator(@@)')
-        row_key_columns.append(f'separator({mysql_database})')
-        row_key_columns.append(f'separator(.)')
-        row_key_columns.append(f'separator({mysql_table_name})')
-        hdfs_writer_def = HBaseWriter.generate_definition(
-            hbase_ds_name, hbase_namespace, hbase_table_name,
-            mysql_table_name, mysql_table_comment, 'cf',
-            mysql_column_names, column_types, row_key_columns
-        )
-        generator_config_file = '{0}/mysql-hbase-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name)
-        write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file)
-        append_file(f'{hive_over_hbase_ddl_def}\n', hive_ddl_file)
-        pretty_print(
-            f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 MySQL 数据库表 '
-            f'{NORM_GRN}{mysql_database}.{mysql_table_name}'
-            f'{NORM_MGT}\n                    写 HBase DataX作业配置文件生成器配置已写入文件 '
-            f'{NORM_GRN}{generator_config_file}'
-            f'{NORM_MGT}\n                    对应的 Hive Over HBase 建表DDL已写入文件 '
-            f'{NORM_GRN}{hive_ddl_file}'
-        )
-        index += 1
-        # break
-    mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database)
-    if os.path.exists(mysql_ds_def_file):
-        mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time())))
-    write_file(f'{mysql_ds_def}\n', mysql_ds_def_file)
-    pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}')
-
-
-if __name__ == '__main__':
-    pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
-    CONFIG, _ = parse_args(sys.argv[1:])
-    # 未提供任何参数或查看帮助
-    if exist(CONFIG, ['H', 'help']):
-        usage(0)
-    from_system = CONFIG.get('from')
-    if from_system:
-        from_system = from_system.lower()
-    to_system = CONFIG.get('to')
-    if to_system:
-        to_system = to_system.lower()
-    if not from_system or not to_system:
-        usage(1)
-    if from_system == "hdfs":
-        if to_system == "elasticsearch":
-            hdfs_elasticsearch_generator()
-        elif to_system == "hbase":
-            hdfs_hbase_generator()
-        elif to_system == "kafka":
-            hdfs_kafka_generator()
-        elif to_system == "mongo":
-            hdfs_mongo_generator()
-        elif to_system == "mysql":
-            hdfs_mysql_generator()
-        else:
-            raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
-    elif from_system == "mysql":
-        if to_system == "hbase":
-            mysql_hbase_generator()
-        elif to_system == "hdfs":
-            mysql_hdfs_generator()
-        else:
-            raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
-    else:
-        raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')

+ 192 - 0
bin/datax-sync-template-gen.py

@@ -0,0 +1,192 @@
+#!/usr/bin/env /usr/bin/python3
+# -*- coding:utf-8 -*-
+"""
+PG → HDFS DataX sync ini 模板生成器。
+
+生成全字段 sync ini 模板(参考起点)。开发者按需裁剪字段 / 改 where /
+加 [mask] / 调 splitPk / 改 writer.path 表名后缀等,再提交到 jobs/raw/{域}/。
+
+CLI:
+  python3 bin/datax-sync-template-gen.py \\
+    -ds postgresql/prod-hobby \\
+    -t public.card_group_order_info \\
+    [-o [DIR]]
+
+参数:
+  -ds  数据源 ref,形如 {db_type}/{env}-{实例简称}(同 sync ini 里
+       dataSource 字段格式)。暂只支持 postgresql。
+  -t   schema 限定的表名,形如 schema.table(如 public.card_group_order_info)。
+  -o   输出目录(可选):
+       - 不传:stdout
+       - 传 -o 不带值:workspace/{yyyymmdd}/
+       - 传 -o <DIR>:自定义目录
+       输出文件名固定 {table}.ini(去掉 schema 前缀)。
+"""
+import argparse
+import os
+import re
+import sys
+from datetime import datetime
+
+project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.append(project_root)
+
+from dw_base.datax.datasources.data_source_factory import DataSourceFactory
+from dw_base.datax.datax_constants import DS_POSTGRE_SQL_JDBC_URL
+
+
+WORKSPACE_DEFAULT = os.path.join(
+    project_root, 'workspace', datetime.now().strftime('%Y%m%d'),
+)
+
+
+def resolve_datasource(ds_ref):
+    """复用 plugin.py:34-42 的 ref → DataSource 解析逻辑。
+
+    ds_ref 形如 'postgresql/prod-hobby',首段为 db_type(同父目录名)。
+    datasource ini 落点:项目同级 ../datasource/{ds_ref}.ini。
+    """
+    ds_type = ds_ref.split('/')[0]
+    if ds_type != 'postgresql':
+        raise NotImplementedError('暂只支持 postgresql 数据源,收到: ' + ds_type)
+    ds_file_path = os.path.normpath(
+        os.path.join(project_root, '..', 'datasource', ds_ref + '.ini'))
+    if not os.path.isfile(ds_file_path):
+        raise FileNotFoundError('数据源 ini 不存在: ' + ds_file_path)
+    return DataSourceFactory.get_data_source(ds_type, ds_file_path)
+
+
+def parse_jdbc_url(jdbc_url):
+    """从 jdbc:postgresql://host:port/database 抽 (host, port, database)。"""
+    m = re.match(r'jdbc:postgresql://([^:/]+)(?::(\d+))?/(.+)', jdbc_url)
+    if not m:
+        raise ValueError('无法解析 PG jdbcUrl: ' + jdbc_url)
+    return m.group(1), int(m.group(2) or 5432), m.group(3)
+
+
+def query_columns(conn, schema, table):
+    """查全字段名 + 注释,按 attnum 排序。"""
+    cur = conn.cursor()
+    cur.execute("""
+        SELECT a.attname, pg_catalog.col_description(a.attrelid, a.attnum)
+        FROM pg_catalog.pg_attribute a
+        JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
+        JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
+        WHERE n.nspname = %s AND c.relname = %s
+          AND a.attnum > 0 AND NOT a.attisdropped
+        ORDER BY a.attnum
+    """, (schema, table))
+    rows = cur.fetchall()
+    if not rows:
+        raise ValueError('表不存在或无字段: {}.{}'.format(schema, table))
+    return [(r[0], r[1] or '') for r in rows]
+
+
+def query_primary_key(conn, schema, table):
+    """查单字段主键名;无主键 / 复合主键 → 返回空字符串。"""
+    cur = conn.cursor()
+    cur.execute("""
+        SELECT a.attname
+        FROM pg_index i
+        JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
+        JOIN pg_class c ON c.oid = i.indrelid
+        JOIN pg_namespace n ON n.oid = c.relnamespace
+        WHERE n.nspname = %s AND c.relname = %s AND i.indisprimary
+    """, (schema, table))
+    rows = cur.fetchall()
+    if len(rows) == 1:
+        return rows[0][0]
+    return ''
+
+
+def render_template(ds_ref, database, schema, table, columns, pk):
+    column_str = ','.join(c for c, _ in columns)
+    today = datetime.now().strftime('%Y-%m-%d')
+    return (
+        '; 作者:<TODO>\n'
+        '; 日期:{today}\n'
+        '; 工单:<TODO>\n'
+        '; 目的:PG {database}.{schema}.{table} → Hive raw.<TODO> 同步模板\n'
+        '; 状态:[待执行]\n'
+        '; 备注:自动生成的全字段参考模板。开发者按需裁剪字段 / 改 where / 加 [mask] /\n'
+        ';       调 splitPk / 改 writer.path 表名后缀(_inc_d / _his_o 等)\n'
+        ';\n'
+        '; 配套 DDL:manual/ddl/raw/<TODO_domain>/raw_<TODO>_create.sql\n'
+        '\n'
+        '[reader]\n'
+        'dataSource = {ds_ref}\n'
+        'database = {database}\n'
+        'table = {schema}.{table}\n'
+        'column = {column_str}\n'
+        'columnType =\n'
+        "where = update_time >= '${{start_date}}' AND update_time < '${{stop_date}}'\n"
+        'querySql =\n'
+        'splitPk = {pk}\n'
+        'fetchSize = 1000\n'
+        '\n'
+        '[writer]\n'
+        'dataSource = hdfs/<TODO>\n'
+        'path = /user/hive/warehouse/raw.db/{table}_TODO_d/dt=${{dt}}/\n'
+        'column = {column_str}\n'
+        'columnType =\n'
+        'fileType = orc\n'
+        'fileName = {table}_TODO_d\n'
+        'encoding = UTF-8\n'
+        'writeMode = truncate\n'
+        'fieldDelimiter = \\t\n'
+    ).format(
+        today=today, ds_ref=ds_ref, database=database, schema=schema,
+        table=table, column_str=column_str, pk=pk,
+    )
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        prog='datax-sync-template-gen',
+        description='PG → HDFS DataX sync ini 模板生成器(全字段参考模板)',
+    )
+    parser.add_argument('-ds', required=True, metavar='DS_REF',
+                        help='数据源 ref,形如 postgresql/prod-hobby(同 sync ini dataSource 字段)')
+    parser.add_argument('-t', required=True, metavar='SCHEMA.TABLE',
+                        help='schema 限定的表名(如 public.card_group_order_info)')
+    parser.add_argument('-o', nargs='?', const=WORKSPACE_DEFAULT, default=None, metavar='DIR',
+                        help='输出目录(不传 stdout;不带值 workspace/{yyyymmdd}/;带值自定义)')
+    args = parser.parse_args()
+
+    if '.' not in args.t:
+        print('-t 必须 schema.table 格式,收到: ' + args.t, file=sys.stderr)
+        sys.exit(2)
+    schema, table = args.t.split('.', 1)
+
+    ds = resolve_datasource(args.ds)
+    ds_dict = ds.parse()
+    jdbc_url = ds_dict[DS_POSTGRE_SQL_JDBC_URL]
+    user = ds_dict['username']
+    password = ds_dict['password']
+    host, port, database = parse_jdbc_url(jdbc_url)
+
+    import pg8000.dbapi
+    conn = pg8000.dbapi.connect(
+        host=host, port=port, database=database,
+        user=user, password=password,
+    )
+    try:
+        columns = query_columns(conn, schema, table)
+        pk = query_primary_key(conn, schema, table)
+    finally:
+        conn.close()
+
+    content = render_template(args.ds, database, schema, table, columns, pk)
+
+    if args.o is None:
+        sys.stdout.write(content)
+    else:
+        os.makedirs(args.o, exist_ok=True)
+        out_path = os.path.join(args.o, table + '.ini')
+        with open(out_path, 'w', encoding='utf-8') as f:
+            f.write(content)
+        print('已写入: ' + out_path, file=sys.stderr)
+
+
+if __name__ == '__main__':
+    main()

+ 1 - 0
requirements.txt

@@ -8,6 +8,7 @@ python-dateutil==2.8.2
 
 PyMySQL==0.10.1
 pymongo==3.11.4
+pg8000~=1.30
 
 
 requests==2.25.1

+ 115 - 0
tests/unit/datax/test_sync_template_gen.py

@@ -0,0 +1,115 @@
+# -*- coding:utf-8 -*-
+"""
+datax-sync-template-gen 模板渲染 + JDBC URL 解析单测。
+
+不连真 PG(query_columns / query_primary_key 走 mock conn)。
+脚本路径含连字符,用 importlib.util 动态加载为模块。
+"""
+import importlib.util
+import os
+import sys
+from unittest.mock import MagicMock
+
+import pytest
+
+PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+SCRIPT_PATH = os.path.join(PROJECT_ROOT, 'bin', 'datax-sync-template-gen.py')
+
+
+def _load_script():
+    spec = importlib.util.spec_from_file_location('datax_sync_template_gen', SCRIPT_PATH)
+    mod = importlib.util.module_from_spec(spec)
+    sys.modules['datax_sync_template_gen'] = mod
+    spec.loader.exec_module(mod)
+    return mod
+
+
+GEN = _load_script()
+
+
+def test_parse_jdbc_url_with_port():
+    host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://10.0.0.1:5433/hobby_stocks')
+    assert host == '10.0.0.1'
+    assert port == 5433
+    assert db == 'hobby_stocks'
+
+
+def test_parse_jdbc_url_default_port():
+    host, port, db = GEN.parse_jdbc_url('jdbc:postgresql://pg.example.com/mydb')
+    assert host == 'pg.example.com'
+    assert port == 5432
+    assert db == 'mydb'
+
+
+def test_parse_jdbc_url_invalid():
+    with pytest.raises(ValueError, match='无法解析'):
+        GEN.parse_jdbc_url('mysql://10.0.0.1:3306/foo')
+
+
+def test_query_columns_returns_name_and_comment():
+    conn = MagicMock()
+    cur = conn.cursor.return_value
+    cur.fetchall.return_value = [
+        ('id', 'id'),
+        ('user_id', '用户id'),
+        ('create_time', None),  # 无注释
+    ]
+    cols = GEN.query_columns(conn, 'public', 'orders')
+    assert cols == [('id', 'id'), ('user_id', '用户id'), ('create_time', '')]
+
+
+def test_query_columns_empty_raises():
+    conn = MagicMock()
+    cur = conn.cursor.return_value
+    cur.fetchall.return_value = []
+    with pytest.raises(ValueError, match='表不存在或无字段'):
+        GEN.query_columns(conn, 'public', 'no_such_table')
+
+
+def test_query_primary_key_single():
+    conn = MagicMock()
+    cur = conn.cursor.return_value
+    cur.fetchall.return_value = [('id',)]
+    assert GEN.query_primary_key(conn, 'public', 'orders') == 'id'
+
+
+def test_query_primary_key_composite_returns_empty():
+    conn = MagicMock()
+    cur = conn.cursor.return_value
+    cur.fetchall.return_value = [('id1',), ('id2',)]
+    assert GEN.query_primary_key(conn, 'public', 'orders') == ''
+
+
+def test_query_primary_key_none_returns_empty():
+    conn = MagicMock()
+    cur = conn.cursor.return_value
+    cur.fetchall.return_value = []
+    assert GEN.query_primary_key(conn, 'public', 'orders') == ''
+
+
+def test_render_template_includes_required_fields():
+    columns = [('id', 'id'), ('name', '姓名'), ('create_time', '创建时间')]
+    out = GEN.render_template(
+        ds_ref='postgresql/prod-hobby',
+        database='hobby_stocks',
+        schema='public',
+        table='users',
+        columns=columns,
+        pk='id',
+    )
+    assert 'dataSource = postgresql/prod-hobby' in out
+    assert 'database = hobby_stocks' in out
+    assert 'table = public.users' in out
+    assert 'column = id,name,create_time' in out
+    assert 'splitPk = id' in out
+    assert "where = update_time >= '${start_date}' AND update_time < '${stop_date}'" in out
+    assert 'path = /user/hive/warehouse/raw.db/users_TODO_d/dt=${dt}/' in out
+    assert 'fileName = users_TODO_d' in out
+
+
+def test_render_template_empty_pk():
+    out = GEN.render_template(
+        ds_ref='postgresql/prod-hobby', database='db', schema='public',
+        table='t', columns=[('a', '')], pk='',
+    )
+    assert 'splitPk = \n' in out