# -*- coding:utf-8 -*- import re from configparser import ConfigParser from dw_base.datax.datax_constants import * from dw_base.datax.plugins.writer.writer import Writer # mysql writer MYSQL_WRITER_NAME = 'mysqlwriter' MYSQL_WRITER_PARAMETER_BATCH_SIZE = 'batchSize' MYSQL_WRITER_PARAMETER_CONNECTION = 'connection' MYSQL_WRITER_PARAMETER_COLUMN = 'column' MYSQL_WRITER_PARAMETER_DATABASE = 'database' MYSQL_WRITER_PARAMETER_POST_SQL = 'postSql' MYSQL_WRITER_PARAMETER_PRE_SQL = 'preSql' MYSQL_WRITER_PARAMETER_TABLE = 'table' MYSQL_WRITER_PARAMETER_WRITE_MODE = 'writeMode' class MySQLWriter(Writer): def __init__(self, base_dir: str, config_parser: ConfigParser, start_time: str = None, stop_time: str = None): super(MySQLWriter, self).__init__(base_dir, config_parser, start_time, stop_time) self.plugin_name = MYSQL_WRITER_NAME def load_others(self): database = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_DATABASE) self.check_config(MYSQL_WRITER_PARAMETER_DATABASE, database) table = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_TABLE) self.check_config(MYSQL_WRITER_PARAMETER_TABLE, table) jdbc_url: str = self.parameter[DS_MYSQL_JDBC_URL] matcher = re.search('jdbc:mysql://(.+?)/(.+)', jdbc_url) pre_sql = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_PRE_SQL) if matcher: jdbc_url = jdbc_url.replace(matcher.group(2), database) elif jdbc_url.endswith('/'): jdbc_url = f'{jdbc_url}{database}' else: jdbc_url = f'{jdbc_url}/{database}' if pre_sql.__contains__('${dt}'): pre_sql = pre_sql.replace('${dt}', self.start_date) connection = { DS_MYSQL_JDBC_URL: f'{jdbc_url}?useSSL=false', MYSQL_WRITER_PARAMETER_TABLE: table.split(',') } self.parameter[MYSQL_WRITER_PARAMETER_CONNECTION] = [connection] del self.parameter[DS_MYSQL_JDBC_URL] self.parameter[MYSQL_WRITER_PARAMETER_POST_SQL] = \ self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_POST_SQL).split(';') or [] self.parameter[MYSQL_WRITER_PARAMETER_PRE_SQL] = \ pre_sql.split(';') or [] self.parameter[MYSQL_WRITER_PARAMETER_WRITE_MODE] = \ self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_WRITE_MODE) or 'insert' self.parameter[MYSQL_WRITER_PARAMETER_BATCH_SIZE] = \ self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_BATCH_SIZE) or '1024' def load_column(self): columns = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_COLUMN).split(',') self.parameter[MYSQL_WRITER_PARAMETER_COLUMN] = columns