# -*- coding:utf-8 -*- import os import pwd import re from configparser import ConfigParser from datetime import datetime from typing import Dict from dw_base.datax.datasources.data_source import DataSource from dw_base.datax.datasources.data_source_factory import DataSourceFactory from dw_base.datax.datax_constants import * class Plugin(object): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): self.base_dir = base_dir self.config_parser = config_parser self.start_date = str(start_date) self.stop_date = str(stop_date) self.parameter = {} self.config = {} self.ds_file = None self.ds_type = None self.ds_file_path = None self.datasource = None # type: DataSource self.plugin_type = None self.plugin_name = None self.columns = [] self.add_columns = [] def init(self): self.check() self.ds_file = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_DATA_SOURCE) self.ds_type = self.ds_file.split('/')[-1].split('-')[0] self.ds_file_path = f'{self.base_dir}/../datasource/{self.ds_type}/{self.ds_file}.ini' if not os.path.exists(self.ds_file_path) or not os.path.isfile(self.ds_file_path): raise FileNotFoundError(self.ds_file_path) self.datasource = DataSourceFactory.get_data_source(self.ds_type, self.ds_file_path) def check(self): if not self.plugin_name: raise ValueError('Plugin.plugin_name at every implemented class of Plugin should be specified.') if not self.plugin_type: raise ValueError('Plugin.plugin_type at every type of Plugin should be specified.') if not (self.start_date == ALL_DATA_DATE or re.match(r'\d{8}', self.start_date)): raise ValueError('start_date %s must be format of yyyyMMdd or equal %s ' % (self.start_date, ALL_DATA_DATE)) if not (self.stop_date == ALL_DATA_DATE or re.match(r'\d{8}', self.stop_date)): raise ValueError('stop_date %s must be format of yyyyMMdd or equals %s ' % (self.stop_date, ALL_DATA_DATE)) def check_config(self, key, value): if not value: raise ValueError('config %s of %s %s not provided' % (key, self.plugin_type, self.plugin_name)) def load_data_source(self): ds_dict: Dict[str, str] = self.datasource.parse() for key, value in ds_dict.items(): self.parameter[key] = value def load_column(self): columns_info = [] self.columns = [ col.strip() for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_COLUMN).split(',') ] column_type = self.get_column_type() column_format = self.get_column_format() splitter_item_type = self.get_splitter_item_type() if isinstance(self.columns, list): if isinstance(self.columns[0], str): for i, column in enumerate(self.columns): column_info = { PLUGIN_PARAMETER_COLUMN_N_INDEX: i, PLUGIN_NAME: column, PLUGIN_PARAMETER_COLUMN_N_TYPE: column_type.get(column, COLUMN_TYPE_STRING) } if column in column_format.keys(): column_info[PLUGIN_PARAMETER_COLUMN_N_FORMAT] = column_format[column] if column in splitter_item_type.keys(): splitter, item_type = splitter_item_type[column].split('_') column_info[PLUGIN_PARAMETER_COLUMN_N_SPLITTER] = splitter column_info[PLUGIN_PARAMETER_COLUMN_N_ITEM_TYPE] = item_type columns_info.append(column_info) elif isinstance(self.columns[0], dict): # data works中的数据集成 columns_info = self.columns else: raise TypeError('column element type error. ') else: raise Exception('columns mast be a list.') # 新增ini文件 addColumns处理逻辑(目前仅支持hdfsReader、其他未测试) if (self.plugin_type == JOB_CONTENT_N_READER and self.config_parser.has_option(JOB_CONTENT_N_READER, PLUGIN_PARAMETER_ADD_COLUMN)): self.add_columns = [ (col.split(":")[0],col.split(":")[1]) for col in self.config_parser.get(self.plugin_type, PLUGIN_PARAMETER_ADD_COLUMN).split(',') ] for column in self.add_columns: if column[0] == "batch_id": batch_id = datetime.now().strftime('%Y%m%d%H%M') columns_info_add = {"value": batch_id,"type":column[1]} columns_info.append(columns_info_add) # ds捕获参数batch_id(方便后端批处理刷新es) print('${setValue(batch_id=%s)}' % batch_id) # write add col_value = self.config_parser.get(JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN) col_type_value = self.config_parser.get(JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE) self.config_parser.set( JOB_CONTENT_N_WRITER, PLUGIN_PARAMETER_COLUMN, value=f"{col_value},{column[0]}" ) self.config_parser.set( JOB_CONTENT_N_WRITER, GEN_CONFIG_KEY_COLUMN_TYPE, value=f"{col_type_value},{column[0]}:{column[1]}" ) self.parameter[PLUGIN_PARAMETER_COLUMN] = columns_info def get_column_format(self): column_format = {} if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT): c_format = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_FORMAT) if c_format: for item in c_format.split(','): k, v = item.split('##') column_format[k] = v return column_format def get_splitter_item_type(self): splitter_item_type = {} if self.config_parser.has_option(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE): splitter_item = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_SPLITTER_ITEM_TYPE) if splitter_item: for item in splitter_item.split(','): k, v = item.split(':') splitter_item_type[k] = v return splitter_item_type def get_column_type(self): column_type = {} try: c_types = self.config_parser.get(self.plugin_type, GEN_CONFIG_KEY_COLUMN_TYPE) if c_types: for item in c_types.split(','): k, v = item.split(':') column_type[k] = v except Exception as e: raise e return column_type def load_parameter(self): self.load_data_source() self.load_column() self.load_others() def assemble_plugin(self): self.load_parameter() self.config[PLUGIN_NAME] = self.plugin_name self.config[PLUGIN_PARAMETER] = self.parameter def configure(self): self.init() self.assemble_plugin() return self.config def load_others(self): raise NotImplementedError('please implement this method in a specified writer.')