| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- # -*- coding:utf-8 -*-
- from configparser import ConfigParser
- from typing import List, Dict
- from dw_base.datax.plugins.reader.reader import Reader
- # hdfs reader
- HDFS_READER_NAME = 'hdfsreader'
- HDFS_READER_PARAMETER_COMPRESS = 'compress'
- HDFS_READER_PARAMETER_ENCODING = 'encoding'
- HDFS_READER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter'
- HDFS_READER_PARAMETER_FILE_TYPE = 'fileType'
- HDFS_READER_PARAMETER_NULL_FORMAT = "nullFormat"
- HDFS_READER_PARAMETER_PATH = 'path'
- class HDFSReader(Reader):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super(HDFSReader, self).__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = HDFS_READER_NAME
- def load_others(self):
- path = self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_PATH)
- self.check_config(HDFS_READER_PARAMETER_PATH, path)
- if path.__contains__('${start_date}'):
- path = path.replace('${start_date}', self.start_date)
- if path.__contains__('${start-date}'):
- path = path.replace('${start-date}', self.start_date)
- if path.__contains__('${dt}'):
- path = path.replace('${dt}', self.start_date)
- self.parameter[HDFS_READER_PARAMETER_PATH] = path
- self.parameter[HDFS_READER_PARAMETER_FILE_TYPE] = \
- self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_FILE_TYPE) or 'text'
- self.parameter[HDFS_READER_PARAMETER_ENCODING] = \
- self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_ENCODING) or 'UTF-8'
- self.parameter[HDFS_READER_PARAMETER_COMPRESS] = \
- self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_NULL_FORMAT) or ''
- self.parameter[HDFS_READER_PARAMETER_NULL_FORMAT] = \
- self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_NULL_FORMAT) or ''
- self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER] = \
- self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_FIELD_DELIMITER) or '\t'
- self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER] = \
- self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER].replace("\\t", "\t")
- @staticmethod
- def generate_definition(hdfs_ds_name: str, hdfs_path: str,
- hive_database: str, hive_table_name: str, hive_table_comment: str, partitioned: bool,
- column_names: List[str], column_types: Dict[str, str]) -> str:
- if partitioned:
- # 分区表
- path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}'
- else:
- # 非分区表
- path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}'
- column = []
- column_type = []
- for col_name in column_names:
- column.append(col_name)
- if column_types.__contains__(col_name):
- curr_type = column_types.get(col_name)
- if curr_type.lower() != 'string':
- column_type.append(f'{col_name}:{curr_type.upper()}')
- column_type = ','.join(column_type)
- definition = [
- f'# {hive_table_name}: {hive_table_comment}',
- '[reader]',
- 'dataSource = %s' % hdfs_ds_name,
- f'column = {",".join(column)}',
- f'columnType = {column_type}',
- f'path = {path}',
- 'fileType = orc',
- 'encoding = UTF-8',
- r'fieldDelimiter = \t',
- 'nullFormat ='
- ]
- return '\n'.join(definition)
|