hdfs_data_source.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536
  1. # -*- coding:utf-8 -*-
  2. from typing import Dict
  3. from dw_base.datax.datasources.data_source import DataSource
  4. # HDFS Data Source
  5. DS_TYPE_HDFS = 'hdfs'
  6. DS_HDFS_KEYS = ['defaultFS']
  7. DS_HDFS_SECTION_HADOOP_CONFIG = 'hadoop_config'
  8. DS_HDFS_PARAMETER_HADOOP_CONFIG = 'hadoopConfig'
  9. class HDFSDataSource(DataSource):
  10. def __init__(self, ds_file: str):
  11. super(HDFSDataSource, self).__init__(ds_file)
  12. self.source_type = DS_TYPE_HDFS
  13. self.keys = DS_HDFS_KEYS
  14. def get_datasource_dict(self) -> Dict[str, str]:
  15. ds_dict = super(HDFSDataSource, self).get_datasource_dict()
  16. # HA 集群下,运维把 hdfs-site.xml 里的 HA keys 原样写进 [hadoop_config] 节;
  17. # DataX JVM 不读 HADOOP_CONF_DIR,必须靠 hadoopConfig 块注入才能解析 nameservice
  18. if self.config_parser.has_section(DS_HDFS_SECTION_HADOOP_CONFIG):
  19. ds_dict[DS_HDFS_PARAMETER_HADOOP_CONFIG] = dict(
  20. self.config_parser.items(DS_HDFS_SECTION_HADOOP_CONFIG)
  21. )
  22. return ds_dict
  23. @staticmethod
  24. def generate_definition(default_fs: str) -> str:
  25. lines = [
  26. '[base]',
  27. 'defaultFS = %s' % default_fs
  28. ]
  29. return '\n'.join(lines)