clickhouse_reader.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # -*- coding:utf-8 -*-
  2. import re
  3. from configparser import ConfigParser
  4. from dw_base.datax.datax_constants import *
  5. from dw_base.datax.plugins.reader.reader import Reader
  6. # ClickHouse reader
  7. CLICK_HOUSE_READER_NAME = 'clickhousereader'
  8. CLICK_HOUSE_READER_PARAMETER_CONNECTION = 'connection'
  9. CLICK_HOUSE_READER_PARAMETER_DATABASE = 'database'
  10. CLICK_HOUSE_READER_PARAMETER_FETCH_SIZE = 'fetchSize'
  11. CLICK_HOUSE_READER_PARAMETER_QUERY_SQL = 'querySql'
  12. CLICK_HOUSE_READER_PARAMETER_TABLE = 'table'
  13. CLICK_HOUSE_READER_PARAMETER_COLUMN = 'column'
  14. CLICK_HOUSE_READER_PARAMETER_WHERE = 'where'
  15. CLICK_HOUSE_READER_PARAMETER_SPLIT_PK = 'splitPk'
  16. class ClickHouseReader(Reader):
  17. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  18. super(ClickHouseReader, self).__init__(base_dir, config_parser, start_date, stop_date)
  19. self.plugin_name = CLICK_HOUSE_READER_NAME
  20. def load_others(self):
  21. start_date = self.start_date
  22. stop_date = self.stop_date
  23. database = self.config_parser.get(self.plugin_type, CLICK_HOUSE_READER_PARAMETER_DATABASE)
  24. self.check_config(CLICK_HOUSE_READER_PARAMETER_DATABASE, database)
  25. table = self.config_parser.get(self.plugin_type, CLICK_HOUSE_READER_PARAMETER_TABLE)
  26. self.check_config(CLICK_HOUSE_READER_PARAMETER_TABLE, table)
  27. fetch_size = self.config_parser.get(self.plugin_type, CLICK_HOUSE_READER_PARAMETER_FETCH_SIZE) or '1000'
  28. self.parameter[CLICK_HOUSE_READER_PARAMETER_FETCH_SIZE] = fetch_size
  29. split_pk = self.config_parser.get(self.plugin_type, CLICK_HOUSE_READER_PARAMETER_SPLIT_PK)
  30. self.parameter[CLICK_HOUSE_READER_PARAMETER_SPLIT_PK] = split_pk
  31. where = self.config_parser.get(self.plugin_type, CLICK_HOUSE_READER_PARAMETER_WHERE)
  32. where = where.replace('${start_date}', start_date)
  33. where = where.replace('${start-date}', start_date)
  34. where = where.replace('${stop_date}', stop_date)
  35. where = where.replace('${stop-date}', stop_date)
  36. self.parameter[CLICK_HOUSE_READER_PARAMETER_WHERE] = where
  37. jdbc_url: str = self.parameter[DS_CLICK_HOUSE_JDBC_URL]
  38. matcher = re.search('jdbc:postgresql://(.+?)/(.+)', jdbc_url)
  39. if matcher:
  40. if database:
  41. jdbc_url = jdbc_url.replace(matcher.group(2), database)
  42. elif jdbc_url.endswith('/'):
  43. jdbc_url = f'{jdbc_url}{database}'
  44. else:
  45. jdbc_url = f'{jdbc_url}/{database}'
  46. query_sql = self.config_parser.get(self.plugin_type, CLICK_HOUSE_READER_PARAMETER_QUERY_SQL)
  47. query_sql = query_sql.replace('${start_date}', start_date)
  48. query_sql = query_sql.replace('${start-date}', start_date)
  49. query_sql = query_sql.replace('${stop_date}', stop_date)
  50. query_sql = query_sql.replace('${stop-date}', stop_date)
  51. if query_sql:
  52. connection = {
  53. DS_CLICK_HOUSE_JDBC_URL: jdbc_url.split(','),
  54. CLICK_HOUSE_READER_PARAMETER_QUERY_SQL: query_sql
  55. }
  56. else:
  57. connection = {
  58. DS_CLICK_HOUSE_JDBC_URL: jdbc_url.split(','),
  59. CLICK_HOUSE_READER_PARAMETER_TABLE: table.split(',')
  60. }
  61. self.parameter[CLICK_HOUSE_READER_PARAMETER_CONNECTION] = [connection]
  62. del self.parameter[DS_CLICK_HOUSE_JDBC_URL]
  63. def load_column(self):
  64. columns = self.config_parser.get(self.plugin_type, CLICK_HOUSE_READER_PARAMETER_COLUMN).split(',')
  65. self.parameter[CLICK_HOUSE_READER_PARAMETER_COLUMN] = columns