# -*- coding:utf-8 -*- import re from configparser import ConfigParser from typing import List, Dict from dw_base.datax.plugins.writer.writer import Writer # kafka writer KAFKA_WRITER_NAME = 'kafka-writer' KAFKA_WRITER_PARAMETER_BROKERS = 'brokers' KAFKA_WRITER_PARAMETER_EXTRA_CONFIG = 'extraConfig' KAFKA_WRITER_PARAMETER_TOPIC = 'topic' KAFKA_WRITER_PARAMETER_KEY = 'key' KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING = 'columnMapping' KAFKA_WRITER_SOURCE_NAME = 'sourceName' class KafkaWriter(Writer): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super().__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = KAFKA_WRITER_NAME def load_others(self): self.parameter[KAFKA_WRITER_PARAMETER_TOPIC] = self.config_parser.get(self.plugin_type, KAFKA_WRITER_PARAMETER_TOPIC) self.parameter[KAFKA_WRITER_PARAMETER_KEY] = self.config_parser.get(self.plugin_type, KAFKA_WRITER_PARAMETER_KEY) extra_config_raw = self.config_parser.get(self.plugin_type, KAFKA_WRITER_PARAMETER_EXTRA_CONFIG) extra_config = {} for kv in extra_config_raw.split(','): splits = kv.split(':') if len(splits) != 2: continue extra_config[splits[0]] = splits[1] self.parameter[KAFKA_WRITER_PARAMETER_EXTRA_CONFIG] = extra_config if self.config_parser.has_option(self.plugin_type, KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING): self.parameter[KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING] = self.config_parser.get(self.plugin_type, KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING) else: self.parameter[KAFKA_WRITER_HIVE_ES_COLUMN_MAPPING] = '' if self.config_parser.has_option(self.plugin_type, KAFKA_WRITER_SOURCE_NAME): self.parameter[KAFKA_WRITER_SOURCE_NAME] = self.config_parser.get(self.plugin_type, KAFKA_WRITER_SOURCE_NAME) else: self.parameter[KAFKA_WRITER_SOURCE_NAME] = '' @staticmethod def generate_definition(kafka_ds_name: str, kafka_topic: str, kafka_key: str, source_name: str, column_names: List[str], column_types: Dict[str, str], column_mapping: str) -> str: column_type = [] column_name_mapping = {'pid': 'pid:ID', 'id': 'pid:ID', 'esId': 'es_id:esId'} column_type_mapping = ['string', 'double', 'int', 'long', 'bigint', 'boolean'] for col_name in column_names: if column_name_mapping.keys().__contains__(col_name): column_type.append(column_name_mapping.get(col_name)) continue if column_types.keys().__contains__(col_name): col_type = column_types.get(col_name) if col_type.lower() in column_type_mapping: column_type.append(f'{col_name}:{col_type.upper()}') else: if re.search(r'array