| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # -*- coding:utf-8 -*-
- from configparser import ConfigParser
- from datetime import datetime, timedelta
- from typing import Dict, List
- from dw_base.datax.plugins.writer.writer import Writer
- # hdfs writer
- HDFS_WRITER_NAME = 'hdfswriter'
- HDFS_WRITER_PARAMETER_COMPRESS = 'compress'
- HDFS_WRITER_PARAMETER_ENCODING = 'encoding'
- HDFS_WRITER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter'
- HDFS_WRITER_PARAMETER_FILE_TYPE = 'fileType'
- HDFS_WRITER_PARAMETER_PATH = 'path'
- HDFS_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
- HDFS_WRITER_PARAMETER_FILE_NAME = 'fileName'
- class HDFSWriter(Writer):
- """
- HDFSWriter有4个时间字段:
- 1. start_date:表示数据的起始日期,从Reader处传递而来,全量时传递19700101即可
- 2. stop_date:表示数据的终止日期,从Reader传递而来
- 3. dt:表示分区日期,值为stop_date - 1 day
- 4. biz_date:表示文件的前缀,当start_date + 1 day = stop_date时,值为start_date,否则值为${start_date}-${stop_date - 1 day}
- """
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super(HDFSWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = HDFS_WRITER_NAME
- def load_others(self):
- path = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_PATH)
- self.check_config(HDFS_WRITER_PARAMETER_PATH, path)
- if path.__contains__('${dt}'):
- stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
- dt = (stop_at - timedelta(days=1)).strftime('%Y%m%d')
- path = path.replace('${dt}', dt)
- self.parameter[HDFS_WRITER_PARAMETER_PATH] = path
- self.parameter[HDFS_WRITER_PARAMETER_FILE_TYPE] = \
- self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_TYPE) or 'text'
- if self.config_parser.has_option(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS):
- self.parameter[HDFS_WRITER_PARAMETER_COMPRESS] = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS)
- self.parameter[HDFS_WRITER_PARAMETER_ENCODING] = \
- self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_ENCODING) or 'UTF-8'
- self.parameter[HDFS_WRITER_PARAMETER_WRITE_MODE] = \
- self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_WRITE_MODE) or 'append'
- self.parameter[HDFS_WRITER_PARAMETER_FIELD_DELIMITER] = \
- self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FIELD_DELIMITER).replace("\\t", "\t") or '\t'
- self.parameter['kerberosPrincipal'] = 'hdfs@LIXIAOYUN.COM'
- self.get_file_name()
- def get_file_name(self):
- start_at = datetime.strptime(self.start_date, '%Y%m%d')
- stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
- biz_date = self.start_date
- if stop_at - timedelta(days=1) > start_at:
- biz_date = f'{self.start_date}-{(stop_at - timedelta(days=1)).strftime("%Y%m%d")}'
- self.parameter[HDFS_WRITER_PARAMETER_FILE_NAME] = \
- self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_NAME).replace('${biz_date}', biz_date)
- @staticmethod
- def generate_definition(hdfs_ds_name: str, hdfs_path: str,
- hive_database: str, hive_table_name: str, partitioned: bool,
- column_names: List[str], column_types: Dict[str, str]) -> str:
- if partitioned:
- # 分区表
- path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}'
- else:
- # 非分区表
- path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}'
- column = []
- column_type = []
- for col_name in column_names:
- column.append(col_name)
- if column_types.__contains__(col_name):
- column_type.append(f'{col_name}:{column_types.get(col_name)}')
- column_type = ','.join(column_type)
- definition = [
- '[writer]',
- 'dataSource = %s' % hdfs_ds_name,
- f'path = {path}',
- f'column = {",".join(column)}',
- f'columnType = {column_type}',
- 'fileType = orc',
- # 'fileName = ${biz_date}',
- f'fileName = {hive_table_name}',
- 'compress = NONE',
- 'encoding = utf-8',
- ';writeMode支持append、nonConflict和truncate',
- 'writeMode = truncate',
- r'fieldDelimiter = \t'
- ]
- return '\n'.join(definition)
|