#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ 生成DataX作业配置文件生成器的配置文件 """ # -*- coding=utf-8 -*- import os import re import sys import time base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(base_dir) from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_YEL, NORM_RED from dw_base.database.mysql_utils import MySQLHandler from dw_base.datax.datasources.hdfs_data_source import HDFSDataSource from dw_base.datax.datasources.mysql_data_source import MySQLDataSource from dw_base.datax.datax_utils import convert_mysql_column_types from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader from dw_base.datax.plugins.reader.mysql_reader import MySQLReader from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter from dw_base.datax.plugins.writer.mongo_writer import MongoWriter, MONGO_SPECIAL_WORDS_DICT from dw_base.hive.hive_utils import get_hive_database_name, get_hive_table_prefix from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.common_utils import exist from dw_base.utils.config_utils import parse_args from dw_base.utils.file_utils import write_file, append_file, list_files, load_json_file from dw_base.utils.log_utils import pretty_print from dw_base.utils.string_utils import snake_case_to_pascal_case, snake_case_to_camel_case def usage(code: int): print( f'{NORM_MGT}Usage: {sys.argv[0]}\n' f'{NORM_CYN}\t[-H/--H/--help] 打印脚本使用方法{DO_RESET}' ) if not from_system or not to_system: print( f'{NORM_MGT}Usage: {sys.argv[0]}\n' f'{NORM_GRN}\t<[-]-from< /=>from> 源系统类型(默认MySQL),目前支持hdfs、mysql\n' f'{NORM_GRN}\t<[-]-to< /=>to> 目标系统类型(默认HDFS),目前支持elasticsearch、hbase、hdfs、kafka、mongo、mysql\n' f'{NORM_CYN}\t[[-]-output< /=>output directory] 生成的ini文件存储位置,可以是绝对路径或相对路径' f'{DO_RESET}' ) if from_system == "hdfs": print( f'{NORM_MGT}Parameters when from is HDFS: \n' f'{NORM_GRN}\t<[-]-d< /=>database> Hive数据库(默认crl_mg)\n' f'{NORM_GRN}\t<[-]-t< /=>table> Hive数据表(可多次传入,不传扫描-D传入的库中所有表)\n' f'{NORM_GRN}\t<[-]-e< /=>exclude> 忽略的Hive数据表(可多次传入)\n' f'{NORM_CYN}\t[[-]-partitioned] 是否是分区表(默认为是),目前只支持日分区\n' f'{DO_RESET}' ) elif from_system == "mysql": print( f'{NORM_MGT}Parameters when from is MySQL: \n' f'{NORM_GRN}\t<[-]-h< /=>host> MySQL主机\n' f'{NORM_CYN}\t[[-]-P< /=>port] MySQL端口\n' f'{NORM_GRN}\t<[-]-u< /=>username> MySQL用户\n' f'{NORM_GRN}\t<[-]-p< /=>password> MySQL密码\n' f'{NORM_GRN}\t<[-]-D< /=>database> MySQL数据库\n' f'{NORM_GRN}\t<[-]-tr< /=>table> MySQL数据表正则(可多次传入,优先级高于t)\n' f'{NORM_GRN}\t<[-]-t< /=>table> MySQL数据表(可多次传入)\n' f'{NORM_GRN}\t<[-]-er< /=>exclude> 忽略的MySQL数据表正则(可多次传入,优先级高于e)\n' f'{NORM_GRN}\t<[-]-e< /=>exclude> 忽略的MySQL数据表(可多次传入)\n' f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)' f'{DO_RESET}' ) if to_system == "elasticsearch": print( f'{NORM_MGT}Parameters when to is Elasticsearch: \n' f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)' f'{DO_RESET}' ) elif to_system == "hbase": print( f'{NORM_MGT}Parameters when to is HBase: \n' f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)' f'{DO_RESET}' ) elif to_system == "hdfs": print( f'{NORM_MGT}Parameters when to is HDFS: \n' f'{NORM_CYN}\t[[-]-project< /=>project] 项目名称,如skb、bms\n' f'{NORM_CYN}\t[[-]-layer< /=>dw-layer] 数据仓库分层,如ods、dwd\n' f'{NORM_CYN}\t[[-]-env< /=>dw-env] 数据仓库分层,如test\n' f'{NORM_CYN}\t[[-]-edition< /=>dw-edition] 数据仓库版本,如dl,dd,\n' f'{NORM_CYN}\t[[-]-partitioned] 是否是分区表(默认为是),目前只支持日分区' f'{DO_RESET}' ) elif to_system == "kafka": print( f'{NORM_MGT}Parameters when to is Kafka: \n' f'{NORM_GRN}\t<[-]-T< /=>kafka topic> Kafka Topic\n' f'{NORM_GRN}\t<[-]-K< /=>kafka key> Kafka Key of data\n' f'{DO_RESET}' ) elif to_system == "mongo": print( f'{NORM_MGT}Parameters when to is Mongo: \n' f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)' f'{DO_RESET}' ) elif to_system == "mysql": print( f'{NORM_MGT}Parameters when to is MySQL: \n' f'{NORM_GRN}\t<[-]-h< /=>host> MySQL主机\n' f'{NORM_CYN}\t[[-]-P< /=>port] MySQL端口\n' f'{NORM_GRN}\t<[-]-u< /=>username> MySQL用户\n' f'{NORM_GRN}\t<[-]-p< /=>password> MySQL密码\n' f'{NORM_GRN}\t<[-]-D< /=>database> MySQL数据库\n' f'{NORM_GRN}\t<[-]-t< /=>table> MySQL数据表' f'{DO_RESET}' ) exit(code) def hdfs_elasticsearch_generator(): raise Exception(f'Not implemented yet with from {from_system} and to {to_system}') def hdfs_hbase_generator(): with SparkSQL('datax-gc-generator') as spark_sql: hdfs_ds_name = 'hdfs-aliyun-cloud' hbase_ds_name = 'hbase-default' hdfs_path = '/user/hive/warehouse' output = CONFIG.get('output', f'{base_dir}/ignored') hive_database = CONFIG.get('d') if not hive_database: pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供') usage(1) config_ini_path = '{0}/config/hdfs-hbase/{1}'.format(output, hive_database) os.system(f'mkdir -p {config_ini_path}') included_tables = CONFIG.get('t', []) if isinstance(included_tables, str): included_tables = [included_tables] excluded_tables = CONFIG.get('e', []) if isinstance(excluded_tables, str): excluded_tables = [excluded_tables] if len(included_tables) == 0 and len(excluded_tables) == 0: pretty_print(f'{NORM_YEL}注意:' f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e ' f'{NORM_MGT}未提供,将扫描数据库 ' f'{NORM_GRN}{hive_database}{DO_RESET} ' f'{NORM_MGT}下所有表') hbase_namespace = CONFIG.get('n') if not hbase_namespace: pretty_print(f'{NORM_YEL}注意:' f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} ' f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 ' f'{NORM_GRN}default') hbase_namespace = 'default' final_tables = [] if len(included_tables) == 0: all_hive_tables = [] # 如果没有传入仅需的表则获取数据库下所有表 tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True) desc_tables = tables_df.collect() for row in desc_tables: all_hive_tables.append(row[1]) # 去除忽略的数据表 for hive_table in all_hive_tables: if hive_table not in excluded_tables: final_tables.append(f'{hive_database}.{hive_table}') else: final_tables = included_tables number_width = len(str(len(final_tables))) index = 1 for hive_table_full_name in final_tables: if hive_table_full_name.__contains__("."): hive_db, hive_table_name = hive_table_full_name.split('.') else: hive_db = hive_database hive_table_name = hive_table_full_name # 获取表备注 create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True) hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1] if 'COMMENT' in hive_table_comment: hive_table_comment = hive_table_comment.split("'")[1] else: hive_table_comment = '' # 表是否有分区 partitioned = False if "`dt`" in create_table_str: partitioned = True # 获取表列名 hive_columns = spark_sql.get_columns(f'{hive_table_full_name}') hive_column_names = [] hive_column_types = {} for column_name, column_value in hive_columns.items(): hive_column_names.append(column_name) column_type = column_value[0].upper() if column_type == 'BIGINT': column_type = 'LONG' hive_column_types[column_name] = column_type hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_db, hive_table_name, hive_table_comment, partitioned, hive_column_names, hive_column_types) # hbase列名 hbase_column_names = [] for column_name in hive_column_names: hbase_column_names.append(f'cf:{column_name}') row_key_columns = [] row_key_columns.append('reverse(主键如果是自增ID,建议reverse)') row_key_columns.append('separator(@@)') row_key_columns.append(f'separator({hive_db})') row_key_columns.append(f'separator(.)') row_key_columns.append(f'separator({hive_table_name})') hbase_writer_str = HBaseWriter.generate_definition(hbase_ds_name, hbase_namespace, hive_table_name, hive_table_name, hive_table_comment, "cf", hbase_column_names, {}, row_key_columns) # 写文件 generator_config_file = '{0}/hdfs-hbase-{1}-{2}.ini'.format(config_ini_path, hbase_namespace, hive_table_name) write_file(f'{hdfs_reader_str}\n\n{hbase_writer_str}\n', generator_config_file) pretty_print( f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 ' f'{NORM_GRN}{hive_database}.{hive_table_name} ' f'{NORM_MGT}\n 写 HBase DataX作业配置文件生成器配置已写入文件 ' f'{NORM_GRN}{generator_config_file}' ) index += 1 def hdfs_kafka_generator(): kafka_topic = CONFIG.get('T') kafka_key = CONFIG.get('K') if not kafka_topic: pretty_print(f'{NORM_RED}参数 {NORM_GRN}-T{DO_RESET}{NORM_RED} 未提供') usage(1) included_tables = CONFIG.get('t', []) if isinstance(included_tables, str): included_tables = [included_tables] excluded_tables = CONFIG.get('e', []) if isinstance(excluded_tables, str): excluded_tables = [excluded_tables] hive_database = CONFIG.get('d') if len(included_tables) == 0 and len(excluded_tables) == 0: if not hive_database: pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供') usage(1) else: pretty_print(f'{NORM_YEL}注意:' f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e ' f'{NORM_MGT}未提供,将扫描数据库 ' f'{NORM_GRN}{hive_database}{DO_RESET} ' f'{NORM_MGT}下所有表') with SparkSQL('datax-gc-generator') as spark_sql: hdfs_ds_name = 'hdfs-aliyun-cloud' hdfs_path = '/user/hive/warehouse' kafka_ds_name = 'kafka-aliyun' output = CONFIG.get('output', f'{base_dir}/ignored') final_tables = [] if len(included_tables) == 0: all_hive_tables = [] # 如果没有传入仅需的表则获取数据库下所有表 tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True) desc_tables = tables_df.collect() for row in desc_tables: all_hive_tables.append(row[1]) # 去除忽略的数据表 for hive_table in all_hive_tables: if hive_table not in excluded_tables: final_tables.append(f'{hive_database}.{hive_table}') else: final_tables = included_tables number_width = len(str(len(final_tables))) index = 1 # 判断是否为外部表,若为外部表,则获取表信息内容改变 for table_name in final_tables: # if hive_table_full_name.endswith("es_mapping"): # continue # hive_database, hive_table_name = hive_table_full_name.split('.') # type: str,str config_ini_path = '{0}/config/hdfs-kafka/{1}'.format(output, hive_database) os.system(f'mkdir -p {config_ini_path}') hive_table_name = '' # 如果传入的配置中(-t=*)是完整的库名+表名(crl_es.xxx),则通过完整的库名表名获取信息 if table_name.__contains__('.'): hive_database, hive_table_name = table_name.split('.') # type: str,str hive_table_full_name = table_name # 如果传入的配置中是表名,则通过传入的配置获取库信息 elif hive_database is not None: hive_table_name = table_name # type: str hive_table_full_name = f'{hive_database}.{hive_table_name}' else: raise ValueError("hive_database undefined") hive_column_names = [] hive_column_types = {} hive_table_names_mapping = {} # 获取表备注 if hive_table_full_name.__contains__('mapping'): mysql_handler = MySQLHandler( host='rm-m5er2i6wz605su9bi.mysql.rds.aliyuncs.com', port=3306, username='meta_ro', password='Ts#r5rO1' ) partitioned = True # 如果是mapping表则默认有分区,因为只是通过mapping表看映射 hive_table_tbl = dict( mysql_handler.query_tbl_hive_metadata(hive_table_name)) # 获取hive的mapping表全部的tblproperties hive_table_comment = hive_table_tbl.get('comment') # 获取hive表comment字段 hive_table_names_mapping = hive_table_tbl.get('es.mapping.names') # 获取hive与es映射的字段信息 hive_table_es_index = hive_table_tbl.get('es.resource').split('/')[0] # 获取写入es的索引 hive_table_column = mysql_handler.query_column_hive_metadata(hive_table_name) # 获取hive表的列名、字段类型、注释等数据 hive_columns = {column[1]: (column[2], column[3]) for column in hive_table_column} # 将读取到的列名、类型、注释转换成与spark读取出来的格式一致 kafka_column_types = {column[1]: column[2] for column in hive_table_column} # 单独列出一个{列名:类型}的字典 hdfs_reader_column = [mappingName.split(':')[0] for mappingName in str(hive_table_names_mapping).split(',')] # 在hive与es映射字段的表内取出要读hdfs的字段 kafka_writer_column = [mappingName.split(':')[1] for mappingName in str(hive_table_names_mapping).split(',')] # 在hive与es映射字段的表内取出要写kafka的字段 column_mapping = dict(item.split(":") for item in hive_table_names_mapping.split(",")) # 将hive与es字段的映射转换为字典 for k, v in dict( item.split(":") for item in hive_table_names_mapping.split(",")).items(): # 保存一个不包含struct结构体映射的字典 if k not in hive_columns.keys(): column_mapping.pop(k) for column in hdfs_reader_column.copy(): # HDFS reader中去掉结构体的列名 if column not in hive_columns.keys(): hdfs_reader_column.remove(column) for column in kafka_writer_column.copy(): # kafka reader中去掉结构体的列名 if column not in column_mapping.values(): kafka_writer_column.remove(column) hive_es_column_mapping = {hdfs_reader_column[i]: kafka_writer_column[i] for i in range(len(hdfs_reader_column))} for key, value in kafka_column_types.copy().items(): # 替换kafka writer字典中的键 if key in hive_es_column_mapping: kafka_column_types[hive_es_column_mapping[key]] = kafka_column_types.pop(key) if key not in hive_es_column_mapping: kafka_column_types.pop(key) hive_column_types = {} column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE'] for column_name, column_value in hive_columns.items(): hive_column_names.append(column_name) column_type = column_value[0].upper() if column_type in column_type_flag: hive_column_types[column_name] = column_type else: if column_type == 'INT': hive_column_types[column_name] = 'LONG' hive_column_names = hdfs_reader_column hive_table_name = re.sub(r"({}).*$".format(re.escape("sum")), "sum", hive_table_name) if kafka_key is None: kafka_key = hive_table_es_index else: create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True) hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1] if 'COMMENT' in hive_table_comment: hive_table_comment = hive_table_comment.split("'")[1] else: hive_table_comment = '' # 表是否有分区 partitioned = False if "`dt`" in create_table_str: partitioned = True # 获取表列名 hive_columns = spark_sql.get_columns(f'{hive_table_full_name}') column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE'] for column_name, column_value in hive_columns.items(): hive_column_names.append(column_name) column_type = column_value[0].upper() if column_type in column_type_flag: hive_column_types[column_name] = column_type else: if column_type == 'INT': hive_column_types[column_name] = 'LONG' kafka_writer_column = hive_column_names kafka_column_types = hive_column_types hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name, hive_table_comment, partitioned, hive_column_names, hive_column_types) source_name = hive_table_name.replace('es_crl_', '').replace('_sum', '') kafka_writer_str = KafkaWriter.generate_definition(kafka_ds_name, kafka_topic, kafka_key, source_name, kafka_writer_column, kafka_column_types, hive_table_names_mapping) # 写文件 generator_config_file = '{0}/hdfs-kafka-{1}-{2}.ini'.format(config_ini_path, hive_database, hive_table_name) write_file(f'{hdfs_reader_str}\n\n{kafka_writer_str}\n', generator_config_file) pretty_print( f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 ' f'{NORM_GRN}{hive_database}.{hive_table_name}' f'{NORM_MGT}\n 写 Kafka DataX作业配置文件生成器配置已写入文件 ' f'{NORM_GRN}{generator_config_file}' ) index += 1 def hdfs_mongo_generator(): hive_database = CONFIG.get('d') if not hive_database: pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供') usage(1) included_tables = CONFIG.get('t', []) if isinstance(included_tables, str): included_tables = [included_tables] excluded_tables = CONFIG.get('e', []) if isinstance(excluded_tables, str): excluded_tables = [excluded_tables] if len(included_tables) == 0 and len(excluded_tables) == 0: pretty_print(f'{NORM_YEL}注意:' f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e ' f'{NORM_MGT}未提供,将扫描数据库 ' f'{NORM_GRN}{hive_database}{DO_RESET} ' f'{NORM_MGT}下所有表') with SparkSQL('datax-gc-generator') as spark_sql: # 获取json文件中表的主键 table_pk_fields = [] for validation_config_file in list_files('conf/validation', True): if validation_config_file.endswith('.json'): validation_config = load_json_file(validation_config_file) dwd_table_config = validation_config.get('dwd_table') # type: str if not dwd_table_config: pretty_print(f'{NORM_YEL}文件 {NORM_GRN}{validation_config_file}' f'{NORM_YEL} 中没有发现 {NORM_GRN}dwd_table{NORM_YEL} 的定义') continue # 表名后缀,如:court_announcement table_name = dwd_table_config.split("_crl_")[1] ods_dwd_config = validation_config.get('ods_dwd_config') if not ods_dwd_config: continue # 获取主键名 new_pk_fields = ods_dwd_config.get('pk_fields') # type:list # 放入到字典中 for pk_fields in new_pk_fields: table_pk_fields.append(table_name + "-" + pk_fields) hdfs_ds_name = 'hdfs-aliyun-cloud' hdfs_path = '/user/hive/warehouse' output = CONFIG.get('output', f'{base_dir}/ignored') config_ini_path = '{0}/config/hdfs-mongo/{1}'.format(output, hive_database) os.system(f'mkdir -p {config_ini_path}') final_tables = [] if len(included_tables) == 0: all_hive_tables = [] # 如果没有传入仅需的表则获取数据库下所有表 tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True) desc_tables = tables_df.collect() for row in desc_tables: all_hive_tables.append(row[1]) # 去除忽略的数据表 for hive_table in all_hive_tables: if hive_table not in excluded_tables: final_tables.append(f'{hive_database}.{hive_table}') else: final_tables = included_tables number_width = len(str(len(final_tables))) index = 1 for hive_table_full_name in final_tables: hive_database, hive_table_name = hive_table_full_name.split('.') # 获取表备注 create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True) hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1] if 'COMMENT' in hive_table_comment: hive_table_comment = hive_table_comment.split("'")[1] else: hive_table_comment = '' # 表是否有分区 partitioned = False if "`dt`" in create_table_str: partitioned = True # 获取表列名 hive_columns = spark_sql.get_columns(f'{hive_table_full_name}') hive_column_names = [] hive_column_types = {} for column_name, column_value in hive_columns.items(): hive_column_names.append(column_name) column_type = column_value[0].upper() if column_type == 'BIGINT': column_type = 'LONG' hive_column_types[column_name] = column_type hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name, hive_table_comment, partitioned, hive_column_names, hive_column_types) mongo_ds_name = 'mongo-dev-rw' if hive_table_full_name.__contains__('_crl_'): split_table_name = hive_table_full_name.split("_crl_")[1] else: split_table_name = 'unknown' mongo_database = 'enterprise' mongo_collection = snake_case_to_pascal_case(split_table_name) # mongo列名 mongo_column_names = [] for column_name in hive_column_names: if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name): mongo_column_names.append(MONGO_SPECIAL_WORDS_DICT[column_name]) else: column_name = snake_case_to_camel_case(column_name) mongo_column_names.append(column_name) # mongo列类型和主键 mongo_column_types = {} pk_fields = [] for column_name, column_type in hive_column_types.items(): is_pk = False if split_table_name + "-" + column_name in table_pk_fields: is_pk = True if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name): column_name = MONGO_SPECIAL_WORDS_DICT[column_name] else: column_name = snake_case_to_camel_case(column_name) if column_name == 'createDate' and column_type == 'LONG': mongo_column_types[column_name] = 'DATE' elif column_name == 'updateDate' and column_type == 'LONG': mongo_column_types[column_name] = 'DATE' else: mongo_column_types[column_name] = column_type if is_pk: pk_fields.append(column_name) mongo_writer_str = MongoWriter.generate_definition(mongo_ds_name, mongo_database, mongo_collection, mongo_column_names, mongo_column_types, pk_fields) # 写文件 generator_config_file = '{0}/hdfs-mongo-{1}-{2}.ini'.format(config_ini_path, mongo_database, split_table_name) write_file(f'{hdfs_reader_str}\n\n{mongo_writer_str}\n', generator_config_file) pretty_print( f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 ' f'{NORM_GRN}{hive_database}.{hive_table_name}' f'{NORM_MGT}\n 写 MongoDB DataX作业配置文件生成器配置已写入文件 ' f'{NORM_GRN}{generator_config_file}' ) index += 1 def hdfs_mysql_generator(): raise Exception(f'Not implemented yet with from {from_system} and to {to_system}') def mysql_hdfs_generator(): host = CONFIG.get('h') port = int(CONFIG.get('P', '3306')) username = CONFIG.get('u') password = CONFIG.get('p') mysql_database = CONFIG.get('D') tables = CONFIG.get('t', []) if isinstance(tables, str): tables = [tables] excluded_tables = CONFIG.get('e', []) if isinstance(excluded_tables, str): excluded_tables = [excluded_tables] table_regex = CONFIG.get('tr', []) if isinstance(table_regex, str): table_regex = [table_regex] exclude_regex = CONFIG.get('er', []) if isinstance(exclude_regex, str): exclude_regex = [exclude_regex] project = CONFIG.get('project') layer = CONFIG.get('layer', 'ods') edition = CONFIG.get('edition') env = CONFIG.get('env') partitioned = CONFIG.get('partitioned', False) inc_col = CONFIG.get('inc-col', 'update_time') if not (host and username and password and mysql_database): usage(1) hdfs_ds_name = 'hdfs-aliyun-cloud' hdfs_default_fs = 'hdfs://cluster' hdfs_path = '/user/hive/warehouse' output = CONFIG.get('output', f'{base_dir}/ignored') hive_database = get_hive_database_name(project, layer, env) hive_table_prefix = get_hive_table_prefix(project, layer, edition) mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database) hdfs_datasource_path = '{0}/datasource/hdfs'.format(output) config_ini_path = '{0}/config/mysql-hdfs/{1}'.format(output, hive_database) hive_ddl_path = '{0}/ddl'.format(output) hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database) os.system(f'mkdir -p {mysql_datasource_path}') os.system(f'mkdir -p {hdfs_datasource_path}') os.system(f'mkdir -p {config_ini_path}') os.system(f'mkdir -p {hive_ddl_path}') mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database) hdfs_ds_def = HDFSDataSource.generate_definition(hdfs_default_fs) mysql_handler = MySQLHandler(host, port, username, password) mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex) number_width = len(str(len(mysql_tables))) index = 1 if os.path.exists(hive_ddl_file): hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time()))) write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file) for mysql_table_name, mysql_table_comment in mysql_tables.items(): if tables and not tables.__contains__(mysql_table_name): continue if excluded_tables and excluded_tables.__contains__(mysql_table_name): continue mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name) mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list] column_types = convert_mysql_column_types(mysql_column_list) mysql_reader_def = MySQLReader.generate_definition( mysql_database, mysql_table_name, mysql_table_comment, mysql_column_names, column_types, hive_database, partitioned, inc_col ) hive_table_name = f'{hive_table_prefix}_{mysql_table_name}' hive_ddl_def = MySQLReader.generate_hive_ddl( hive_database, hive_table_name, mysql_table_comment, partitioned, mysql_column_list, column_types ) hdfs_writer_def = HDFSWriter.generate_definition( hdfs_ds_name, hdfs_path, hive_database, hive_table_name, partitioned, mysql_column_names, column_types ) generator_config_file = '{0}/mysql-hdfs-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name) write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file) append_file(f'{hive_ddl_def}\n', hive_ddl_file) pretty_print( f'{NORM_YEL}{str(index).rjust(number_width, " ")}{NORM_MGT} 读 MySQL 数据库表 ' f'{NORM_GRN}{mysql_database}.{mysql_table_name}' f'{NORM_MGT}\n 写 HDFS DataX作业配置文件生成器配置已写入文件 ' f'{NORM_GRN}{generator_config_file}' f'{NORM_MGT}\n 对应的Hive建表DDL已写入文件 ' f'{NORM_GRN}{hive_ddl_file}' ) index += 1 # break mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database) if os.path.exists(mysql_ds_def_file): mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time()))) write_file(f'{mysql_ds_def}\n', mysql_ds_def_file) pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}') hdfs_ds_def_file = '{0}/{1}.ini'.format(hdfs_datasource_path, hdfs_ds_name) if os.path.exists(hdfs_ds_def_file): hdfs_ds_def_file = '{0}/{1}-{2}.ini'.format(hdfs_datasource_path, hdfs_ds_name, str(int(time.time()))) write_file(f'{hdfs_ds_def}\n', hdfs_ds_def_file) pretty_print(f'{NORM_MGT}HDFS 数据源配置已写入文件 {NORM_GRN}{hdfs_ds_def_file}') def mysql_hbase_generator(): host = CONFIG.get('h') port = int(CONFIG.get('P', '3306')) username = CONFIG.get('u') password = CONFIG.get('p') mysql_database = CONFIG.get('D') if not (host and username and password and mysql_database): usage(1) included_tables = CONFIG.get('t', []) if isinstance(included_tables, str): included_tables = [included_tables] excluded_tables = CONFIG.get('e', []) if isinstance(excluded_tables, str): excluded_tables = [excluded_tables] table_regex = CONFIG.get('tr', []) if isinstance(table_regex, str): table_regex = [table_regex] exclude_regex = CONFIG.get('er', []) if isinstance(exclude_regex, str): exclude_regex = [exclude_regex] if len(included_tables) == 0 and len(excluded_tables) == 0 and len(table_regex) == 0 and len(exclude_regex) == 0: pretty_print(f'{NORM_YEL}注意:' f'{NORM_MGT}参数 {NORM_GRN}-t -e -tr -er ' f'{NORM_MGT}都未提供,将扫描数据库 ' f'{NORM_GRN}{mysql_database}{DO_RESET} ' f'{NORM_MGT}下所有表') hbase_namespace = CONFIG.get('n') if not hbase_namespace: pretty_print(f'{NORM_YEL}注意:' f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} ' f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 ' f'{NORM_GRN}default') hbase_namespace = 'default' project = CONFIG.get('project') layer = CONFIG.get('layer', 'ods') edition = CONFIG.get('edition') env = CONFIG.get('env') partitioned = CONFIG.get('partitioned', False) inc_col = CONFIG.get('inc-col', 'update_time') hbase_ds_name = 'hbase-default' hdfs_default_fs = 'hdfs://cluster' output = CONFIG.get('output', f'{base_dir}/ignored') hive_database = get_hive_database_name(project, layer, env) hive_table_prefix = get_hive_table_prefix(project, layer, edition) mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database) config_ini_path = '{0}/config/mysql-hbase/{1}'.format(output, hive_database) hive_ddl_path = '{0}/ddl'.format(output) hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database) os.system(f'mkdir -p {mysql_datasource_path}') os.system(f'mkdir -p {config_ini_path}') os.system(f'mkdir -p {hive_ddl_path}') mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database) mysql_handler = MySQLHandler(host, port, username, password) mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex) number_width = len(str(len(mysql_tables))) index = 1 if os.path.exists(hive_ddl_file): hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time()))) write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file) for mysql_table_name, mysql_table_comment in mysql_tables.items(): if included_tables and not included_tables.__contains__(mysql_table_name): continue if excluded_tables and excluded_tables.__contains__(mysql_table_name): continue mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name) mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list] column_types = convert_mysql_column_types(mysql_column_list) mysql_reader_def = MySQLReader.generate_definition( mysql_database, mysql_table_name, mysql_table_comment, mysql_column_names, column_types, hive_database, partitioned, inc_col ) hive_table_name = f'{hive_table_prefix}_{mysql_table_name.lower()}_hbase_mapping' hbase_table_name = mysql_table_name.lower() hive_over_hbase_ddl_def = MySQLReader.generate_hive_over_hbase_ddl( hive_database, hive_table_name, mysql_table_comment, hbase_namespace, hbase_table_name, mysql_column_list, column_types ) row_key_columns = [] row_key_columns.append('reverse(主键如果是自增ID,建议reverse)') row_key_columns.append('separator(@@)') row_key_columns.append(f'separator({mysql_database})') row_key_columns.append(f'separator(.)') row_key_columns.append(f'separator({mysql_table_name})') hdfs_writer_def = HBaseWriter.generate_definition( hbase_ds_name, hbase_namespace, hbase_table_name, mysql_table_name, mysql_table_comment, 'cf', mysql_column_names, column_types, row_key_columns ) generator_config_file = '{0}/mysql-hbase-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name) write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file) append_file(f'{hive_over_hbase_ddl_def}\n', hive_ddl_file) pretty_print( f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 MySQL 数据库表 ' f'{NORM_GRN}{mysql_database}.{mysql_table_name}' f'{NORM_MGT}\n 写 HBase DataX作业配置文件生成器配置已写入文件 ' f'{NORM_GRN}{generator_config_file}' f'{NORM_MGT}\n 对应的 Hive Over HBase 建表DDL已写入文件 ' f'{NORM_GRN}{hive_ddl_file}' ) index += 1 # break mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database) if os.path.exists(mysql_ds_def_file): mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time()))) write_file(f'{mysql_ds_def}\n', mysql_ds_def_file) pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}') if __name__ == '__main__': pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}') CONFIG, _ = parse_args(sys.argv[1:]) # 未提供任何参数或查看帮助 if exist(CONFIG, ['H', 'help']): usage(0) from_system = CONFIG.get('from') if from_system: from_system = from_system.lower() to_system = CONFIG.get('to') if to_system: to_system = to_system.lower() if not from_system or not to_system: usage(1) if from_system == "hdfs": if to_system == "elasticsearch": hdfs_elasticsearch_generator() elif to_system == "hbase": hdfs_hbase_generator() elif to_system == "kafka": hdfs_kafka_generator() elif to_system == "mongo": hdfs_mongo_generator() elif to_system == "mysql": hdfs_mysql_generator() else: raise Exception(f'Not implemented yet with from {from_system} and to {to_system}') elif from_system == "mysql": if to_system == "hbase": mysql_hbase_generator() elif to_system == "hdfs": mysql_hdfs_generator() else: raise Exception(f'Not implemented yet with from {from_system} and to {to_system}') else: raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')