|
|
@@ -3,7 +3,7 @@
|
|
|
import inspect
|
|
|
import re
|
|
|
from importlib import import_module
|
|
|
-from typing import List, Union, Dict, Any, Tuple
|
|
|
+from typing import List, Optional, Union, Dict, Any, Tuple
|
|
|
|
|
|
from pyspark.sql import Row, SparkSession, DataFrame
|
|
|
|
|
|
@@ -17,6 +17,23 @@ from dw_base.utils.sql_utils import get_sql_list_from_file, check_parameter_subs
|
|
|
HDFS_EXPORT_DATA_PATH = '/hdfs-mnt/export-data'
|
|
|
|
|
|
|
|
|
+def _load_spark_conf_file(path: str) -> Dict[str, str]:
|
|
|
+ """读 Spark 原生 conf(每行 `key value`,# 注释,空白分隔)。文件缺失或行非法时返回空 dict,不抛错。"""
|
|
|
+ if not os.path.isfile(path):
|
|
|
+ return {}
|
|
|
+ config = {}
|
|
|
+ with open(path, 'r') as f:
|
|
|
+ for line in f:
|
|
|
+ line = line.strip()
|
|
|
+ if not line or line.startswith('#'):
|
|
|
+ continue
|
|
|
+ parts = line.split(None, 1)
|
|
|
+ if len(parts) != 2:
|
|
|
+ continue
|
|
|
+ config[parts[0]] = parts[1]
|
|
|
+ return config
|
|
|
+
|
|
|
+
|
|
|
class SparkSQL(object):
|
|
|
"""
|
|
|
封装执行 Spark 相关操作的类, 相关参数说明:
|
|
|
@@ -49,16 +66,16 @@ class SparkSQL(object):
|
|
|
def __init__(self,
|
|
|
session_name: str = 'spark',
|
|
|
master: str = 'yarn',
|
|
|
- spark_yarn_queue: str = 'spark',
|
|
|
- spark_driver_memory: str = '2g',
|
|
|
- spark_executor_memory: str = '6g',
|
|
|
- spark_executor_memory_overhead: str = '512',
|
|
|
- spark_driver_cores: int = 2,
|
|
|
- spark_executor_cores: int = 2,
|
|
|
- spark_executor_instances: int = 15,
|
|
|
- spark_driver_max_result_size='4g',
|
|
|
- spark_shuffle_partitions=200,
|
|
|
- spark_default_parallelism=200,
|
|
|
+ spark_yarn_queue: Optional[str] = None,
|
|
|
+ spark_driver_memory: Optional[str] = None,
|
|
|
+ spark_executor_memory: Optional[str] = None,
|
|
|
+ spark_executor_memory_overhead: Optional[str] = None,
|
|
|
+ spark_driver_cores: Optional[int] = None,
|
|
|
+ spark_executor_cores: Optional[int] = None,
|
|
|
+ spark_executor_instances: Optional[int] = None,
|
|
|
+ spark_driver_max_result_size: Optional[str] = None,
|
|
|
+ spark_shuffle_partitions: Optional[int] = None,
|
|
|
+ spark_default_parallelism: Optional[int] = None,
|
|
|
extra_spark_config: Dict[str, Any] = None,
|
|
|
udf_files: List[str] = None,
|
|
|
resource_files: List[str] = None):
|
|
|
@@ -125,44 +142,41 @@ class SparkSQL(object):
|
|
|
if self._spark_session:
|
|
|
return
|
|
|
pretty_print(f'{NORM_MGT}基于用户 {NORM_GRN}{USER}{NORM_MGT} 创建 SparkSession')
|
|
|
- # for element in os.environ:
|
|
|
- # pretty_print(f'{NORM_MGT}Environment {NORM_GRN}{element} => {os.environ[element]}')
|
|
|
builder = SparkSession.builder \
|
|
|
.appName(self._session_name) \
|
|
|
- .master(self._master) \
|
|
|
- .config('hive.exec.orc.default.block.size', 134217728) \
|
|
|
- .config('spark.debug.maxToStringFields', 5000) \
|
|
|
- .config('spark.default.parallelism', self._spark_default_parallelism) \
|
|
|
- .config('spark.driver.cores', self._spark_driver_cores) \
|
|
|
- .config('spark.driver.maxResultSize', self._spark_driver_max_result_size) \
|
|
|
- .config('spark.driver.memory', self._spark_driver_memory) \
|
|
|
- .config('spark.dynamicAllocation.enabled', False) \
|
|
|
- .config('spark.files.ignoreCorruptFiles', True) \
|
|
|
- .config('spark.executor.cores', self._spark_executor_cores) \
|
|
|
- .config('spark.executor.instances', self._spark_executor_instances) \
|
|
|
- .config('spark.executor.memory', self._spark_executor_memory) \
|
|
|
- .config('spark.executor.memoryOverhead', self._spark_executor_memory_overhead) \
|
|
|
- .config('spark.sql.adaptive.enabled', 'true') \
|
|
|
- .config('spark.sql.broadcastTimeout', -1) \
|
|
|
- .config('spark.sql.codegen.wholeStage', 'false') \
|
|
|
- .config('spark.sql.execution.arrow.enabled', True) \
|
|
|
- .config('spark.sql.execution.arrow.fallback.enabled', True) \
|
|
|
- .config('spark.sql.files.ignoreCorruptFiles', True) \
|
|
|
- .config('spark.sql.shuffle.partitions', self._spark_shuffle_partitions) \
|
|
|
- .config('spark.sql.statistics.fallBackToHdfs', True) \
|
|
|
- .config('spark.yarn.queue', self._spark_yarn_queue) \
|
|
|
- .config('spark.port.maxRetries', 999)
|
|
|
+ .master(self._master)
|
|
|
+ # L1:conf/spark-defaults.conf(底层)+ conf/spark-tuning.conf(调优,相同 key 覆盖 defaults)
|
|
|
+ l1_defaults = {}
|
|
|
+ l1_defaults.update(_load_spark_conf_file(f'{PROJECT_ROOT_PATH}/conf/spark-defaults.conf'))
|
|
|
+ l1_defaults.update(_load_spark_conf_file(f'{PROJECT_ROOT_PATH}/conf/spark-tuning.conf'))
|
|
|
+ pretty_print(f'{NORM_MGT}L1 加载 {NORM_GRN}{len(l1_defaults)}{NORM_MGT} 条 conf 默认')
|
|
|
+ for key, value in l1_defaults.items():
|
|
|
+ builder.config(key, value)
|
|
|
+ # L2:SQL 内 SET(仅 spark.sql.* 生效,资源类参数 session 启动后不可变)
|
|
|
+ for key, value in self._final_spark_config.items():
|
|
|
+ pretty_print(f'{NORM_MGT}L2 应用 SQL SET {NORM_GRN}{key} => {str(value)}')
|
|
|
+ builder.config(key, value)
|
|
|
+ # L3:构造函数显式传参 + extra_spark_config
|
|
|
+ l3_overrides: Dict[str, Any] = {}
|
|
|
+ for conf_key, attr_val in (
|
|
|
+ ('spark.yarn.queue', self._spark_yarn_queue),
|
|
|
+ ('spark.driver.memory', self._spark_driver_memory),
|
|
|
+ ('spark.executor.memory', self._spark_executor_memory),
|
|
|
+ ('spark.executor.memoryOverhead', self._spark_executor_memory_overhead),
|
|
|
+ ('spark.driver.cores', self._spark_driver_cores),
|
|
|
+ ('spark.executor.cores', self._spark_executor_cores),
|
|
|
+ ('spark.executor.instances', self._spark_executor_instances),
|
|
|
+ ('spark.driver.maxResultSize', self._spark_driver_max_result_size),
|
|
|
+ ('spark.sql.shuffle.partitions', self._spark_shuffle_partitions),
|
|
|
+ ('spark.default.parallelism', self._spark_default_parallelism),
|
|
|
+ ):
|
|
|
+ if attr_val is not None:
|
|
|
+ l3_overrides[conf_key] = attr_val
|
|
|
if self._extra_spark_config:
|
|
|
- for spark_config, config_value in self._extra_spark_config.items():
|
|
|
- if self._final_spark_config.__contains__(spark_config):
|
|
|
- pretty_print(f'{NORM_YEL}构造函数传入的 Spark 配置 {NORM_GRN}{spark_config} => {config_value} '
|
|
|
- f'{NORM_YEL}覆盖了在 SQL 文件中定义的配置 '
|
|
|
- f'{NORM_GRN}{spark_config} => {self._final_spark_config[spark_config]}')
|
|
|
- self._final_spark_config[spark_config] = config_value
|
|
|
- if self._final_spark_config:
|
|
|
- for key, value in self._final_spark_config.items():
|
|
|
- pretty_print(f'{NORM_MGT}添加自定义 Spark 配置 {NORM_GRN}{key} => {str(value)}')
|
|
|
- builder.config(key, value)
|
|
|
+ l3_overrides.update(self._extra_spark_config)
|
|
|
+ for key, value in l3_overrides.items():
|
|
|
+ pretty_print(f'{NORM_MGT}L3 应用构造参数/extra {NORM_GRN}{key} => {str(value)}')
|
|
|
+ builder.config(key, value)
|
|
|
pretty_print(f'{NORM_MGT}创建 SparkSession')
|
|
|
self._spark_session = builder.enableHiveSupport().getOrCreate()
|
|
|
self._spark_session.sparkContext._jsc.hadoopConfiguration().set('mapred.max.split.size', '33554432')
|