| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- # -*- coding:utf-8 -*-
- import re
- from configparser import ConfigParser
- from dw_base.datax.datax_constants import *
- from dw_base.datax.plugins.reader.reader import Reader
- # PostgreSQL reader
- from dw_base.datax.plugins.writer.postgresql_writer import POSTGRE_SQL_WRITER_PARAMETER_COLUMN, \
- POSTGRE_SQL_WRITER_PARAMETER_CONNECTION, POSTGRE_SQL_WRITER_PARAMETER_DATABASE, POSTGRE_SQL_WRITER_PARAMETER_TABLE
- POSTGRE_SQL_READER_NAME = 'postgresqlreader'
- POSTGRE_SQL_READER_PARAMETER_CONNECTION = 'connection'
- POSTGRE_SQL_READER_PARAMETER_DATABASE = 'database'
- POSTGRE_SQL_READER_PARAMETER_FETCH_SIZE = 'fetchSize'
- POSTGRE_SQL_READER_PARAMETER_QUERY_SQL = 'querySql'
- POSTGRE_SQL_READER_PARAMETER_TABLE = 'table'
- POSTGRE_SQL_READER_PARAMETER_COLUMN = 'column'
- POSTGRE_SQL_READER_PARAMETER_WHERE = 'where'
- POSTGRE_SQL_READER_PARAMETER_SPLIT_PK = 'splitPk'
- class PostgreSQLReader(Reader):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super(PostgreSQLReader, self).__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = POSTGRE_SQL_READER_NAME
- def load_others(self):
- start_date = self.start_date
- stop_date = self.stop_date
- database = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_DATABASE)
- self.check_config(POSTGRE_SQL_WRITER_PARAMETER_DATABASE, database)
- table = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_TABLE)
- self.check_config(POSTGRE_SQL_WRITER_PARAMETER_TABLE, table)
- fetch_size = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_FETCH_SIZE) or '1000'
- self.parameter[POSTGRE_SQL_READER_PARAMETER_FETCH_SIZE] = fetch_size
- split_pk = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_SPLIT_PK)
- self.parameter[POSTGRE_SQL_READER_PARAMETER_SPLIT_PK] = split_pk
- where = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_WHERE)
- where = where.replace('${start_date}', start_date)
- where = where.replace('${start-date}', start_date)
- where = where.replace('${stop_date}', stop_date)
- where = where.replace('${stop-date}', stop_date)
- self.parameter[POSTGRE_SQL_READER_PARAMETER_WHERE] = where
- jdbc_url: str = self.parameter[DS_POSTGRE_SQL_JDBC_URL]
- matcher = re.search('jdbc:postgresql://(.+?)/(.+)', jdbc_url)
- if matcher:
- if database:
- jdbc_url = jdbc_url.replace(matcher.group(2), database)
- elif jdbc_url.endswith('/'):
- jdbc_url = f'{jdbc_url}{database}'
- else:
- jdbc_url = f'{jdbc_url}/{database}'
- query_sql = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_QUERY_SQL)
- # 优先级:手写 querySql > [mask] 段自动生成 > table 透传
- if not query_sql and self.config_parser.has_section('mask'):
- from dw_base.datax.mask import build_query_sql
- columns_list = [c.strip() for c in self.config_parser.get(
- self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_COLUMN).split(',')]
- mask_config = dict(self.config_parser.items('mask'))
- query_sql = build_query_sql('postgresql', columns_list, mask_config, table, where)
- query_sql = query_sql.replace('${start_date}', start_date)
- query_sql = query_sql.replace('${start-date}', start_date)
- query_sql = query_sql.replace('${stop_date}', stop_date)
- query_sql = query_sql.replace('${stop-date}', stop_date)
- if query_sql:
- connection = {
- DS_POSTGRE_SQL_JDBC_URL: jdbc_url.split(','),
- POSTGRE_SQL_READER_PARAMETER_QUERY_SQL: query_sql.split(';')
- }
- else:
- connection = {
- DS_POSTGRE_SQL_JDBC_URL: jdbc_url.split(','),
- POSTGRE_SQL_READER_PARAMETER_TABLE: table.split(',')
- }
- self.parameter[POSTGRE_SQL_WRITER_PARAMETER_CONNECTION] = [connection]
- del self.parameter[DS_POSTGRE_SQL_JDBC_URL]
- def load_column(self):
- columns = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_COLUMN).split(',')
- self.parameter[POSTGRE_SQL_WRITER_PARAMETER_COLUMN] = columns
|