import mysql.connector from mysql.connector import errorcode from .config import settings from app.core.logger import get_logger logger = get_logger(__name__) # 全局的连接池 db_connection_pool = None import mysql.connector from mysql.connector import errorcode from .config import settings from app.core.logger import get_logger logger = get_logger(__name__) # 全局的连接池 db_connection_pool = None def init_database(): """ 初始化数据库:如果数据库或表不存在,则创建它们。 """ logger.info("--- 开始初始化数据库 ---") try: cnx = mysql.connector.connect(**settings.DATABASE_CONFIG) cursor = cnx.cursor() cursor.execute(f"CREATE DATABASE IF NOT EXISTS {settings.DB_NAME} DEFAULT CHARACTER SET 'utf8mb4'") logger.info(f"数据库 '{settings.DB_NAME}' 已准备就绪。") cnx.database = settings.DB_NAME # 1. 创建 card_images 表 (因为它被 cards 表引用) card_images_table = ( f"CREATE TABLE IF NOT EXISTS `{settings.DB_IMAGE_TABLE_NAME}` (" " `image_id` INT AUTO_INCREMENT PRIMARY KEY," " `image_name` VARCHAR(255) NULL," " `image_path` VARCHAR(512) NOT NULL," " `detection_json` JSON NOT NULL COMMENT '原始检测JSON数据'," " `modified_json` JSON NULL COMMENT '修改后的JSON数据'," " `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP," " `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'JSON数据最后修改时间'" ") ENGINE=InnoDB COMMENT='存储所有卡牌的图片及检测数据'" ) cursor.execute(card_images_table) logger.info(f"数据表 '{settings.DB_IMAGE_TABLE_NAME}' 已准备就绪。") # 2. 创建 cards 表 (不带外键) cards_table = ( f"CREATE TABLE IF NOT EXISTS `{settings.DB_CARD_TABLE_NAME}` (" " `card_id` INT AUTO_INCREMENT PRIMARY KEY," " `card_name` VARCHAR(255) NULL COMMENT '卡牌的通用名称'," " `front_face_id` INT NULL," " `back_face_id` INT NULL," " `front_edge_id` INT NULL," " `back_edge_id` INT NULL," " `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP," " `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" ") ENGINE=InnoDB COMMENT='存储实体卡牌及其核心图片引用'" ) cursor.execute(cards_table) logger.info(f"数据表 '{settings.DB_CARD_TABLE_NAME}' 已准备就绪。") # 3. 为 cards 表添加外键约束 (如果它们不存在) # 这是一个更健壮的方法,可以重复运行而不会出错 foreign_keys = { "fk_front_face": "front_face_id", "fk_back_face": "back_face_id", "fk_front_edge": "front_edge_id", "fk_back_edge": "back_edge_id" } for fk_name, col_name in foreign_keys.items(): try: # 检查外键是否存在 cursor.execute(f""" SELECT COUNT(*) FROM information_schema.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA = '{settings.DB_NAME}' AND TABLE_NAME = '{settings.DB_CARD_TABLE_NAME}' AND CONSTRAINT_NAME = '{fk_name}'; """) if cursor.fetchone()[0] == 0: alter_query = ( f"ALTER TABLE `{settings.DB_CARD_TABLE_NAME}` " f"ADD CONSTRAINT `{fk_name}` " f"FOREIGN KEY (`{col_name}`) " f"REFERENCES `{settings.DB_IMAGE_TABLE_NAME}`(`image_id`) " "ON DELETE SET NULL" ) cursor.execute(alter_query) logger.info(f"为表 '{settings.DB_CARD_TABLE_NAME}' 添加了外键 '{fk_name}'。") except mysql.connector.Error as err: # 1060: Duplicate column name; 1061: Duplicate key name; 1826: Duplicate foreign key if err.errno in [1060, 1061, 1826]: logger.warning(f"外键 '{fk_name}' 可能已存在。跳过。") else: raise err except mysql.connector.Error as err: logger.error(f"数据库初始化失败: {err}") exit(1) finally: if 'cursor' in locals() and cursor: cursor.close() if 'cnx' in locals() and cnx.is_connected(): cnx.close() logger.info("--- 数据库初始化完成 ---") def load_database_pool(): """ 在应用启动时创建数据库连接池。 """ global db_connection_pool if db_connection_pool is None: logger.info("--- 创建数据库连接池 ---") try: db_connection_pool = mysql.connector.pooling.MySQLConnectionPool( pool_name="mypool", pool_size=5, # 池中保持的连接数 **settings.DATABASE_CONFIG_WITH_DB ) logger.info("--- 数据库连接池创建成功 ---") except mysql.connector.Error as err: logger.error(f"创建数据库连接池失败: {err}") exit(1) def close_database_pool(): """ 在应用关闭时,不需要手动关闭连接池,连接器会自动处理。 这个函数留作备用。 """ logger.info("--- 数据库连接池将自动关闭 ---") # --- FastAPI 依赖注入 --- def get_db_connection(): """ 一个FastAPI依赖项,用于从池中获取数据库连接。 它确保连接在使用后返回到池中。 """ if db_connection_pool is None: raise RuntimeError("数据库连接池未初始化") db_conn = None try: db_conn = db_connection_pool.get_connection() yield db_conn except mysql.connector.Error as err: logger.error(f"获取数据库连接失败: {err}") # 这里可以根据需要抛出HTTPException finally: if db_conn and db_conn.is_connected(): db_conn.close()