# -*- coding:utf-8 -*- from configparser import ConfigParser from typing import List, Dict from dw_base.datax.plugins.writer.writer import Writer # mongo writer MONGO_WRITER_NAME = 'mongodbwriter' MONGO_WRITER_PARAMETER_DB_NAME = 'dbName' MONGO_WRITER_PARAMETER_COLLECTION_NAME = 'collectionName' MONGO_SPECIAL_WORDS_DICT = { 'company_name': 'COMPANYNAME', 'pid': 'PID', 'uncid': 'UNCID', 'unc_id': 'UNCID', 'url': 'URL', 'url_desc': 'URL_DESC', 'web_name': 'WEBNAME', } # isReplace, replaceKey MONGO_WRITER_PARAMETER_WRITE_MODE = 'writeMode' class MongoWriter(Writer): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super(MongoWriter, self).__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = MONGO_WRITER_NAME def load_others(self): db_name = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_DB_NAME) self.check_config(MONGO_WRITER_PARAMETER_DB_NAME, db_name) self.parameter[MONGO_WRITER_PARAMETER_DB_NAME] = db_name collection_name = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_COLLECTION_NAME) self.check_config(MONGO_WRITER_PARAMETER_COLLECTION_NAME, collection_name) self.parameter[MONGO_WRITER_PARAMETER_COLLECTION_NAME] = collection_name upsert_info_str = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_WRITE_MODE) upsert_info = {} if upsert_info_str: for item in upsert_info_str.split(','): k, v = item.split(':') upsert_info[k] = v self.parameter[MONGO_WRITER_PARAMETER_WRITE_MODE] = upsert_info @staticmethod def generate_definition(mongo_ds_name: str, mongo_database: str, mongo_collection: str, column_names: List[str], column_types: Dict[str, str], pk_fields: List[str]) -> str: column = [] column_type = [] column_format = [] for col_name in column_names: column.append(col_name) if column_types.__contains__(col_name): curr_type = column_types.get(col_name) curr_type_upper = curr_type.upper() if curr_type_upper != 'STRING': column_type.append(f'{col_name}:{curr_type_upper}') if curr_type_upper == 'DATE': column_format.append(f'{col_name}##yyyy-MM-dd HH:mm:ss') write_mode = f'isReplace:true,replaceKey:{"_".join(pk_fields)}' definition = [ '[writer]', 'dataSource = %s' % mongo_ds_name, f'dbName = {mongo_database}', f'collectionName = {mongo_collection}', f'column = {",".join(column)}', f'columnType = {",".join(column_type)}', f'columnFormat = {",".join(column_format)}', f'writeMode = {write_mode}' ] return '\n'.join(definition)