# -*- coding:utf-8 -*- from typing import Dict from dw_base.datax.datasources.data_source import DataSource # HDFS Data Source DS_TYPE_HDFS = 'hdfs' DS_HDFS_KEYS = ['defaultFS'] DS_HDFS_SECTION_HADOOP_CONFIG = 'hadoop_config' DS_HDFS_PARAMETER_HADOOP_CONFIG = 'hadoopConfig' class HDFSDataSource(DataSource): def __init__(self, ds_file: str): super(HDFSDataSource, self).__init__(ds_file) self.source_type = DS_TYPE_HDFS self.keys = DS_HDFS_KEYS def get_datasource_dict(self) -> Dict[str, str]: ds_dict = super(HDFSDataSource, self).get_datasource_dict() # HA 集群下,运维把 hdfs-site.xml 里的 HA keys 原样写进 [hadoop_config] 节; # DataX JVM 不读 HADOOP_CONF_DIR,必须靠 hadoopConfig 块注入才能解析 nameservice if self.config_parser.has_section(DS_HDFS_SECTION_HADOOP_CONFIG): ds_dict[DS_HDFS_PARAMETER_HADOOP_CONFIG] = dict( self.config_parser.items(DS_HDFS_SECTION_HADOOP_CONFIG) ) return ds_dict @staticmethod def generate_definition(default_fs: str) -> str: lines = [ '[base]', 'defaultFS = %s' % default_fs ] return '\n'.join(lines)