Ver código fonte

refactor(dw_base): HDFSDataSource 支持 hadoopConfig

- `hdfs_data_source.py` 覆写 `get_datasource_dict()`:检测 ini
  的 `[hadoop_config]` 节,存在则以 dict 形式塞进 `hadoopConfig`
  key,由 `Plugin.load_data_source()` 自动注入到 DataX json
- `__init__.py` 清理死代码 `os.environ['HADOOP_CONF_DIR']`:
  新 CDH 环境实测证实该 env 对 DataX JVM 无效(datax.py 不
  把 conf 目录入 classpath,Hadoop Configuration 拿不到 HA 参数)

动机:为 DataX 连 HA HDFS 打底。老 ini 只支持
`defaultFS`,在 worker 侧 classpath 不含 hdfs-site.xml 时无法
解析 nameservice;新方案让 HA 参数显式写进 ini、经由 json
`hadoopConfig` 块注入,零运维依赖。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 2 semanas atrás
pai
commit
13d0688972
2 arquivos alterados com 14 adições e 1 exclusões
  1. 1 1
      dw_base/__init__.py
  2. 13 0
      dw_base/datax/datasources/hdfs_data_source.py

+ 1 - 1
dw_base/__init__.py

@@ -13,7 +13,7 @@ def cow_says():
     os.system(f'source {PROJECT_ROOT_PATH}/bin/common/functions.sh')
 
 
-os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
+# HADOOP_CONF_DIR 对 DataX 无效(datax.py 不把 conf 目录加入 classpath),改由 ini [hadoop_config] 节注入
 # os.environ['HIVE_CONF_DIR'] = '/etc/hive/conf'
 # os.environ['JAVA_HOME'] = '/usr/local/java'
 os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

+ 13 - 0
dw_base/datax/datasources/hdfs_data_source.py

@@ -1,11 +1,14 @@
 # -*- coding:utf-8 -*-
 
+from typing import Dict
 
 from dw_base.datax.datasources.data_source import DataSource
 
 # HDFS Data Source
 DS_TYPE_HDFS = 'hdfs'
 DS_HDFS_KEYS = ['defaultFS']
+DS_HDFS_SECTION_HADOOP_CONFIG = 'hadoop_config'
+DS_HDFS_PARAMETER_HADOOP_CONFIG = 'hadoopConfig'
 
 
 class HDFSDataSource(DataSource):
@@ -14,6 +17,16 @@ class HDFSDataSource(DataSource):
         self.source_type = DS_TYPE_HDFS
         self.keys = DS_HDFS_KEYS
 
+    def get_datasource_dict(self) -> Dict[str, str]:
+        ds_dict = super(HDFSDataSource, self).get_datasource_dict()
+        # HA 集群下,运维把 hdfs-site.xml 里的 HA keys 原样写进 [hadoop_config] 节;
+        # DataX JVM 不读 HADOOP_CONF_DIR,必须靠 hadoopConfig 块注入才能解析 nameservice
+        if self.config_parser.has_section(DS_HDFS_SECTION_HADOOP_CONFIG):
+            ds_dict[DS_HDFS_PARAMETER_HADOOP_CONFIG] = dict(
+                self.config_parser.items(DS_HDFS_SECTION_HADOOP_CONFIG)
+            )
+        return ds_dict
+
     @staticmethod
     def generate_definition(default_fs: str) -> str:
         lines = [