| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- 生成DataX作业配置文件生成器的配置文件
- """
- # -*- coding=utf-8 -*-
- import os
- import re
- import sys
- import time
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(base_dir)
- from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_YEL, NORM_RED
- from dw_base.database.mysql_utils import MySQLHandler
- from dw_base.datax.datasources.hdfs_data_source import HDFSDataSource
- from dw_base.datax.datasources.mysql_data_source import MySQLDataSource
- from dw_base.datax.datax_utils import convert_mysql_column_types
- from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader
- from dw_base.datax.plugins.reader.mysql_reader import MySQLReader
- from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter
- from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter
- from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter
- from dw_base.datax.plugins.writer.mongo_writer import MongoWriter, MONGO_SPECIAL_WORDS_DICT
- from dw_base.hive.hive_utils import get_hive_database_name, get_hive_table_prefix
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.common_utils import exist
- from dw_base.utils.config_utils import parse_args
- from dw_base.utils.file_utils import write_file, append_file, list_files, load_json_file
- from dw_base.utils.log_utils import pretty_print
- from dw_base.utils.string_utils import snake_case_to_pascal_case, snake_case_to_camel_case
- def usage(code: int):
- print(
- f'{NORM_MGT}Usage: {sys.argv[0]}\n'
- f'{NORM_CYN}\t[-H/--H/--help] 打印脚本使用方法{DO_RESET}'
- )
- if not from_system or not to_system:
- print(
- f'{NORM_MGT}Usage: {sys.argv[0]}\n'
- f'{NORM_GRN}\t<[-]-from< /=>from> 源系统类型(默认MySQL),目前支持hdfs、mysql\n'
- f'{NORM_GRN}\t<[-]-to< /=>to> 目标系统类型(默认HDFS),目前支持elasticsearch、hbase、hdfs、kafka、mongo、mysql\n'
- f'{NORM_CYN}\t[[-]-output< /=>output directory] 生成的ini文件存储位置,可以是绝对路径或相对路径'
- f'{DO_RESET}'
- )
- if from_system == "hdfs":
- print(
- f'{NORM_MGT}Parameters when from is HDFS: \n'
- f'{NORM_GRN}\t<[-]-d< /=>database> Hive数据库(默认crl_mg)\n'
- f'{NORM_GRN}\t<[-]-t< /=>table> Hive数据表(可多次传入,不传扫描-D传入的库中所有表)\n'
- f'{NORM_GRN}\t<[-]-e< /=>exclude> 忽略的Hive数据表(可多次传入)\n'
- f'{NORM_CYN}\t[[-]-partitioned] 是否是分区表(默认为是),目前只支持日分区\n'
- f'{DO_RESET}'
- )
- elif from_system == "mysql":
- print(
- f'{NORM_MGT}Parameters when from is MySQL: \n'
- f'{NORM_GRN}\t<[-]-h< /=>host> MySQL主机\n'
- f'{NORM_CYN}\t[[-]-P< /=>port] MySQL端口\n'
- f'{NORM_GRN}\t<[-]-u< /=>username> MySQL用户\n'
- f'{NORM_GRN}\t<[-]-p< /=>password> MySQL密码\n'
- f'{NORM_GRN}\t<[-]-D< /=>database> MySQL数据库\n'
- f'{NORM_GRN}\t<[-]-tr< /=>table> MySQL数据表正则(可多次传入,优先级高于t)\n'
- f'{NORM_GRN}\t<[-]-t< /=>table> MySQL数据表(可多次传入)\n'
- f'{NORM_GRN}\t<[-]-er< /=>exclude> 忽略的MySQL数据表正则(可多次传入,优先级高于e)\n'
- f'{NORM_GRN}\t<[-]-e< /=>exclude> 忽略的MySQL数据表(可多次传入)\n'
- f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
- f'{DO_RESET}'
- )
- if to_system == "elasticsearch":
- print(
- f'{NORM_MGT}Parameters when to is Elasticsearch: \n'
- f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
- f'{DO_RESET}'
- )
- elif to_system == "hbase":
- print(
- f'{NORM_MGT}Parameters when to is HBase: \n'
- f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
- f'{DO_RESET}'
- )
- elif to_system == "hdfs":
- print(
- f'{NORM_MGT}Parameters when to is HDFS: \n'
- f'{NORM_CYN}\t[[-]-project< /=>project] 项目名称,如skb、bms\n'
- f'{NORM_CYN}\t[[-]-layer< /=>dw-layer] 数据仓库分层,如ods、dwd\n'
- f'{NORM_CYN}\t[[-]-env< /=>dw-env] 数据仓库分层,如test\n'
- f'{NORM_CYN}\t[[-]-edition< /=>dw-edition] 数据仓库版本,如dl,dd,\n'
- f'{NORM_CYN}\t[[-]-partitioned] 是否是分区表(默认为是),目前只支持日分区'
- f'{DO_RESET}'
- )
- elif to_system == "kafka":
- print(
- f'{NORM_MGT}Parameters when to is Kafka: \n'
- f'{NORM_GRN}\t<[-]-T< /=>kafka topic> Kafka Topic\n'
- f'{NORM_GRN}\t<[-]-K< /=>kafka key> Kafka Key of data\n'
- f'{DO_RESET}'
- )
- elif to_system == "mongo":
- print(
- f'{NORM_MGT}Parameters when to is Mongo: \n'
- f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
- f'{DO_RESET}'
- )
- elif to_system == "mysql":
- print(
- f'{NORM_MGT}Parameters when to is MySQL: \n'
- f'{NORM_GRN}\t<[-]-h< /=>host> MySQL主机\n'
- f'{NORM_CYN}\t[[-]-P< /=>port] MySQL端口\n'
- f'{NORM_GRN}\t<[-]-u< /=>username> MySQL用户\n'
- f'{NORM_GRN}\t<[-]-p< /=>password> MySQL密码\n'
- f'{NORM_GRN}\t<[-]-D< /=>database> MySQL数据库\n'
- f'{NORM_GRN}\t<[-]-t< /=>table> MySQL数据表'
- f'{DO_RESET}'
- )
- exit(code)
- def hdfs_elasticsearch_generator():
- raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
- def hdfs_hbase_generator():
- with SparkSQL('datax-gc-generator') as spark_sql:
- hdfs_ds_name = 'hdfs-aliyun-cloud'
- hbase_ds_name = 'hbase-default'
- hdfs_path = '/user/hive/warehouse'
- output = CONFIG.get('output', f'{base_dir}/ignored')
- hive_database = CONFIG.get('d')
- if not hive_database:
- pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
- usage(1)
- config_ini_path = '{0}/config/hdfs-hbase/{1}'.format(output, hive_database)
- os.system(f'mkdir -p {config_ini_path}')
- included_tables = CONFIG.get('t', [])
- if isinstance(included_tables, str):
- included_tables = [included_tables]
- excluded_tables = CONFIG.get('e', [])
- if isinstance(excluded_tables, str):
- excluded_tables = [excluded_tables]
- if len(included_tables) == 0 and len(excluded_tables) == 0:
- pretty_print(f'{NORM_YEL}注意:'
- f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
- f'{NORM_MGT}未提供,将扫描数据库 '
- f'{NORM_GRN}{hive_database}{DO_RESET} '
- f'{NORM_MGT}下所有表')
- hbase_namespace = CONFIG.get('n')
- if not hbase_namespace:
- pretty_print(f'{NORM_YEL}注意:'
- f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} '
- f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 '
- f'{NORM_GRN}default')
- hbase_namespace = 'default'
- final_tables = []
- if len(included_tables) == 0:
- all_hive_tables = []
- # 如果没有传入仅需的表则获取数据库下所有表
- tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
- desc_tables = tables_df.collect()
- for row in desc_tables:
- all_hive_tables.append(row[1])
- # 去除忽略的数据表
- for hive_table in all_hive_tables:
- if hive_table not in excluded_tables:
- final_tables.append(f'{hive_database}.{hive_table}')
- else:
- final_tables = included_tables
- number_width = len(str(len(final_tables)))
- index = 1
- for hive_table_full_name in final_tables:
- if hive_table_full_name.__contains__("."):
- hive_db, hive_table_name = hive_table_full_name.split('.')
- else:
- hive_db = hive_database
- hive_table_name = hive_table_full_name
- # 获取表备注
- create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
- hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
- if 'COMMENT' in hive_table_comment:
- hive_table_comment = hive_table_comment.split("'")[1]
- else:
- hive_table_comment = ''
- # 表是否有分区
- partitioned = False
- if "`dt`" in create_table_str:
- partitioned = True
- # 获取表列名
- hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
- hive_column_names = []
- hive_column_types = {}
- for column_name, column_value in hive_columns.items():
- hive_column_names.append(column_name)
- column_type = column_value[0].upper()
- if column_type == 'BIGINT':
- column_type = 'LONG'
- hive_column_types[column_name] = column_type
- hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_db, hive_table_name,
- hive_table_comment,
- partitioned,
- hive_column_names, hive_column_types)
- # hbase列名
- hbase_column_names = []
- for column_name in hive_column_names:
- hbase_column_names.append(f'cf:{column_name}')
- row_key_columns = []
- row_key_columns.append('reverse(主键如果是自增ID,建议reverse)')
- row_key_columns.append('separator(@@)')
- row_key_columns.append(f'separator({hive_db})')
- row_key_columns.append(f'separator(.)')
- row_key_columns.append(f'separator({hive_table_name})')
- hbase_writer_str = HBaseWriter.generate_definition(hbase_ds_name,
- hbase_namespace,
- hive_table_name,
- hive_table_name,
- hive_table_comment,
- "cf",
- hbase_column_names,
- {},
- row_key_columns)
- # 写文件
- generator_config_file = '{0}/hdfs-hbase-{1}-{2}.ini'.format(config_ini_path, hbase_namespace, hive_table_name)
- write_file(f'{hdfs_reader_str}\n\n{hbase_writer_str}\n', generator_config_file)
- pretty_print(
- f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
- f'{NORM_GRN}{hive_database}.{hive_table_name} '
- f'{NORM_MGT}\n 写 HBase DataX作业配置文件生成器配置已写入文件 '
- f'{NORM_GRN}{generator_config_file}'
- )
- index += 1
- def hdfs_kafka_generator():
- kafka_topic = CONFIG.get('T')
- kafka_key = CONFIG.get('K')
- if not kafka_topic:
- pretty_print(f'{NORM_RED}参数 {NORM_GRN}-T{DO_RESET}{NORM_RED} 未提供')
- usage(1)
- included_tables = CONFIG.get('t', [])
- if isinstance(included_tables, str):
- included_tables = [included_tables]
- excluded_tables = CONFIG.get('e', [])
- if isinstance(excluded_tables, str):
- excluded_tables = [excluded_tables]
- hive_database = CONFIG.get('d')
- if len(included_tables) == 0 and len(excluded_tables) == 0:
- if not hive_database:
- pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
- usage(1)
- else:
- pretty_print(f'{NORM_YEL}注意:'
- f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
- f'{NORM_MGT}未提供,将扫描数据库 '
- f'{NORM_GRN}{hive_database}{DO_RESET} '
- f'{NORM_MGT}下所有表')
- with SparkSQL('datax-gc-generator') as spark_sql:
- hdfs_ds_name = 'hdfs-aliyun-cloud'
- hdfs_path = '/user/hive/warehouse'
- kafka_ds_name = 'kafka-aliyun'
- output = CONFIG.get('output', f'{base_dir}/ignored')
- final_tables = []
- if len(included_tables) == 0:
- all_hive_tables = []
- # 如果没有传入仅需的表则获取数据库下所有表
- tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
- desc_tables = tables_df.collect()
- for row in desc_tables:
- all_hive_tables.append(row[1])
- # 去除忽略的数据表
- for hive_table in all_hive_tables:
- if hive_table not in excluded_tables:
- final_tables.append(f'{hive_database}.{hive_table}')
- else:
- final_tables = included_tables
- number_width = len(str(len(final_tables)))
- index = 1
- # 判断是否为外部表,若为外部表,则获取表信息内容改变
- for table_name in final_tables:
- # if hive_table_full_name.endswith("es_mapping"):
- # continue
- # hive_database, hive_table_name = hive_table_full_name.split('.') # type: str,str
- config_ini_path = '{0}/config/hdfs-kafka/{1}'.format(output, hive_database)
- os.system(f'mkdir -p {config_ini_path}')
- hive_table_name = ''
- # 如果传入的配置中(-t=*)是完整的库名+表名(crl_es.xxx),则通过完整的库名表名获取信息
- if table_name.__contains__('.'):
- hive_database, hive_table_name = table_name.split('.') # type: str,str
- hive_table_full_name = table_name
- # 如果传入的配置中是表名,则通过传入的配置获取库信息
- elif hive_database is not None:
- hive_table_name = table_name # type: str
- hive_table_full_name = f'{hive_database}.{hive_table_name}'
- else:
- raise ValueError("hive_database undefined")
- hive_column_names = []
- hive_column_types = {}
- hive_table_names_mapping = {}
- # 获取表备注
- if hive_table_full_name.__contains__('mapping'):
- mysql_handler = MySQLHandler(
- host='rm-m5er2i6wz605su9bi.mysql.rds.aliyuncs.com',
- port=3306,
- username='meta_ro',
- password='Ts#r5rO1'
- )
- partitioned = True # 如果是mapping表则默认有分区,因为只是通过mapping表看映射
- hive_table_tbl = dict(
- mysql_handler.query_tbl_hive_metadata(hive_table_name)) # 获取hive的mapping表全部的tblproperties
- hive_table_comment = hive_table_tbl.get('comment') # 获取hive表comment字段
- hive_table_names_mapping = hive_table_tbl.get('es.mapping.names') # 获取hive与es映射的字段信息
- hive_table_es_index = hive_table_tbl.get('es.resource').split('/')[0] # 获取写入es的索引
- hive_table_column = mysql_handler.query_column_hive_metadata(hive_table_name) # 获取hive表的列名、字段类型、注释等数据
- hive_columns = {column[1]: (column[2], column[3]) for column in
- hive_table_column} # 将读取到的列名、类型、注释转换成与spark读取出来的格式一致
- kafka_column_types = {column[1]: column[2] for column in hive_table_column} # 单独列出一个{列名:类型}的字典
- hdfs_reader_column = [mappingName.split(':')[0] for mappingName in
- str(hive_table_names_mapping).split(',')] # 在hive与es映射字段的表内取出要读hdfs的字段
- kafka_writer_column = [mappingName.split(':')[1] for mappingName in
- str(hive_table_names_mapping).split(',')] # 在hive与es映射字段的表内取出要写kafka的字段
- column_mapping = dict(item.split(":") for item in hive_table_names_mapping.split(",")) # 将hive与es字段的映射转换为字典
- for k, v in dict(
- item.split(":") for item in hive_table_names_mapping.split(",")).items(): # 保存一个不包含struct结构体映射的字典
- if k not in hive_columns.keys():
- column_mapping.pop(k)
- for column in hdfs_reader_column.copy(): # HDFS reader中去掉结构体的列名
- if column not in hive_columns.keys():
- hdfs_reader_column.remove(column)
- for column in kafka_writer_column.copy(): # kafka reader中去掉结构体的列名
- if column not in column_mapping.values():
- kafka_writer_column.remove(column)
- hive_es_column_mapping = {hdfs_reader_column[i]: kafka_writer_column[i] for i in
- range(len(hdfs_reader_column))}
- for key, value in kafka_column_types.copy().items(): # 替换kafka writer字典中的键
- if key in hive_es_column_mapping:
- kafka_column_types[hive_es_column_mapping[key]] = kafka_column_types.pop(key)
- if key not in hive_es_column_mapping:
- kafka_column_types.pop(key)
- hive_column_types = {}
- column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE']
- for column_name, column_value in hive_columns.items():
- hive_column_names.append(column_name)
- column_type = column_value[0].upper()
- if column_type in column_type_flag:
- hive_column_types[column_name] = column_type
- else:
- if column_type == 'INT':
- hive_column_types[column_name] = 'LONG'
- hive_column_names = hdfs_reader_column
- hive_table_name = re.sub(r"({}).*$".format(re.escape("sum")), "sum", hive_table_name)
- if kafka_key is None:
- kafka_key = hive_table_es_index
- else:
- create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
- hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
- if 'COMMENT' in hive_table_comment:
- hive_table_comment = hive_table_comment.split("'")[1]
- else:
- hive_table_comment = ''
- # 表是否有分区
- partitioned = False
- if "`dt`" in create_table_str:
- partitioned = True
- # 获取表列名
- hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
- column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE']
- for column_name, column_value in hive_columns.items():
- hive_column_names.append(column_name)
- column_type = column_value[0].upper()
- if column_type in column_type_flag:
- hive_column_types[column_name] = column_type
- else:
- if column_type == 'INT':
- hive_column_types[column_name] = 'LONG'
- kafka_writer_column = hive_column_names
- kafka_column_types = hive_column_types
- hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name,
- hive_table_comment,
- partitioned,
- hive_column_names, hive_column_types)
- source_name = hive_table_name.replace('es_crl_', '').replace('_sum', '')
- kafka_writer_str = KafkaWriter.generate_definition(kafka_ds_name, kafka_topic, kafka_key, source_name,
- kafka_writer_column, kafka_column_types,
- hive_table_names_mapping)
- # 写文件
- generator_config_file = '{0}/hdfs-kafka-{1}-{2}.ini'.format(config_ini_path, hive_database, hive_table_name)
- write_file(f'{hdfs_reader_str}\n\n{kafka_writer_str}\n', generator_config_file)
- pretty_print(
- f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
- f'{NORM_GRN}{hive_database}.{hive_table_name}'
- f'{NORM_MGT}\n 写 Kafka DataX作业配置文件生成器配置已写入文件 '
- f'{NORM_GRN}{generator_config_file}'
- )
- index += 1
- def hdfs_mongo_generator():
- hive_database = CONFIG.get('d')
- if not hive_database:
- pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
- usage(1)
- included_tables = CONFIG.get('t', [])
- if isinstance(included_tables, str):
- included_tables = [included_tables]
- excluded_tables = CONFIG.get('e', [])
- if isinstance(excluded_tables, str):
- excluded_tables = [excluded_tables]
- if len(included_tables) == 0 and len(excluded_tables) == 0:
- pretty_print(f'{NORM_YEL}注意:'
- f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
- f'{NORM_MGT}未提供,将扫描数据库 '
- f'{NORM_GRN}{hive_database}{DO_RESET} '
- f'{NORM_MGT}下所有表')
- with SparkSQL('datax-gc-generator') as spark_sql:
- # 获取json文件中表的主键
- table_pk_fields = []
- for validation_config_file in list_files('conf/validation', True):
- if validation_config_file.endswith('.json'):
- validation_config = load_json_file(validation_config_file)
- dwd_table_config = validation_config.get('dwd_table') # type: str
- if not dwd_table_config:
- pretty_print(f'{NORM_YEL}文件 {NORM_GRN}{validation_config_file}'
- f'{NORM_YEL} 中没有发现 {NORM_GRN}dwd_table{NORM_YEL} 的定义')
- continue
- # 表名后缀,如:court_announcement
- table_name = dwd_table_config.split("_crl_")[1]
- ods_dwd_config = validation_config.get('ods_dwd_config')
- if not ods_dwd_config:
- continue
- # 获取主键名
- new_pk_fields = ods_dwd_config.get('pk_fields') # type:list
- # 放入到字典中
- for pk_fields in new_pk_fields:
- table_pk_fields.append(table_name + "-" + pk_fields)
- hdfs_ds_name = 'hdfs-aliyun-cloud'
- hdfs_path = '/user/hive/warehouse'
- output = CONFIG.get('output', f'{base_dir}/ignored')
- config_ini_path = '{0}/config/hdfs-mongo/{1}'.format(output, hive_database)
- os.system(f'mkdir -p {config_ini_path}')
- final_tables = []
- if len(included_tables) == 0:
- all_hive_tables = []
- # 如果没有传入仅需的表则获取数据库下所有表
- tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
- desc_tables = tables_df.collect()
- for row in desc_tables:
- all_hive_tables.append(row[1])
- # 去除忽略的数据表
- for hive_table in all_hive_tables:
- if hive_table not in excluded_tables:
- final_tables.append(f'{hive_database}.{hive_table}')
- else:
- final_tables = included_tables
- number_width = len(str(len(final_tables)))
- index = 1
- for hive_table_full_name in final_tables:
- hive_database, hive_table_name = hive_table_full_name.split('.')
- # 获取表备注
- create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
- hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
- if 'COMMENT' in hive_table_comment:
- hive_table_comment = hive_table_comment.split("'")[1]
- else:
- hive_table_comment = ''
- # 表是否有分区
- partitioned = False
- if "`dt`" in create_table_str:
- partitioned = True
- # 获取表列名
- hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
- hive_column_names = []
- hive_column_types = {}
- for column_name, column_value in hive_columns.items():
- hive_column_names.append(column_name)
- column_type = column_value[0].upper()
- if column_type == 'BIGINT':
- column_type = 'LONG'
- hive_column_types[column_name] = column_type
- hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name,
- hive_table_comment,
- partitioned,
- hive_column_names, hive_column_types)
- mongo_ds_name = 'mongo-dev-rw'
- if hive_table_full_name.__contains__('_crl_'):
- split_table_name = hive_table_full_name.split("_crl_")[1]
- else:
- split_table_name = 'unknown'
- mongo_database = 'enterprise'
- mongo_collection = snake_case_to_pascal_case(split_table_name)
- # mongo列名
- mongo_column_names = []
- for column_name in hive_column_names:
- if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name):
- mongo_column_names.append(MONGO_SPECIAL_WORDS_DICT[column_name])
- else:
- column_name = snake_case_to_camel_case(column_name)
- mongo_column_names.append(column_name)
- # mongo列类型和主键
- mongo_column_types = {}
- pk_fields = []
- for column_name, column_type in hive_column_types.items():
- is_pk = False
- if split_table_name + "-" + column_name in table_pk_fields:
- is_pk = True
- if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name):
- column_name = MONGO_SPECIAL_WORDS_DICT[column_name]
- else:
- column_name = snake_case_to_camel_case(column_name)
- if column_name == 'createDate' and column_type == 'LONG':
- mongo_column_types[column_name] = 'DATE'
- elif column_name == 'updateDate' and column_type == 'LONG':
- mongo_column_types[column_name] = 'DATE'
- else:
- mongo_column_types[column_name] = column_type
- if is_pk:
- pk_fields.append(column_name)
- mongo_writer_str = MongoWriter.generate_definition(mongo_ds_name, mongo_database, mongo_collection,
- mongo_column_names,
- mongo_column_types, pk_fields)
- # 写文件
- generator_config_file = '{0}/hdfs-mongo-{1}-{2}.ini'.format(config_ini_path, mongo_database, split_table_name)
- write_file(f'{hdfs_reader_str}\n\n{mongo_writer_str}\n', generator_config_file)
- pretty_print(
- f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
- f'{NORM_GRN}{hive_database}.{hive_table_name}'
- f'{NORM_MGT}\n 写 MongoDB DataX作业配置文件生成器配置已写入文件 '
- f'{NORM_GRN}{generator_config_file}'
- )
- index += 1
- def hdfs_mysql_generator():
- raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
- def mysql_hdfs_generator():
- host = CONFIG.get('h')
- port = int(CONFIG.get('P', '3306'))
- username = CONFIG.get('u')
- password = CONFIG.get('p')
- mysql_database = CONFIG.get('D')
- tables = CONFIG.get('t', [])
- if isinstance(tables, str):
- tables = [tables]
- excluded_tables = CONFIG.get('e', [])
- if isinstance(excluded_tables, str):
- excluded_tables = [excluded_tables]
- table_regex = CONFIG.get('tr', [])
- if isinstance(table_regex, str):
- table_regex = [table_regex]
- exclude_regex = CONFIG.get('er', [])
- if isinstance(exclude_regex, str):
- exclude_regex = [exclude_regex]
- project = CONFIG.get('project')
- layer = CONFIG.get('layer', 'ods')
- edition = CONFIG.get('edition')
- env = CONFIG.get('env')
- partitioned = CONFIG.get('partitioned', False)
- inc_col = CONFIG.get('inc-col', 'update_time')
- if not (host and username and password and mysql_database):
- usage(1)
- hdfs_ds_name = 'hdfs-aliyun-cloud'
- hdfs_default_fs = 'hdfs://cluster'
- hdfs_path = '/user/hive/warehouse'
- output = CONFIG.get('output', f'{base_dir}/ignored')
- hive_database = get_hive_database_name(project, layer, env)
- hive_table_prefix = get_hive_table_prefix(project, layer, edition)
- mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database)
- hdfs_datasource_path = '{0}/datasource/hdfs'.format(output)
- config_ini_path = '{0}/config/mysql-hdfs/{1}'.format(output, hive_database)
- hive_ddl_path = '{0}/ddl'.format(output)
- hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database)
- os.system(f'mkdir -p {mysql_datasource_path}')
- os.system(f'mkdir -p {hdfs_datasource_path}')
- os.system(f'mkdir -p {config_ini_path}')
- os.system(f'mkdir -p {hive_ddl_path}')
- mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database)
- hdfs_ds_def = HDFSDataSource.generate_definition(hdfs_default_fs)
- mysql_handler = MySQLHandler(host, port, username, password)
- mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex)
- number_width = len(str(len(mysql_tables)))
- index = 1
- if os.path.exists(hive_ddl_file):
- hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time())))
- write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file)
- for mysql_table_name, mysql_table_comment in mysql_tables.items():
- if tables and not tables.__contains__(mysql_table_name):
- continue
- if excluded_tables and excluded_tables.__contains__(mysql_table_name):
- continue
- mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name)
- mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list]
- column_types = convert_mysql_column_types(mysql_column_list)
- mysql_reader_def = MySQLReader.generate_definition(
- mysql_database,
- mysql_table_name, mysql_table_comment,
- mysql_column_names, column_types,
- hive_database, partitioned, inc_col
- )
- hive_table_name = f'{hive_table_prefix}_{mysql_table_name}'
- hive_ddl_def = MySQLReader.generate_hive_ddl(
- hive_database, hive_table_name, mysql_table_comment,
- partitioned,
- mysql_column_list, column_types
- )
- hdfs_writer_def = HDFSWriter.generate_definition(
- hdfs_ds_name, hdfs_path,
- hive_database, hive_table_name, partitioned,
- mysql_column_names, column_types
- )
- generator_config_file = '{0}/mysql-hdfs-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name)
- write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file)
- append_file(f'{hive_ddl_def}\n', hive_ddl_file)
- pretty_print(
- f'{NORM_YEL}{str(index).rjust(number_width, " ")}{NORM_MGT} 读 MySQL 数据库表 '
- f'{NORM_GRN}{mysql_database}.{mysql_table_name}'
- f'{NORM_MGT}\n 写 HDFS DataX作业配置文件生成器配置已写入文件 '
- f'{NORM_GRN}{generator_config_file}'
- f'{NORM_MGT}\n 对应的Hive建表DDL已写入文件 '
- f'{NORM_GRN}{hive_ddl_file}'
- )
- index += 1
- # break
- mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database)
- if os.path.exists(mysql_ds_def_file):
- mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time())))
- write_file(f'{mysql_ds_def}\n', mysql_ds_def_file)
- pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}')
- hdfs_ds_def_file = '{0}/{1}.ini'.format(hdfs_datasource_path, hdfs_ds_name)
- if os.path.exists(hdfs_ds_def_file):
- hdfs_ds_def_file = '{0}/{1}-{2}.ini'.format(hdfs_datasource_path, hdfs_ds_name, str(int(time.time())))
- write_file(f'{hdfs_ds_def}\n', hdfs_ds_def_file)
- pretty_print(f'{NORM_MGT}HDFS 数据源配置已写入文件 {NORM_GRN}{hdfs_ds_def_file}')
- def mysql_hbase_generator():
- host = CONFIG.get('h')
- port = int(CONFIG.get('P', '3306'))
- username = CONFIG.get('u')
- password = CONFIG.get('p')
- mysql_database = CONFIG.get('D')
- if not (host and username and password and mysql_database):
- usage(1)
- included_tables = CONFIG.get('t', [])
- if isinstance(included_tables, str):
- included_tables = [included_tables]
- excluded_tables = CONFIG.get('e', [])
- if isinstance(excluded_tables, str):
- excluded_tables = [excluded_tables]
- table_regex = CONFIG.get('tr', [])
- if isinstance(table_regex, str):
- table_regex = [table_regex]
- exclude_regex = CONFIG.get('er', [])
- if isinstance(exclude_regex, str):
- exclude_regex = [exclude_regex]
- if len(included_tables) == 0 and len(excluded_tables) == 0 and len(table_regex) == 0 and len(exclude_regex) == 0:
- pretty_print(f'{NORM_YEL}注意:'
- f'{NORM_MGT}参数 {NORM_GRN}-t -e -tr -er '
- f'{NORM_MGT}都未提供,将扫描数据库 '
- f'{NORM_GRN}{mysql_database}{DO_RESET} '
- f'{NORM_MGT}下所有表')
- hbase_namespace = CONFIG.get('n')
- if not hbase_namespace:
- pretty_print(f'{NORM_YEL}注意:'
- f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} '
- f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 '
- f'{NORM_GRN}default')
- hbase_namespace = 'default'
- project = CONFIG.get('project')
- layer = CONFIG.get('layer', 'ods')
- edition = CONFIG.get('edition')
- env = CONFIG.get('env')
- partitioned = CONFIG.get('partitioned', False)
- inc_col = CONFIG.get('inc-col', 'update_time')
- hbase_ds_name = 'hbase-default'
- hdfs_default_fs = 'hdfs://cluster'
- output = CONFIG.get('output', f'{base_dir}/ignored')
- hive_database = get_hive_database_name(project, layer, env)
- hive_table_prefix = get_hive_table_prefix(project, layer, edition)
- mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database)
- config_ini_path = '{0}/config/mysql-hbase/{1}'.format(output, hive_database)
- hive_ddl_path = '{0}/ddl'.format(output)
- hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database)
- os.system(f'mkdir -p {mysql_datasource_path}')
- os.system(f'mkdir -p {config_ini_path}')
- os.system(f'mkdir -p {hive_ddl_path}')
- mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database)
- mysql_handler = MySQLHandler(host, port, username, password)
- mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex)
- number_width = len(str(len(mysql_tables)))
- index = 1
- if os.path.exists(hive_ddl_file):
- hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time())))
- write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file)
- for mysql_table_name, mysql_table_comment in mysql_tables.items():
- if included_tables and not included_tables.__contains__(mysql_table_name):
- continue
- if excluded_tables and excluded_tables.__contains__(mysql_table_name):
- continue
- mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name)
- mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list]
- column_types = convert_mysql_column_types(mysql_column_list)
- mysql_reader_def = MySQLReader.generate_definition(
- mysql_database,
- mysql_table_name, mysql_table_comment,
- mysql_column_names, column_types,
- hive_database, partitioned, inc_col
- )
- hive_table_name = f'{hive_table_prefix}_{mysql_table_name.lower()}_hbase_mapping'
- hbase_table_name = mysql_table_name.lower()
- hive_over_hbase_ddl_def = MySQLReader.generate_hive_over_hbase_ddl(
- hive_database, hive_table_name, mysql_table_comment, hbase_namespace, hbase_table_name,
- mysql_column_list, column_types
- )
- row_key_columns = []
- row_key_columns.append('reverse(主键如果是自增ID,建议reverse)')
- row_key_columns.append('separator(@@)')
- row_key_columns.append(f'separator({mysql_database})')
- row_key_columns.append(f'separator(.)')
- row_key_columns.append(f'separator({mysql_table_name})')
- hdfs_writer_def = HBaseWriter.generate_definition(
- hbase_ds_name, hbase_namespace, hbase_table_name,
- mysql_table_name, mysql_table_comment, 'cf',
- mysql_column_names, column_types, row_key_columns
- )
- generator_config_file = '{0}/mysql-hbase-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name)
- write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file)
- append_file(f'{hive_over_hbase_ddl_def}\n', hive_ddl_file)
- pretty_print(
- f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 MySQL 数据库表 '
- f'{NORM_GRN}{mysql_database}.{mysql_table_name}'
- f'{NORM_MGT}\n 写 HBase DataX作业配置文件生成器配置已写入文件 '
- f'{NORM_GRN}{generator_config_file}'
- f'{NORM_MGT}\n 对应的 Hive Over HBase 建表DDL已写入文件 '
- f'{NORM_GRN}{hive_ddl_file}'
- )
- index += 1
- # break
- mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database)
- if os.path.exists(mysql_ds_def_file):
- mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time())))
- write_file(f'{mysql_ds_def}\n', mysql_ds_def_file)
- pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}')
- if __name__ == '__main__':
- pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
- CONFIG, _ = parse_args(sys.argv[1:])
- # 未提供任何参数或查看帮助
- if exist(CONFIG, ['H', 'help']):
- usage(0)
- from_system = CONFIG.get('from')
- if from_system:
- from_system = from_system.lower()
- to_system = CONFIG.get('to')
- if to_system:
- to_system = to_system.lower()
- if not from_system or not to_system:
- usage(1)
- if from_system == "hdfs":
- if to_system == "elasticsearch":
- hdfs_elasticsearch_generator()
- elif to_system == "hbase":
- hdfs_hbase_generator()
- elif to_system == "kafka":
- hdfs_kafka_generator()
- elif to_system == "mongo":
- hdfs_mongo_generator()
- elif to_system == "mysql":
- hdfs_mysql_generator()
- else:
- raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
- elif from_system == "mysql":
- if to_system == "hbase":
- mysql_hbase_generator()
- elif to_system == "hdfs":
- mysql_hdfs_generator()
- else:
- raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
- else:
- raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
|