# -*- coding:utf-8 -*- from configparser import ConfigParser from typing import List, Dict from dw_base.datax.plugins.reader.reader import Reader # hdfs reader HDFS_READER_NAME = 'hdfsreader' HDFS_READER_PARAMETER_COMPRESS = 'compress' HDFS_READER_PARAMETER_ENCODING = 'encoding' HDFS_READER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter' HDFS_READER_PARAMETER_FILE_TYPE = 'fileType' HDFS_READER_PARAMETER_NULL_FORMAT = "nullFormat" HDFS_READER_PARAMETER_PATH = 'path' class HDFSReader(Reader): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super(HDFSReader, self).__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = HDFS_READER_NAME def load_others(self): path = self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_PATH) self.check_config(HDFS_READER_PARAMETER_PATH, path) if path.__contains__('${start_date}'): path = path.replace('${start_date}', self.start_date) if path.__contains__('${start-date}'): path = path.replace('${start-date}', self.start_date) if path.__contains__('${dt}'): path = path.replace('${dt}', self.start_date) self.parameter[HDFS_READER_PARAMETER_PATH] = path self.parameter[HDFS_READER_PARAMETER_FILE_TYPE] = \ self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_FILE_TYPE) or 'text' self.parameter[HDFS_READER_PARAMETER_ENCODING] = \ self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_ENCODING) or 'UTF-8' self.parameter[HDFS_READER_PARAMETER_COMPRESS] = \ self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_NULL_FORMAT) or '' self.parameter[HDFS_READER_PARAMETER_NULL_FORMAT] = \ self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_NULL_FORMAT) or '' self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER] = \ self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_FIELD_DELIMITER) or '\t' self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER] = \ self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER].replace("\\t", "\t") @staticmethod def generate_definition(hdfs_ds_name: str, hdfs_path: str, hive_database: str, hive_table_name: str, hive_table_comment: str, partitioned: bool, column_names: List[str], column_types: Dict[str, str]) -> str: if partitioned: # 分区表 path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}' else: # 非分区表 path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}' column = [] column_type = [] for col_name in column_names: column.append(col_name) if column_types.__contains__(col_name): curr_type = column_types.get(col_name) if curr_type.lower() != 'string': column_type.append(f'{col_name}:{curr_type.upper()}') column_type = ','.join(column_type) definition = [ f'# {hive_table_name}: {hive_table_comment}', '[reader]', 'dataSource = %s' % hdfs_ds_name, f'column = {",".join(column)}', f'columnType = {column_type}', f'path = {path}', 'fileType = orc', 'encoding = UTF-8', r'fieldDelimiter = \t', 'nullFormat =' ] return '\n'.join(definition)