datax-gc-generator.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. 生成DataX作业配置文件生成器的配置文件
  5. """
  6. # -*- coding=utf-8 -*-
  7. import os
  8. import re
  9. import sys
  10. import time
  11. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  12. sys.path.append(base_dir)
  13. from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_YEL, NORM_RED
  14. from dw_base.database.mysql_utils import MySQLHandler
  15. from dw_base.datax.datasources.hdfs_data_source import HDFSDataSource
  16. from dw_base.datax.datasources.mysql_data_source import MySQLDataSource
  17. from dw_base.datax.datax_utils import convert_mysql_column_types
  18. from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader
  19. from dw_base.datax.plugins.reader.mysql_reader import MySQLReader
  20. from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter
  21. from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter
  22. from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter
  23. from dw_base.datax.plugins.writer.mongo_writer import MongoWriter, MONGO_SPECIAL_WORDS_DICT
  24. from dw_base.hive.hive_utils import get_hive_database_name, get_hive_table_prefix
  25. from dw_base.spark.spark_sql import SparkSQL
  26. from dw_base.utils.common_utils import exist
  27. from dw_base.utils.config_utils import parse_args
  28. from dw_base.utils.file_utils import write_file, append_file, list_files, load_json_file
  29. from dw_base.utils.log_utils import pretty_print
  30. from dw_base.utils.string_utils import snake_case_to_pascal_case, snake_case_to_camel_case
  31. def usage(code: int):
  32. print(
  33. f'{NORM_MGT}Usage: {sys.argv[0]}\n'
  34. f'{NORM_CYN}\t[-H/--H/--help] 打印脚本使用方法{DO_RESET}'
  35. )
  36. if not from_system or not to_system:
  37. print(
  38. f'{NORM_MGT}Usage: {sys.argv[0]}\n'
  39. f'{NORM_GRN}\t<[-]-from< /=>from> 源系统类型(默认MySQL),目前支持hdfs、mysql\n'
  40. f'{NORM_GRN}\t<[-]-to< /=>to> 目标系统类型(默认HDFS),目前支持elasticsearch、hbase、hdfs、kafka、mongo、mysql\n'
  41. f'{NORM_CYN}\t[[-]-output< /=>output directory] 生成的ini文件存储位置,可以是绝对路径或相对路径'
  42. f'{DO_RESET}'
  43. )
  44. if from_system == "hdfs":
  45. print(
  46. f'{NORM_MGT}Parameters when from is HDFS: \n'
  47. f'{NORM_GRN}\t<[-]-d< /=>database> Hive数据库(默认crl_mg)\n'
  48. f'{NORM_GRN}\t<[-]-t< /=>table> Hive数据表(可多次传入,不传扫描-D传入的库中所有表)\n'
  49. f'{NORM_GRN}\t<[-]-e< /=>exclude> 忽略的Hive数据表(可多次传入)\n'
  50. f'{NORM_CYN}\t[[-]-partitioned] 是否是分区表(默认为是),目前只支持日分区\n'
  51. f'{DO_RESET}'
  52. )
  53. elif from_system == "mysql":
  54. print(
  55. f'{NORM_MGT}Parameters when from is MySQL: \n'
  56. f'{NORM_GRN}\t<[-]-h< /=>host> MySQL主机\n'
  57. f'{NORM_CYN}\t[[-]-P< /=>port] MySQL端口\n'
  58. f'{NORM_GRN}\t<[-]-u< /=>username> MySQL用户\n'
  59. f'{NORM_GRN}\t<[-]-p< /=>password> MySQL密码\n'
  60. f'{NORM_GRN}\t<[-]-D< /=>database> MySQL数据库\n'
  61. f'{NORM_GRN}\t<[-]-tr< /=>table> MySQL数据表正则(可多次传入,优先级高于t)\n'
  62. f'{NORM_GRN}\t<[-]-t< /=>table> MySQL数据表(可多次传入)\n'
  63. f'{NORM_GRN}\t<[-]-er< /=>exclude> 忽略的MySQL数据表正则(可多次传入,优先级高于e)\n'
  64. f'{NORM_GRN}\t<[-]-e< /=>exclude> 忽略的MySQL数据表(可多次传入)\n'
  65. f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
  66. f'{DO_RESET}'
  67. )
  68. if to_system == "elasticsearch":
  69. print(
  70. f'{NORM_MGT}Parameters when to is Elasticsearch: \n'
  71. f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
  72. f'{DO_RESET}'
  73. )
  74. elif to_system == "hbase":
  75. print(
  76. f'{NORM_MGT}Parameters when to is HBase: \n'
  77. f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
  78. f'{DO_RESET}'
  79. )
  80. elif to_system == "hdfs":
  81. print(
  82. f'{NORM_MGT}Parameters when to is HDFS: \n'
  83. f'{NORM_CYN}\t[[-]-project< /=>project] 项目名称,如skb、bms\n'
  84. f'{NORM_CYN}\t[[-]-layer< /=>dw-layer] 数据仓库分层,如ods、dwd\n'
  85. f'{NORM_CYN}\t[[-]-env< /=>dw-env] 数据仓库分层,如test\n'
  86. f'{NORM_CYN}\t[[-]-edition< /=>dw-edition] 数据仓库版本,如dl,dd,\n'
  87. f'{NORM_CYN}\t[[-]-partitioned] 是否是分区表(默认为是),目前只支持日分区'
  88. f'{DO_RESET}'
  89. )
  90. elif to_system == "kafka":
  91. print(
  92. f'{NORM_MGT}Parameters when to is Kafka: \n'
  93. f'{NORM_GRN}\t<[-]-T< /=>kafka topic> Kafka Topic\n'
  94. f'{NORM_GRN}\t<[-]-K< /=>kafka key> Kafka Key of data\n'
  95. f'{DO_RESET}'
  96. )
  97. elif to_system == "mongo":
  98. print(
  99. f'{NORM_MGT}Parameters when to is Mongo: \n'
  100. f'{NORM_CYN}\t[[-]-inc-col] 增量抽取字段名称(默认update_time)'
  101. f'{DO_RESET}'
  102. )
  103. elif to_system == "mysql":
  104. print(
  105. f'{NORM_MGT}Parameters when to is MySQL: \n'
  106. f'{NORM_GRN}\t<[-]-h< /=>host> MySQL主机\n'
  107. f'{NORM_CYN}\t[[-]-P< /=>port] MySQL端口\n'
  108. f'{NORM_GRN}\t<[-]-u< /=>username> MySQL用户\n'
  109. f'{NORM_GRN}\t<[-]-p< /=>password> MySQL密码\n'
  110. f'{NORM_GRN}\t<[-]-D< /=>database> MySQL数据库\n'
  111. f'{NORM_GRN}\t<[-]-t< /=>table> MySQL数据表'
  112. f'{DO_RESET}'
  113. )
  114. exit(code)
  115. def hdfs_elasticsearch_generator():
  116. raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
  117. def hdfs_hbase_generator():
  118. with SparkSQL('datax-gc-generator') as spark_sql:
  119. hdfs_ds_name = 'hdfs-aliyun-cloud'
  120. hbase_ds_name = 'hbase-default'
  121. hdfs_path = '/user/hive/warehouse'
  122. output = CONFIG.get('output', f'{base_dir}/ignored')
  123. hive_database = CONFIG.get('d')
  124. if not hive_database:
  125. pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
  126. usage(1)
  127. config_ini_path = '{0}/config/hdfs-hbase/{1}'.format(output, hive_database)
  128. os.system(f'mkdir -p {config_ini_path}')
  129. included_tables = CONFIG.get('t', [])
  130. if isinstance(included_tables, str):
  131. included_tables = [included_tables]
  132. excluded_tables = CONFIG.get('e', [])
  133. if isinstance(excluded_tables, str):
  134. excluded_tables = [excluded_tables]
  135. if len(included_tables) == 0 and len(excluded_tables) == 0:
  136. pretty_print(f'{NORM_YEL}注意:'
  137. f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
  138. f'{NORM_MGT}未提供,将扫描数据库 '
  139. f'{NORM_GRN}{hive_database}{DO_RESET} '
  140. f'{NORM_MGT}下所有表')
  141. hbase_namespace = CONFIG.get('n')
  142. if not hbase_namespace:
  143. pretty_print(f'{NORM_YEL}注意:'
  144. f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} '
  145. f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 '
  146. f'{NORM_GRN}default')
  147. hbase_namespace = 'default'
  148. final_tables = []
  149. if len(included_tables) == 0:
  150. all_hive_tables = []
  151. # 如果没有传入仅需的表则获取数据库下所有表
  152. tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
  153. desc_tables = tables_df.collect()
  154. for row in desc_tables:
  155. all_hive_tables.append(row[1])
  156. # 去除忽略的数据表
  157. for hive_table in all_hive_tables:
  158. if hive_table not in excluded_tables:
  159. final_tables.append(f'{hive_database}.{hive_table}')
  160. else:
  161. final_tables = included_tables
  162. number_width = len(str(len(final_tables)))
  163. index = 1
  164. for hive_table_full_name in final_tables:
  165. if hive_table_full_name.__contains__("."):
  166. hive_db, hive_table_name = hive_table_full_name.split('.')
  167. else:
  168. hive_db = hive_database
  169. hive_table_name = hive_table_full_name
  170. # 获取表备注
  171. create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
  172. hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
  173. if 'COMMENT' in hive_table_comment:
  174. hive_table_comment = hive_table_comment.split("'")[1]
  175. else:
  176. hive_table_comment = ''
  177. # 表是否有分区
  178. partitioned = False
  179. if "`dt`" in create_table_str:
  180. partitioned = True
  181. # 获取表列名
  182. hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
  183. hive_column_names = []
  184. hive_column_types = {}
  185. for column_name, column_value in hive_columns.items():
  186. hive_column_names.append(column_name)
  187. column_type = column_value[0].upper()
  188. if column_type == 'BIGINT':
  189. column_type = 'LONG'
  190. hive_column_types[column_name] = column_type
  191. hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_db, hive_table_name,
  192. hive_table_comment,
  193. partitioned,
  194. hive_column_names, hive_column_types)
  195. # hbase列名
  196. hbase_column_names = []
  197. for column_name in hive_column_names:
  198. hbase_column_names.append(f'cf:{column_name}')
  199. row_key_columns = []
  200. row_key_columns.append('reverse(主键如果是自增ID,建议reverse)')
  201. row_key_columns.append('separator(@@)')
  202. row_key_columns.append(f'separator({hive_db})')
  203. row_key_columns.append(f'separator(.)')
  204. row_key_columns.append(f'separator({hive_table_name})')
  205. hbase_writer_str = HBaseWriter.generate_definition(hbase_ds_name,
  206. hbase_namespace,
  207. hive_table_name,
  208. hive_table_name,
  209. hive_table_comment,
  210. "cf",
  211. hbase_column_names,
  212. {},
  213. row_key_columns)
  214. # 写文件
  215. generator_config_file = '{0}/hdfs-hbase-{1}-{2}.ini'.format(config_ini_path, hbase_namespace, hive_table_name)
  216. write_file(f'{hdfs_reader_str}\n\n{hbase_writer_str}\n', generator_config_file)
  217. pretty_print(
  218. f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
  219. f'{NORM_GRN}{hive_database}.{hive_table_name} '
  220. f'{NORM_MGT}\n 写 HBase DataX作业配置文件生成器配置已写入文件 '
  221. f'{NORM_GRN}{generator_config_file}'
  222. )
  223. index += 1
  224. def hdfs_kafka_generator():
  225. kafka_topic = CONFIG.get('T')
  226. kafka_key = CONFIG.get('K')
  227. if not kafka_topic:
  228. pretty_print(f'{NORM_RED}参数 {NORM_GRN}-T{DO_RESET}{NORM_RED} 未提供')
  229. usage(1)
  230. included_tables = CONFIG.get('t', [])
  231. if isinstance(included_tables, str):
  232. included_tables = [included_tables]
  233. excluded_tables = CONFIG.get('e', [])
  234. if isinstance(excluded_tables, str):
  235. excluded_tables = [excluded_tables]
  236. hive_database = CONFIG.get('d')
  237. if len(included_tables) == 0 and len(excluded_tables) == 0:
  238. if not hive_database:
  239. pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
  240. usage(1)
  241. else:
  242. pretty_print(f'{NORM_YEL}注意:'
  243. f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
  244. f'{NORM_MGT}未提供,将扫描数据库 '
  245. f'{NORM_GRN}{hive_database}{DO_RESET} '
  246. f'{NORM_MGT}下所有表')
  247. with SparkSQL('datax-gc-generator') as spark_sql:
  248. hdfs_ds_name = 'hdfs-aliyun-cloud'
  249. hdfs_path = '/user/hive/warehouse'
  250. kafka_ds_name = 'kafka-aliyun'
  251. output = CONFIG.get('output', f'{base_dir}/ignored')
  252. final_tables = []
  253. if len(included_tables) == 0:
  254. all_hive_tables = []
  255. # 如果没有传入仅需的表则获取数据库下所有表
  256. tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
  257. desc_tables = tables_df.collect()
  258. for row in desc_tables:
  259. all_hive_tables.append(row[1])
  260. # 去除忽略的数据表
  261. for hive_table in all_hive_tables:
  262. if hive_table not in excluded_tables:
  263. final_tables.append(f'{hive_database}.{hive_table}')
  264. else:
  265. final_tables = included_tables
  266. number_width = len(str(len(final_tables)))
  267. index = 1
  268. # 判断是否为外部表,若为外部表,则获取表信息内容改变
  269. for table_name in final_tables:
  270. # if hive_table_full_name.endswith("es_mapping"):
  271. # continue
  272. # hive_database, hive_table_name = hive_table_full_name.split('.') # type: str,str
  273. config_ini_path = '{0}/config/hdfs-kafka/{1}'.format(output, hive_database)
  274. os.system(f'mkdir -p {config_ini_path}')
  275. hive_table_name = ''
  276. # 如果传入的配置中(-t=*)是完整的库名+表名(crl_es.xxx),则通过完整的库名表名获取信息
  277. if table_name.__contains__('.'):
  278. hive_database, hive_table_name = table_name.split('.') # type: str,str
  279. hive_table_full_name = table_name
  280. # 如果传入的配置中是表名,则通过传入的配置获取库信息
  281. elif hive_database is not None:
  282. hive_table_name = table_name # type: str
  283. hive_table_full_name = f'{hive_database}.{hive_table_name}'
  284. else:
  285. raise ValueError("hive_database undefined")
  286. hive_column_names = []
  287. hive_column_types = {}
  288. hive_table_names_mapping = {}
  289. # 获取表备注
  290. if hive_table_full_name.__contains__('mapping'):
  291. mysql_handler = MySQLHandler(
  292. host='rm-m5er2i6wz605su9bi.mysql.rds.aliyuncs.com',
  293. port=3306,
  294. username='meta_ro',
  295. password='Ts#r5rO1'
  296. )
  297. partitioned = True # 如果是mapping表则默认有分区,因为只是通过mapping表看映射
  298. hive_table_tbl = dict(
  299. mysql_handler.query_tbl_hive_metadata(hive_table_name)) # 获取hive的mapping表全部的tblproperties
  300. hive_table_comment = hive_table_tbl.get('comment') # 获取hive表comment字段
  301. hive_table_names_mapping = hive_table_tbl.get('es.mapping.names') # 获取hive与es映射的字段信息
  302. hive_table_es_index = hive_table_tbl.get('es.resource').split('/')[0] # 获取写入es的索引
  303. hive_table_column = mysql_handler.query_column_hive_metadata(hive_table_name) # 获取hive表的列名、字段类型、注释等数据
  304. hive_columns = {column[1]: (column[2], column[3]) for column in
  305. hive_table_column} # 将读取到的列名、类型、注释转换成与spark读取出来的格式一致
  306. kafka_column_types = {column[1]: column[2] for column in hive_table_column} # 单独列出一个{列名:类型}的字典
  307. hdfs_reader_column = [mappingName.split(':')[0] for mappingName in
  308. str(hive_table_names_mapping).split(',')] # 在hive与es映射字段的表内取出要读hdfs的字段
  309. kafka_writer_column = [mappingName.split(':')[1] for mappingName in
  310. str(hive_table_names_mapping).split(',')] # 在hive与es映射字段的表内取出要写kafka的字段
  311. column_mapping = dict(item.split(":") for item in hive_table_names_mapping.split(",")) # 将hive与es字段的映射转换为字典
  312. for k, v in dict(
  313. item.split(":") for item in hive_table_names_mapping.split(",")).items(): # 保存一个不包含struct结构体映射的字典
  314. if k not in hive_columns.keys():
  315. column_mapping.pop(k)
  316. for column in hdfs_reader_column.copy(): # HDFS reader中去掉结构体的列名
  317. if column not in hive_columns.keys():
  318. hdfs_reader_column.remove(column)
  319. for column in kafka_writer_column.copy(): # kafka reader中去掉结构体的列名
  320. if column not in column_mapping.values():
  321. kafka_writer_column.remove(column)
  322. hive_es_column_mapping = {hdfs_reader_column[i]: kafka_writer_column[i] for i in
  323. range(len(hdfs_reader_column))}
  324. for key, value in kafka_column_types.copy().items(): # 替换kafka writer字典中的键
  325. if key in hive_es_column_mapping:
  326. kafka_column_types[hive_es_column_mapping[key]] = kafka_column_types.pop(key)
  327. if key not in hive_es_column_mapping:
  328. kafka_column_types.pop(key)
  329. hive_column_types = {}
  330. column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE']
  331. for column_name, column_value in hive_columns.items():
  332. hive_column_names.append(column_name)
  333. column_type = column_value[0].upper()
  334. if column_type in column_type_flag:
  335. hive_column_types[column_name] = column_type
  336. else:
  337. if column_type == 'INT':
  338. hive_column_types[column_name] = 'LONG'
  339. hive_column_names = hdfs_reader_column
  340. hive_table_name = re.sub(r"({}).*$".format(re.escape("sum")), "sum", hive_table_name)
  341. if kafka_key is None:
  342. kafka_key = hive_table_es_index
  343. else:
  344. create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
  345. hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
  346. if 'COMMENT' in hive_table_comment:
  347. hive_table_comment = hive_table_comment.split("'")[1]
  348. else:
  349. hive_table_comment = ''
  350. # 表是否有分区
  351. partitioned = False
  352. if "`dt`" in create_table_str:
  353. partitioned = True
  354. # 获取表列名
  355. hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
  356. column_type_flag = ['LONG', 'BIGINT', 'BOOLEAN', 'STRING', 'DOULBE']
  357. for column_name, column_value in hive_columns.items():
  358. hive_column_names.append(column_name)
  359. column_type = column_value[0].upper()
  360. if column_type in column_type_flag:
  361. hive_column_types[column_name] = column_type
  362. else:
  363. if column_type == 'INT':
  364. hive_column_types[column_name] = 'LONG'
  365. kafka_writer_column = hive_column_names
  366. kafka_column_types = hive_column_types
  367. hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name,
  368. hive_table_comment,
  369. partitioned,
  370. hive_column_names, hive_column_types)
  371. source_name = hive_table_name.replace('es_crl_', '').replace('_sum', '')
  372. kafka_writer_str = KafkaWriter.generate_definition(kafka_ds_name, kafka_topic, kafka_key, source_name,
  373. kafka_writer_column, kafka_column_types,
  374. hive_table_names_mapping)
  375. # 写文件
  376. generator_config_file = '{0}/hdfs-kafka-{1}-{2}.ini'.format(config_ini_path, hive_database, hive_table_name)
  377. write_file(f'{hdfs_reader_str}\n\n{kafka_writer_str}\n', generator_config_file)
  378. pretty_print(
  379. f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
  380. f'{NORM_GRN}{hive_database}.{hive_table_name}'
  381. f'{NORM_MGT}\n 写 Kafka DataX作业配置文件生成器配置已写入文件 '
  382. f'{NORM_GRN}{generator_config_file}'
  383. )
  384. index += 1
  385. def hdfs_mongo_generator():
  386. hive_database = CONFIG.get('d')
  387. if not hive_database:
  388. pretty_print(f'{NORM_RED}参数 {NORM_GRN}-d{DO_RESET}{NORM_RED} 未提供')
  389. usage(1)
  390. included_tables = CONFIG.get('t', [])
  391. if isinstance(included_tables, str):
  392. included_tables = [included_tables]
  393. excluded_tables = CONFIG.get('e', [])
  394. if isinstance(excluded_tables, str):
  395. excluded_tables = [excluded_tables]
  396. if len(included_tables) == 0 and len(excluded_tables) == 0:
  397. pretty_print(f'{NORM_YEL}注意:'
  398. f'{NORM_MGT}参数 {NORM_GRN}-t{NORM_MGT} 或 {NORM_GRN}-e '
  399. f'{NORM_MGT}未提供,将扫描数据库 '
  400. f'{NORM_GRN}{hive_database}{DO_RESET} '
  401. f'{NORM_MGT}下所有表')
  402. with SparkSQL('datax-gc-generator') as spark_sql:
  403. # 获取json文件中表的主键
  404. table_pk_fields = []
  405. for validation_config_file in list_files('conf/validation', True):
  406. if validation_config_file.endswith('.json'):
  407. validation_config = load_json_file(validation_config_file)
  408. dwd_table_config = validation_config.get('dwd_table') # type: str
  409. if not dwd_table_config:
  410. pretty_print(f'{NORM_YEL}文件 {NORM_GRN}{validation_config_file}'
  411. f'{NORM_YEL} 中没有发现 {NORM_GRN}dwd_table{NORM_YEL} 的定义')
  412. continue
  413. # 表名后缀,如:court_announcement
  414. table_name = dwd_table_config.split("_crl_")[1]
  415. ods_dwd_config = validation_config.get('ods_dwd_config')
  416. if not ods_dwd_config:
  417. continue
  418. # 获取主键名
  419. new_pk_fields = ods_dwd_config.get('pk_fields') # type:list
  420. # 放入到字典中
  421. for pk_fields in new_pk_fields:
  422. table_pk_fields.append(table_name + "-" + pk_fields)
  423. hdfs_ds_name = 'hdfs-aliyun-cloud'
  424. hdfs_path = '/user/hive/warehouse'
  425. output = CONFIG.get('output', f'{base_dir}/ignored')
  426. config_ini_path = '{0}/config/hdfs-mongo/{1}'.format(output, hive_database)
  427. os.system(f'mkdir -p {config_ini_path}')
  428. final_tables = []
  429. if len(included_tables) == 0:
  430. all_hive_tables = []
  431. # 如果没有传入仅需的表则获取数据库下所有表
  432. tables_df, _ = spark_sql.query(f'show tables in {hive_database}', silent=True)
  433. desc_tables = tables_df.collect()
  434. for row in desc_tables:
  435. all_hive_tables.append(row[1])
  436. # 去除忽略的数据表
  437. for hive_table in all_hive_tables:
  438. if hive_table not in excluded_tables:
  439. final_tables.append(f'{hive_database}.{hive_table}')
  440. else:
  441. final_tables = included_tables
  442. number_width = len(str(len(final_tables)))
  443. index = 1
  444. for hive_table_full_name in final_tables:
  445. hive_database, hive_table_name = hive_table_full_name.split('.')
  446. # 获取表备注
  447. create_table_str = spark_sql.query_scalar(f'show create table {hive_table_full_name}', silent=True)
  448. hive_table_comment = create_table_str.split('ROW FORMAT SERDE')[0].split(")")[1]
  449. if 'COMMENT' in hive_table_comment:
  450. hive_table_comment = hive_table_comment.split("'")[1]
  451. else:
  452. hive_table_comment = ''
  453. # 表是否有分区
  454. partitioned = False
  455. if "`dt`" in create_table_str:
  456. partitioned = True
  457. # 获取表列名
  458. hive_columns = spark_sql.get_columns(f'{hive_table_full_name}')
  459. hive_column_names = []
  460. hive_column_types = {}
  461. for column_name, column_value in hive_columns.items():
  462. hive_column_names.append(column_name)
  463. column_type = column_value[0].upper()
  464. if column_type == 'BIGINT':
  465. column_type = 'LONG'
  466. hive_column_types[column_name] = column_type
  467. hdfs_reader_str = HDFSReader.generate_definition(hdfs_ds_name, hdfs_path, hive_database, hive_table_name,
  468. hive_table_comment,
  469. partitioned,
  470. hive_column_names, hive_column_types)
  471. mongo_ds_name = 'mongo-dev-rw'
  472. if hive_table_full_name.__contains__('_crl_'):
  473. split_table_name = hive_table_full_name.split("_crl_")[1]
  474. else:
  475. split_table_name = 'unknown'
  476. mongo_database = 'enterprise'
  477. mongo_collection = snake_case_to_pascal_case(split_table_name)
  478. # mongo列名
  479. mongo_column_names = []
  480. for column_name in hive_column_names:
  481. if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name):
  482. mongo_column_names.append(MONGO_SPECIAL_WORDS_DICT[column_name])
  483. else:
  484. column_name = snake_case_to_camel_case(column_name)
  485. mongo_column_names.append(column_name)
  486. # mongo列类型和主键
  487. mongo_column_types = {}
  488. pk_fields = []
  489. for column_name, column_type in hive_column_types.items():
  490. is_pk = False
  491. if split_table_name + "-" + column_name in table_pk_fields:
  492. is_pk = True
  493. if MONGO_SPECIAL_WORDS_DICT.__contains__(column_name):
  494. column_name = MONGO_SPECIAL_WORDS_DICT[column_name]
  495. else:
  496. column_name = snake_case_to_camel_case(column_name)
  497. if column_name == 'createDate' and column_type == 'LONG':
  498. mongo_column_types[column_name] = 'DATE'
  499. elif column_name == 'updateDate' and column_type == 'LONG':
  500. mongo_column_types[column_name] = 'DATE'
  501. else:
  502. mongo_column_types[column_name] = column_type
  503. if is_pk:
  504. pk_fields.append(column_name)
  505. mongo_writer_str = MongoWriter.generate_definition(mongo_ds_name, mongo_database, mongo_collection,
  506. mongo_column_names,
  507. mongo_column_types, pk_fields)
  508. # 写文件
  509. generator_config_file = '{0}/hdfs-mongo-{1}-{2}.ini'.format(config_ini_path, mongo_database, split_table_name)
  510. write_file(f'{hdfs_reader_str}\n\n{mongo_writer_str}\n', generator_config_file)
  511. pretty_print(
  512. f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 Hive 数据库表 '
  513. f'{NORM_GRN}{hive_database}.{hive_table_name}'
  514. f'{NORM_MGT}\n 写 MongoDB DataX作业配置文件生成器配置已写入文件 '
  515. f'{NORM_GRN}{generator_config_file}'
  516. )
  517. index += 1
  518. def hdfs_mysql_generator():
  519. raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
  520. def mysql_hdfs_generator():
  521. host = CONFIG.get('h')
  522. port = int(CONFIG.get('P', '3306'))
  523. username = CONFIG.get('u')
  524. password = CONFIG.get('p')
  525. mysql_database = CONFIG.get('D')
  526. tables = CONFIG.get('t', [])
  527. if isinstance(tables, str):
  528. tables = [tables]
  529. excluded_tables = CONFIG.get('e', [])
  530. if isinstance(excluded_tables, str):
  531. excluded_tables = [excluded_tables]
  532. table_regex = CONFIG.get('tr', [])
  533. if isinstance(table_regex, str):
  534. table_regex = [table_regex]
  535. exclude_regex = CONFIG.get('er', [])
  536. if isinstance(exclude_regex, str):
  537. exclude_regex = [exclude_regex]
  538. project = CONFIG.get('project')
  539. layer = CONFIG.get('layer', 'ods')
  540. edition = CONFIG.get('edition')
  541. env = CONFIG.get('env')
  542. partitioned = CONFIG.get('partitioned', False)
  543. inc_col = CONFIG.get('inc-col', 'update_time')
  544. if not (host and username and password and mysql_database):
  545. usage(1)
  546. hdfs_ds_name = 'hdfs-aliyun-cloud'
  547. hdfs_default_fs = 'hdfs://cluster'
  548. hdfs_path = '/user/hive/warehouse'
  549. output = CONFIG.get('output', f'{base_dir}/ignored')
  550. hive_database = get_hive_database_name(project, layer, env)
  551. hive_table_prefix = get_hive_table_prefix(project, layer, edition)
  552. mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database)
  553. hdfs_datasource_path = '{0}/datasource/hdfs'.format(output)
  554. config_ini_path = '{0}/config/mysql-hdfs/{1}'.format(output, hive_database)
  555. hive_ddl_path = '{0}/ddl'.format(output)
  556. hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database)
  557. os.system(f'mkdir -p {mysql_datasource_path}')
  558. os.system(f'mkdir -p {hdfs_datasource_path}')
  559. os.system(f'mkdir -p {config_ini_path}')
  560. os.system(f'mkdir -p {hive_ddl_path}')
  561. mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database)
  562. hdfs_ds_def = HDFSDataSource.generate_definition(hdfs_default_fs)
  563. mysql_handler = MySQLHandler(host, port, username, password)
  564. mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex)
  565. number_width = len(str(len(mysql_tables)))
  566. index = 1
  567. if os.path.exists(hive_ddl_file):
  568. hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time())))
  569. write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file)
  570. for mysql_table_name, mysql_table_comment in mysql_tables.items():
  571. if tables and not tables.__contains__(mysql_table_name):
  572. continue
  573. if excluded_tables and excluded_tables.__contains__(mysql_table_name):
  574. continue
  575. mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name)
  576. mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list]
  577. column_types = convert_mysql_column_types(mysql_column_list)
  578. mysql_reader_def = MySQLReader.generate_definition(
  579. mysql_database,
  580. mysql_table_name, mysql_table_comment,
  581. mysql_column_names, column_types,
  582. hive_database, partitioned, inc_col
  583. )
  584. hive_table_name = f'{hive_table_prefix}_{mysql_table_name}'
  585. hive_ddl_def = MySQLReader.generate_hive_ddl(
  586. hive_database, hive_table_name, mysql_table_comment,
  587. partitioned,
  588. mysql_column_list, column_types
  589. )
  590. hdfs_writer_def = HDFSWriter.generate_definition(
  591. hdfs_ds_name, hdfs_path,
  592. hive_database, hive_table_name, partitioned,
  593. mysql_column_names, column_types
  594. )
  595. generator_config_file = '{0}/mysql-hdfs-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name)
  596. write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file)
  597. append_file(f'{hive_ddl_def}\n', hive_ddl_file)
  598. pretty_print(
  599. f'{NORM_YEL}{str(index).rjust(number_width, " ")}{NORM_MGT} 读 MySQL 数据库表 '
  600. f'{NORM_GRN}{mysql_database}.{mysql_table_name}'
  601. f'{NORM_MGT}\n 写 HDFS DataX作业配置文件生成器配置已写入文件 '
  602. f'{NORM_GRN}{generator_config_file}'
  603. f'{NORM_MGT}\n 对应的Hive建表DDL已写入文件 '
  604. f'{NORM_GRN}{hive_ddl_file}'
  605. )
  606. index += 1
  607. # break
  608. mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database)
  609. if os.path.exists(mysql_ds_def_file):
  610. mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time())))
  611. write_file(f'{mysql_ds_def}\n', mysql_ds_def_file)
  612. pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}')
  613. hdfs_ds_def_file = '{0}/{1}.ini'.format(hdfs_datasource_path, hdfs_ds_name)
  614. if os.path.exists(hdfs_ds_def_file):
  615. hdfs_ds_def_file = '{0}/{1}-{2}.ini'.format(hdfs_datasource_path, hdfs_ds_name, str(int(time.time())))
  616. write_file(f'{hdfs_ds_def}\n', hdfs_ds_def_file)
  617. pretty_print(f'{NORM_MGT}HDFS 数据源配置已写入文件 {NORM_GRN}{hdfs_ds_def_file}')
  618. def mysql_hbase_generator():
  619. host = CONFIG.get('h')
  620. port = int(CONFIG.get('P', '3306'))
  621. username = CONFIG.get('u')
  622. password = CONFIG.get('p')
  623. mysql_database = CONFIG.get('D')
  624. if not (host and username and password and mysql_database):
  625. usage(1)
  626. included_tables = CONFIG.get('t', [])
  627. if isinstance(included_tables, str):
  628. included_tables = [included_tables]
  629. excluded_tables = CONFIG.get('e', [])
  630. if isinstance(excluded_tables, str):
  631. excluded_tables = [excluded_tables]
  632. table_regex = CONFIG.get('tr', [])
  633. if isinstance(table_regex, str):
  634. table_regex = [table_regex]
  635. exclude_regex = CONFIG.get('er', [])
  636. if isinstance(exclude_regex, str):
  637. exclude_regex = [exclude_regex]
  638. if len(included_tables) == 0 and len(excluded_tables) == 0 and len(table_regex) == 0 and len(exclude_regex) == 0:
  639. pretty_print(f'{NORM_YEL}注意:'
  640. f'{NORM_MGT}参数 {NORM_GRN}-t -e -tr -er '
  641. f'{NORM_MGT}都未提供,将扫描数据库 '
  642. f'{NORM_GRN}{mysql_database}{DO_RESET} '
  643. f'{NORM_MGT}下所有表')
  644. hbase_namespace = CONFIG.get('n')
  645. if not hbase_namespace:
  646. pretty_print(f'{NORM_YEL}注意:'
  647. f'{NORM_MGT}参数 {NORM_GRN}-n{DO_RESET} '
  648. f'{NORM_MGT}未提供,将使用 HBase 默认命名空间 '
  649. f'{NORM_GRN}default')
  650. hbase_namespace = 'default'
  651. project = CONFIG.get('project')
  652. layer = CONFIG.get('layer', 'ods')
  653. edition = CONFIG.get('edition')
  654. env = CONFIG.get('env')
  655. partitioned = CONFIG.get('partitioned', False)
  656. inc_col = CONFIG.get('inc-col', 'update_time')
  657. hbase_ds_name = 'hbase-default'
  658. hdfs_default_fs = 'hdfs://cluster'
  659. output = CONFIG.get('output', f'{base_dir}/ignored')
  660. hive_database = get_hive_database_name(project, layer, env)
  661. hive_table_prefix = get_hive_table_prefix(project, layer, edition)
  662. mysql_datasource_path = '{0}/datasource/mysql/{1}'.format(output, hive_database)
  663. config_ini_path = '{0}/config/mysql-hbase/{1}'.format(output, hive_database)
  664. hive_ddl_path = '{0}/ddl'.format(output)
  665. hive_ddl_file = '{0}/{1}.sql'.format(hive_ddl_path, mysql_database)
  666. os.system(f'mkdir -p {mysql_datasource_path}')
  667. os.system(f'mkdir -p {config_ini_path}')
  668. os.system(f'mkdir -p {hive_ddl_path}')
  669. mysql_ds_def = MySQLDataSource.generate_definition(host, port, username, password, mysql_database)
  670. mysql_handler = MySQLHandler(host, port, username, password)
  671. mysql_tables = mysql_handler.list_tables(mysql_database, exclude_regex, table_regex)
  672. number_width = len(str(len(mysql_tables)))
  673. index = 1
  674. if os.path.exists(hive_ddl_file):
  675. hive_ddl_file = '{0}/{1}-{2}.sql'.format(hive_ddl_path, mysql_database, str(int(time.time())))
  676. write_file('CREATE DATABASE IF NOT EXISTS %s;\n' % hive_database, hive_ddl_file)
  677. for mysql_table_name, mysql_table_comment in mysql_tables.items():
  678. if included_tables and not included_tables.__contains__(mysql_table_name):
  679. continue
  680. if excluded_tables and excluded_tables.__contains__(mysql_table_name):
  681. continue
  682. mysql_column_list = mysql_handler.list_columns(mysql_database, mysql_table_name)
  683. mysql_column_names = [c.COLUMN_NAME for c in mysql_column_list]
  684. column_types = convert_mysql_column_types(mysql_column_list)
  685. mysql_reader_def = MySQLReader.generate_definition(
  686. mysql_database,
  687. mysql_table_name, mysql_table_comment,
  688. mysql_column_names, column_types,
  689. hive_database, partitioned, inc_col
  690. )
  691. hive_table_name = f'{hive_table_prefix}_{mysql_table_name.lower()}_hbase_mapping'
  692. hbase_table_name = mysql_table_name.lower()
  693. hive_over_hbase_ddl_def = MySQLReader.generate_hive_over_hbase_ddl(
  694. hive_database, hive_table_name, mysql_table_comment, hbase_namespace, hbase_table_name,
  695. mysql_column_list, column_types
  696. )
  697. row_key_columns = []
  698. row_key_columns.append('reverse(主键如果是自增ID,建议reverse)')
  699. row_key_columns.append('separator(@@)')
  700. row_key_columns.append(f'separator({mysql_database})')
  701. row_key_columns.append(f'separator(.)')
  702. row_key_columns.append(f'separator({mysql_table_name})')
  703. hdfs_writer_def = HBaseWriter.generate_definition(
  704. hbase_ds_name, hbase_namespace, hbase_table_name,
  705. mysql_table_name, mysql_table_comment, 'cf',
  706. mysql_column_names, column_types, row_key_columns
  707. )
  708. generator_config_file = '{0}/mysql-hbase-{1}-{2}.ini'.format(config_ini_path, mysql_database, mysql_table_name)
  709. write_file(f'{mysql_reader_def}\n{hdfs_writer_def}\n', generator_config_file)
  710. append_file(f'{hive_over_hbase_ddl_def}\n', hive_ddl_file)
  711. pretty_print(
  712. f'{NORM_YEL}{str(index).rjust(number_width, " ")}. {NORM_MGT} 读 MySQL 数据库表 '
  713. f'{NORM_GRN}{mysql_database}.{mysql_table_name}'
  714. f'{NORM_MGT}\n 写 HBase DataX作业配置文件生成器配置已写入文件 '
  715. f'{NORM_GRN}{generator_config_file}'
  716. f'{NORM_MGT}\n 对应的 Hive Over HBase 建表DDL已写入文件 '
  717. f'{NORM_GRN}{hive_ddl_file}'
  718. )
  719. index += 1
  720. # break
  721. mysql_ds_def_file = '{0}/mysql-{1}.ini'.format(mysql_datasource_path, mysql_database)
  722. if os.path.exists(mysql_ds_def_file):
  723. mysql_ds_def_file = '{0}/mysql-{1}-{2}.ini'.format(mysql_datasource_path, mysql_database, str(int(time.time())))
  724. write_file(f'{mysql_ds_def}\n', mysql_ds_def_file)
  725. pretty_print(f'{NORM_MGT}MySQL数据库 {NORM_GRN}{mysql_database}{NORM_MGT} 数据源配置已写入文件 {NORM_GRN}{mysql_ds_def_file}')
  726. if __name__ == '__main__':
  727. pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
  728. CONFIG, _ = parse_args(sys.argv[1:])
  729. # 未提供任何参数或查看帮助
  730. if exist(CONFIG, ['H', 'help']):
  731. usage(0)
  732. from_system = CONFIG.get('from')
  733. if from_system:
  734. from_system = from_system.lower()
  735. to_system = CONFIG.get('to')
  736. if to_system:
  737. to_system = to_system.lower()
  738. if not from_system or not to_system:
  739. usage(1)
  740. if from_system == "hdfs":
  741. if to_system == "elasticsearch":
  742. hdfs_elasticsearch_generator()
  743. elif to_system == "hbase":
  744. hdfs_hbase_generator()
  745. elif to_system == "kafka":
  746. hdfs_kafka_generator()
  747. elif to_system == "mongo":
  748. hdfs_mongo_generator()
  749. elif to_system == "mysql":
  750. hdfs_mysql_generator()
  751. else:
  752. raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
  753. elif from_system == "mysql":
  754. if to_system == "hbase":
  755. mysql_hbase_generator()
  756. elif to_system == "hdfs":
  757. mysql_hdfs_generator()
  758. else:
  759. raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')
  760. else:
  761. raise Exception(f'Not implemented yet with from {from_system} and to {to_system}')