| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- -- 设置 SparkSession 名称(血缘分析)
- SET spark.app.name=crl_dwd.dwd_crl_xxx;
- -- 设置 Spark 配置
- SET spark.xxx.yyy.zzz=xyz;
- -- 引用 UDF
- ADD FILE dw_base/spark/udf/spark_xxx_udf.py;
- -- 声明变量
- SET DT_START=20210101;
- SET TOPIC=xxx;
- SET KEY=CourtAnn;
- SET ODS_TABLE=crl_ods.ods_crl_xxx;
- -- 查看数据行数
- SET LIMIT=1000;
- """
- import os
- import sys
- from typing import Dict, List
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(base_dir)
- from dw_base import *
- from dw_base.common.config_constants import K_DT, KL_HELP
- from dw_base.common.container import ValueContainer
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.spark.spark_utils import analyse_session_name
- from dw_base.utils.common_utils import exist
- from dw_base.utils.config_utils import parse_args
- from dw_base.utils.datetime_utils import get_yesterday, get_date_range
- from dw_base.utils.file_utils import list_files
- from dw_base.utils.log_utils import pretty_print, CURRENT_LOG_FILE, get_log_file_path
- def usage(code: int):
- print(
- f'{NORM_MGT}Usage: {sys.argv[0]}\n'
- f'{NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法{DO_RESET}'
- )
- print(
- f'{NORM_MGT}Usage: {sys.argv[0]}\n'
- f'{NORM_GRN}\t<[-]-f< /=>sql file> 要执行的SQL文件,可传入多个,按传入顺序执行,与参数-d互斥\n'
- f'{NORM_GRN}\t<[-]-d< /=>sql file directory> 要执行的SQL文件夹,可传入多个,按传入顺序执行\n'
- f'{NORM_CYN}\t[[-]-u< /=>udf file] 被引用到的udf文件(.py),注意多个udf文件内定义的方法有重名时,只会取靠前的\n'
- f'{NORM_CYN}\t[[-]-sc spark-config] Spark自定义配置,格式为config:config-value或config=config-value\n'
- f'{NORM_CYN}\t[[-]-dt< /=>date with format] %Y%m%d 或 yyyyMMdd 格式的日期(命令行 > 默认)\n'
- f'{NORM_CYN}\t 可以以四种形式传入日期:\n'
- f'{NORM_CYN}\t 1. 20211101,表示具体日期\n'
- f'{NORM_CYN}\t 2. 20211101-,表示20211101至昨天\n'
- f'{NORM_CYN}\t 3. 20211101-20211107,表示20211101至20211107\n'
- f'{NORM_CYN}\t 4. 20211101,20211103,表示离散的日期20211101、20211103\n'
- f'{NORM_CYN}\t[[-]-p sql-parameter] SQL脚本的参数,格式为parameter:parameter-value或parameter=parameter-value\n'
- f'{NORM_CYN}\t[[-]-n] 在串行执行多个SQL文件时,一个失败并不影响后续文件的执行,但最后程序会非正常结束,即返回非0值(失败的文件个数)\n'
- f'{NORM_CYN}\t[[-]-0] 在串行执行多个SQL文件时,一个失败并不影响后续文件的执行,且最后程序正常结束(请谨慎使用)\n'
- f'{NORM_CYN}\t 上述两个参数都不传的情况下,某个SQL文件执行失败时,后续文件都不再执行'
- f'{DO_RESET}'
- )
- exit(code)
- def parse_cli_spark_config() -> Dict[str, str]:
- cli_spark_config = {}
- spark_configs = CONFIG.get('sc', [])
- if isinstance(spark_configs, str):
- spark_configs = [spark_configs]
- for elem in spark_configs:
- if elem.__contains__('='):
- spark_config, config_value = elem.split('=')
- elif elem.__contains__(':'):
- spark_config, config_value = elem.split(':')
- else:
- pretty_print(f'{NORM_YEL}无效的 Spark 配置 {NORM_GRN}{elem}')
- continue
- if cli_spark_config.__contains__(spark_config):
- pretty_print(f'{NORM_YEL}命令行多次传入了 Spark 配置 {NORM_GRN}{spark_config}'
- f'{NORM_YEL}, 原值 {NORM_GRN}{cli_spark_config[spark_config]} '
- f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{config_value}')
- cli_spark_config[spark_config] = config_value
- return cli_spark_config
- def parse_sql_parameters() -> Dict[str, str]:
- sql_parameters = {}
- parameter_configs = CONFIG.get('p', [])
- if isinstance(parameter_configs, str):
- parameter_configs = [parameter_configs]
- for elem in parameter_configs:
- if elem.__contains__('='):
- parameter_key, parameter_value = elem.split('=')
- elif elem.__contains__(':'):
- parameter_key, parameter_value = elem.split(':')
- else:
- pretty_print(f'{NORM_YEL}无效的 SQL 参数 {NORM_GRN}{elem}')
- continue
- if parameter_key == 'dt':
- continue
- if sql_parameters.__contains__(parameter_key):
- pretty_print(f'{NORM_YEL}命令行多次传入了 SQL 参数 {NORM_GRN}{parameter_key}'
- f'{NORM_YEL}, 原值 {NORM_GRN}{sql_parameters[parameter_key]} '
- f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{parameter_value}')
- sql_parameters[parameter_key] = parameter_value
- return sql_parameters
- def run_sql_file(sql_file: str, dt: str, is_last: bool):
- try:
- spark_session_name = CONFIG.get('ssn', analyse_session_name(sql_file))
- sql_file_name = os.path.splitext(os.path.basename(sql_file))[0]
- CURRENT_LOG_FILE.set(get_log_file_path('spark-sql', dt, sql_file_name))
- spark_sql = SparkSQL(spark_session_name, udf_files=udf_files, extra_spark_config=extra_spark_config)
- is_export = CONFIG.get('export')
- if is_last and is_export is True:
- sql_file_name = os.path.splitext(os.path.basename(sql_file))[0]
- delimiter = CONFIG.get('delimiter', ',')
- spark_sql.export_data(sql_file_name, sql_file, truncate=False,
- delimiter=delimiter, dt=dt, partition=1, **sql_parameters)
- else:
- spark_sql.execute(sql_file, check_parameter=True, dt=dt, **sql_parameters)
- except Exception as e:
- message = f'执行SQL文件 {sql_file} 失败(dt={dt})'
- if IS_RUN_IN_RELEASE_DIR and IS_RUN_BY_RELEASE_USER:
- message += f'(可访问网址 http://{HOST}/log/spark-sql/{dt}/{os.path.basename(CURRENT_LOG_FILE.get())} 查看日志)'
- print(message)
- ERROR_COUNT.set(ERROR_COUNT.get() + 1)
- if CONTINUE_ALL_ON_ERROR and CONTINUE_ALL_ON_ERROR is True:
- return
- elif CONTINUE_NEXT_ON_ERROR and CONTINUE_NEXT_ON_ERROR is True:
- return
- else:
- raise e
- def run_sql_files(sql_files: List[str], dt: str):
- for index in range(len(sql_files)):
- sql_file = sql_files[index]
- if sql_file == 'create_table.sql' or sql_file == 'create-table.sql':
- continue
- sql_file_dir = os.path.basename(os.path.basename(sql_file))
- if sql_file_dir == 'create_table' or sql_file_dir == 'create-table':
- continue
- run_sql_file(sql_file, dt, len(sql_files) == index + 1)
- def run_sql_directories(dt: str):
- directories = CONFIG.get('d', [])
- if isinstance(directories, str):
- directories = [directories]
- for each_dir in directories:
- sql_files_in_dir = list_files(each_dir, extension='.sql')
- sql_files_in_dir = sorted(sql_files_in_dir)
- if len(sql_files_in_dir) == 0:
- pretty_print(f'{NORM_RED}文件夹 {NORM_GRN}{each_dir}{NORM_RED} 中未找到任何可执行的SQL文件')
- usage(1)
- run_sql_files(sql_files_in_dir, dt)
- if __name__ == '__main__':
- pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
- CONFIG, _ = parse_args(sys.argv[1:])
- # 未提供任何参数或查看帮助
- if len(sys.argv) == 1 or exist(CONFIG, KL_HELP):
- usage(0)
- CONTINUE_NEXT_ON_ERROR = CONFIG.get('n')
- CONTINUE_ALL_ON_ERROR = CONFIG.get('0')
- ERROR_COUNT = ValueContainer(0)
- date_range = get_date_range(CONFIG.get(K_DT, get_yesterday()))
- udf_files = CONFIG.get('u', [])
- if isinstance(udf_files, str):
- udf_files = [udf_files]
- if not udf_files.__contains__(COMMON_SPARK_UDF_FILE):
- udf_files.insert(0, COMMON_SPARK_UDF_FILE)
- extra_spark_config = parse_cli_spark_config()
- sql_parameters = parse_sql_parameters()
- if CONFIG.__contains__('f'):
- arg_sql_files = CONFIG.get('f', [])
- if isinstance(arg_sql_files, str):
- arg_sql_files = [arg_sql_files]
- if len(arg_sql_files) == 0:
- pretty_print(f'{NORM_RED}未提供任何可执行的SQL文件')
- usage(1)
- for each_dt in date_range:
- run_sql_files(arg_sql_files, each_dt)
- elif CONFIG.__contains__('d'):
- for each_dt in date_range:
- run_sql_directories(each_dt)
- else:
- usage(0)
- if not CONTINUE_ALL_ON_ERROR or CONTINUE_ALL_ON_ERROR is False:
- exit(ERROR_COUNT.get())
|