# -*- coding:utf-8 -*- from configparser import ConfigParser from dw_base.datax.datax_constants import * from dw_base.datax.plugins.reader.reader import Reader from dw_base.utils.datetime_utils import parse_datetime, date_to_timestamp # mongo reader MONGO_READER_NAME = 'mongodbreader' MONGO_READER_PARAMETER_COLLECTION_NAME = 'collectionName' MONGO_READER_PARAMETER_DB_NAME = 'dbName' MONGO_READER_PARAMETER_QUERY = 'query' class MongoReader(Reader): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super(MongoReader, self).__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = MONGO_READER_NAME def load_others(self): db_name = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_DB_NAME) self.check_config(MONGO_READER_PARAMETER_DB_NAME, db_name) collection_name = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_COLLECTION_NAME) self.check_config(MONGO_READER_PARAMETER_COLLECTION_NAME, collection_name) self.parameter[MONGO_READER_PARAMETER_DB_NAME] = db_name self.parameter[MONGO_READER_PARAMETER_COLLECTION_NAME] = collection_name if self.start_date == ALL_DATA_DATE: self.parameter[MONGO_READER_PARAMETER_QUERY] = '{}' else: query = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_QUERY) if query: if 'ObjectId' in query: start_dt_str = hex(int(date_to_timestamp(self.start_date)))[2:] + '0000000000000000' stop_dt_str = hex(int(date_to_timestamp(self.stop_date)))[2:] + '0000000000000000' else: start_dt_str = parse_datetime(self.start_date).strftime('%Y-%m-%d') stop_dt_str = parse_datetime(self.stop_date).strftime('%Y-%m-%d') query = query.replace('${start_date}', start_dt_str) query = query.replace('${stop_date}', stop_dt_str) self.parameter[MONGO_READER_PARAMETER_QUERY] = query