# -*- coding:utf-8 -*- from configparser import ConfigParser from datetime import datetime, timedelta from typing import Dict, List from dw_base.datax.plugins.writer.writer import Writer # hdfs writer HDFS_WRITER_NAME = 'hdfswriter' HDFS_WRITER_PARAMETER_COMPRESS = 'compress' HDFS_WRITER_PARAMETER_ENCODING = 'encoding' HDFS_WRITER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter' HDFS_WRITER_PARAMETER_FILE_TYPE = 'fileType' HDFS_WRITER_PARAMETER_PATH = 'path' HDFS_WRITER_PARAMETER_WRITE_MODE = 'writeMode' HDFS_WRITER_PARAMETER_FILE_NAME = 'fileName' class HDFSWriter(Writer): """ HDFSWriter有4个时间字段: 1. start_date:表示数据的起始日期,从Reader处传递而来,全量时传递19700101即可 2. stop_date:表示数据的终止日期,从Reader传递而来 3. dt:表示分区日期,值为 start_date(业务日);分区内允许含次日漂移数据 (按 ADR-03 raw 不纠正分区漂移,宽窗 [start, stop) 抓到的全部入此分区) 4. biz_date:表示文件的前缀,当start_date + 1 day = stop_date时,值为start_date,否则值为${start_date}-${stop_date - 1 day} """ def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super(HDFSWriter, self).__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = HDFS_WRITER_NAME def load_others(self): path = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_PATH) self.check_config(HDFS_WRITER_PARAMETER_PATH, path) if path.__contains__('${dt}'): path = path.replace('${dt}', self.start_date) self.parameter[HDFS_WRITER_PARAMETER_PATH] = path self.parameter[HDFS_WRITER_PARAMETER_FILE_TYPE] = \ self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_TYPE) or 'text' if self.config_parser.has_option(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS): self.parameter[HDFS_WRITER_PARAMETER_COMPRESS] = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS) self.parameter[HDFS_WRITER_PARAMETER_ENCODING] = \ self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_ENCODING) or 'UTF-8' self.parameter[HDFS_WRITER_PARAMETER_WRITE_MODE] = \ self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_WRITE_MODE) or 'append' self.parameter[HDFS_WRITER_PARAMETER_FIELD_DELIMITER] = \ self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FIELD_DELIMITER).replace("\\t", "\t") or '\t' self.get_file_name() def get_file_name(self): start_at = datetime.strptime(self.start_date, '%Y%m%d') stop_at = datetime.strptime(self.stop_date, '%Y%m%d') biz_date = self.start_date if stop_at - timedelta(days=1) > start_at: biz_date = f'{self.start_date}-{(stop_at - timedelta(days=1)).strftime("%Y%m%d")}' self.parameter[HDFS_WRITER_PARAMETER_FILE_NAME] = \ self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_NAME).replace('${biz_date}', biz_date) @staticmethod def generate_definition(hdfs_ds_name: str, hdfs_path: str, hive_database: str, hive_table_name: str, partitioned: bool, column_names: List[str], column_types: Dict[str, str]) -> str: if partitioned: # 分区表 path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}' else: # 非分区表 path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}' column = [] column_type = [] for col_name in column_names: column.append(col_name) if column_types.__contains__(col_name): column_type.append(f'{col_name}:{column_types.get(col_name)}') column_type = ','.join(column_type) definition = [ '[writer]', 'dataSource = %s' % hdfs_ds_name, f'path = {path}', f'column = {",".join(column)}', f'columnType = {column_type}', 'fileType = orc', # 'fileName = ${biz_date}', f'fileName = {hive_table_name}', 'compress = NONE', 'encoding = utf-8', ';writeMode支持append、nonConflict和truncate', 'writeMode = truncate', r'fieldDelimiter = \t' ] return '\n'.join(definition)