#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ -- 设置 SparkSession 名称(血缘分析) SET spark.app.name=crl_dwd.dwd_crl_xxx; -- 设置 Spark 配置 SET spark.xxx.yyy.zzz=xyz; -- 引用 UDF ADD FILE dw_base/spark/udf/spark_xxx_udf.py; -- 声明变量 SET DT_START=20210101; SET TOPIC=xxx; SET KEY=CourtAnn; SET ODS_TABLE=crl_ods.ods_crl_xxx; -- 查看数据行数 SET LIMIT=1000; """ import os import sys from typing import Dict, List base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(base_dir) from dw_base import * from dw_base.common.config_constants import K_DT, KL_HELP from dw_base.common.container import ValueContainer from dw_base.spark.spark_sql import SparkSQL from dw_base.spark.spark_utils import analyse_session_name from dw_base.utils.common_utils import exist from dw_base.utils.config_utils import parse_args from dw_base.utils.datetime_utils import get_yesterday, get_date_range from dw_base.utils.file_utils import list_files from dw_base.utils.log_utils import pretty_print, CURRENT_LOG_FILE, get_log_file_path def usage(code: int): print( f'{NORM_MGT}Usage: {sys.argv[0]}\n' f'{NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法{DO_RESET}' ) print( f'{NORM_MGT}Usage: {sys.argv[0]}\n' f'{NORM_GRN}\t<[-]-f< /=>sql file> 要执行的SQL文件,可传入多个,按传入顺序执行,与参数-d互斥\n' f'{NORM_GRN}\t<[-]-d< /=>sql file directory> 要执行的SQL文件夹,可传入多个,按传入顺序执行\n' f'{NORM_CYN}\t[[-]-u< /=>udf file] 被引用到的udf文件(.py),注意多个udf文件内定义的方法有重名时,只会取靠前的\n' f'{NORM_CYN}\t[[-]-sc spark-config] Spark自定义配置,格式为config:config-value或config=config-value\n' f'{NORM_CYN}\t[[-]-dt< /=>date with format] %Y%m%d 或 yyyyMMdd 格式的日期(命令行 > 默认)\n' f'{NORM_CYN}\t 可以以四种形式传入日期:\n' f'{NORM_CYN}\t 1. 20211101,表示具体日期\n' f'{NORM_CYN}\t 2. 20211101-,表示20211101至昨天\n' f'{NORM_CYN}\t 3. 20211101-20211107,表示20211101至20211107\n' f'{NORM_CYN}\t 4. 20211101,20211103,表示离散的日期20211101、20211103\n' f'{NORM_CYN}\t[[-]-p sql-parameter] SQL脚本的参数,格式为parameter:parameter-value或parameter=parameter-value\n' f'{NORM_CYN}\t[[-]-n] 在串行执行多个SQL文件时,一个失败并不影响后续文件的执行,但最后程序会非正常结束,即返回非0值(失败的文件个数)\n' f'{NORM_CYN}\t[[-]-0] 在串行执行多个SQL文件时,一个失败并不影响后续文件的执行,且最后程序正常结束(请谨慎使用)\n' f'{NORM_CYN}\t 上述两个参数都不传的情况下,某个SQL文件执行失败时,后续文件都不再执行' f'{DO_RESET}' ) exit(code) def parse_cli_spark_config() -> Dict[str, str]: cli_spark_config = {} spark_configs = CONFIG.get('sc', []) if isinstance(spark_configs, str): spark_configs = [spark_configs] for elem in spark_configs: if elem.__contains__('='): spark_config, config_value = elem.split('=') elif elem.__contains__(':'): spark_config, config_value = elem.split(':') else: pretty_print(f'{NORM_YEL}无效的 Spark 配置 {NORM_GRN}{elem}') continue if cli_spark_config.__contains__(spark_config): pretty_print(f'{NORM_YEL}命令行多次传入了 Spark 配置 {NORM_GRN}{spark_config}' f'{NORM_YEL}, 原值 {NORM_GRN}{cli_spark_config[spark_config]} ' f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{config_value}') cli_spark_config[spark_config] = config_value return cli_spark_config def parse_sql_parameters() -> Dict[str, str]: sql_parameters = {} parameter_configs = CONFIG.get('p', []) if isinstance(parameter_configs, str): parameter_configs = [parameter_configs] for elem in parameter_configs: if elem.__contains__('='): parameter_key, parameter_value = elem.split('=') elif elem.__contains__(':'): parameter_key, parameter_value = elem.split(':') else: pretty_print(f'{NORM_YEL}无效的 SQL 参数 {NORM_GRN}{elem}') continue if parameter_key == 'dt': continue if sql_parameters.__contains__(parameter_key): pretty_print(f'{NORM_YEL}命令行多次传入了 SQL 参数 {NORM_GRN}{parameter_key}' f'{NORM_YEL}, 原值 {NORM_GRN}{sql_parameters[parameter_key]} ' f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{parameter_value}') sql_parameters[parameter_key] = parameter_value return sql_parameters def run_sql_file(sql_file: str, dt: str, is_last: bool): try: spark_session_name = CONFIG.get('ssn', analyse_session_name(sql_file)) sql_file_name = os.path.splitext(os.path.basename(sql_file))[0] CURRENT_LOG_FILE.set(get_log_file_path('spark-sql', dt, sql_file_name)) spark_sql = SparkSQL(spark_session_name, udf_files=udf_files, extra_spark_config=extra_spark_config) is_export = CONFIG.get('export') if is_last and is_export is True: sql_file_name = os.path.splitext(os.path.basename(sql_file))[0] delimiter = CONFIG.get('delimiter', ',') spark_sql.export_data(sql_file_name, sql_file, truncate=False, delimiter=delimiter, dt=dt, partition=1, **sql_parameters) else: spark_sql.execute(sql_file, check_parameter=True, dt=dt, **sql_parameters) except Exception as e: message = f'执行SQL文件 {sql_file} 失败(dt={dt})' if IS_RUN_IN_RELEASE_DIR and IS_RUN_BY_RELEASE_USER: message += f'(可访问网址 http://{HOST}/log/spark-sql/{dt}/{os.path.basename(CURRENT_LOG_FILE.get())} 查看日志)' print(message) ERROR_COUNT.set(ERROR_COUNT.get() + 1) if CONTINUE_ALL_ON_ERROR and CONTINUE_ALL_ON_ERROR is True: return elif CONTINUE_NEXT_ON_ERROR and CONTINUE_NEXT_ON_ERROR is True: return else: raise e def run_sql_files(sql_files: List[str], dt: str): for index in range(len(sql_files)): sql_file = sql_files[index] if sql_file == 'create_table.sql' or sql_file == 'create-table.sql': continue sql_file_dir = os.path.basename(os.path.basename(sql_file)) if sql_file_dir == 'create_table' or sql_file_dir == 'create-table': continue run_sql_file(sql_file, dt, len(sql_files) == index + 1) def run_sql_directories(dt: str): directories = CONFIG.get('d', []) if isinstance(directories, str): directories = [directories] for each_dir in directories: sql_files_in_dir = list_files(each_dir, extension='.sql') sql_files_in_dir = sorted(sql_files_in_dir) if len(sql_files_in_dir) == 0: pretty_print(f'{NORM_RED}文件夹 {NORM_GRN}{each_dir}{NORM_RED} 中未找到任何可执行的SQL文件') usage(1) run_sql_files(sql_files_in_dir, dt) if __name__ == '__main__': pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}') CONFIG, _ = parse_args(sys.argv[1:]) # 未提供任何参数或查看帮助 if len(sys.argv) == 1 or exist(CONFIG, KL_HELP): usage(0) CONTINUE_NEXT_ON_ERROR = CONFIG.get('n') CONTINUE_ALL_ON_ERROR = CONFIG.get('0') ERROR_COUNT = ValueContainer(0) date_range = get_date_range(CONFIG.get(K_DT, get_yesterday())) udf_files = CONFIG.get('u', []) if isinstance(udf_files, str): udf_files = [udf_files] if not udf_files.__contains__(COMMON_SPARK_UDF_FILE): udf_files.insert(0, COMMON_SPARK_UDF_FILE) extra_spark_config = parse_cli_spark_config() sql_parameters = parse_sql_parameters() if CONFIG.__contains__('f'): arg_sql_files = CONFIG.get('f', []) if isinstance(arg_sql_files, str): arg_sql_files = [arg_sql_files] if len(arg_sql_files) == 0: pretty_print(f'{NORM_RED}未提供任何可执行的SQL文件') usage(1) for each_dt in date_range: run_sql_files(arg_sql_files, each_dt) elif CONFIG.__contains__('d'): for each_dt in date_range: run_sql_directories(each_dt) else: usage(0) if not CONTINUE_ALL_ON_ERROR or CONTINUE_ALL_ON_ERROR is False: exit(ERROR_COUNT.get())