postgresql_writer.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # -*- coding:utf-8 -*-
  2. import re
  3. from configparser import ConfigParser
  4. from dw_base.datax.datax_constants import *
  5. from dw_base.datax.plugins.writer.writer import Writer
  6. # postgresql writer
  7. POSTGRE_SQL_WRITER_NAME = 'postgresqlwriter'
  8. POSTGRE_SQL_WRITER_PARAMETER_BATCH_SIZE = 'batchSize'
  9. POSTGRE_SQL_WRITER_PARAMETER_CONNECTION = 'connection'
  10. POSTGRE_SQL_WRITER_PARAMETER_COLUMN = 'column'
  11. POSTGRE_SQL_WRITER_PARAMETER_DATABASE = 'database'
  12. POSTGRE_SQL_WRITER_PARAMETER_POST_SQL = 'postSql'
  13. POSTGRE_SQL_WRITER_PARAMETER_PRE_SQL = 'preSql'
  14. POSTGRE_SQL_WRITER_PARAMETER_TABLE = 'table'
  15. POSTGRE_SQL_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
  16. class PostgreSQLWriter(Writer):
  17. def __init__(self, base_dir: str, config_parser: ConfigParser, start_time: str = None, stop_time: str = None):
  18. super(PostgreSQLWriter, self).__init__(base_dir, config_parser, start_time, stop_time)
  19. self.plugin_name = POSTGRE_SQL_WRITER_NAME
  20. def load_others(self):
  21. database = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_DATABASE)
  22. self.check_config(POSTGRE_SQL_WRITER_PARAMETER_DATABASE, database)
  23. table = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_TABLE)
  24. self.check_config(POSTGRE_SQL_WRITER_PARAMETER_TABLE, table)
  25. jdbc_url: str = self.parameter[DS_POSTGRE_SQL_JDBC_URL]
  26. matcher = re.search('jdbc:postgresql://(.+?)/(.+)', jdbc_url)
  27. if matcher:
  28. if database:
  29. jdbc_url = jdbc_url.replace(matcher.group(2), database)
  30. elif jdbc_url.endswith('/'):
  31. jdbc_url = f'{jdbc_url}{database}'
  32. else:
  33. jdbc_url = f'{jdbc_url}/{database}'
  34. connection = {
  35. DS_POSTGRE_SQL_JDBC_URL: jdbc_url,
  36. POSTGRE_SQL_WRITER_PARAMETER_TABLE: table.split(',')
  37. }
  38. self.parameter[POSTGRE_SQL_WRITER_PARAMETER_CONNECTION] = [connection]
  39. del self.parameter[DS_POSTGRE_SQL_JDBC_URL]
  40. self.parameter[POSTGRE_SQL_WRITER_PARAMETER_POST_SQL] = \
  41. self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_POST_SQL).split(';') or []
  42. self.parameter[POSTGRE_SQL_WRITER_PARAMETER_PRE_SQL] = \
  43. self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_PRE_SQL).split(';') or []
  44. self.parameter[POSTGRE_SQL_WRITER_PARAMETER_WRITE_MODE] = \
  45. self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_WRITE_MODE) or 'insert'
  46. self.parameter[POSTGRE_SQL_WRITER_PARAMETER_BATCH_SIZE] = \
  47. self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_BATCH_SIZE) or '1024'
  48. def load_column(self):
  49. columns = self.config_parser.get(self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_COLUMN).split(',')
  50. self.parameter[POSTGRE_SQL_WRITER_PARAMETER_COLUMN] = columns