mongo_writer.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # -*- coding:utf-8 -*-
  2. from configparser import ConfigParser
  3. from typing import List, Dict
  4. from dw_base.datax.plugins.writer.writer import Writer
  5. # mongo writer
  6. MONGO_WRITER_NAME = 'mongodbwriter'
  7. MONGO_WRITER_PARAMETER_DB_NAME = 'dbName'
  8. MONGO_WRITER_PARAMETER_COLLECTION_NAME = 'collectionName'
  9. MONGO_SPECIAL_WORDS_DICT = {
  10. 'company_name': 'COMPANYNAME',
  11. 'pid': 'PID',
  12. 'uncid': 'UNCID',
  13. 'unc_id': 'UNCID',
  14. 'url': 'URL',
  15. 'url_desc': 'URL_DESC',
  16. 'web_name': 'WEBNAME',
  17. }
  18. # isReplace, replaceKey
  19. MONGO_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
  20. class MongoWriter(Writer):
  21. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  22. super(MongoWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
  23. self.plugin_name = MONGO_WRITER_NAME
  24. def load_others(self):
  25. db_name = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_DB_NAME)
  26. self.check_config(MONGO_WRITER_PARAMETER_DB_NAME, db_name)
  27. self.parameter[MONGO_WRITER_PARAMETER_DB_NAME] = db_name
  28. collection_name = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_COLLECTION_NAME)
  29. self.check_config(MONGO_WRITER_PARAMETER_COLLECTION_NAME, collection_name)
  30. self.parameter[MONGO_WRITER_PARAMETER_COLLECTION_NAME] = collection_name
  31. upsert_info_str = self.config_parser.get(self.plugin_type, MONGO_WRITER_PARAMETER_WRITE_MODE)
  32. upsert_info = {}
  33. if upsert_info_str:
  34. for item in upsert_info_str.split(','):
  35. k, v = item.split(':')
  36. upsert_info[k] = v
  37. self.parameter[MONGO_WRITER_PARAMETER_WRITE_MODE] = upsert_info
  38. @staticmethod
  39. def generate_definition(mongo_ds_name: str, mongo_database: str, mongo_collection: str,
  40. column_names: List[str], column_types: Dict[str, str], pk_fields: List[str]) -> str:
  41. column = []
  42. column_type = []
  43. column_format = []
  44. for col_name in column_names:
  45. column.append(col_name)
  46. if column_types.__contains__(col_name):
  47. curr_type = column_types.get(col_name)
  48. curr_type_upper = curr_type.upper()
  49. if curr_type_upper != 'STRING':
  50. column_type.append(f'{col_name}:{curr_type_upper}')
  51. if curr_type_upper == 'DATE':
  52. column_format.append(f'{col_name}##yyyy-MM-dd HH:mm:ss')
  53. write_mode = f'isReplace:true,replaceKey:{"_".join(pk_fields)}'
  54. definition = [
  55. '[writer]',
  56. 'dataSource = %s' % mongo_ds_name,
  57. f'dbName = {mongo_database}',
  58. f'collectionName = {mongo_collection}',
  59. f'column = {",".join(column)}',
  60. f'columnType = {",".join(column_type)}',
  61. f'columnFormat = {",".join(column_format)}',
  62. f'writeMode = {write_mode}'
  63. ]
  64. return '\n'.join(definition)