| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- # -*- coding:utf-8 -*-
- from configparser import ConfigParser
- from typing import List, Dict
- from dw_base.datax.plugins.writer.writer import Writer
- # mongo writer
- MONGO_WRITER_NAME = 'mongodbwriter'
- MONGO_WRITER_PARAMETER_DB_NAME = 'dbName'
- MONGO_WRITER_PARAMETER_COLLECTION_NAME = 'collectionName'
- MONGO_SPECIAL_WORDS_DICT = {
- 'company_name': 'COMPANYNAME',
- 'pid': 'PID',
- 'uncid': 'UNCID',
- 'unc_id': 'UNCID',
- 'url': 'URL',
- 'url_desc': 'URL_DESC',
- 'web_name': 'WEBNAME',
- }
- # isReplace, replaceKey
- MONGO_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
- class MongoWriter(Writer):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super(MongoWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = MONGO_WRITER_NAME
- def load_others(self):
- db_name = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_DB_NAME)
- self.check_config(MONGO_WRITER_PARAMETER_DB_NAME, db_name)
- self.parameter[MONGO_WRITER_PARAMETER_DB_NAME] = db_name
- collection_name = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_COLLECTION_NAME)
- self.check_config(MONGO_WRITER_PARAMETER_COLLECTION_NAME, collection_name)
- self.parameter[MONGO_WRITER_PARAMETER_COLLECTION_NAME] = collection_name
- upsert_info_str = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_WRITE_MODE)
- upsert_info = {}
- if upsert_info_str:
- for item in upsert_info_str.split(','):
- k, v = item.split(':')
- upsert_info[k] = v
- self.parameter[MONGO_WRITER_PARAMETER_WRITE_MODE] = upsert_info
- @staticmethod
- def generate_definition(mongo_ds_name: str, mongo_database: str, mongo_collection: str,
- column_names: List[str], column_types: Dict[str, str], pk_fields: List[str]) -> str:
- column = []
- column_type = []
- column_format = []
- for col_name in column_names:
- column.append(col_name)
- if column_types.__contains__(col_name):
- curr_type = column_types.get(col_name)
- curr_type_upper = curr_type.upper()
- if curr_type_upper != 'STRING':
- column_type.append(f'{col_name}:{curr_type_upper}')
- if curr_type_upper == 'DATE':
- column_format.append(f'{col_name}##yyyy-MM-dd HH:mm:ss')
- write_mode = f'isReplace:true,replaceKey:{"_".join(pk_fields)}'
- definition = [
- '[writer]',
- 'dataSource = %s' % mongo_ds_name,
- f'dbName = {mongo_database}',
- f'collectionName = {mongo_collection}',
- f'column = {",".join(column)}',
- f'columnType = {",".join(column_type)}',
- f'columnFormat = {",".join(column_format)}',
- f'writeMode = {write_mode}'
- ]
- return '\n'.join(definition)
|