#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- import re from typing import List from dw_base import NORM_GRN, NORM_YEL from dw_base.utils.file_utils import read_file_lines from dw_base.utils.log_utils import pretty_print def check_parameter_substituted(sql: str, ignore: bool = False): """ 检查是否有未替换的参数 Args: sql: 要检查的SQL ignore: 有未替换的参数时是否忽略 Returns: """ sql = re.sub(r'[\r\n]+', ' ', sql) match = re.findall('\\${(.*?)}', sql) if match and len(match) > 0: parameters = set(match) if ignore: pretty_print(f'{NORM_YEL}Parameter {NORM_GRN}{", ".join(parameters)}{NORM_YEL} is not provided') else: raise Exception(f'Parameter {", ".join(parameters)} is not provided') def get_sql_list_from_file(sql_file: str, trim_comment: bool = False) -> List[str]: """ 从文件读取SQL语句列表 Args: sql_file: SQL文件 trim_comment: 是否去除注释 Returns: SQL语句列表 """ sql_lines = read_file_lines(sql_file) sql_list = [] sql_buffer = '' for line in sql_lines: if trim_comment: if line.strip() == '' or line.strip().startswith('--'): continue if line.__contains__('--'): cleaned_line = line[:line.rindex('--')].strip() if len(cleaned_line) > 0: sql_buffer += cleaned_line continue if line.strip() == ';': # 新行是分号 if sql_buffer != '': sql_list.append(sql_buffer) sql_buffer = '' continue if line.strip().endswith(';'): # 新行以分号结尾 sql_list.append(sql_buffer + line.strip().strip(';')) sql_buffer = '' continue # if line.strip().__contains__(';'): # # 新行含有分号(比较复杂的逻辑,如 like '%abc;def%'),如果分号左边的单引号个数是奇数个,应认为分号是作为参数的(没有实现,先一刀切认为是语句结尾吧) # parts = line.split(';') # sql_list.append(sql_buffer + parts[0]) # for index in range(1, len(parts) - 1): # sql_list.append(parts[index]) # sql_buffer = parts[-1] # continue sql_buffer += line if sql_buffer != '': sql_list.append(sql_buffer) return sql_list