| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- import os
- from typing import Dict
- from dw_base import NORM_GRN, NORM_MGT, NORM_YEL
- from dw_base import USER
- from dw_base.utils.log_utils import pretty_print
- from dw_base.utils.sql_utils import get_sql_list_from_file
- def analyse_spark_config(sql_file: str) -> Dict[str, str]:
- spark_config = {}
- sql_list = get_sql_list_from_file(sql_file, trim_comment=True)
- for sql in sql_list:
- if not sql.startswith('SET '):
- continue
- spark_config, config_value = sql.replace('SET ', '').strip().split('=')
- pretty_print(f'{NORM_MGT}从SQL文件 {NORM_GRN}{sql_file} '
- f'{NORM_MGT}中解析到 Spark 配置 {NORM_GRN}{spark_config} => {config_value}')
- if spark_config.__contains__(spark_config):
- pretty_print(f'{NORM_YEL}Spark 配置 {NORM_GRN}{spark_config} '
- f'{NORM_YEL}重复提供,原值 {NORM_GRN}{spark_config[spark_config]} '
- f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{config_value}')
- spark_config[spark_config] = config_value
- return spark_config
- def analyse_session_name(sql_file: str) -> str:
- """
- 根据当前用户目录名和待执行的sql文件, 组装当前任务的名称
- Args:
- sql_file: 待执行的sql文件路径
- Returns:
- spark sql任务名, 可根据此任务名在 http://cdh2.lixiaoyun.com:8088/cluster 中查看任务进度和状态
- """
- file_name = os.path.basename(sql_file)
- file_name_without_ext = os.path.splitext(file_name)[0]
- if file_name_without_ext.__contains__('insert2'):
- table_name = file_name_without_ext.split('insert2')[1]
- table_name_splits = table_name.split('_')
- if len(table_name_splits) > 2:
- layer = table_name_splits[0]
- project = table_name_splits[1]
- return f'{project}_{layer}.{table_name}'
- if USER == 'root':
- return file_name_without_ext
- return f'{USER}_{file_name_without_ext}'
|