plugin.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # -*- coding:utf-8 -*-
  2. import os
  3. import pwd
  4. import re
  5. from configparser import ConfigParser
  6. from datetime import datetime
  7. from typing import Dict
  8. from dw_base.datax.datasources.data_source import DataSource
  9. from dw_base.datax.datasources.data_source_factory import DataSourceFactory
  10. from dw_base.datax.datax_constants import *
  11. class Plugin(object):
  12. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  13. self.base_dir = base_dir
  14. self.config_parser = config_parser
  15. self.start_date = str(start_date)
  16. self.stop_date = str(stop_date)
  17. self.parameter = {}
  18. self.config = {}
  19. self.ds_file = None
  20. self.ds_type = None
  21. self.ds_file_path = None
  22. self.datasource = None # type: DataSource
  23. self.plugin_type = None
  24. self.plugin_name = None
  25. self.columns = []
  26. self.add_columns = []
  27. def init(self):
  28. self.check()
  29. self.ds_file = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_DATA_SOURCE)
  30. # ds_file 形如 {db_type}/{env}-{实例简称},首段即 db_type(= 父目录名)
  31. self.ds_type = self.ds_file.split('/')[0]
  32. self.ds_file_path = f'{self.base_dir}/../datasource/{self.ds_file}.ini'
  33. if not os.path.exists(self.ds_file_path) or not os.path.isfile(self.ds_file_path):
  34. raise FileNotFoundError(self.ds_file_path)
  35. self.datasource = DataSourceFactory.get_data_source(self.ds_type, self.ds_file_path)
  36. def check(self):
  37. if not self.plugin_name:
  38. raise ValueError('Plugin.plugin_name at every implemented class of Plugin should be specified.')
  39. if not self.plugin_type:
  40. raise ValueError('Plugin.plugin_type at every type of Plugin should be specified.')
  41. if not (self.start_date == ALL_DATA_DATE or re.match(r'\d{8}', self.start_date)):
  42. raise ValueError('start_date %s must be format of yyyyMMdd or equal %s ' % (self.start_date, ALL_DATA_DATE))
  43. if not (self.stop_date == ALL_DATA_DATE or re.match(r'\d{8}', self.stop_date)):
  44. raise ValueError('stop_date %s must be format of yyyyMMdd or equals %s ' % (self.stop_date, ALL_DATA_DATE))
  45. def check_config(self, key, value):
  46. if not value:
  47. raise ValueError('config %s of %s %s not provided' % (key, self.plugin_type, self.plugin_name))
  48. def load_data_source(self):
  49. ds_dict: Dict[str, str] = self.datasource.parse()
  50. for key, value in ds_dict.items():
  51. self.parameter[key] = value
  52. def load_column(self):
  53. columns_info = []
  54. self.columns = [
  55. col.strip() for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_COLUMN).split(',')
  56. ]
  57. column_type = self.get_column_type()
  58. column_format = self.get_column_format()
  59. splitter_item_type = self.get_splitter_item_type()
  60. if isinstance(self.columns, list):
  61. if isinstance(self.columns[0], str):
  62. for i, column in enumerate(self.columns):
  63. column_info = {
  64. PLUGIN_PARAMETER_COLUMN_N_INDEX: i,
  65. PLUGIN_NAME: column,
  66. PLUGIN_PARAMETER_COLUMN_N_TYPE: column_type.get(column, COLUMN_TYPE_STRING)
  67. }
  68. if column in column_format.keys():
  69. column_info[PLUGIN_PARAMETER_COLUMN_N_FORMAT] = column_format[column]
  70. if column in splitter_item_type.keys():
  71. splitter, item_type = splitter_item_type[column].split('_')
  72. column_info[PLUGIN_PARAMETER_COLUMN_N_SPLITTER] = splitter
  73. column_info[PLUGIN_PARAMETER_COLUMN_N_ITEM_TYPE] = item_type
  74. columns_info.append(column_info)
  75. elif isinstance(self.columns[0], dict):
  76. # data works中的数据集成
  77. columns_info = self.columns
  78. else:
  79. raise TypeError('column element type error. ')
  80. else:
  81. raise Exception('columns mast be a list.')
  82. # 新增ini文件 addColumns处理逻辑(目前仅支持hdfsReader、其他未测试)
  83. if (self.plugin_type == JOB_CONTENT_N_READER
  84. and self.config_parser.has_option(JOB_CONTENT_N_READER, PLUGIN_PARAMETER_ADD_COLUMN)):
  85. self.add_columns = [
  86. (col.split(":")[0],col.split(":")[1]) for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_ADD_COLUMN).split(',')
  87. ]
  88. for column in self.add_columns:
  89. if column[0] == "batch_id":
  90. batch_id = datetime.now().strftime('%Y%m%d%H%M')
  91. columns_info_add = {"value": batch_id,"type":column[1]}
  92. columns_info.append(columns_info_add)
  93. # ds捕获参数batch_id(方便后端批处理刷新es)
  94. print('${setValue(batch_id=%s)}' % batch_id)
  95. # write add
  96. col_value = self.config_parser.get(JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN)
  97. col_type_value = self.config_parser.get(JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE)
  98. self.config_parser.set(
  99. JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN,
  100. value=f"{col_value},{column[0]}"
  101. )
  102. self.config_parser.set(
  103. JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE,
  104. value=f"{col_type_value},{column[0]}:{column[1]}"
  105. )
  106. self.parameter[PLUGIN_PARAMETER_COLUMN] = columns_info
  107. def get_column_format(self):
  108. column_format = {}
  109. if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT):
  110. c_format = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT)
  111. if c_format:
  112. for item in c_format.split(','):
  113. k, v = item.split('##')
  114. column_format[k] = v
  115. return column_format
  116. def get_splitter_item_type(self):
  117. splitter_item_type = {}
  118. if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE):
  119. splitter_item = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE)
  120. if splitter_item:
  121. for item in splitter_item.split(','):
  122. k, v = item.split(':')
  123. splitter_item_type[k] = v
  124. return splitter_item_type
  125. def get_column_type(self):
  126. column_type = {}
  127. try:
  128. c_types = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_TYPE)
  129. if c_types:
  130. for item in c_types.split(','):
  131. k, v = item.split(':')
  132. column_type[k] = v
  133. except Exception as e:
  134. raise e
  135. return column_type
  136. def load_parameter(self):
  137. self.load_data_source()
  138. self.load_column()
  139. self.load_others()
  140. def assemble_plugin(self):
  141. self.load_parameter()
  142. self.config[PLUGIN_NAME] = self.plugin_name
  143. self.config[PLUGIN_PARAMETER] = self.parameter
  144. def configure(self):
  145. self.init()
  146. self.assemble_plugin()
  147. return self.config
  148. def load_others(self):
  149. raise NotImplementedError('please implement this method in a specified writer.')