spark-sql-starter.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. """
  4. -- 设置 SparkSession 名称(血缘分析)
  5. SET spark.app.name=crl_dwd.dwd_crl_xxx;
  6. -- 设置 Spark 配置
  7. SET spark.xxx.yyy.zzz=xyz;
  8. -- 引用 UDF
  9. ADD FILE dw_base/udf/business/spark_xxx_udf.py;
  10. -- 声明变量
  11. SET DT_START=20210101;
  12. SET TOPIC=xxx;
  13. SET KEY=CourtAnn;
  14. SET ODS_TABLE=crl_ods.ods_crl_xxx;
  15. -- 查看数据行数
  16. SET LIMIT=1000;
  17. """
  18. import os
  19. import sys
  20. from typing import Dict, List
  21. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  22. sys.path.append(base_dir)
  23. from dw_base import *
  24. from dw_base.common.config_constants import K_DT, KL_HELP
  25. from dw_base.common.container import ValueContainer
  26. from dw_base.spark.spark_sql import SparkSQL
  27. from dw_base.spark.spark_utils import analyse_session_name
  28. from dw_base.utils.common_utils import exist
  29. from dw_base.utils.config_utils import parse_args
  30. from dw_base.utils.datetime_utils import get_yesterday, get_date_range
  31. from dw_base.utils.file_utils import list_files
  32. from dw_base.utils.log_utils import pretty_print, CURRENT_LOG_FILE, get_log_file_path
  33. def usage(code: int):
  34. print(
  35. f'{NORM_MGT}Usage: {sys.argv[0]}\n'
  36. f'{NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法{DO_RESET}'
  37. )
  38. print(
  39. f'{NORM_MGT}Usage: {sys.argv[0]}\n'
  40. f'{NORM_GRN}\t<[-]-f< /=>sql file> 要执行的SQL文件,可传入多个,按传入顺序执行,与参数-d互斥\n'
  41. f'{NORM_GRN}\t<[-]-d< /=>sql file directory> 要执行的SQL文件夹,可传入多个,按传入顺序执行\n'
  42. f'{NORM_CYN}\t[[-]-u< /=>udf file] 被引用到的udf文件(.py),注意多个udf文件内定义的方法有重名时,只会取靠前的\n'
  43. f'{NORM_CYN}\t[[-]-sc spark-config] Spark自定义配置,格式为config:config-value或config=config-value\n'
  44. f'{NORM_CYN}\t[[-]-dt< /=>date with format] %Y%m%d 或 yyyyMMdd 格式的日期(命令行 > 默认)\n'
  45. f'{NORM_CYN}\t 可以以四种形式传入日期:\n'
  46. f'{NORM_CYN}\t 1. 20211101,表示具体日期\n'
  47. f'{NORM_CYN}\t 2. 20211101-,表示20211101至昨天\n'
  48. f'{NORM_CYN}\t 3. 20211101-20211107,表示20211101至20211107\n'
  49. f'{NORM_CYN}\t 4. 20211101,20211103,表示离散的日期20211101、20211103\n'
  50. f'{NORM_CYN}\t[[-]-p sql-parameter] SQL脚本的参数,格式为parameter:parameter-value或parameter=parameter-value\n'
  51. f'{NORM_CYN}\t[[-]-n] 在串行执行多个SQL文件时,一个失败并不影响后续文件的执行,但最后程序会非正常结束,即返回非0值(失败的文件个数)\n'
  52. f'{NORM_CYN}\t[[-]-0] 在串行执行多个SQL文件时,一个失败并不影响后续文件的执行,且最后程序正常结束(请谨慎使用)\n'
  53. f'{NORM_CYN}\t 上述两个参数都不传的情况下,某个SQL文件执行失败时,后续文件都不再执行'
  54. f'{DO_RESET}'
  55. )
  56. exit(code)
  57. def parse_cli_spark_config() -> Dict[str, str]:
  58. cli_spark_config = {}
  59. spark_configs = CONFIG.get('sc', [])
  60. if isinstance(spark_configs, str):
  61. spark_configs = [spark_configs]
  62. for elem in spark_configs:
  63. if elem.__contains__('='):
  64. spark_config, config_value = elem.split('=')
  65. elif elem.__contains__(':'):
  66. spark_config, config_value = elem.split(':')
  67. else:
  68. pretty_print(f'{NORM_YEL}无效的 Spark 配置 {NORM_GRN}{elem}')
  69. continue
  70. if cli_spark_config.__contains__(spark_config):
  71. pretty_print(f'{NORM_YEL}命令行多次传入了 Spark 配置 {NORM_GRN}{spark_config}'
  72. f'{NORM_YEL}, 原值 {NORM_GRN}{cli_spark_config[spark_config]} '
  73. f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{config_value}')
  74. cli_spark_config[spark_config] = config_value
  75. return cli_spark_config
  76. def parse_sql_parameters() -> Dict[str, str]:
  77. sql_parameters = {}
  78. parameter_configs = CONFIG.get('p', [])
  79. if isinstance(parameter_configs, str):
  80. parameter_configs = [parameter_configs]
  81. for elem in parameter_configs:
  82. if elem.__contains__('='):
  83. parameter_key, parameter_value = elem.split('=')
  84. elif elem.__contains__(':'):
  85. parameter_key, parameter_value = elem.split(':')
  86. else:
  87. pretty_print(f'{NORM_YEL}无效的 SQL 参数 {NORM_GRN}{elem}')
  88. continue
  89. if parameter_key == 'dt':
  90. continue
  91. if sql_parameters.__contains__(parameter_key):
  92. pretty_print(f'{NORM_YEL}命令行多次传入了 SQL 参数 {NORM_GRN}{parameter_key}'
  93. f'{NORM_YEL}, 原值 {NORM_GRN}{sql_parameters[parameter_key]} '
  94. f'{NORM_YEL}将被覆盖为新值 {NORM_GRN}{parameter_value}')
  95. sql_parameters[parameter_key] = parameter_value
  96. return sql_parameters
  97. def run_sql_file(sql_file: str, dt: str, is_last: bool):
  98. try:
  99. spark_session_name = CONFIG.get('ssn', analyse_session_name(sql_file))
  100. sql_file_name = os.path.splitext(os.path.basename(sql_file))[0]
  101. CURRENT_LOG_FILE.set(get_log_file_path('spark-sql', dt, sql_file_name))
  102. spark_sql = SparkSQL(spark_session_name, udf_files=udf_files, extra_spark_config=extra_spark_config)
  103. is_export = CONFIG.get('export')
  104. if is_last and is_export is True:
  105. sql_file_name = os.path.splitext(os.path.basename(sql_file))[0]
  106. delimiter = CONFIG.get('delimiter', ',')
  107. spark_sql.export_data(sql_file_name, sql_file, truncate=False,
  108. delimiter=delimiter, dt=dt, partition=1, **sql_parameters)
  109. else:
  110. spark_sql.execute(sql_file, check_parameter=True, dt=dt, **sql_parameters)
  111. except Exception as e:
  112. message = f'执行SQL文件 {sql_file} 失败(dt={dt})'
  113. if IS_RUN_IN_RELEASE_DIR and IS_RUN_BY_RELEASE_USER:
  114. message += f'(可访问网址 http://{HOST}/log/spark-sql/{dt}/{os.path.basename(CURRENT_LOG_FILE.get())} 查看日志)'
  115. print(message)
  116. ERROR_COUNT.set(ERROR_COUNT.get() + 1)
  117. if CONTINUE_ALL_ON_ERROR and CONTINUE_ALL_ON_ERROR is True:
  118. return
  119. elif CONTINUE_NEXT_ON_ERROR and CONTINUE_NEXT_ON_ERROR is True:
  120. return
  121. else:
  122. raise e
  123. def run_sql_files(sql_files: List[str], dt: str):
  124. for index in range(len(sql_files)):
  125. sql_file = sql_files[index]
  126. if sql_file == 'create_table.sql' or sql_file == 'create-table.sql':
  127. continue
  128. sql_file_dir = os.path.basename(os.path.basename(sql_file))
  129. if sql_file_dir == 'create_table' or sql_file_dir == 'create-table':
  130. continue
  131. run_sql_file(sql_file, dt, len(sql_files) == index + 1)
  132. def run_sql_directories(dt: str):
  133. directories = CONFIG.get('d', [])
  134. if isinstance(directories, str):
  135. directories = [directories]
  136. for each_dir in directories:
  137. sql_files_in_dir = list_files(each_dir, extension='.sql')
  138. sql_files_in_dir = sorted(sql_files_in_dir)
  139. if len(sql_files_in_dir) == 0:
  140. pretty_print(f'{NORM_RED}文件夹 {NORM_GRN}{each_dir}{NORM_RED} 中未找到任何可执行的SQL文件')
  141. usage(1)
  142. run_sql_files(sql_files_in_dir, dt)
  143. if __name__ == '__main__':
  144. pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
  145. CONFIG, _ = parse_args(sys.argv[1:])
  146. # 未提供任何参数或查看帮助
  147. if len(sys.argv) == 1 or exist(CONFIG, KL_HELP):
  148. usage(0)
  149. CONTINUE_NEXT_ON_ERROR = CONFIG.get('n')
  150. CONTINUE_ALL_ON_ERROR = CONFIG.get('0')
  151. ERROR_COUNT = ValueContainer(0)
  152. date_range = get_date_range(CONFIG.get(K_DT, get_yesterday()))
  153. udf_files = CONFIG.get('u', [])
  154. if isinstance(udf_files, str):
  155. udf_files = [udf_files]
  156. if not udf_files.__contains__(COMMON_SPARK_UDF_FILE):
  157. udf_files.insert(0, COMMON_SPARK_UDF_FILE)
  158. extra_spark_config = parse_cli_spark_config()
  159. sql_parameters = parse_sql_parameters()
  160. if CONFIG.__contains__('f'):
  161. arg_sql_files = CONFIG.get('f', [])
  162. if isinstance(arg_sql_files, str):
  163. arg_sql_files = [arg_sql_files]
  164. if len(arg_sql_files) == 0:
  165. pretty_print(f'{NORM_RED}未提供任何可执行的SQL文件')
  166. usage(1)
  167. for each_dt in date_range:
  168. run_sql_files(arg_sql_files, each_dt)
  169. elif CONFIG.__contains__('d'):
  170. for each_dt in date_range:
  171. run_sql_directories(each_dt)
  172. else:
  173. usage(0)
  174. if not CONTINUE_ALL_ON_ERROR or CONTINUE_ALL_ON_ERROR is False:
  175. exit(ERROR_COUNT.get())