mysql_reader.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # -*- coding:utf-8 -*-
  2. import re
  3. from configparser import ConfigParser
  4. from typing import Dict, List
  5. from dw_base.datax.datax_constants import *
  6. from dw_base.datax.plugins.reader.reader import Reader
  7. from dw_base.utils.datetime_utils import local_2_utc, parse_datetime
  8. # mysql reader
  9. MYSQL_READER_NAME = 'mysqlreader'
  10. MYSQL_READER_PARAMETER_COLUMN = 'column'
  11. MYSQL_READER_PARAMETER_CONNECTION = 'connection'
  12. MYSQL_READER_PARAMETER_DATABASE = 'database'
  13. MYSQL_READER_PARAMETER_QUERY_SQL = 'querySql'
  14. MYSQL_READER_PARAMETER_TABLE = 'table'
  15. MYSQL_READER_PARAMETER_UTC = 'utc'
  16. MYSQL_READER_PARAMETER_WHERE = 'where'
  17. MYSQL_KEYWORDS = ['default', 'desc', 'key', 'start', 'views', 'commit', 'time', 'add', 'admin', 'after', 'all', 'alter',
  18. 'analyze', 'and', 'archive', 'array', 'as', 'asc', 'window', 'with', 'year', 'when', 'where', 'while',
  19. 'authorization', 'before', 'between', 'bigint', 'binary', 'boolean', 'both', 'bucket', 'buckets',
  20. 'by', 'cascade', 'case', 'cast', 'change', 'char', 'cluster', 'clustered', 'clusterstatus',
  21. 'collection', 'column', 'columns', 'comment', 'compact', 'compactions', 'compute', 'concatenate',
  22. 'conf', 'continue', 'create', 'cross', 'cube', 'current', 'current_date', 'current_timestamp',
  23. 'cursor', 'data', 'database', 'databases', 'date', 'datetime', 'day', 'dbproperties', 'decimal',
  24. 'deferred', 'defined', 'delete', 'delimited', 'dependency', 'desc', 'describe', 'directories',
  25. 'directory', 'disable', 'distinct', 'distribute', 'double', 'drop', 'elem_type', 'else', 'enable',
  26. 'end', 'escaped', 'exchange', 'exclusive', 'exists', 'explain', 'export', 'extended', 'external',
  27. 'false', 'fetch', 'fields', 'file', 'fileformat', 'first', 'float', 'following', 'for', 'format',
  28. 'formatted', 'from', 'full', 'function', 'functions', 'grant', 'group', 'grouping', 'having',
  29. 'hold_ddltime', 'hour', 'idxproperties', 'if', 'ignore', 'import', 'in', 'index', 'indexes', 'inner',
  30. 'inpath', 'inputdriver', 'inputformat', 'insert', 'int', 'intersect', 'interval', 'into', 'is',
  31. 'items', 'jar', 'join', 'keys', 'key_type', 'lateral', 'left', 'less', 'like', 'limit', 'lines',
  32. 'load', 'local', 'location', 'lock', 'locks', 'logical', 'long', 'macro', 'map', 'mapjoin',
  33. 'materialized', 'minus', 'minute', 'month', 'more', 'msck', 'none', 'noscan', 'not', 'no_drop',
  34. 'null', 'of', 'offline', 'on', 'option', 'or', 'order', 'out', 'outer', 'outputdriver',
  35. 'outputformat', 'over', 'overwrite', 'owner', 'partialscan', 'partition', 'partitioned', 'partitions',
  36. 'percent', 'plus', 'preceding', 'preserve', 'pretty', 'principals', 'procedure', 'protection',
  37. 'purge', 'range', 'read', 'readonly', 'reads', 'rebuild', 'recordreader', 'recordwriter', 'reduce',
  38. 'regexp', 'reload', 'rename', 'repair', 'replace', 'restrict', 'revoke', 'rewrite', 'right', 'rlike',
  39. 'role', 'roles', 'rollup', 'row', 'rows', 'schema', 'schemas', 'second', 'select', 'semi', 'serde',
  40. 'serdeproperties', 'server', 'set', 'sets', 'shared', 'show', 'show_database', 'skewed', 'smallint',
  41. 'sort', 'sorted', 'ssl', 'statistics', 'stored', 'streamtable', 'string', 'struct', 'table', 'tables',
  42. 'tablesample', 'tblproperties', 'temporary', 'terminated', 'then', 'timestamp', 'tinyint', 'to',
  43. 'touch', 'transactions', 'transform', 'trigger', 'true', 'truncate', 'unarchive', 'unbounded', 'undo',
  44. 'union', 'uniontype', 'uniquejoin', 'unlock', 'unset', 'unsigned', 'update', 'uri', 'use', 'user',
  45. 'using', 'utc', 'utctimestamp', 'values', 'value_type', 'varchar', 'view', 'sql', 'start', 'views',
  46. 'time', 'admin', 'alter', 'analyze', 'archive', 'array', 'asc', 'window', 'with', 'year', 'when',
  47. 'where',
  48. 'while', 'authorization', 'bucket', 'buckets', 'by', 'cast', 'cluster', 'clustered',
  49. 'clusterstatus', 'collection', 'compactions', 'compute', 'concatenate', 'conf',
  50. 'current_timestamp', 'cursor', 'dbproperties', 'decimal', 'deferred', 'defined', 'delimited',
  51. 'dependency', 'directories', 'directory', 'distribute', 'elem_type', 'enable', 'end', 'exclusive',
  52. 'external', 'false', 'fileformat', 'following', 'format', 'formatted', 'functions', 'grouping',
  53. 'having', 'hold_ddltime', 'idxproperties', 'inner', 'inpath', 'inputdriver', 'inputformat',
  54. 'intersect',
  55. 'is', 'items', 'jar', 'key_type', 'lateral', 'lines', 'load', 'location', 'logical', 'macro', 'map',
  56. 'mapjoin', 'materialized', 'minus', 'more', 'msck', 'noscan', 'no_drop', 'null', 'of', 'offline',
  57. 'outputdriver', 'outputformat', 'over', 'overwrite', 'partialscan', 'partitioned',
  58. 'partitions', 'percent', 'plus', 'preceding', 'pretty', 'principals', 'protection', 'purge',
  59. 'readonly',
  60. 'recordreader', 'recordwriter', 'reduce', 'regexp', 'restrict', 'rewrite', 'rlike', 'role', 'roles',
  61. 'semi', 'serde', 'serdeproperties', 'server', 'set', 'sets', 'shared', 'show', 'show_database',
  62. 'skewed', 'smallint', 'sort', 'sorted', 'ssl', 'statistics', 'stored', 'streamtable', 'string',
  63. 'struct', 'table', 'tables', 'tablesample', 'tblproperties', 'temporary', 'terminated', 'then',
  64. 'timestamp', 'tinyint', 'to', 'touch', 'transactions', 'transform', 'trigger', 'true', 'truncate',
  65. 'unarchive', 'unbounded', 'undo', 'union', 'uniontype', 'uniquejoin', 'unlock', 'unset', 'unsigned',
  66. 'update', 'uri', 'use', 'user', 'using', 'utc', 'utctimestamp', 'values', 'value_type', 'varchar',
  67. 'view', 'release', 'leave', 'condition', 'type', 'types', 'linear', 'repeat', 'check']
  68. class MySQLReader(Reader):
  69. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  70. super(MySQLReader, self).__init__(base_dir, config_parser, start_date, stop_date)
  71. self.plugin_name = MYSQL_READER_NAME
  72. def load_others(self):
  73. start_date = self.start_date
  74. stop_date = self.stop_date
  75. jdbc_url: str = self.parameter[DS_MYSQL_JDBC_URL]
  76. table_name = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_TABLE)
  77. connection_element = {}
  78. if jdbc_url.__contains__(','):
  79. # 多个jdbc url,不考虑有无db_name
  80. connection_element[DS_MYSQL_JDBC_URL] = jdbc_url.split(',')
  81. else:
  82. db_name = None
  83. if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_DATABASE):
  84. db_name = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_DATABASE)
  85. # 单个jdbc url
  86. matcher = re.search('jdbc:mysql://(.+?)/(.+)', jdbc_url)
  87. if matcher:
  88. # 带数据库名的jdbc url(例如jdbc:mysql://host/db_a)
  89. default_db = matcher.group(2)
  90. connection_element[DS_MYSQL_JDBC_URL] = [jdbc_url]
  91. # 假设db_b, db_c, db_d
  92. if db_name:
  93. # 替换成多个新数据库
  94. connection_element[DS_MYSQL_JDBC_URL] = [
  95. jdbc_url.replace(default_db, db) for db in db_name.split(',')
  96. ]
  97. else:
  98. self.check_config(MYSQL_READER_PARAMETER_DATABASE, db_name)
  99. # 不带数据库的jdbc url,需传入db_name
  100. if not jdbc_url.endswith('/'):
  101. jdbc_url = f'{jdbc_url}/'
  102. connection_element[DS_MYSQL_JDBC_URL] = [f'{jdbc_url}{db}' for db in db_name.split(',')]
  103. if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_QUERY_SQL):
  104. query_sql = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_QUERY_SQL)
  105. if query_sql:
  106. # 有查询语句
  107. connection_element[MYSQL_READER_PARAMETER_QUERY_SQL] = query_sql.split(';')
  108. else:
  109. # 没有查询语句需指定table
  110. self.check_config(MYSQL_READER_PARAMETER_TABLE, table_name)
  111. # 不管有没有提供table_name,都可以写到json里,有优先级做控制
  112. connection_element[MYSQL_READER_PARAMETER_TABLE] = table_name.split(',')
  113. connection = [connection_element]
  114. self.parameter[MYSQL_READER_PARAMETER_CONNECTION] = connection
  115. # 删除parameter中的jdbcUrl
  116. del self.parameter[DS_MYSQL_JDBC_URL]
  117. where = ''
  118. utc = 0
  119. if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_WHERE):
  120. where = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_WHERE)
  121. if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_UTC):
  122. utc = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_UTC)
  123. if where:
  124. if utc:
  125. start_dt_str = local_2_utc(start_date).strftime('%Y-%m-%d %H:%M:%S')
  126. stop_dt_str = local_2_utc(stop_date).strftime('%Y-%m-%d %H:%M:%S')
  127. specific_start_dt_str = local_2_utc(start_date).strftime('%Y-%m-%d')
  128. specific_stop_dt_str = local_2_utc(stop_date).strftime('%Y-%m-%d')
  129. else:
  130. start_dt_str = parse_datetime(start_date).strftime('%Y-%m-%d %H:%M:%S')
  131. stop_dt_str = parse_datetime(stop_date).strftime('%Y-%m-%d %H:%M:%S')
  132. specific_start_dt_str = parse_datetime(start_date).strftime('%Y-%m-%d')
  133. specific_stop_dt_str = parse_datetime(stop_date).strftime('%Y-%m-%d')
  134. where = where.replace('${start_date}', start_dt_str)
  135. where = where.replace('${stop_date}', stop_dt_str)
  136. where = where.replace('${start_specific_date}', specific_start_dt_str)
  137. where = where.replace('${stop_specific_date}', specific_stop_dt_str)
  138. # 此处为了统一(utc和北京时间的区别),使用self.start_date判断,而不是start_date
  139. if self.start_date.startswith(ALL_DATA_DATE):
  140. self.parameter[MYSQL_READER_PARAMETER_WHERE] = ''
  141. else:
  142. self.parameter[MYSQL_READER_PARAMETER_WHERE] = where
  143. def load_column(self):
  144. columns = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_COLUMN).split(',')
  145. self.parameter[MYSQL_READER_PARAMETER_COLUMN] = columns
  146. @staticmethod
  147. def generate_definition(database: str, table_name: str, table_comment: str,
  148. column_names: List[str], column_types: Dict[str, str],
  149. datasource_group: str, incremental: str, inc_col: str) -> str:
  150. column = []
  151. for col_name in column_names:
  152. if MYSQL_KEYWORDS.__contains__(col_name):
  153. column.append(f'`{col_name}`')
  154. else:
  155. column.append(col_name)
  156. column_type = []
  157. for col_name in column_names:
  158. if column_types.__contains__(col_name):
  159. column_type.append(f'{col_name}:{column_types[col_name]}')
  160. definition = [
  161. f'# {table_name}: {table_comment}',
  162. '[reader]',
  163. f'dataSource = {datasource_group}/mysql-{database}',
  164. f'table = {table_name}',
  165. f'column = {",".join(column)}',
  166. f'columnType = {",".join(column_type)}',
  167. ";全量抽取需传递19700101给参数`start_date`,或直接注释下面的条件;增量抽取请取消注释(有update的表不适用)"
  168. ";以具体日期传入当参数时,需要将开始日期参数声明为`start_specific_date`,取短日期范围时,需要同时将结束日期声明为`stop_specific_date`"
  169. ]
  170. if incremental is not None and incremental:
  171. definition.append(f"where = {inc_col} >= '%s' and {inc_col} < '%s'" % ('${start_date}', '${stop_date}'))
  172. else:
  173. definition.append(f";where = {inc_col} >= '%s' and {inc_col} < '%s'" % ('${start_date}', '${stop_date}'))
  174. definition.append(';crm大部分表的时间都是+00(格林威治时间),而实际传入的时间是+08(北京时间),所有通过此字段统一')
  175. definition.append(';utc=1 表示原始表是格林威治时间,utc=0或为空 表示原始表为北京时间')
  176. definition.append('utc =')
  177. return '\n'.join(definition)