mysql_utils.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- coding:utf-8 -*-
  2. import json
  3. import re
  4. from typing import Dict, List
  5. import pymysql
  6. class MySQLColumn(object):
  7. def __init__(self,
  8. column_name: str,
  9. column_type: str,
  10. column_comment: str,
  11. ordinal_position: str,
  12. is_nullable: bool):
  13. self.COLUMN_NAME = column_name
  14. self.COLUMN_TYPE = column_type
  15. self.COLUMN_COMMENT = column_comment
  16. self.ORDINAL_POSITION = ordinal_position
  17. self.IS_NULLABLE = is_nullable
  18. self._dict = {
  19. 'COLUMN_NAME': column_name,
  20. 'COLUMN_TYPE': column_type,
  21. 'COLUMN_COMMENT': column_comment,
  22. 'ORDINAL_POSITION': ordinal_position,
  23. 'IS_NULLABLE': is_nullable,
  24. }
  25. def __str__(self):
  26. return json.dumps(self._dict, ensure_ascii=False)
  27. class MySQLHandler:
  28. def __init__(self, host: str, port: int, username: str, password: str, database: str = None):
  29. """
  30. MySQL工具包
  31. Args:
  32. host: 实例地址
  33. port: 端口
  34. username: 用户名
  35. password: 密码
  36. """
  37. self.jdbcUrl = "jdbc:mysql://%s:%s" % (host, port)
  38. self.username = username
  39. self.password = password
  40. self.connection = pymysql.connect(
  41. host=host,
  42. port=port,
  43. user=username,
  44. password=password,
  45. database=database,
  46. charset='utf8'
  47. )
  48. self.connection.autocommit(True)
  49. def list_tables(self,
  50. database: str = None,
  51. exclude_regex: List[str] = None,
  52. table_regex: List[str] = None) -> Dict[str, str]:
  53. """
  54. 列出指定数据库中的表
  55. Args:
  56. database: 数据库名称
  57. exclude_regex: 不要的数据表正则
  58. table_regex: 想要的数据表正则
  59. Returns: 表及注释
  60. """
  61. assert database is not None
  62. curs = self.connection.cursor()
  63. curs.execute('SET NAMES utf8')
  64. curs.execute(f'use {database}')
  65. sql = "SELECT TABLE_NAME, TABLE_COMMENT " \
  66. " FROM information_schema.TABLES " \
  67. f" WHERE TABLE_SCHEMA='{database}' AND TABLE_TYPE = 'BASE TABLE'"
  68. curs.execute(sql)
  69. rows = curs.fetchall()
  70. tables = {}
  71. for each_row in rows:
  72. if exclude_regex:
  73. exclude = False
  74. for regex in exclude_regex:
  75. if re.match(regex, each_row[0]):
  76. exclude = True
  77. break
  78. if exclude:
  79. continue
  80. if table_regex:
  81. match = False
  82. for regex in table_regex:
  83. if re.match(regex, each_row[0]):
  84. match = True
  85. break
  86. if not match:
  87. continue
  88. tables[each_row[0]] = each_row[1]
  89. return tables
  90. def list_columns(self, database: str, table_name: str) -> List[MySQLColumn]:
  91. """
  92. 列出指定数据库、指定表的字段及字段的其他信息
  93. Args:
  94. database: 数据库
  95. table_name: 表
  96. Returns: 字段及字段的其他信息
  97. """
  98. assert database is not None
  99. assert table_name is not None
  100. curs = self.connection.cursor()
  101. curs.execute('SET NAMES utf8')
  102. curs.execute(f'use {database}')
  103. detail_names = ['COLUMN_TYPE', 'COLUMN_COMMENT', 'ORDINAL_POSITION', 'IS_NULLABLE']
  104. sql = "SELECT COLUMN_NAME, %s" \
  105. " FROM information_schema.COLUMNS " \
  106. " WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'"
  107. sql = sql % (', '.join(detail_names), database, table_name)
  108. curs.execute(sql)
  109. rows = curs.fetchall()
  110. columns = []
  111. for each_row in rows:
  112. column_name = each_row[0]
  113. column_type = each_row[1]
  114. column_comment = each_row[2]
  115. ordinal_position = each_row[3]
  116. is_nullable = each_row[4]
  117. mysql_column = MySQLColumn(column_name, column_type, column_comment, ordinal_position, is_nullable)
  118. columns.append(mysql_column)
  119. return columns
  120. def query(self, sql: str):
  121. curs = self.connection.cursor()
  122. curs.execute('SET NAMES utf8')
  123. curs.execute(sql)
  124. rows = curs.fetchall()
  125. return rows
  126. def query_column_hive_metadata(self, table_name: str):
  127. curs = self.connection.cursor()
  128. curs.execute('SET NAMES utf8')
  129. sql = f'SELECT' \
  130. f' t.TBL_NAME,' \
  131. f' c.COLUMN_NAME,' \
  132. f' c.TYPE_NAME,' \
  133. f' c.`COMMENT` ' \
  134. f'FROM ' \
  135. f' hive.TBLS t' \
  136. f' LEFT JOIN hive.SDS s ON t.SD_ID = s.SD_ID' \
  137. f' LEFT JOIN hive.COLUMNS_V2 c ON s.CD_ID = c.CD_ID' \
  138. f' LEFT JOIN hive.TBLS tbs ON s.SD_ID = tbs.SD_ID ' \
  139. f'WHERE t.TBL_NAME = "{table_name}"'
  140. curs.execute(sql)
  141. column_info = curs.fetchall()
  142. return column_info
  143. def query_tbl_hive_metadata(self, table_name: str):
  144. curs = self.connection.cursor()
  145. curs.execute('SET NAMES utf8')
  146. sql = f'SELECT' \
  147. f' tp.PARAM_KEY,' \
  148. f' tp.PARAM_VALUE ' \
  149. f'FROM' \
  150. f' hive.TABLE_PARAMS tp' \
  151. f' LEFT JOIN hive.TBLS t ON tp.TBL_ID = t.TBL_ID ' \
  152. f'WHERE' \
  153. f' t.TBL_NAME = "{table_name}"'
  154. curs.execute(sql)
  155. column_info = curs.fetchall()
  156. return column_info
  157. if __name__ == '__main__':
  158. mysql_handler = MySQLHandler(
  159. 'rm-m5e76y41wq677ogz7.mysql.rds.aliyuncs.com',
  160. 3306,
  161. 'bigdata_sync',
  162. '76iW6SG2K6RGN2X68EQb'
  163. )
  164. database_ame = 'ik_bms_production'
  165. tables = mysql_handler.list_tables(database_ame)
  166. for table_name, table_comment in tables.items():
  167. print(f'{table_name}\t{table_comment}')
  168. columns = mysql_handler.list_columns(database_ame, table_name)
  169. for col in columns:
  170. print(col)
  171. break