from datetime import datetime import os from typing import Optional, List from fastapi import APIRouter, Depends, HTTPException, Query from mysql.connector.pooling import PooledMySQLConnection from app.core.config import settings from app.core.logger import get_logger from app.core.database_loader import get_db_connection from app.utils.scheme import CardDetailResponse, CardImageResponse, CardListDetailResponse logger = get_logger(__name__) router = APIRouter() db_dependency = Depends(get_db_connection) @router.post("/created", response_model=CardDetailResponse, status_code=201, summary="创建一个新的卡牌记录") def create_card( card_name: Optional[str] = Query(None, summary="卡牌的名称"), db_conn: PooledMySQLConnection = db_dependency ): """创建一个新的卡牌实体,此时它不关联任何图片。""" cursor = None try: cursor = db_conn.cursor() query = f"INSERT INTO {settings.DB_CARD_TABLE_NAME} (card_name) VALUES (%s)" cursor.execute(query, (card_name,)) db_conn.commit() new_id = cursor.lastrowid logger.info(f"新卡牌已创建, ID: {new_id}") # 返回刚创建的空卡牌信息 return CardDetailResponse( id=new_id, card_name=card_name, created_at=datetime.now(), updated_at=datetime.now(), images=[] ) except Exception as e: db_conn.rollback() logger.error(f"创建卡牌失败: {e}") raise HTTPException(status_code=500, detail="数据库插入失败。") finally: if cursor: cursor.close() @router.get("/query/{id}", response_model=CardDetailResponse, summary="获取指定卡牌的详细信息") def get_card_details(id: int, db_conn: PooledMySQLConnection = db_dependency): """获取卡牌元数据以及所有与之关联的图片信息。""" cursor = None try: cursor = db_conn.cursor(dictionary=True) # 1. 获取卡牌信息 query_card = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s" cursor.execute(query_card, (id,)) card_data = cursor.fetchone() if not card_data: raise HTTPException(status_code=404, detail=f"ID为 {id} 的卡牌未找到。") # 2. 获取所有关联的图片信息 query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s" cursor.execute(query_images, (id,)) image_records = cursor.fetchall() images = [CardImageResponse.model_validate(row) for row in image_records] # 计算总分数(只有当图片数量为 4 时才计算) # 初始化 card_data["detection_score"] = None card_data["modified_score"] = None if len(images) == 4: try: # ---------- detection_score ---------- detection_score = 10.0 for img in images: try: add_val = img.detection_json.get("result", {}).get("_used_compute_deduct_score", 0) detection_score += float(add_val or 0) except Exception as e: logger.warning(f"解析 detection_json 分数失败 (image_id={img.id}): {e}") card_data["detection_score"] = detection_score # ---------- modified_score ---------- all_modified_none = all(img.modified_json is None for img in images) if not all_modified_none: modified_score = 10.0 for img in images: src = img.modified_json if img.modified_json is not None else img.detection_json try: add_val = src.get("result", {}).get("_used_compute_deduct_score", 0) modified_score += float(add_val or 0) except Exception as e: logger.warning(f"解析 modified_json 分数失败 (image_id={img.id}): {e}") card_data["modified_score"] = modified_score else: card_data["modified_score"] = None except Exception as e: logger.error(f"计算分数过程异常: {e}") # 组合成最终响应 card_response = CardDetailResponse.model_validate(card_data) card_response.images = images return card_response except Exception as e: logger.error(f"查询卡牌详情失败 ({id}): {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail="数据库查询失败。") finally: if cursor: cursor.close() @router.get("/card_list", response_model=List[CardListDetailResponse], summary="获取卡牌列表") def list_cards_detailed( start_id: Optional[int] = Query(None, description="筛选条件:起始 card_id"), end_id: Optional[int] = Query(None, description="筛选条件:结束 card_id"), skip: int = Query(0, ge=0, description="分页:跳过的记录数"), limit: int = Query(100, ge=1, le=1000, description="分页:每页的记录数"), db_conn: PooledMySQLConnection = db_dependency ): """ 获取卡牌的基础信息列表,支持按 id 范围筛选和分页。 """ cursor = None try: cursor = db_conn.cursor(dictionary=True) base_query = f"SELECT id, card_name, created_at, updated_at FROM {settings.DB_CARD_TABLE_NAME}" conditions = [] params = [] if start_id is not None: conditions.append("id >= %s") params.append(start_id) if end_id is not None: conditions.append("id <= %s") params.append(end_id) if conditions: base_query += " WHERE " + " AND ".join(conditions) base_query += " ORDER BY id DESC LIMIT %s OFFSET %s" params.extend([limit, skip]) cursor.execute(base_query, tuple(params)) results = cursor.fetchall() return [CardListDetailResponse.model_validate(row) for row in results] except Exception as e: logger.error(f"查询卡牌列表失败: {e}") raise HTTPException(status_code=500, detail="获取数据列表失败。") finally: if cursor: cursor.close() @router.delete("/delete/{id}", status_code=200, summary="删除卡牌及其所有关联图片") def delete_card(id: int, db_conn: PooledMySQLConnection = db_dependency): """ 删除一张卡牌及其所有关联的图片记录和物理文件。 利用了数据库的 ON DELETE CASCADE 特性。 """ cursor = None try: cursor = db_conn.cursor() # 1. 查询所有关联图片的物理文件路径,以便稍后删除 query_paths = f"SELECT image_path FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s" cursor.execute(query_paths, (id,)) image_paths_to_delete = [row[0] for row in cursor.fetchall()] # 2. 删除卡牌记录。数据库会自动级联删除 card_images 表中的相关记录 query_delete_card = f"DELETE FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s" cursor.execute(query_delete_card, (id,)) if cursor.rowcount == 0: raise HTTPException(status_code=404, detail=f"ID为 {id} 的卡牌未找到。") # 3. 删除物理文件 for path in image_paths_to_delete: absolute_path = settings.BASE_PATH / path.lstrip('/\\') if os.path.exists(absolute_path): try: os.remove(absolute_path) logger.info(f"图片文件已删除: {absolute_path}") except OSError as e: logger.error(f"删除文件失败 {absolute_path}: {e}") db_conn.commit() logger.info(f"ID {id} 的卡牌和关联数据已成功删除。") return {"message": f"成功删除卡牌 ID {id} 及其所有关联数据"} except Exception as e: db_conn.rollback() logger.error(f"删除卡牌失败 ({id}): {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail="删除卡牌失败。") finally: if cursor: cursor.close()