| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- # -*- coding:utf-8 -*-
- from configparser import ConfigParser
- from dw_base.datax.datax_constants import *
- from dw_base.datax.plugins.reader.reader import Reader
- from dw_base.utils.datetime_utils import parse_datetime, date_to_timestamp
- # mongo reader
- MONGO_READER_NAME = 'mongodbreader'
- MONGO_READER_PARAMETER_COLLECTION_NAME = 'collectionName'
- MONGO_READER_PARAMETER_DB_NAME = 'dbName'
- MONGO_READER_PARAMETER_QUERY = 'query'
- class MongoReader(Reader):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super(MongoReader, self).__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = MONGO_READER_NAME
- def load_others(self):
- db_name = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_DB_NAME)
- self.check_config(MONGO_READER_PARAMETER_DB_NAME, db_name)
- collection_name = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_COLLECTION_NAME)
- self.check_config(MONGO_READER_PARAMETER_COLLECTION_NAME, collection_name)
- self.parameter[MONGO_READER_PARAMETER_DB_NAME] = db_name
- self.parameter[MONGO_READER_PARAMETER_COLLECTION_NAME] = collection_name
- if self.start_date == ALL_DATA_DATE:
- self.parameter[MONGO_READER_PARAMETER_QUERY] = '{}'
- else:
- query = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_QUERY)
- if query:
- if 'ObjectId' in query:
- start_dt_str = hex(int(date_to_timestamp(self.start_date)))[2:] + '0000000000000000'
- stop_dt_str = hex(int(date_to_timestamp(self.stop_date)))[2:] + '0000000000000000'
- else:
- start_dt_str = parse_datetime(self.start_date).strftime('%Y-%m-%d')
- stop_dt_str = parse_datetime(self.stop_date).strftime('%Y-%m-%d')
- query = query.replace('${start_date}', start_dt_str)
- query = query.replace('${stop_date}', stop_dt_str)
- self.parameter[MONGO_READER_PARAMETER_QUERY] = query
|