| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- # -*- coding:utf-8 -*-
- import re
- from configparser import ConfigParser
- from typing import Dict, List
- from dw_base.common.template_constants import MYSQL_HIVE_CREATE_TABLE_TEMPLATE, MYSQL_HIVE_HBASE_CREATE_TABLE_TEMPLATE
- from dw_base.database.mysql_utils import MySQLColumn
- from dw_base.datax.datax_constants import *
- from dw_base.datax.plugins.reader.reader import Reader
- from dw_base.utils.datetime_utils import local_2_utc, parse_datetime
- from dw_base.utils.file_utils import read_file_content
- # mysql reader
- MYSQL_READER_NAME = 'mysqlreader'
- MYSQL_READER_PARAMETER_COLUMN = 'column'
- MYSQL_READER_PARAMETER_CONNECTION = 'connection'
- MYSQL_READER_PARAMETER_DATABASE = 'database'
- MYSQL_READER_PARAMETER_QUERY_SQL = 'querySql'
- MYSQL_READER_PARAMETER_TABLE = 'table'
- MYSQL_READER_PARAMETER_UTC = 'utc'
- MYSQL_READER_PARAMETER_WHERE = 'where'
- MYSQL_KEYWORDS = ['default', 'desc', 'key', 'start', 'views', 'commit', 'time', 'add', 'admin', 'after', 'all', 'alter',
- 'analyze', 'and', 'archive', 'array', 'as', 'asc', 'window', 'with', 'year', 'when', 'where', 'while',
- 'authorization', 'before', 'between', 'bigint', 'binary', 'boolean', 'both', 'bucket', 'buckets',
- 'by', 'cascade', 'case', 'cast', 'change', 'char', 'cluster', 'clustered', 'clusterstatus',
- 'collection', 'column', 'columns', 'comment', 'compact', 'compactions', 'compute', 'concatenate',
- 'conf', 'continue', 'create', 'cross', 'cube', 'current', 'current_date', 'current_timestamp',
- 'cursor', 'data', 'database', 'databases', 'date', 'datetime', 'day', 'dbproperties', 'decimal',
- 'deferred', 'defined', 'delete', 'delimited', 'dependency', 'desc', 'describe', 'directories',
- 'directory', 'disable', 'distinct', 'distribute', 'double', 'drop', 'elem_type', 'else', 'enable',
- 'end', 'escaped', 'exchange', 'exclusive', 'exists', 'explain', 'export', 'extended', 'external',
- 'false', 'fetch', 'fields', 'file', 'fileformat', 'first', 'float', 'following', 'for', 'format',
- 'formatted', 'from', 'full', 'function', 'functions', 'grant', 'group', 'grouping', 'having',
- 'hold_ddltime', 'hour', 'idxproperties', 'if', 'ignore', 'import', 'in', 'index', 'indexes', 'inner',
- 'inpath', 'inputdriver', 'inputformat', 'insert', 'int', 'intersect', 'interval', 'into', 'is',
- 'items', 'jar', 'join', 'keys', 'key_type', 'lateral', 'left', 'less', 'like', 'limit', 'lines',
- 'load', 'local', 'location', 'lock', 'locks', 'logical', 'long', 'macro', 'map', 'mapjoin',
- 'materialized', 'minus', 'minute', 'month', 'more', 'msck', 'none', 'noscan', 'not', 'no_drop',
- 'null', 'of', 'offline', 'on', 'option', 'or', 'order', 'out', 'outer', 'outputdriver',
- 'outputformat', 'over', 'overwrite', 'owner', 'partialscan', 'partition', 'partitioned', 'partitions',
- 'percent', 'plus', 'preceding', 'preserve', 'pretty', 'principals', 'procedure', 'protection',
- 'purge', 'range', 'read', 'readonly', 'reads', 'rebuild', 'recordreader', 'recordwriter', 'reduce',
- 'regexp', 'reload', 'rename', 'repair', 'replace', 'restrict', 'revoke', 'rewrite', 'right', 'rlike',
- 'role', 'roles', 'rollup', 'row', 'rows', 'schema', 'schemas', 'second', 'select', 'semi', 'serde',
- 'serdeproperties', 'server', 'set', 'sets', 'shared', 'show', 'show_database', 'skewed', 'smallint',
- 'sort', 'sorted', 'ssl', 'statistics', 'stored', 'streamtable', 'string', 'struct', 'table', 'tables',
- 'tablesample', 'tblproperties', 'temporary', 'terminated', 'then', 'timestamp', 'tinyint', 'to',
- 'touch', 'transactions', 'transform', 'trigger', 'true', 'truncate', 'unarchive', 'unbounded', 'undo',
- 'union', 'uniontype', 'uniquejoin', 'unlock', 'unset', 'unsigned', 'update', 'uri', 'use', 'user',
- 'using', 'utc', 'utctimestamp', 'values', 'value_type', 'varchar', 'view', 'sql', 'start', 'views',
- 'time', 'admin', 'alter', 'analyze', 'archive', 'array', 'asc', 'window', 'with', 'year', 'when',
- 'where',
- 'while', 'authorization', 'bucket', 'buckets', 'by', 'cast', 'cluster', 'clustered',
- 'clusterstatus', 'collection', 'compactions', 'compute', 'concatenate', 'conf',
- 'current_timestamp', 'cursor', 'dbproperties', 'decimal', 'deferred', 'defined', 'delimited',
- 'dependency', 'directories', 'directory', 'distribute', 'elem_type', 'enable', 'end', 'exclusive',
- 'external', 'false', 'fileformat', 'following', 'format', 'formatted', 'functions', 'grouping',
- 'having', 'hold_ddltime', 'idxproperties', 'inner', 'inpath', 'inputdriver', 'inputformat',
- 'intersect',
- 'is', 'items', 'jar', 'key_type', 'lateral', 'lines', 'load', 'location', 'logical', 'macro', 'map',
- 'mapjoin', 'materialized', 'minus', 'more', 'msck', 'noscan', 'no_drop', 'null', 'of', 'offline',
- 'outputdriver', 'outputformat', 'over', 'overwrite', 'partialscan', 'partitioned',
- 'partitions', 'percent', 'plus', 'preceding', 'pretty', 'principals', 'protection', 'purge',
- 'readonly',
- 'recordreader', 'recordwriter', 'reduce', 'regexp', 'restrict', 'rewrite', 'rlike', 'role', 'roles',
- 'semi', 'serde', 'serdeproperties', 'server', 'set', 'sets', 'shared', 'show', 'show_database',
- 'skewed', 'smallint', 'sort', 'sorted', 'ssl', 'statistics', 'stored', 'streamtable', 'string',
- 'struct', 'table', 'tables', 'tablesample', 'tblproperties', 'temporary', 'terminated', 'then',
- 'timestamp', 'tinyint', 'to', 'touch', 'transactions', 'transform', 'trigger', 'true', 'truncate',
- 'unarchive', 'unbounded', 'undo', 'union', 'uniontype', 'uniquejoin', 'unlock', 'unset', 'unsigned',
- 'update', 'uri', 'use', 'user', 'using', 'utc', 'utctimestamp', 'values', 'value_type', 'varchar',
- 'view', 'release', 'leave', 'condition', 'type', 'types', 'linear', 'repeat', 'check']
- class MySQLReader(Reader):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super(MySQLReader, self).__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = MYSQL_READER_NAME
- def load_others(self):
- start_date = self.start_date
- stop_date = self.stop_date
- jdbc_url: str = self.parameter[DS_MYSQL_JDBC_URL]
- table_name = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_TABLE)
- connection_element = {}
- if jdbc_url.__contains__(','):
- # 多个jdbc url,不考虑有无db_name
- connection_element[DS_MYSQL_JDBC_URL] = jdbc_url.split(',')
- else:
- db_name = None
- if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_DATABASE):
- db_name = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_DATABASE)
- # 单个jdbc url
- matcher = re.search('jdbc:mysql://(.+?)/(.+)', jdbc_url)
- if matcher:
- # 带数据库名的jdbc url(例如jdbc:mysql://host/db_a)
- default_db = matcher.group(2)
- connection_element[DS_MYSQL_JDBC_URL] = [jdbc_url]
- # 假设db_b, db_c, db_d
- if db_name:
- # 替换成多个新数据库
- connection_element[DS_MYSQL_JDBC_URL] = [
- jdbc_url.replace(default_db, db) for db in db_name.split(',')
- ]
- else:
- self.check_config(MYSQL_READER_PARAMETER_DATABASE, db_name)
- # 不带数据库的jdbc url,需传入db_name
- if not jdbc_url.endswith('/'):
- jdbc_url = f'{jdbc_url}/'
- connection_element[DS_MYSQL_JDBC_URL] = [f'{jdbc_url}{db}' for db in db_name.split(',')]
- if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_QUERY_SQL):
- query_sql = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_QUERY_SQL)
- if query_sql:
- # 有查询语句
- connection_element[MYSQL_READER_PARAMETER_QUERY_SQL] = query_sql.split(';')
- else:
- # 没有查询语句需指定table
- self.check_config(MYSQL_READER_PARAMETER_TABLE, table_name)
- # 不管有没有提供table_name,都可以写到json里,有优先级做控制
- connection_element[MYSQL_READER_PARAMETER_TABLE] = table_name.split(',')
- connection = [connection_element]
- self.parameter[MYSQL_READER_PARAMETER_CONNECTION] = connection
- # 删除parameter中的jdbcUrl
- del self.parameter[DS_MYSQL_JDBC_URL]
- where = ''
- utc = 0
- if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_WHERE):
- where = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_WHERE)
- if self.config_parser.has_option(self.plugin_type, MYSQL_READER_PARAMETER_UTC):
- utc = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_UTC)
- if where:
- if utc:
- start_dt_str = local_2_utc(start_date).strftime('%Y-%m-%d %H:%M:%S')
- stop_dt_str = local_2_utc(stop_date).strftime('%Y-%m-%d %H:%M:%S')
- specific_start_dt_str = local_2_utc(start_date).strftime('%Y-%m-%d')
- specific_stop_dt_str = local_2_utc(stop_date).strftime('%Y-%m-%d')
- else:
- start_dt_str = parse_datetime(start_date).strftime('%Y-%m-%d %H:%M:%S')
- stop_dt_str = parse_datetime(stop_date).strftime('%Y-%m-%d %H:%M:%S')
- specific_start_dt_str = parse_datetime(start_date).strftime('%Y-%m-%d')
- specific_stop_dt_str = parse_datetime(stop_date).strftime('%Y-%m-%d')
- where = where.replace('${start_date}', start_dt_str)
- where = where.replace('${stop_date}', stop_dt_str)
- where = where.replace('${start_specific_date}', specific_start_dt_str)
- where = where.replace('${stop_specific_date}', specific_stop_dt_str)
- # 此处为了统一(utc和北京时间的区别),使用self.start_date判断,而不是start_date
- if self.start_date.startswith(ALL_DATA_DATE):
- self.parameter[MYSQL_READER_PARAMETER_WHERE] = ''
- else:
- self.parameter[MYSQL_READER_PARAMETER_WHERE] = where
- def load_column(self):
- columns = self.config_parser.get(self.plugin_type, MYSQL_READER_PARAMETER_COLUMN).split(',')
- self.parameter[MYSQL_READER_PARAMETER_COLUMN] = columns
- @staticmethod
- def generate_definition(database: str, table_name: str, table_comment: str,
- column_names: List[str], column_types: Dict[str, str],
- datasource_group: str, incremental: str, inc_col: str) -> str:
- column = []
- for col_name in column_names:
- if MYSQL_KEYWORDS.__contains__(col_name):
- column.append(f'`{col_name}`')
- else:
- column.append(col_name)
- column_type = []
- for col_name in column_names:
- if column_types.__contains__(col_name):
- column_type.append(f'{col_name}:{column_types[col_name]}')
- definition = [
- f'# {table_name}: {table_comment}',
- '[reader]',
- f'dataSource = {datasource_group}/mysql-{database}',
- f'table = {table_name}',
- f'column = {",".join(column)}',
- f'columnType = {",".join(column_type)}',
- ";全量抽取需传递19700101给参数`start_date`,或直接注释下面的条件;增量抽取请取消注释(有update的表不适用)"
- ";以具体日期传入当参数时,需要将开始日期参数声明为`start_specific_date`,取短日期范围时,需要同时将结束日期声明为`stop_specific_date`"
- ]
- if incremental is not None and incremental:
- definition.append(f"where = {inc_col} >= '%s' and {inc_col} < '%s'" % ('${start_date}', '${stop_date}'))
- else:
- definition.append(f";where = {inc_col} >= '%s' and {inc_col} < '%s'" % ('${start_date}', '${stop_date}'))
- definition.append(';crm大部分表的时间都是+00(格林威治时间),而实际传入的时间是+08(北京时间),所有通过此字段统一')
- definition.append(';utc=1 表示原始表是格林威治时间,utc=0或为空 表示原始表为北京时间')
- definition.append('utc =')
- return '\n'.join(definition)
- @staticmethod
- def generate_hive_ddl(hive_database_name: str,
- hive_table_name: str,
- table_comment: str,
- partitioned: bool,
- columns: List[MySQLColumn],
- column_types: Dict[str, str]) -> str:
- columns_definition = []
- partition_def = ''
- for column in columns:
- column_name = column.COLUMN_NAME
- column_comment = column.COLUMN_COMMENT
- if MYSQL_KEYWORDS.__contains__(column_name):
- column_name = str(f'`{column_name}`')
- if column_types.__contains__(column_name):
- column_type = str(column_types[column_name]).upper()
- else:
- column_type = "STRING"
- columns_definition.append(f"{column_name} {column_type} COMMENT '{column_comment}'")
- if partitioned is not None and partitioned:
- partition_def = '\nPARTITIONED BY (dt STRING)'
- ddl = read_file_content(MYSQL_HIVE_CREATE_TABLE_TEMPLATE).format(
- hive_database_name, hive_table_name, hive_database_name, hive_table_name,
- ',\n'.join(columns_definition), table_comment, partition_def
- )
- return ddl
- @staticmethod
- def generate_hive_over_hbase_ddl(hive_database_name: str,
- hive_table_name: str,
- table_comment: str,
- hbase_namespace: str,
- hbase_table_name: str,
- columns: List[MySQLColumn],
- column_types: Dict[str, str]) -> str:
- columns_definition = []
- hbase_column_mapping_definition = []
- partition_def = ''
- for column in columns:
- column_name = column.COLUMN_NAME
- column_comment = column.COLUMN_COMMENT
- if MYSQL_KEYWORDS.__contains__(column_name):
- column_name = str(f'`{column_name}`')
- if column_types.__contains__(column_name):
- column_type = str(column_types[column_name]).upper()
- else:
- column_type = "STRING"
- columns_definition.append(f"{column_name} {column_type} COMMENT '{column_comment}'")
- hbase_column_mapping_definition.append(f"cf:{column_name}")
- ddl_template = read_file_content(MYSQL_HIVE_HBASE_CREATE_TABLE_TEMPLATE)
- ddl = ddl_template.format(
- hive_database_name, hive_table_name, hive_database_name, hive_table_name,
- ',\n'.join(columns_definition), table_comment, partition_def,
- ',\n'.join(hbase_column_mapping_definition), hbase_namespace, hbase_table_name
- )
- return ddl
|