from fastapi import APIRouter, HTTPException, Depends, Query from mysql.connector.pooling import PooledMySQLConnection import os import json import math from pathlib import Path from typing import List, Dict, Any, Optional from PIL import Image from app.core.logger import get_logger from app.core.config import settings from app.core.database_loader import get_db_connection from app.crud import crud_card from app.utils.scheme import ImageType logger = get_logger(__name__) router = APIRouter() # 定义新的缺陷图片存储路径 DEFECT_IMAGE_DIR = settings.DEFECT_IMAGE_DIR def _get_active_json(image_data: Any) -> Optional[Dict]: """获取有效的json数据,优先 modified_json""" if not image_data: return None # image_data 可能是 Pydantic 对象或 字典,做兼容处理 if hasattr(image_data, "modified_json"): mj = image_data.modified_json dj = image_data.detection_json else: mj = image_data.get("modified_json") dj = image_data.get("detection_json") # 注意:根据 schema.py,这里读出来已经是 dict 了,不需要 json.loads # 如果数据库里存的是 null,读出来是 None if mj: return mj return dj def _crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str: """ 切割缺陷图片为正方形 min_rect 结构: [[center_x, center_y], [width, height], angle] """ try: # 构建绝对路径 # 假设 original_image_path_str 是 "/Data/..." 格式 rel_path = original_image_path_str.lstrip('/\\') full_path = settings.BASE_PATH / rel_path if not full_path.exists(): logger.warning(f"原图不存在: {full_path}") return "" with Image.open(full_path) as img: img_w, img_h = img.size # 解析 min_rect # min_rect[0] 是中心点 [x, y] # min_rect[1] 是宽高 [w, h] center_x, center_y = min_rect[0] rect_w, rect_h = min_rect[1] # 确定裁剪的正方形边长:取宽高的最大值,并适当外扩 (例如 1.5 倍) 以展示周围环境 # 如果缺陷非常小,设置一个最小尺寸(例如 100px),避免切图太模糊 side_length = max(rect_w, rect_h) * 1.5 side_length = max(side_length, 100) half_side = side_length / 2 # 计算裁剪框 (left, top, right, bottom) left = center_x - half_side top = center_y - half_side right = center_x + half_side bottom = center_y + half_side # 边界检查,防止超出图片范围 # 如果只是想保持正方形,超出部分可以填黑,或者简单的移动框的位置 # 这里简单处理:如果超出边界,就移动框,实在移不动就截断 if left < 0: right -= left # 往右移 left = 0 if top < 0: bottom -= top # 往下移 top = 0 if right > img_w: left -= (right - img_w) # 往左移 right = img_w if bottom > img_h: top -= (bottom - img_h) # 往上移 bottom = img_h # 再次检查防止负数(比如图片本身比框还小) left = max(0, left) top = max(0, top) right = min(img_w, right) bottom = min(img_h, bottom) crop_box = (left, top, right, bottom) cropped_img = img.crop(crop_box) # 保存 save_path = DEFECT_IMAGE_DIR / output_filename cropped_img.save(save_path, quality=95) # 返回 URL 路径 (相对于项目根目录的 web 路径) return f"/DefectImage/{output_filename}" except Exception as e: logger.error(f"切割图片失败: {e}") return "" @router.get("/generate", status_code=200, summary="生成评级报告数据") def generate_rating_report( card_id: int, db_conn: PooledMySQLConnection = Depends(get_db_connection) ): top_n_defects = 3 """ 根据 Card ID 生成评级报告 JSON """ # 1. 获取卡片详情 (复用 Crud 逻辑,确保能拿到所有图片) card_data = crud_card.get_card_with_details(db_conn, card_id) if not card_data: raise HTTPException(status_code=404, detail="未找到该卡片信息") # 初始化返回结构 response_data = { "backImageUrl": "", "frontImageUrl": "", "cardNo": "", "centerBack": "", "centerFront": "", "measureLength": 0.0, "measureWidth": 0.0, "cornerBackNum": 0, "sideBackNum": 0, "surfaceBackNum": 0, "cornerFrontNum": 0, "sideFrontNum": 0, "surfaceFrontNum": 0, "popNum": 0, # 暂时无数据来源,置0 "scoreThreshold": float(card_data.get("detection_score") or 0), "evaluateNo": str(card_data.get("id")), "recognizedInfoDTO": { "cardSet": "", "player": "", "series": "", "year": "" }, "defectDetailList": [] } # 临时列表用于收集所有缺陷,最后排序取 Top N all_defects_collected = [] # 遍历图片寻找 Front Ring 和 Back Ring images = card_data.get("images", []) # 辅助字典:defect_type 到 统计字段 的映射 defect_map_keys = { "front_ring": { "corner": "cornerFrontNum", "edge": "sideFrontNum", "face": "surfaceFrontNum" }, "back_ring": { "corner": "cornerBackNum", "edge": "sideBackNum", "face": "surfaceBackNum" } } for img in images: img_type = img.image_type # 只处理环光图 if img_type not in ["front_ring", "back_ring"]: continue # 设置主图 URL if img_type == "front_ring": response_data["frontImageUrl"] = img.image_path elif img_type == "back_ring": response_data["backImageUrl"] = img.image_path # 获取有效 JSON json_data = _get_active_json(img) if not json_data or "result" not in json_data: continue result_node = json_data["result"] # 1. 处理居中 (Center) center_inf = result_node.get("center_result", {}).get("box_result", {}).get("center_inference", {}) if center_inf: # 格式: L/R=47/53, T/B=51/49 (取整) # center_inference 包含 center_left, center_right, center_top, center_bottom c_str = ( f"L/R={int(round(center_inf.get('center_left', 0)))}/{int(round(center_inf.get('center_right', 0)))}, " f"T/B={int(round(center_inf.get('center_top', 0)))}/{int(round(center_inf.get('center_bottom', 0)))}" ) if img_type == "front_ring": response_data["centerFront"] = c_str # 2. 处理尺寸 (仅从正面取,或者只要有就取) - mm 转 cm,除以 10,保留2位 rw_mm = center_inf.get("real_width_mm", 0) rh_mm = center_inf.get("real_height_mm", 0) response_data["measureWidth"] = round(rw_mm / 10.0, 2) response_data["measureLength"] = round(rh_mm / 10.0, 2) else: response_data["centerBack"] = c_str # 2. 处理缺陷 (Defects) defects = result_node.get("defect_result", {}).get("defects", []) for defect in defects: # 过滤 edit_type == 'del' if defect.get("edit_type") == "del": continue d_type = defect.get("defect_type", "") # corner, edge, face d_label = defect.get("label", "") # scratch, wear, etc. # 统计数量 count_key = defect_map_keys.get(img_type, {}).get(d_type) if count_key: response_data[count_key] += 1 # 收集详细信息用于 Top N 列表 # 需要保存:缺陷对象本身,图片路径,正反面标识 side_str = "FRONT" if img_type == "front_ring" else "BACK" all_defects_collected.append({ "defect_data": defect, "image_path": img.image_path, "side": side_str, "area": defect.get("actual_area", 0) }) # 3. 处理 defectDetailList (Top N 切图) # 按实际面积从大到小排序 all_defects_collected.sort(key=lambda x: x["area"], reverse=True) top_defects = all_defects_collected[:top_n_defects] final_defect_list = [] for idx, item in enumerate(top_defects, start=1): defect = item["defect_data"] side = item["side"] original_img_path = item["image_path"] # 构造 ID d_id = idx # 1, 2, 3 # 构造文件名: {card_id}_{seq_id}.jpg filename = f"{card_id}_{d_id}.jpg" # 执行切图 min_rect = defect.get("min_rect") defect_img_url = "" location_str = "" if min_rect and len(min_rect) == 3: # 切图并保存 defect_img_url = _crop_defect_image(original_img_path, min_rect, filename) # 计算 Location (中心坐标) # min_rect[0] 是 [x, y] cx, cy = min_rect[0] location_str = f"{int(cx)},{int(cy)}" # 构造 Type 字符串: defect_type + label (大写) # 例如: defect_type="edge", label="wear" -> "EDGE WEAR" d_type_raw = defect.get("defect_type", "") d_label_raw = defect.get("label", "") type_str = f"{d_type_raw.upper()} {d_label_raw.upper()}".strip() final_defect_list.append({ "id": d_id, "side": side, "location": location_str, "type": type_str, "defectImgUrl": defect_img_url }) response_data["defectDetailList"] = final_defect_list return response_data