sql_utils.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. import re
  4. from typing import List
  5. from dw_base import NORM_GRN, NORM_YEL
  6. from dw_base.utils.file_utils import read_file_lines
  7. from dw_base.utils.log_utils import pretty_print
  8. def check_parameter_substituted(sql: str, ignore: bool = False):
  9. """
  10. 检查是否有未替换的参数
  11. Args:
  12. sql: 要检查的SQL
  13. ignore: 有未替换的参数时是否忽略
  14. Returns:
  15. """
  16. sql = re.sub(r'[\r\n]+', ' ', sql)
  17. match = re.findall('\\${(.*?)}', sql)
  18. if match and len(match) > 0:
  19. parameters = set(match)
  20. if ignore:
  21. pretty_print(f'{NORM_YEL}Parameter {NORM_GRN}{", ".join(parameters)}{NORM_YEL} is not provided')
  22. else:
  23. raise Exception(f'Parameter {", ".join(parameters)} is not provided')
  24. def get_sql_list_from_file(sql_file: str, trim_comment: bool = False) -> List[str]:
  25. """
  26. 从文件读取SQL语句列表
  27. Args:
  28. sql_file: SQL文件
  29. trim_comment: 是否去除注释
  30. Returns: SQL语句列表
  31. """
  32. sql_lines = read_file_lines(sql_file)
  33. sql_list = []
  34. sql_buffer = ''
  35. for line in sql_lines:
  36. if trim_comment:
  37. if line.strip() == '' or line.strip().startswith('--'):
  38. continue
  39. if line.__contains__('--'):
  40. cleaned_line = line[:line.rindex('--')].strip()
  41. if len(cleaned_line) > 0:
  42. sql_buffer += cleaned_line
  43. continue
  44. if line.strip() == ';':
  45. # 新行是分号
  46. if sql_buffer != '':
  47. sql_list.append(sql_buffer)
  48. sql_buffer = ''
  49. continue
  50. if line.strip().endswith(';'):
  51. # 新行以分号结尾
  52. sql_list.append(sql_buffer + line.strip().strip(';'))
  53. sql_buffer = ''
  54. continue
  55. sql_buffer += line
  56. if sql_buffer != '':
  57. sql_list.append(sql_buffer)
  58. return sql_list