# -*- coding:utf-8 -*- import re from configparser import ConfigParser from dw_base.datax.datax_constants import * from dw_base.datax.plugins.reader.reader import Reader # PostgreSQL reader from dw_base.datax.plugins.writer.postgresql_writer import POSTGRE_SQL_WRITER_PARAMETER_COLUMN, \ POSTGRE_SQL_WRITER_PARAMETER_CONNECTION, POSTGRE_SQL_WRITER_PARAMETER_DATABASE, POSTGRE_SQL_WRITER_PARAMETER_TABLE POSTGRE_SQL_READER_NAME = 'postgresqlreader' POSTGRE_SQL_READER_PARAMETER_CONNECTION = 'connection' POSTGRE_SQL_READER_PARAMETER_DATABASE = 'database' POSTGRE_SQL_READER_PARAMETER_FETCH_SIZE = 'fetchSize' POSTGRE_SQL_READER_PARAMETER_QUERY_SQL = 'querySql' POSTGRE_SQL_READER_PARAMETER_TABLE = 'table' POSTGRE_SQL_READER_PARAMETER_COLUMN = 'column' POSTGRE_SQL_READER_PARAMETER_WHERE = 'where' POSTGRE_SQL_READER_PARAMETER_SPLIT_PK = 'splitPk' class PostgreSQLReader(Reader): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super(PostgreSQLReader, self).__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = POSTGRE_SQL_READER_NAME def load_others(self): start_date = self.start_date stop_date = self.stop_date database = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_DATABASE) self.check_config(POSTGRE_SQL_WRITER_PARAMETER_DATABASE, database) table = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_TABLE) self.check_config(POSTGRE_SQL_WRITER_PARAMETER_TABLE, table) fetch_size = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_FETCH_SIZE) or '1000' self.parameter[POSTGRE_SQL_READER_PARAMETER_FETCH_SIZE] = fetch_size split_pk = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_SPLIT_PK) self.parameter[POSTGRE_SQL_READER_PARAMETER_SPLIT_PK] = split_pk where = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_WHERE) where = where.replace('${start_date}', start_date) where = where.replace('${start-date}', start_date) where = where.replace('${stop_date}', stop_date) where = where.replace('${stop-date}', stop_date) self.parameter[POSTGRE_SQL_READER_PARAMETER_WHERE] = where jdbc_url: str = self.parameter[DS_POSTGRE_SQL_JDBC_URL] matcher = re.search('jdbc:postgresql://(.+?)/(.+)', jdbc_url) if matcher: if database: jdbc_url = jdbc_url.replace(matcher.group(2), database) elif jdbc_url.endswith('/'): jdbc_url = f'{jdbc_url}{database}' else: jdbc_url = f'{jdbc_url}/{database}' query_sql = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_QUERY_SQL) query_sql = query_sql.replace('${start_date}', start_date) query_sql = query_sql.replace('${start-date}', start_date) query_sql = query_sql.replace('${stop_date}', stop_date) query_sql = query_sql.replace('${stop-date}', stop_date) if query_sql: connection = { DS_POSTGRE_SQL_JDBC_URL: jdbc_url.split(','), POSTGRE_SQL_READER_PARAMETER_QUERY_SQL: query_sql } else: connection = { DS_POSTGRE_SQL_JDBC_URL: jdbc_url.split(','), POSTGRE_SQL_READER_PARAMETER_TABLE: table.split(',') } self.parameter[POSTGRE_SQL_WRITER_PARAMETER_CONNECTION] = [connection] del self.parameter[DS_POSTGRE_SQL_JDBC_URL] def load_column(self): columns = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_COLUMN).split(',') self.parameter[POSTGRE_SQL_WRITER_PARAMETER_COLUMN] = columns