#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- import os from typing import Dict from dw_base import NORM_GRN, NORM_MGT, NORM_YEL from dw_base import USER from dw_base.utils.log_utils import pretty_print from dw_base.utils.sql_utils import get_sql_list_from_file def analyse_spark_config(sql_file: str) -> Dict[str, str]: spark_config = {} sql_list = get_sql_list_from_file(sql_file, trim_comment=True) for sql in sql_list: if not sql.startswith('SET '): continue spark_config, config_value = sql.replace('SET ', '').strip().split('=') pretty_print(f'{NORM_MGT}从SQL文件 {NORM_GRN}{sql_file} ' f'{NORM_MGT}中解析到 Spark 配置 {NORM_GRN}{spark_config} => {config_value}') if spark_config.__contains__(spark_config): pretty_print(f'{NORM_YEL}Spark 配置 {NORM_GRN}{spark_config} ' f'{NORM_YEL}重复提供,原值 {NORM_GRN}{spark_config[spark_config]} ' f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{config_value}') spark_config[spark_config] = config_value return spark_config def analyse_session_name(sql_file: str) -> str: """ 根据当前用户目录名和待执行的sql文件, 组装当前任务的名称 Args: sql_file: 待执行的sql文件路径 Returns: spark sql任务名, 可根据此任务名在 http://cdh2.lixiaoyun.com:8088/cluster 中查看任务进度和状态 """ file_name = os.path.basename(sql_file) file_name_without_ext = os.path.splitext(file_name)[0] if file_name_without_ext.__contains__('insert2'): table_name = file_name_without_ext.split('insert2')[1] table_name_splits = table_name.split('_') if len(table_name_splits) > 2: layer = table_name_splits[0] project = table_name_splits[1] return f'{project}_{layer}.{table_name}' if USER == 'root': return file_name_without_ext return f'{USER}_{file_name_without_ext}'