kafka_writer.py 4.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # -*- coding:utf-8 -*-
  2. import re
  3. from configparser import ConfigParser
  4. from typing import List, Dict
  5. from dw_base.datax.plugins.writer.writer import Writer
  6. # kafka writer
  7. KAFKA_WRITER_NAME = 'kafka-writer'
  8. KAFKA_WRITER_PARAMETER_BROKERS = 'brokers'
  9. KAFKA_WRITER_PARAMETER_EXTRA_CONFIG = 'extraConfig'
  10. KAFKA_WRITER_PARAMETER_TOPIC = 'topic'
  11. KAFKA_WRITER_PARAMETER_KEY = 'key'
  12. KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING = 'columnMapping'
  13. KAFKA_WRITER_SOURCE_NAME = 'sourceName'
  14. class KafkaWriter(Writer):
  15. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  16. super().__init__(base_dir, config_parser, start_date, stop_date)
  17. self.plugin_name = KAFKA_WRITER_NAME
  18. def load_others(self):
  19. self.parameter[KAFKA_WRITER_PARAMETER_TOPIC] = self.config_parser.get(self.plugin_type,
  20. KAFKA_WRITER_PARAMETER_TOPIC)
  21. self.parameter[KAFKA_WRITER_PARAMETER_KEY] = self.config_parser.get(self.plugin_type,
  22. KAFKA_WRITER_PARAMETER_KEY)
  23. extra_config_raw = self.config_parser.get(self.plugin_type, KAFKA_WRITER_PARAMETER_EXTRA_CONFIG)
  24. extra_config = {}
  25. for kv in extra_config_raw.split(','):
  26. splits = kv.split(':')
  27. if len(splits) != 2:
  28. continue
  29. extra_config[splits[0]] = splits[1]
  30. self.parameter[KAFKA_WRITER_PARAMETER_EXTRA_CONFIG] = extra_config
  31. if self.config_parser.has_option(self.plugin_type, KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING):
  32. self.parameter[KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING] = self.config_parser.get(self.plugin_type,
  33. KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING)
  34. else:
  35. self.parameter[KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING] = ''
  36. if self.config_parser.has_option(self.plugin_type, KAFKA_WRITER_SOURCE_NAME):
  37. self.parameter[KAFKA_WRITER_SOURCE_NAME] = self.config_parser.get(self.plugin_type,
  38. KAFKA_WRITER_SOURCE_NAME)
  39. else:
  40. self.parameter[KAFKA_WRITER_SOURCE_NAME] = ''
  41. @staticmethod
  42. def generate_definition(kafka_ds_name: str,
  43. kafka_topic: str,
  44. kafka_key: str,
  45. source_name: str,
  46. column_names: List[str],
  47. column_types: Dict[str, str],
  48. column_mapping: str) -> str:
  49. column_type = []
  50. column_name_mapping = {'pid': 'pid:ID', 'id': 'pid:ID', 'esId': 'es_id:esId'}
  51. column_type_mapping = ['string', 'double', 'int', 'long', 'bigint', 'boolean']
  52. for col_name in column_names:
  53. if column_name_mapping.keys().__contains__(col_name):
  54. column_type.append(column_name_mapping.get(col_name))
  55. continue
  56. if column_types.keys().__contains__(col_name):
  57. col_type = column_types.get(col_name)
  58. if col_type.lower() in column_type_mapping:
  59. column_type.append(f'{col_name}:{col_type.upper()}')
  60. else:
  61. if re.search(r'array<struct|array<string|struct<', col_type.lower()):
  62. column_type.append(f'{col_name}:{col_type.replace(",", "#").replace(":", "@")}')
  63. else:
  64. column_type.append(f'{col_name}:JSON')
  65. definition = [
  66. '[writer]',
  67. 'dataSource = %s' % kafka_ds_name,
  68. f'topic = {kafka_topic}',
  69. f'key = {kafka_key}',
  70. f'sourceName = {source_name}',
  71. f'column = {",".join(column_names)}',
  72. f'columnType = {",".join(column_type)}',
  73. f'columnMapping = {column_mapping}',
  74. f'extraConfig = auto.commit.interval.ms:5000',
  75. ]
  76. return '\n'.join(definition)