# -*- coding:utf-8 -*- import re from configparser import ConfigParser from typing import Dict, List from dw_base.datax.datax_constants import * from dw_base.datax.plugins.reader.reader import Reader from dw_base.utils.datetime_utils import local_2_utc, parse_datetime # mysql reader MYSQL_READER_NAME = 'mysqlreader' MYSQL_READER_PARAMETER_COLUMN = 'column' MYSQL_READER_PARAMETER_CONNECTION = 'connection' MYSQL_READER_PARAMETER_DATABASE = 'database' MYSQL_READER_PARAMETER_QUERY_SQL = 'querySql' MYSQL_READER_PARAMETER_TABLE = 'table' MYSQL_READER_PARAMETER_UTC = 'utc' MYSQL_READER_PARAMETER_WHERE = 'where' MYSQL_KEYWORDS = ['default', 'desc', 'key', 'start', 'views', 'commit', 'time', 'add', 'admin', 'after', 'all', 'alter', 'analyze', 'and', 'archive', 'array', 'as', 'asc', 'window', 'with', 'year', 'when', 'where', 'while', 'authorization', 'before', 'between', 'bigint', 'binary', 'boolean', 'both', 'bucket', 'buckets', 'by', 'cascade', 'case', 'cast', 'change', 'char', 'cluster', 'clustered', 'clusterstatus', 'collection', 'column', 'columns', 'comment', 'compact', 'compactions', 'compute', 'concatenate', 'conf', 'continue', 'create', 'cross', 'cube', 'current', 'current_date', 'current_timestamp', 'cursor', 'data', 'database', 'databases', 'date', 'datetime', 'day', 'dbproperties', 'decimal', 'deferred', 'defined', 'delete', 'delimited', 'dependency', 'desc', 'describe', 'directories', 'directory', 'disable', 'distinct', 'distribute', 'double', 'drop', 'elem_type', 'else', 'enable', 'end', 'escaped', 'exchange', 'exclusive', 'exists', 'explain', 'export', 'extended', 'external', 'false', 'fetch', 'fields', 'file', 'fileformat', 'first', 'float', 'following', 'for', 'format', 'formatted', 'from', 'full', 'function', 'functions', 'grant', 'group', 'grouping', 'having', 'hold_ddltime', 'hour', 'idxproperties', 'if', 'ignore', 'import', 'in', 'index', 'indexes', 'inner', 'inpath', 'inputdriver', 'inputformat', 'insert', 'int', 'intersect', 'interval', 'into', 'is', 'items', 'jar', 'join', 'keys', 'key_type', 'lateral', 'left', 'less', 'like', 'limit', 'lines', 'load', 'local', 'location', 'lock', 'locks', 'logical', 'long', 'macro', 'map', 'mapjoin', 'materialized', 'minus', 'minute', 'month', 'more', 'msck', 'none', 'noscan', 'not', 'no_drop', 'null', 'of', 'offline', 'on', 'option', 'or', 'order', 'out', 'outer', 'outputdriver', 'outputformat', 'over', 'overwrite', 'owner', 'partialscan', 'partition', 'partitioned', 'partitions', 'percent', 'plus', 'preceding', 'preserve', 'pretty', 'principals', 'procedure', 'protection', 'purge', 'range', 'read', 'readonly', 'reads', 'rebuild', 'recordreader', 'recordwriter', 'reduce', 'regexp', 'reload', 'rename', 'repair', 'replace', 'restrict', 'revoke', 'rewrite', 'right', 'rlike', 'role', 'roles', 'rollup', 'row', 'rows', 'schema', 'schemas', 'second', 'select', 'semi', 'serde', 'serdeproperties', 'server', 'set', 'sets', 'shared', 'show', 'show_database', 'skewed', 'smallint', 'sort', 'sorted', 'ssl', 'statistics', 'stored', 'streamtable', 'string', 'struct', 'table', 'tables', 'tablesample', 'tblproperties', 'temporary', 'terminated', 'then', 'timestamp', 'tinyint', 'to', 'touch', 'transactions', 'transform', 'trigger', 'true', 'truncate', 'unarchive', 'unbounded', 'undo', 'union', 'uniontype', 'uniquejoin', 'unlock', 'unset', 'unsigned', 'update', 'uri', 'use', 'user', 'using', 'utc', 'utctimestamp', 'values', 'value_type', 'varchar', 'view', 'sql', 'start', 'views', 'time', 'admin', 'alter', 'analyze', 'archive', 'array', 'asc', 'window', 'with', 'year', 'when', 'where', 'while', 'authorization', 'bucket', 'buckets', 'by', 'cast', 'cluster', 'clustered', 'clusterstatus', 'collection', 'compactions', 'compute', 'concatenate', 'conf', 'current_timestamp', 'cursor', 'dbproperties', 'decimal', 'deferred', 'defined', 'delimited', 'dependency', 'directories', 'directory', 'distribute', 'elem_type', 'enable', 'end', 'exclusive', 'external', 'false', 'fileformat', 'following', 'format', 'formatted', 'functions', 'grouping', 'having', 'hold_ddltime', 'idxproperties', 'inner', 'inpath', 'inputdriver', 'inputformat', 'intersect', 'is', 'items', 'jar', 'key_type', 'lateral', 'lines', 'load', 'location', 'logical', 'macro', 'map', 'mapjoin', 'materialized', 'minus', 'more', 'msck', 'noscan', 'no_drop', 'null', 'of', 'offline', 'outputdriver', 'outputformat', 'over', 'overwrite', 'partialscan', 'partitioned', 'partitions', 'percent', 'plus', 'preceding', 'pretty', 'principals', 'protection', 'purge', 'readonly', 'recordreader', 'recordwriter', 'reduce', 'regexp', 'restrict', 'rewrite', 'rlike', 'role', 'roles', 'semi', 'serde', 'serdeproperties', 'server', 'set', 'sets', 'shared', 'show', 'show_database', 'skewed', 'smallint', 'sort', 'sorted', 'ssl', 'statistics', 'stored', 'streamtable', 'string', 'struct', 'table', 'tables', 'tablesample', 'tblproperties', 'temporary', 'terminated', 'then', 'timestamp', 'tinyint', 'to', 'touch', 'transactions', 'transform', 'trigger', 'true', 'truncate', 'unarchive', 'unbounded', 'undo', 'union', 'uniontype', 'uniquejoin', 'unlock', 'unset', 'unsigned', 'update', 'uri', 'use', 'user', 'using', 'utc', 'utctimestamp', 'values', 'value_type', 'varchar', 'view', 'release', 'leave', 'condition', 'type', 'types', 'linear', 'repeat', 'check'] class MySQLReader(Reader): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super(MySQLReader, self).__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = MYSQL_READER_NAME def load_others(self): start_date = self.start_date stop_date = self.stop_date jdbc_url: str = self.parameter[DS_MYSQL_JDBC_URL] table_name = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_TABLE) connection_element = {} if jdbc_url.__contains__(','): # 多个jdbc url,不考虑有无db_name connection_element[DS_MYSQL_JDBC_URL] = jdbc_url.split(',') else: db_name = None if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_DATABASE): db_name = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_DATABASE) # 单个jdbc url matcher = re.search('jdbc:mysql://(.+?)/(.+)', jdbc_url) if matcher: # 带数据库名的jdbc url(例如jdbc:mysql://host/db_a) default_db = matcher.group(2) connection_element[DS_MYSQL_JDBC_URL] = [jdbc_url] # 假设db_b, db_c, db_d if db_name: # 替换成多个新数据库 connection_element[DS_MYSQL_JDBC_URL] = [ jdbc_url.replace(default_db, db) for db in db_name.split(',') ] else: self.check_config(MYSQL_READER_PARAMETER_DATABASE, db_name) # 不带数据库的jdbc url,需传入db_name if not jdbc_url.endswith('/'): jdbc_url = f'{jdbc_url}/' connection_element[DS_MYSQL_JDBC_URL] = [f'{jdbc_url}{db}' for db in db_name.split(',')] if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_QUERY_SQL): query_sql = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_QUERY_SQL) if query_sql: # 有查询语句 connection_element[MYSQL_READER_PARAMETER_QUERY_SQL] = query_sql.split(';') else: # 没有查询语句需指定table self.check_config(MYSQL_READER_PARAMETER_TABLE, table_name) # 不管有没有提供table_name,都可以写到json里,有优先级做控制 connection_element[MYSQL_READER_PARAMETER_TABLE] = table_name.split(',') connection = [connection_element] self.parameter[MYSQL_READER_PARAMETER_CONNECTION] = connection # 删除parameter中的jdbcUrl del self.parameter[DS_MYSQL_JDBC_URL] where = '' utc = 0 if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_WHERE): where = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_WHERE) if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_UTC): utc = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_UTC) if where: if utc: start_dt_str = local_2_utc(start_date).strftime('%Y-%m-%d %H:%M:%S') stop_dt_str = local_2_utc(stop_date).strftime('%Y-%m-%d %H:%M:%S') specific_start_dt_str = local_2_utc(start_date).strftime('%Y-%m-%d') specific_stop_dt_str = local_2_utc(stop_date).strftime('%Y-%m-%d') else: start_dt_str = parse_datetime(start_date).strftime('%Y-%m-%d %H:%M:%S') stop_dt_str = parse_datetime(stop_date).strftime('%Y-%m-%d %H:%M:%S') specific_start_dt_str = parse_datetime(start_date).strftime('%Y-%m-%d') specific_stop_dt_str = parse_datetime(stop_date).strftime('%Y-%m-%d') where = where.replace('${start_date}', start_dt_str) where = where.replace('${stop_date}', stop_dt_str) where = where.replace('${start_specific_date}', specific_start_dt_str) where = where.replace('${stop_specific_date}', specific_stop_dt_str) # 此处为了统一(utc和北京时间的区别),使用self.start_date判断,而不是start_date if self.start_date.startswith(ALL_DATA_DATE): self.parameter[MYSQL_READER_PARAMETER_WHERE] = '' else: self.parameter[MYSQL_READER_PARAMETER_WHERE] = where def load_column(self): columns = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_COLUMN).split(',') self.parameter[MYSQL_READER_PARAMETER_COLUMN] = columns @staticmethod def generate_definition(database: str, table_name: str, table_comment: str, column_names: List[str], column_types: Dict[str, str], datasource_group: str, incremental: str, inc_col: str) -> str: column = [] for col_name in column_names: if MYSQL_KEYWORDS.__contains__(col_name): column.append(f'`{col_name}`') else: column.append(col_name) column_type = [] for col_name in column_names: if column_types.__contains__(col_name): column_type.append(f'{col_name}:{column_types[col_name]}') definition = [ f'# {table_name}: {table_comment}', '[reader]', f'dataSource = {datasource_group}/mysql-{database}', f'table = {table_name}', f'column = {",".join(column)}', f'columnType = {",".join(column_type)}', ";全量抽取需传递19700101给参数`start_date`,或直接注释下面的条件;增量抽取请取消注释(有update的表不适用)" ";以具体日期传入当参数时,需要将开始日期参数声明为`start_specific_date`,取短日期范围时,需要同时将结束日期声明为`stop_specific_date`" ] if incremental is not None and incremental: definition.append(f"where = {inc_col} >= '%s' and {inc_col} < '%s'" % ('${start_date}', '${stop_date}')) else: definition.append(f";where = {inc_col} >= '%s' and {inc_col} < '%s'" % ('${start_date}', '${stop_date}')) definition.append(';crm大部分表的时间都是+00(格林威治时间),而实际传入的时间是+08(北京时间),所有通过此字段统一') definition.append(';utc=1 表示原始表是格林威治时间,utc=0或为空 表示原始表为北京时间') definition.append('utc =') return '\n'.join(definition)