| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- # -*- coding:utf-8 -*-
- import re
- from configparser import ConfigParser
- from typing import List, Dict
- from dw_base.datax.plugins.writer.writer import Writer
- # kafka writer
- KAFKA_WRITER_NAME = 'kafka-writer'
- KAFKA_WRITER_PARAMETER_BROKERS = 'brokers'
- KAFKA_WRITER_PARAMETER_EXTRA_CONFIG = 'extraConfig'
- KAFKA_WRITER_PARAMETER_TOPIC = 'topic'
- KAFKA_WRITER_PARAMETER_KEY = 'key'
- KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING = 'columnMapping'
- KAFKA_WRITER_SOURCE_NAME = 'sourceName'
- class KafkaWriter(Writer):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super().__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = KAFKA_WRITER_NAME
- def load_others(self):
- self.parameter[KAFKA_WRITER_PARAMETER_TOPIC] = self.config_parser.get(self.plugin_type,
- KAFKA_WRITER_PARAMETER_TOPIC)
- self.parameter[KAFKA_WRITER_PARAMETER_KEY] = self.config_parser.get(self.plugin_type,
- KAFKA_WRITER_PARAMETER_KEY)
- extra_config_raw = self.config_parser.get(self.plugin_type, KAFKA_WRITER_PARAMETER_EXTRA_CONFIG)
- extra_config = {}
- for kv in extra_config_raw.split(','):
- splits = kv.split(':')
- if len(splits) != 2:
- continue
- extra_config[splits[0]] = splits[1]
- self.parameter[KAFKA_WRITER_PARAMETER_EXTRA_CONFIG] = extra_config
- if self.config_parser.has_option(self.plugin_type, KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING):
- self.parameter[KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING] = self.config_parser.get(self.plugin_type,
- KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING)
- else:
- self.parameter[KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING] = ''
- if self.config_parser.has_option(self.plugin_type, KAFKA_WRITER_SOURCE_NAME):
- self.parameter[KAFKA_WRITER_SOURCE_NAME] = self.config_parser.get(self.plugin_type,
- KAFKA_WRITER_SOURCE_NAME)
- else:
- self.parameter[KAFKA_WRITER_SOURCE_NAME] = ''
- @staticmethod
- def generate_definition(kafka_ds_name: str,
- kafka_topic: str,
- kafka_key: str,
- source_name: str,
- column_names: List[str],
- column_types: Dict[str, str],
- column_mapping: str) -> str:
- column_type = []
- column_name_mapping = {'pid': 'pid:ID', 'id': 'pid:ID', 'esId': 'es_id:esId'}
- column_type_mapping = ['string', 'double', 'int', 'long', 'bigint', 'boolean']
- for col_name in column_names:
- if column_name_mapping.keys().__contains__(col_name):
- column_type.append(column_name_mapping.get(col_name))
- continue
- if column_types.keys().__contains__(col_name):
- col_type = column_types.get(col_name)
- if col_type.lower() in column_type_mapping:
- column_type.append(f'{col_name}:{col_type.upper()}')
- else:
- if re.search(r'array<struct|array<string|struct<', col_type.lower()):
- column_type.append(f'{col_name}:{col_type.replace(",", "#").replace(":", "@")}')
- else:
- column_type.append(f'{col_name}:JSON')
- definition = [
- '[writer]',
- 'dataSource = %s' % kafka_ds_name,
- f'topic = {kafka_topic}',
- f'key = {kafka_key}',
- f'sourceName = {source_name}',
- f'column = {",".join(column_names)}',
- f'columnType = {",".join(column_type)}',
- f'columnMapping = {column_mapping}',
- f'extraConfig = auto.commit.interval.ms:5000',
- ]
- return '\n'.join(definition)
|