| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- """
- Hive 表比较与同步脚本
- 功能:
- 1. 比较两个 Hive 集群中指定数据库的表结构,找出新集群中缺失的表。
- 2. 支持排除指定前缀或后缀的表。
- 3. 支持自动在新集群中创建缺失的表。
- 4. 支持在新集群中执行 `MSCK REPAIR TABLE` 修复分区元数据。
- 使用方法:
- 1.运行脚本:
- python script.py -d <database> [-p <exclude_prefixes>] [-s <exclude_suffixes>] [-a <auto_create>]
- 参数说明:
- -d, --database: 数据库名称(必填)。
- -p, --exclude_prefixes: 需要排除的表名前缀列表,用逗号分隔(例如:tmp_,bak_)。
- -s, --exclude_suffixes: 需要排除的表名后缀列表,用逗号分隔(例如:_old,_backup)。
- -a, --auto_create: 是否自动创建缺失的表(True/False,默认为 False)。
- 示例:
- 1. 比较数据库 ods 的表结构:
- python script.py -d ods
- 2. 排除前缀 tmp_,bak_ 和后缀 _old,_backup:
- python script.py -d ods -p tmp_,bak_ -s _old,_backup
- 3. 自动创建缺失的表:
- python script.py -d ods -a True
- 4. 排除前缀和后缀并自动创建缺失的表:
- python script.py -d ods -p tmp_,bak_ -s _old,_backup -a True
- """
- from pyhive import hive
- import argparse
- # 定义连接Hive的函数
- def get_hive_tables(host, port, database):
- """
- 连接Hive数据库并获取指定数据库中的所有表名
- :param host: Hive服务器地址
- :param port: Hive服务器端口
- :param database: 数据库名称
- :return: 表名列表
- """
- connection = hive.Connection(host=host, port=port, database=database)
- connection = hive.Connection(host=host, port=port, database=database, username="hive")
- cursor = connection.cursor()
- cursor.execute('SHOW TABLES')
- tables = cursor.fetchall()
- cursor.close()
- connection.close()
- return [table[0] for table in tables]
- def filter_tables(tables, exclude_prefixes=None, exclude_suffixes=None):
- """
- 过滤表名,排除指定的前缀和后缀
- :param tables: 表名列表
- :param exclude_prefixes: 需要排除的前缀列表
- :param exclude_suffixes: 需要排除的后缀列表
- :return: 过滤后的表名列表
- """
- if exclude_prefixes is None:
- exclude_prefixes = []
- if exclude_suffixes is None:
- exclude_suffixes = []
- filtered_tables = []
- for table in tables:
- # 检查表名是否以指定的前缀开头
- if any(table.startswith(prefix) for prefix in exclude_prefixes):
- continue
- # 检查表名是否以指定的后缀结尾
- if any(table.endswith(suffix) for suffix in exclude_suffixes):
- continue
- filtered_tables.append(table)
- return filtered_tables
- def get_create_table_sql(host, port, database, table_name):
- """
- 获取指定表的完整建表语句
- :param host: Hive服务器地址
- :param port: Hive服务器端口
- :param database: 数据库名称
- :param table_name: 表名
- :return: 完整的建表语句
- """
- connection = hive.Connection(host=host, port=port, database=database)
- connection = hive.Connection(host=host, port=port, database=database, username="hive")
- cursor = connection.cursor()
- cursor.execute(f'SHOW CREATE TABLE {database}.{table_name}')
- # 逐行读取建表语句
- create_table_sql = ""
- for row in cursor:
- create_table_sql += row[0] + "\n"
- cursor.close()
- connection.close()
- return create_table_sql.strip() # 去除末尾的换行符
- def execute_sql(host, port, database, sql):
- """
- 在指定的Hive服务器上执行SQL语句
- :param host: Hive服务器地址
- :param port: Hive服务器端口
- :param database: 数据库名称
- :param sql: 要执行的SQL语句
- """
- connection = hive.Connection(host=host, port=port, database=database)
- connection = hive.Connection(host=host, port=port, database=database, username="hive")
- cursor = connection.cursor()
- try:
- cursor.execute(sql)
- print(f"执行成功: {sql}")
- except Exception as e:
- print(f"执行失败: {sql}\n错误信息: {e}")
- finally:
- cursor.close()
- connection.close()
- def compare_hive_tables(database, exclude_prefixes=None, exclude_suffixes=None, auto_create=False):
- """
- 比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句
- :param database: 数据库名称
- :param exclude_prefixes: 需要排除的前缀列表
- :param exclude_suffixes: 需要排除的后缀列表
- :param auto_create: 是否自动在new_host上创建缺失的表
- """
- # Hive连接信息
- old_host = '192.168.30.3'
- new_host = '192.168.30.23'
- port = 10000
- # 获取两个数据库的表名
- old_tables = get_hive_tables(old_host, port, database)
- new_tables = get_hive_tables(new_host, port, database)
- # 过滤表名
- old_tables_filtered = filter_tables(old_tables, exclude_prefixes, exclude_suffixes)
- new_tables_filtered = filter_tables(new_tables, exclude_prefixes, exclude_suffixes)
- # 将表名转换为集合
- set_tables_old = set(old_tables_filtered)
- set_tables_new = set(new_tables_filtered)
- # 比较表结构并重新建表
- # common_tables = set_tables_old.intersection(set_tables_new)
- # if common_tables:
- # print("\n检查表结构并重新建表(如果结构不同):")
- # for table_name in common_tables:
- # try:
- # old_table_sql = get_create_table_sql(old_host, port, database, table_name)
- # new_table_sql = get_create_table_sql(new_host, port, database, table_name)
- # old_table_sql_c = old_table_sql.split("TBLPROPERTIES")[0]
- # new_table_sql_c = new_table_sql.split("TBLPROPERTIES")[0]
- # if old_table_sql_c != new_table_sql_c:
- # print(f"表结构不同,需要重新建表:{table_name}")
- # if auto_create:
- # # 删除旧表
- # drop_table_sql = f"DROP TABLE IF EXISTS {database}.{table_name}"
- # execute_sql(new_host, port, database, drop_table_sql)
- # # 创建新表
- # execute_sql(new_host, port, database, old_table_sql)
- # print(f"表 {table_name} 重新创建完成。")
- # else:
- # print(f"表结构相同,无需重新建表:{table_name}")
- # except Exception as e:
- # print(f"无法比较表 {table_name} 的结构: {e}")
- #
- # 找出缺失的表名
- missing_in_new = set_tables_old - set_tables_new
- # 打印缺失的表名
- print(f"新集群hive-({database})缺失的表: {missing_in_new}")
- # 获取并打印缺失表的完整建表语句
- if missing_in_new:
- print("\n缺失表的建表语句:")
- for table_name in missing_in_new:
- try:
- create_table_sql = get_create_table_sql(old_host, port, database, table_name)
- print(f"缺失的表:{table_name}=========================================")
- print(create_table_sql)
- # 如果启用自动建表功能,则在new_host上执行建表语句
- if auto_create:
- execute_sql(new_host, port, database, create_table_sql)
- except Exception as e:
- print(f"无法获取表 {table_name} 的建表语句: {e}")
- # 在new_host上执行MSCK REPAIR TABLE
- if auto_create:
- print("\n在new_host上执行MSCK REPAIR TABLE:")
- for table_name in old_tables:
- try:
- msck_sql = f"MSCK REPAIR TABLE {database}.{table_name}"
- print("执行MSCK REPAIR TABLE修复表:",msck_sql)
- execute_sql(new_host, port, database, msck_sql)
- except Exception as e:
- print(f"执行MSCK REPAIR TABLE失败: {database}.{table_name}\n错误信息: {e}")
- # 主程序
- if __name__ == "__main__":
- # 使用 argparse 解析命令行参数
- parser = argparse.ArgumentParser(
- description="比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句, 最后开始修复数据")
- parser.add_argument('-database', '-d', type=str, required=True, help="数据库名称")
- parser.add_argument('-exclude_prefixes', '-p', type=str, default="", help="需要排除的前缀列表,用逗号分隔")
- parser.add_argument('-exclude_suffixes', '-s', type=str, default="", help="需要排除的后缀列表,用逗号分隔")
- parser.add_argument('-auto_create', '-a', type=str, default="False",
- help="是否自动在new_host上创建缺失的表 (True/False)")
- # 解析参数
- args = parser.parse_args()
- # 处理 exclude_prefixes 和 exclude_suffixes
- exclude_prefixes = args.exclude_prefixes.split(",") if args.exclude_prefixes else []
- exclude_suffixes = args.exclude_suffixes.split(",") if args.exclude_suffixes else []
- print(f"exclude_prefixes: {exclude_prefixes}, exclude_suffixes: {exclude_suffixes}")
- # 处理 auto_create
- auto_create = args.auto_create.lower() == "true"
- # 调用比较函数
- compare_hive_tables(
- database=args.database,
- exclude_prefixes=exclude_prefixes,
- exclude_suffixes=exclude_suffixes,
- auto_create=auto_create
- )
- print("程序执行完成!")
|