mongo_reader.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. # -*- coding:utf-8 -*-
  2. from configparser import ConfigParser
  3. from dw_base.datax.datax_constants import *
  4. from dw_base.datax.plugins.reader.reader import Reader
  5. from dw_base.utils.datetime_utils import parse_datetime, date_to_timestamp
  6. # mongo reader
  7. MONGO_READER_NAME = 'mongodbreader'
  8. MONGO_READER_PARAMETER_COLLECTION_NAME = 'collectionName'
  9. MONGO_READER_PARAMETER_DB_NAME = 'dbName'
  10. MONGO_READER_PARAMETER_QUERY = 'query'
  11. class MongoReader(Reader):
  12. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  13. super(MongoReader, self).__init__(base_dir, config_parser, start_date, stop_date)
  14. self.plugin_name = MONGO_READER_NAME
  15. def load_others(self):
  16. db_name = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_DB_NAME)
  17. self.check_config(MONGO_READER_PARAMETER_DB_NAME, db_name)
  18. collection_name = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_COLLECTION_NAME)
  19. self.check_config(MONGO_READER_PARAMETER_COLLECTION_NAME, collection_name)
  20. self.parameter[MONGO_READER_PARAMETER_DB_NAME] = db_name
  21. self.parameter[MONGO_READER_PARAMETER_COLLECTION_NAME] = collection_name
  22. if self.start_date == ALL_DATA_DATE:
  23. self.parameter[MONGO_READER_PARAMETER_QUERY] = '{}'
  24. else:
  25. query = self.config_parser.get(self.plugin_type, MONGO_READER_PARAMETER_QUERY)
  26. if query:
  27. if 'ObjectId' in query:
  28. start_dt_str = hex(int(date_to_timestamp(self.start_date)))[2:] + '0000000000000000'
  29. stop_dt_str = hex(int(date_to_timestamp(self.stop_date)))[2:] + '0000000000000000'
  30. else:
  31. start_dt_str = parse_datetime(self.start_date).strftime('%Y-%m-%d')
  32. stop_dt_str = parse_datetime(self.stop_date).strftime('%Y-%m-%d')
  33. query = query.replace('${start_date}', start_dt_str)
  34. query = query.replace('${stop_date}', stop_dt_str)
  35. self.parameter[MONGO_READER_PARAMETER_QUERY] = query