plugin.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # -*- coding:utf-8 -*-
  2. import os
  3. import re
  4. from configparser import ConfigParser
  5. from datetime import datetime
  6. from typing import Dict
  7. from dw_base.datax.datasources.data_source import DataSource
  8. from dw_base.datax.datasources.data_source_factory import DataSourceFactory
  9. from dw_base.datax.datax_constants import *
  10. class Plugin(object):
  11. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  12. self.base_dir = base_dir
  13. self.config_parser = config_parser
  14. self.start_date = str(start_date)
  15. self.stop_date = str(stop_date)
  16. self.parameter = {}
  17. self.config = {}
  18. self.ds_file = None
  19. self.ds_type = None
  20. self.ds_file_path = None
  21. self.datasource = None # type: DataSource
  22. self.plugin_type = None
  23. self.plugin_name = None
  24. self.columns = []
  25. self.add_columns = []
  26. def init(self):
  27. self.check()
  28. self.ds_file = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_DATA_SOURCE)
  29. # ds_file 形如 {db_type}/{env}-{实例简称},首段即 db_type(= 父目录名)
  30. self.ds_type = self.ds_file.split('/')[0]
  31. self.ds_file_path = f'{self.base_dir}/../datasource/{self.ds_file}.ini'
  32. if not os.path.exists(self.ds_file_path) or not os.path.isfile(self.ds_file_path):
  33. raise FileNotFoundError(self.ds_file_path)
  34. self.datasource = DataSourceFactory.get_data_source(self.ds_type, self.ds_file_path)
  35. def check(self):
  36. if not self.plugin_name:
  37. raise ValueError('Plugin.plugin_name at every implemented class of Plugin should be specified.')
  38. if not self.plugin_type:
  39. raise ValueError('Plugin.plugin_type at every type of Plugin should be specified.')
  40. if not (self.start_date == ALL_DATA_DATE or re.match(r'\d{8}', self.start_date)):
  41. raise ValueError('start_date %s must be format of yyyyMMdd or equal %s ' % (self.start_date, ALL_DATA_DATE))
  42. if not (self.stop_date == ALL_DATA_DATE or re.match(r'\d{8}', self.stop_date)):
  43. raise ValueError('stop_date %s must be format of yyyyMMdd or equals %s ' % (self.stop_date, ALL_DATA_DATE))
  44. def check_config(self, key, value):
  45. if not value:
  46. raise ValueError('config %s of %s %s not provided' % (key, self.plugin_type, self.plugin_name))
  47. def load_data_source(self):
  48. ds_dict: Dict[str, str] = self.datasource.parse()
  49. for key, value in ds_dict.items():
  50. self.parameter[key] = value
  51. def load_column(self):
  52. columns_info = []
  53. self.columns = [
  54. col.strip() for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_COLUMN).split(',')
  55. ]
  56. column_type = self.get_column_type()
  57. column_format = self.get_column_format()
  58. splitter_item_type = self.get_splitter_item_type()
  59. if isinstance(self.columns, list):
  60. if isinstance(self.columns[0], str):
  61. for i, column in enumerate(self.columns):
  62. column_info = {
  63. PLUGIN_PARAMETER_COLUMN_N_INDEX: i,
  64. PLUGIN_NAME: column,
  65. PLUGIN_PARAMETER_COLUMN_N_TYPE: column_type.get(column, COLUMN_TYPE_STRING)
  66. }
  67. if column in column_format.keys():
  68. column_info[PLUGIN_PARAMETER_COLUMN_N_FORMAT] = column_format[column]
  69. if column in splitter_item_type.keys():
  70. splitter, item_type = splitter_item_type[column].split('_')
  71. column_info[PLUGIN_PARAMETER_COLUMN_N_SPLITTER] = splitter
  72. column_info[PLUGIN_PARAMETER_COLUMN_N_ITEM_TYPE] = item_type
  73. columns_info.append(column_info)
  74. elif isinstance(self.columns[0], dict):
  75. # data works中的数据集成
  76. columns_info = self.columns
  77. else:
  78. raise TypeError('column element type error. ')
  79. else:
  80. raise Exception('columns mast be a list.')
  81. # 新增ini文件 addColumns处理逻辑(目前仅支持hdfsReader、其他未测试)
  82. if (self.plugin_type == JOB_CONTENT_N_READER
  83. and self.config_parser.has_option(JOB_CONTENT_N_READER, PLUGIN_PARAMETER_ADD_COLUMN)):
  84. self.add_columns = [
  85. (col.split(":")[0],col.split(":")[1]) for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_ADD_COLUMN).split(',')
  86. ]
  87. for column in self.add_columns:
  88. if column[0] == "batch_id":
  89. batch_id = datetime.now().strftime('%Y%m%d%H%M')
  90. columns_info_add = {"value": batch_id,"type":column[1]}
  91. columns_info.append(columns_info_add)
  92. # ds捕获参数batch_id(方便后端批处理刷新es)
  93. print('${setValue(batch_id=%s)}' % batch_id)
  94. # write add
  95. col_value = self.config_parser.get(JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN)
  96. col_type_value = self.config_parser.get(JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE)
  97. self.config_parser.set(
  98. JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN,
  99. value=f"{col_value},{column[0]}"
  100. )
  101. self.config_parser.set(
  102. JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE,
  103. value=f"{col_type_value},{column[0]}:{column[1]}"
  104. )
  105. self.parameter[PLUGIN_PARAMETER_COLUMN] = columns_info
  106. def get_column_format(self):
  107. column_format = {}
  108. if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT):
  109. c_format = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT)
  110. if c_format:
  111. for item in c_format.split(','):
  112. k, v = item.split('##')
  113. column_format[k] = v
  114. return column_format
  115. def get_splitter_item_type(self):
  116. splitter_item_type = {}
  117. if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE):
  118. splitter_item = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE)
  119. if splitter_item:
  120. for item in splitter_item.split(','):
  121. k, v = item.split(':')
  122. splitter_item_type[k] = v
  123. return splitter_item_type
  124. def get_column_type(self):
  125. column_type = {}
  126. try:
  127. c_types = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_TYPE)
  128. if c_types:
  129. for item in c_types.split(','):
  130. k, v = item.split(':')
  131. column_type[k] = v
  132. except Exception as e:
  133. raise e
  134. return column_type
  135. def load_parameter(self):
  136. self.load_data_source()
  137. self.load_column()
  138. self.load_others()
  139. def assemble_plugin(self):
  140. self.load_parameter()
  141. self.config[PLUGIN_NAME] = self.plugin_name
  142. self.config[PLUGIN_PARAMETER] = self.parameter
  143. def configure(self):
  144. self.init()
  145. self.assemble_plugin()
  146. return self.config
  147. def load_others(self):
  148. raise NotImplementedError('please implement this method in a specified writer.')