| 123456789101112131415161718192021222324252627282930313233343536 |
- # -*- coding:utf-8 -*-
- from typing import Dict
- from dw_base.datax.datasources.data_source import DataSource
- # HDFS Data Source
- DS_TYPE_HDFS = 'hdfs'
- DS_HDFS_KEYS = ['defaultFS']
- DS_HDFS_SECTION_HADOOP_CONFIG = 'hadoop_config'
- DS_HDFS_PARAMETER_HADOOP_CONFIG = 'hadoopConfig'
- class HDFSDataSource(DataSource):
- def __init__(self, ds_file: str):
- super(HDFSDataSource, self).__init__(ds_file)
- self.source_type = DS_TYPE_HDFS
- self.keys = DS_HDFS_KEYS
- def get_datasource_dict(self) -> Dict[str, str]:
- ds_dict = super(HDFSDataSource, self).get_datasource_dict()
- # HA 集群下,运维把 hdfs-site.xml 里的 HA keys 原样写进 [hadoop_config] 节;
- # DataX JVM 不读 HADOOP_CONF_DIR,必须靠 hadoopConfig 块注入才能解析 nameservice
- if self.config_parser.has_section(DS_HDFS_SECTION_HADOOP_CONFIG):
- ds_dict[DS_HDFS_PARAMETER_HADOOP_CONFIG] = dict(
- self.config_parser.items(DS_HDFS_SECTION_HADOOP_CONFIG)
- )
- return ds_dict
- @staticmethod
- def generate_definition(default_fs: str) -> str:
- lines = [
- '[base]',
- 'defaultFS = %s' % default_fs
- ]
- return '\n'.join(lines)
|