| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- # -*- coding:utf-8 -*-
- import re
- from configparser import ConfigParser
- from dw_base.datax.datax_constants import *
- from dw_base.datax.plugins.writer.writer import Writer
- # clickhouse writer
- CLICK_HOUSE_WRITER_NAME = 'clickhousewriter'
- CLICK_HOUSE_WRITER_PARAMETER_BATCH_SIZE = 'batchSize'
- CLICK_HOUSE_WRITER_PARAMETER_CONNECTION = 'connection'
- CLICK_HOUSE_WRITER_PARAMETER_COLUMN = 'column'
- CLICK_HOUSE_WRITER_PARAMETER_DATABASE = 'database'
- CLICK_HOUSE_WRITER_PARAMETER_POST_SQL = 'postSql'
- CLICK_HOUSE_WRITER_PARAMETER_PRE_SQL = 'preSql'
- CLICK_HOUSE_WRITER_PARAMETER_TABLE = 'table'
- CLICK_HOUSE_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
- class ClickHouseWriter(Writer):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_time: str = None, stop_time: str = None):
- super(ClickHouseWriter, self).__init__(base_dir, config_parser, start_time, stop_time)
- self.plugin_name = CLICK_HOUSE_WRITER_NAME
- def load_others(self):
- database = self.config_parser.get(self.plugin_type, CLICK_HOUSE_WRITER_PARAMETER_DATABASE)
- self.check_config(CLICK_HOUSE_WRITER_PARAMETER_DATABASE, database)
- table = self.config_parser.get(self.plugin_type, CLICK_HOUSE_WRITER_PARAMETER_TABLE)
- self.check_config(CLICK_HOUSE_WRITER_PARAMETER_TABLE, table)
- jdbc_url: str = self.parameter[DS_CLICK_HOUSE_JDBC_URL]
- matcher = re.search('jdbc:postgresql://(.+?)/(.+)', jdbc_url)
- if matcher:
- if database:
- jdbc_url = jdbc_url.replace(matcher.group(2), database)
- elif jdbc_url.endswith('/'):
- jdbc_url = f'{jdbc_url}{database}'
- else:
- jdbc_url = f'{jdbc_url}/{database}'
- connection = {
- DS_CLICK_HOUSE_JDBC_URL: jdbc_url,
- CLICK_HOUSE_WRITER_PARAMETER_TABLE: table.split(',')
- }
- self.parameter[CLICK_HOUSE_WRITER_PARAMETER_CONNECTION] = [connection]
- del self.parameter[DS_CLICK_HOUSE_JDBC_URL]
- self.parameter[CLICK_HOUSE_WRITER_PARAMETER_POST_SQL] = \
- self.config_parser.get(self.plugin_type, CLICK_HOUSE_WRITER_PARAMETER_POST_SQL).split(';') or []
- self.parameter[CLICK_HOUSE_WRITER_PARAMETER_PRE_SQL] = \
- self.config_parser.get(self.plugin_type, CLICK_HOUSE_WRITER_PARAMETER_PRE_SQL).split(';') or []
- self.parameter[CLICK_HOUSE_WRITER_PARAMETER_WRITE_MODE] = \
- self.config_parser.get(self.plugin_type, CLICK_HOUSE_WRITER_PARAMETER_WRITE_MODE) or 'insert'
- self.parameter[CLICK_HOUSE_WRITER_PARAMETER_BATCH_SIZE] = \
- self.config_parser.get(self.plugin_type, CLICK_HOUSE_WRITER_PARAMETER_BATCH_SIZE) or '1024'
- def load_column(self):
- columns = self.config_parser.get(self.plugin_type, CLICK_HOUSE_WRITER_PARAMETER_COLUMN).split(',')
- self.parameter[CLICK_HOUSE_WRITER_PARAMETER_COLUMN] = columns
|