mysql_writer.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. # -*- coding:utf-8 -*-
  2. import re
  3. from configparser import ConfigParser
  4. from dw_base.datax.datax_constants import *
  5. from dw_base.datax.plugins.writer.writer import Writer
  6. # mysql writer
  7. MYSQL_WRITER_NAME = 'mysqlwriter'
  8. MYSQL_WRITER_PARAMETER_BATCH_SIZE = 'batchSize'
  9. MYSQL_WRITER_PARAMETER_CONNECTION = 'connection'
  10. MYSQL_WRITER_PARAMETER_COLUMN = 'column'
  11. MYSQL_WRITER_PARAMETER_DATABASE = 'database'
  12. MYSQL_WRITER_PARAMETER_POST_SQL = 'postSql'
  13. MYSQL_WRITER_PARAMETER_PRE_SQL = 'preSql'
  14. MYSQL_WRITER_PARAMETER_TABLE = 'table'
  15. MYSQL_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
  16. class MySQLWriter(Writer):
  17. def __init__(self, base_dir: str, config_parser: ConfigParser, start_time: str = None, stop_time: str = None):
  18. super(MySQLWriter, self).__init__(base_dir, config_parser, start_time, stop_time)
  19. self.plugin_name = MYSQL_WRITER_NAME
  20. def load_others(self):
  21. database = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_DATABASE)
  22. self.check_config(MYSQL_WRITER_PARAMETER_DATABASE, database)
  23. table = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_TABLE)
  24. self.check_config(MYSQL_WRITER_PARAMETER_TABLE, table)
  25. jdbc_url: str = self.parameter[DS_MYSQL_JDBC_URL]
  26. matcher = re.search('jdbc:mysql://(.+?)/(.+)', jdbc_url)
  27. pre_sql = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_PRE_SQL)
  28. if matcher:
  29. jdbc_url = jdbc_url.replace(matcher.group(2), database)
  30. elif jdbc_url.endswith('/'):
  31. jdbc_url = f'{jdbc_url}{database}'
  32. else:
  33. jdbc_url = f'{jdbc_url}/{database}'
  34. if pre_sql.__contains__('${dt}'):
  35. pre_sql = pre_sql.replace('${dt}', self.start_date)
  36. connection = {
  37. DS_MYSQL_JDBC_URL: f'{jdbc_url}?useSSL=false',
  38. MYSQL_WRITER_PARAMETER_TABLE: table.split(',')
  39. }
  40. self.parameter[MYSQL_WRITER_PARAMETER_CONNECTION] = [connection]
  41. del self.parameter[DS_MYSQL_JDBC_URL]
  42. self.parameter[MYSQL_WRITER_PARAMETER_POST_SQL] = \
  43. self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_POST_SQL).split(';') or []
  44. self.parameter[MYSQL_WRITER_PARAMETER_PRE_SQL] = \
  45. pre_sql.split(';') or []
  46. self.parameter[MYSQL_WRITER_PARAMETER_WRITE_MODE] = \
  47. self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_WRITE_MODE) or 'insert'
  48. self.parameter[MYSQL_WRITER_PARAMETER_BATCH_SIZE] = \
  49. self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_BATCH_SIZE) or '1024'
  50. def load_column(self):
  51. columns = self.config_parser.get(self.plugin_type, MYSQL_WRITER_PARAMETER_COLUMN).split(',')
  52. self.parameter[MYSQL_WRITER_PARAMETER_COLUMN] = columns