""" Hive 表比较与同步脚本 功能: 1. 比较两个 Hive 集群中指定数据库的表结构,找出新集群中缺失的表。 2. 支持排除指定前缀或后缀的表。 3. 支持自动在新集群中创建缺失的表。 4. 支持在新集群中执行 `MSCK REPAIR TABLE` 修复分区元数据。 使用方法: 1.运行脚本: python script.py -d [-p ] [-s ] [-a ] 参数说明: -d, --database: 数据库名称(必填)。 -p, --exclude_prefixes: 需要排除的表名前缀列表,用逗号分隔(例如:tmp_,bak_)。 -s, --exclude_suffixes: 需要排除的表名后缀列表,用逗号分隔(例如:_old,_backup)。 -a, --auto_create: 是否自动创建缺失的表(True/False,默认为 False)。 示例: 1. 比较数据库 ods 的表结构: python script.py -d ods 2. 排除前缀 tmp_,bak_ 和后缀 _old,_backup: python script.py -d ods -p tmp_,bak_ -s _old,_backup 3. 自动创建缺失的表: python script.py -d ods -a True 4. 排除前缀和后缀并自动创建缺失的表: python script.py -d ods -p tmp_,bak_ -s _old,_backup -a True """ from pyhive import hive import argparse # 定义连接Hive的函数 def get_hive_tables(host, port, database): """ 连接Hive数据库并获取指定数据库中的所有表名 :param host: Hive服务器地址 :param port: Hive服务器端口 :param database: 数据库名称 :return: 表名列表 """ connection = hive.Connection(host=host, port=port, database=database) connection = hive.Connection(host=host, port=port, database=database, username="hive") cursor = connection.cursor() cursor.execute('SHOW TABLES') tables = cursor.fetchall() cursor.close() connection.close() return [table[0] for table in tables] def filter_tables(tables, exclude_prefixes=None, exclude_suffixes=None): """ 过滤表名,排除指定的前缀和后缀 :param tables: 表名列表 :param exclude_prefixes: 需要排除的前缀列表 :param exclude_suffixes: 需要排除的后缀列表 :return: 过滤后的表名列表 """ if exclude_prefixes is None: exclude_prefixes = [] if exclude_suffixes is None: exclude_suffixes = [] filtered_tables = [] for table in tables: # 检查表名是否以指定的前缀开头 if any(table.startswith(prefix) for prefix in exclude_prefixes): continue # 检查表名是否以指定的后缀结尾 if any(table.endswith(suffix) for suffix in exclude_suffixes): continue filtered_tables.append(table) return filtered_tables def get_create_table_sql(host, port, database, table_name): """ 获取指定表的完整建表语句 :param host: Hive服务器地址 :param port: Hive服务器端口 :param database: 数据库名称 :param table_name: 表名 :return: 完整的建表语句 """ connection = hive.Connection(host=host, port=port, database=database) connection = hive.Connection(host=host, port=port, database=database, username="hive") cursor = connection.cursor() cursor.execute(f'SHOW CREATE TABLE {database}.{table_name}') # 逐行读取建表语句 create_table_sql = "" for row in cursor: create_table_sql += row[0] + "\n" cursor.close() connection.close() return create_table_sql.strip() # 去除末尾的换行符 def execute_sql(host, port, database, sql): """ 在指定的Hive服务器上执行SQL语句 :param host: Hive服务器地址 :param port: Hive服务器端口 :param database: 数据库名称 :param sql: 要执行的SQL语句 """ connection = hive.Connection(host=host, port=port, database=database) connection = hive.Connection(host=host, port=port, database=database, username="hive") cursor = connection.cursor() try: cursor.execute(sql) print(f"执行成功: {sql}") except Exception as e: print(f"执行失败: {sql}\n错误信息: {e}") finally: cursor.close() connection.close() def compare_hive_tables(database, exclude_prefixes=None, exclude_suffixes=None, auto_create=False): """ 比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句 :param database: 数据库名称 :param exclude_prefixes: 需要排除的前缀列表 :param exclude_suffixes: 需要排除的后缀列表 :param auto_create: 是否自动在new_host上创建缺失的表 """ # Hive连接信息 old_host = '192.168.30.3' new_host = '192.168.30.23' port = 10000 # 获取两个数据库的表名 old_tables = get_hive_tables(old_host, port, database) new_tables = get_hive_tables(new_host, port, database) # 过滤表名 old_tables_filtered = filter_tables(old_tables, exclude_prefixes, exclude_suffixes) new_tables_filtered = filter_tables(new_tables, exclude_prefixes, exclude_suffixes) # 将表名转换为集合 set_tables_old = set(old_tables_filtered) set_tables_new = set(new_tables_filtered) # 比较表结构并重新建表 # common_tables = set_tables_old.intersection(set_tables_new) # if common_tables: # print("\n检查表结构并重新建表(如果结构不同):") # for table_name in common_tables: # try: # old_table_sql = get_create_table_sql(old_host, port, database, table_name) # new_table_sql = get_create_table_sql(new_host, port, database, table_name) # old_table_sql_c = old_table_sql.split("TBLPROPERTIES")[0] # new_table_sql_c = new_table_sql.split("TBLPROPERTIES")[0] # if old_table_sql_c != new_table_sql_c: # print(f"表结构不同,需要重新建表:{table_name}") # if auto_create: # # 删除旧表 # drop_table_sql = f"DROP TABLE IF EXISTS {database}.{table_name}" # execute_sql(new_host, port, database, drop_table_sql) # # 创建新表 # execute_sql(new_host, port, database, old_table_sql) # print(f"表 {table_name} 重新创建完成。") # else: # print(f"表结构相同,无需重新建表:{table_name}") # except Exception as e: # print(f"无法比较表 {table_name} 的结构: {e}") # # 找出缺失的表名 missing_in_new = set_tables_old - set_tables_new # 打印缺失的表名 print(f"新集群hive-({database})缺失的表: {missing_in_new}") # 获取并打印缺失表的完整建表语句 if missing_in_new: print("\n缺失表的建表语句:") for table_name in missing_in_new: try: create_table_sql = get_create_table_sql(old_host, port, database, table_name) print(f"缺失的表:{table_name}=========================================") print(create_table_sql) # 如果启用自动建表功能,则在new_host上执行建表语句 if auto_create: execute_sql(new_host, port, database, create_table_sql) except Exception as e: print(f"无法获取表 {table_name} 的建表语句: {e}") # 在new_host上执行MSCK REPAIR TABLE if auto_create: print("\n在new_host上执行MSCK REPAIR TABLE:") for table_name in old_tables: try: msck_sql = f"MSCK REPAIR TABLE {database}.{table_name}" print("执行MSCK REPAIR TABLE修复表:",msck_sql) execute_sql(new_host, port, database, msck_sql) except Exception as e: print(f"执行MSCK REPAIR TABLE失败: {database}.{table_name}\n错误信息: {e}") # 主程序 if __name__ == "__main__": # 使用 argparse 解析命令行参数 parser = argparse.ArgumentParser( description="比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句, 最后开始修复数据") parser.add_argument('-database', '-d', type=str, required=True, help="数据库名称") parser.add_argument('-exclude_prefixes', '-p', type=str, default="", help="需要排除的前缀列表,用逗号分隔") parser.add_argument('-exclude_suffixes', '-s', type=str, default="", help="需要排除的后缀列表,用逗号分隔") parser.add_argument('-auto_create', '-a', type=str, default="False", help="是否自动在new_host上创建缺失的表 (True/False)") # 解析参数 args = parser.parse_args() # 处理 exclude_prefixes 和 exclude_suffixes exclude_prefixes = args.exclude_prefixes.split(",") if args.exclude_prefixes else [] exclude_suffixes = args.exclude_suffixes.split(",") if args.exclude_suffixes else [] print(f"exclude_prefixes: {exclude_prefixes}, exclude_suffixes: {exclude_suffixes}") # 处理 auto_create auto_create = args.auto_create.lower() == "true" # 调用比较函数 compare_hive_tables( database=args.database, exclude_prefixes=exclude_prefixes, exclude_suffixes=exclude_suffixes, auto_create=auto_create ) print("程序执行完成!")