| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- # -*- coding:utf-8 -*-
- import os
- import pwd
- import re
- from configparser import ConfigParser
- from datetime import datetime
- from typing import Dict
- from dw_base.datax.datasources.data_source import DataSource
- from dw_base.datax.datasources.data_source_factory import DataSourceFactory
- from dw_base.datax.datax_constants import *
- class Plugin(object):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- self.base_dir = base_dir
- self.config_parser = config_parser
- self.start_date = str(start_date)
- self.stop_date = str(stop_date)
- self.parameter = {}
- self.config = {}
- self.ds_file = None
- self.ds_type = None
- self.ds_file_path = None
- self.datasource = None # type: DataSource
- self.plugin_type = None
- self.plugin_name = None
- self.columns = []
- self.add_columns = []
- def init(self):
- self.check()
- self.ds_file = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_DATA_SOURCE)
- self.ds_type = self.ds_file.split('/')[-1].split('-')[0]
- self.ds_file_path = f'{self.base_dir}/../datasource/{self.ds_type}/{self.ds_file}.ini'
- if not os.path.exists(self.ds_file_path) or not os.path.isfile(self.ds_file_path):
- raise FileNotFoundError(self.ds_file_path)
- self.datasource = DataSourceFactory.get_data_source(self.ds_type, self.ds_file_path)
- def check(self):
- if not self.plugin_name:
- raise ValueError('Plugin.plugin_name at every implemented class of Plugin should be specified.')
- if not self.plugin_type:
- raise ValueError('Plugin.plugin_type at every type of Plugin should be specified.')
- if not (self.start_date == ALL_DATA_DATE or re.match(r'\d{8}', self.start_date)):
- raise ValueError('start_date %s must be format of yyyyMMdd or equal %s ' % (self.start_date, ALL_DATA_DATE))
- if not (self.stop_date == ALL_DATA_DATE or re.match(r'\d{8}', self.stop_date)):
- raise ValueError('stop_date %s must be format of yyyyMMdd or equals %s ' % (self.stop_date, ALL_DATA_DATE))
- def check_config(self, key, value):
- if not value:
- raise ValueError('config %s of %s %s not provided' % (key, self.plugin_type, self.plugin_name))
- def load_data_source(self):
- ds_dict: Dict[str, str] = self.datasource.parse()
- for key, value in ds_dict.items():
- self.parameter[key] = value
- def load_column(self):
- columns_info = []
- self.columns = [
- col.strip() for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_COLUMN).split(',')
- ]
- column_type = self.get_column_type()
- column_format = self.get_column_format()
- splitter_item_type = self.get_splitter_item_type()
- if isinstance(self.columns, list):
- if isinstance(self.columns[0], str):
- for i, column in enumerate(self.columns):
- column_info = {
- PLUGIN_PARAMETER_COLUMN_N_INDEX: i,
- PLUGIN_NAME: column,
- PLUGIN_PARAMETER_COLUMN_N_TYPE: column_type.get(column, COLUMN_TYPE_STRING)
- }
- if column in column_format.keys():
- column_info[PLUGIN_PARAMETER_COLUMN_N_FORMAT] = column_format[column]
- if column in splitter_item_type.keys():
- splitter, item_type = splitter_item_type[column].split('_')
- column_info[PLUGIN_PARAMETER_COLUMN_N_SPLITTER] = splitter
- column_info[PLUGIN_PARAMETER_COLUMN_N_ITEM_TYPE] = item_type
- columns_info.append(column_info)
- elif isinstance(self.columns[0], dict):
- # data works中的数据集成
- columns_info = self.columns
- else:
- raise TypeError('column element type error. ')
- else:
- raise Exception('columns mast be a list.')
- # 新增ini文件 addColumns处理逻辑(目前仅支持hdfsReader、其他未测试)
- if (self.plugin_type == JOB_CONTENT_N_READER
- and self.config_parser.has_option(JOB_CONTENT_N_READER, PLUGIN_PARAMETER_ADD_COLUMN)):
- self.add_columns = [
- (col.split(":")[0],col.split(":")[1]) for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_ADD_COLUMN).split(',')
- ]
- for column in self.add_columns:
- if column[0] == "batch_id":
- batch_id = datetime.now().strftime('%Y%m%d%H%M')
- columns_info_add = {"value": batch_id,"type":column[1]}
- columns_info.append(columns_info_add)
- # ds捕获参数batch_id(方便后端批处理刷新es)
- print('${setValue(batch_id=%s)}' % batch_id)
- # write add
- col_value = self.config_parser.get(JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN)
- col_type_value = self.config_parser.get(JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE)
- self.config_parser.set(
- JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN,
- value=f"{col_value},{column[0]}"
- )
- self.config_parser.set(
- JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE,
- value=f"{col_type_value},{column[0]}:{column[1]}"
- )
- self.parameter[PLUGIN_PARAMETER_COLUMN] = columns_info
- def get_column_format(self):
- column_format = {}
- if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT):
- c_format = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT)
- if c_format:
- for item in c_format.split(','):
- k, v = item.split('##')
- column_format[k] = v
- return column_format
- def get_splitter_item_type(self):
- splitter_item_type = {}
- if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE):
- splitter_item = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE)
- if splitter_item:
- for item in splitter_item.split(','):
- k, v = item.split(':')
- splitter_item_type[k] = v
- return splitter_item_type
- def get_column_type(self):
- column_type = {}
- try:
- c_types = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_TYPE)
- if c_types:
- for item in c_types.split(','):
- k, v = item.split(':')
- column_type[k] = v
- except Exception as e:
- raise e
- return column_type
- def load_parameter(self):
- self.load_data_source()
- self.load_column()
- self.load_others()
- def assemble_plugin(self):
- self.load_parameter()
- self.config[PLUGIN_NAME] = self.plugin_name
- self.config[PLUGIN_PARAMETER] = self.parameter
- def configure(self):
- self.init()
- self.assemble_plugin()
- return self.config
- def load_others(self):
- raise NotImplementedError('please implement this method in a specified writer.')
|