| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- import re
- from typing import List
- from dw_base import NORM_GRN, NORM_YEL
- from dw_base.utils.file_utils import read_file_lines
- from dw_base.utils.log_utils import pretty_print
- def check_parameter_substituted(sql: str, ignore: bool = False):
- """
- 检查是否有未替换的参数
- Args:
- sql: 要检查的SQL
- ignore: 有未替换的参数时是否忽略
- Returns:
- """
- sql = re.sub(r'[\r\n]+', ' ', sql)
- match = re.findall('\\${(.*?)}', sql)
- if match and len(match) > 0:
- parameters = set(match)
- if ignore:
- pretty_print(f'{NORM_YEL}Parameter {NORM_GRN}{", ".join(parameters)}{NORM_YEL} is not provided')
- else:
- raise Exception(f'Parameter {", ".join(parameters)} is not provided')
- def get_sql_list_from_file(sql_file: str, trim_comment: bool = False) -> List[str]:
- """
- 从文件读取SQL语句列表
- Args:
- sql_file: SQL文件
- trim_comment: 是否去除注释
- Returns: SQL语句列表
- """
- sql_lines = read_file_lines(sql_file)
- sql_list = []
- sql_buffer = ''
- for line in sql_lines:
- if trim_comment:
- if line.strip() == '' or line.strip().startswith('--'):
- continue
- if line.__contains__('--'):
- cleaned_line = line[:line.rindex('--')].strip()
- if len(cleaned_line) > 0:
- sql_buffer += cleaned_line
- continue
- if line.strip() == ';':
- # 新行是分号
- if sql_buffer != '':
- sql_list.append(sql_buffer)
- sql_buffer = ''
- continue
- if line.strip().endswith(';'):
- # 新行以分号结尾
- sql_list.append(sql_buffer + line.strip().strip(';'))
- sql_buffer = ''
- continue
- sql_buffer += line
- if sql_buffer != '':
- sql_list.append(sql_buffer)
- return sql_list
|