hive_diff_database.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. """
  2. Hive 表比较与同步脚本
  3. 功能:
  4. 1. 比较两个 Hive 集群中指定数据库的表结构,找出新集群中缺失的表。
  5. 2. 支持排除指定前缀或后缀的表。
  6. 3. 支持自动在新集群中创建缺失的表。
  7. 4. 支持在新集群中执行 `MSCK REPAIR TABLE` 修复分区元数据。
  8. 使用方法:
  9. 1.运行脚本:
  10. python script.py -d <database> [-p <exclude_prefixes>] [-s <exclude_suffixes>] [-a <auto_create>]
  11. 参数说明:
  12. -d, --database: 数据库名称(必填)。
  13. -p, --exclude_prefixes: 需要排除的表名前缀列表,用逗号分隔(例如:tmp_,bak_)。
  14. -s, --exclude_suffixes: 需要排除的表名后缀列表,用逗号分隔(例如:_old,_backup)。
  15. -a, --auto_create: 是否自动创建缺失的表(True/False,默认为 False)。
  16. 示例:
  17. 1. 比较数据库 ods 的表结构:
  18. python script.py -d ods
  19. 2. 排除前缀 tmp_,bak_ 和后缀 _old,_backup:
  20. python script.py -d ods -p tmp_,bak_ -s _old,_backup
  21. 3. 自动创建缺失的表:
  22. python script.py -d ods -a True
  23. 4. 排除前缀和后缀并自动创建缺失的表:
  24. python script.py -d ods -p tmp_,bak_ -s _old,_backup -a True
  25. """
  26. from pyhive import hive
  27. import argparse
  28. # 定义连接Hive的函数
  29. def get_hive_tables(host, port, database):
  30. """
  31. 连接Hive数据库并获取指定数据库中的所有表名
  32. :param host: Hive服务器地址
  33. :param port: Hive服务器端口
  34. :param database: 数据库名称
  35. :return: 表名列表
  36. """
  37. connection = hive.Connection(host=host, port=port, database=database)
  38. connection = hive.Connection(host=host, port=port, database=database, username="hive")
  39. cursor = connection.cursor()
  40. cursor.execute('SHOW TABLES')
  41. tables = cursor.fetchall()
  42. cursor.close()
  43. connection.close()
  44. return [table[0] for table in tables]
  45. def filter_tables(tables, exclude_prefixes=None, exclude_suffixes=None):
  46. """
  47. 过滤表名,排除指定的前缀和后缀
  48. :param tables: 表名列表
  49. :param exclude_prefixes: 需要排除的前缀列表
  50. :param exclude_suffixes: 需要排除的后缀列表
  51. :return: 过滤后的表名列表
  52. """
  53. if exclude_prefixes is None:
  54. exclude_prefixes = []
  55. if exclude_suffixes is None:
  56. exclude_suffixes = []
  57. filtered_tables = []
  58. for table in tables:
  59. # 检查表名是否以指定的前缀开头
  60. if any(table.startswith(prefix) for prefix in exclude_prefixes):
  61. continue
  62. # 检查表名是否以指定的后缀结尾
  63. if any(table.endswith(suffix) for suffix in exclude_suffixes):
  64. continue
  65. filtered_tables.append(table)
  66. return filtered_tables
  67. def get_create_table_sql(host, port, database, table_name):
  68. """
  69. 获取指定表的完整建表语句
  70. :param host: Hive服务器地址
  71. :param port: Hive服务器端口
  72. :param database: 数据库名称
  73. :param table_name: 表名
  74. :return: 完整的建表语句
  75. """
  76. connection = hive.Connection(host=host, port=port, database=database)
  77. connection = hive.Connection(host=host, port=port, database=database, username="hive")
  78. cursor = connection.cursor()
  79. cursor.execute(f'SHOW CREATE TABLE {database}.{table_name}')
  80. # 逐行读取建表语句
  81. create_table_sql = ""
  82. for row in cursor:
  83. create_table_sql += row[0] + "\n"
  84. cursor.close()
  85. connection.close()
  86. return create_table_sql.strip() # 去除末尾的换行符
  87. def execute_sql(host, port, database, sql):
  88. """
  89. 在指定的Hive服务器上执行SQL语句
  90. :param host: Hive服务器地址
  91. :param port: Hive服务器端口
  92. :param database: 数据库名称
  93. :param sql: 要执行的SQL语句
  94. """
  95. connection = hive.Connection(host=host, port=port, database=database)
  96. connection = hive.Connection(host=host, port=port, database=database, username="hive")
  97. cursor = connection.cursor()
  98. try:
  99. cursor.execute(sql)
  100. print(f"执行成功: {sql}")
  101. except Exception as e:
  102. print(f"执行失败: {sql}\n错误信息: {e}")
  103. finally:
  104. cursor.close()
  105. connection.close()
  106. def compare_hive_tables(database, exclude_prefixes=None, exclude_suffixes=None, auto_create=False):
  107. """
  108. 比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句
  109. :param database: 数据库名称
  110. :param exclude_prefixes: 需要排除的前缀列表
  111. :param exclude_suffixes: 需要排除的后缀列表
  112. :param auto_create: 是否自动在new_host上创建缺失的表
  113. """
  114. # Hive连接信息
  115. old_host = '192.168.30.3'
  116. new_host = '192.168.30.23'
  117. port = 10000
  118. # 获取两个数据库的表名
  119. old_tables = get_hive_tables(old_host, port, database)
  120. new_tables = get_hive_tables(new_host, port, database)
  121. # 过滤表名
  122. old_tables_filtered = filter_tables(old_tables, exclude_prefixes, exclude_suffixes)
  123. new_tables_filtered = filter_tables(new_tables, exclude_prefixes, exclude_suffixes)
  124. # 将表名转换为集合
  125. set_tables_old = set(old_tables_filtered)
  126. set_tables_new = set(new_tables_filtered)
  127. # 比较表结构并重新建表
  128. # common_tables = set_tables_old.intersection(set_tables_new)
  129. # if common_tables:
  130. # print("\n检查表结构并重新建表(如果结构不同):")
  131. # for table_name in common_tables:
  132. # try:
  133. # old_table_sql = get_create_table_sql(old_host, port, database, table_name)
  134. # new_table_sql = get_create_table_sql(new_host, port, database, table_name)
  135. # old_table_sql_c = old_table_sql.split("TBLPROPERTIES")[0]
  136. # new_table_sql_c = new_table_sql.split("TBLPROPERTIES")[0]
  137. # if old_table_sql_c != new_table_sql_c:
  138. # print(f"表结构不同,需要重新建表:{table_name}")
  139. # if auto_create:
  140. # # 删除旧表
  141. # drop_table_sql = f"DROP TABLE IF EXISTS {database}.{table_name}"
  142. # execute_sql(new_host, port, database, drop_table_sql)
  143. # # 创建新表
  144. # execute_sql(new_host, port, database, old_table_sql)
  145. # print(f"表 {table_name} 重新创建完成。")
  146. # else:
  147. # print(f"表结构相同,无需重新建表:{table_name}")
  148. # except Exception as e:
  149. # print(f"无法比较表 {table_name} 的结构: {e}")
  150. #
  151. # 找出缺失的表名
  152. missing_in_new = set_tables_old - set_tables_new
  153. # 打印缺失的表名
  154. print(f"新集群hive-({database})缺失的表: {missing_in_new}")
  155. # 获取并打印缺失表的完整建表语句
  156. if missing_in_new:
  157. print("\n缺失表的建表语句:")
  158. for table_name in missing_in_new:
  159. try:
  160. create_table_sql = get_create_table_sql(old_host, port, database, table_name)
  161. print(f"缺失的表:{table_name}=========================================")
  162. print(create_table_sql)
  163. # 如果启用自动建表功能,则在new_host上执行建表语句
  164. if auto_create:
  165. execute_sql(new_host, port, database, create_table_sql)
  166. except Exception as e:
  167. print(f"无法获取表 {table_name} 的建表语句: {e}")
  168. # 在new_host上执行MSCK REPAIR TABLE
  169. if auto_create:
  170. print("\n在new_host上执行MSCK REPAIR TABLE:")
  171. for table_name in old_tables:
  172. try:
  173. msck_sql = f"MSCK REPAIR TABLE {database}.{table_name}"
  174. print("执行MSCK REPAIR TABLE修复表:",msck_sql)
  175. execute_sql(new_host, port, database, msck_sql)
  176. except Exception as e:
  177. print(f"执行MSCK REPAIR TABLE失败: {database}.{table_name}\n错误信息: {e}")
  178. # 主程序
  179. if __name__ == "__main__":
  180. # 使用 argparse 解析命令行参数
  181. parser = argparse.ArgumentParser(
  182. description="比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句, 最后开始修复数据")
  183. parser.add_argument('-database', '-d', type=str, required=True, help="数据库名称")
  184. parser.add_argument('-exclude_prefixes', '-p', type=str, default="", help="需要排除的前缀列表,用逗号分隔")
  185. parser.add_argument('-exclude_suffixes', '-s', type=str, default="", help="需要排除的后缀列表,用逗号分隔")
  186. parser.add_argument('-auto_create', '-a', type=str, default="False",
  187. help="是否自动在new_host上创建缺失的表 (True/False)")
  188. # 解析参数
  189. args = parser.parse_args()
  190. # 处理 exclude_prefixes 和 exclude_suffixes
  191. exclude_prefixes = args.exclude_prefixes.split(",") if args.exclude_prefixes else []
  192. exclude_suffixes = args.exclude_suffixes.split(",") if args.exclude_suffixes else []
  193. print(f"exclude_prefixes: {exclude_prefixes}, exclude_suffixes: {exclude_suffixes}")
  194. # 处理 auto_create
  195. auto_create = args.auto_create.lower() == "true"
  196. # 调用比较函数
  197. compare_hive_tables(
  198. database=args.database,
  199. exclude_prefixes=exclude_prefixes,
  200. exclude_suffixes=exclude_suffixes,
  201. auto_create=auto_create
  202. )
  203. print("程序执行完成!")