from fastapi import APIRouter, HTTPException, Depends, Query from mysql.connector.pooling import PooledMySQLConnection import io from app.core.minio_client import minio_client from typing import List, Dict, Any, Optional from PIL import Image from app.core.logger import get_logger from app.core.config import settings from app.core.database_loader import get_db_connection from app.crud import crud_card from app.utils.scheme import ImageType logger = get_logger(__name__) router = APIRouter() def _get_active_json(image_data: Any) -> Optional[Dict]: """获取有效的json数据,优先 modified_json""" if not image_data: return None # image_data 可能是 Pydantic 对象或 字典,做兼容处理 if hasattr(image_data, "modified_json"): mj = image_data.modified_json dj = image_data.detection_json else: mj = image_data.get("modified_json") dj = image_data.get("detection_json") # 注意:根据 schema.py,这里读出来已经是 dict 了,不需要 json.loads # 如果数据库里存的是 null,读出来是 None if mj: return mj return dj def _crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str: """ 通过 MinIO 切割缺陷图片为正方形并保存 """ try: # ★ 将进来的全路径 URL 剥离为相对路径 (如 /Data/xxx.jpg) 供 MinIO 读取 rel_path = original_image_path_str.replace(settings.DATA_HOST_URL, "") rel_path = "/" + rel_path.lstrip('/\\') object_name = f"{settings.MINIO_BASE_PREFIX}{rel_path}" # 1. 从 MinIO 获取原图字节 try: response = minio_client.get_object(settings.MINIO_BUCKET, object_name) image_bytes = response.read() response.close() response.release_conn() except Exception as e: logger.warning(f"从MinIO获取原图失败: {object_name} -> {e}") return "" # 2. 在内存中用 PIL 切图 with Image.open(io.BytesIO(image_bytes)) as img: img_w, img_h = img.size center_x, center_y = min_rect[0] rect_w, rect_h = min_rect[1] side_length = max(max(rect_w, rect_h) * 1.5, 100) half_side = side_length / 2 left, top = max(0, center_x - half_side), max(0, center_y - half_side) right, bottom = min(img_w, center_x + half_side), min(img_h, center_y + half_side) cropped_img = img.crop((left, top, right, bottom)) # 3. 将切割后的图存入内存流,并上传到 MinIO out_bytes = io.BytesIO() cropped_img.save(out_bytes, format="JPEG", quality=95) out_bytes.seek(0) out_rel_path = f"/DefectImage/{output_filename}" out_object_name = f"{settings.MINIO_BASE_PREFIX}{out_rel_path}" minio_client.put_object( settings.MINIO_BUCKET, out_object_name, out_bytes, len(out_bytes.getvalue()), content_type="image/jpeg" ) return settings.get_full_url(out_rel_path) except Exception as e: logger.error(f"切割并上传图片失败: {e}") return "" @router.get("/generate", status_code=200, summary="生成评级报告数据") def generate_rating_report( cardNo: str, db_conn: PooledMySQLConnection = Depends(get_db_connection) ): if not cardNo or not cardNo.strip(): raise HTTPException(status_code=400, detail="cardNo 不能为空") # 根据cardNo 查询id try: with db_conn.cursor(buffered=True) as cursor: query_sql = f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} WHERE cardNo = %s LIMIT 1" cursor.execute(query_sql, (cardNo,)) row = cursor.fetchone() if not row: raise HTTPException( status_code=404, detail=f"未找到卡号为 {cardNo} 的相关记录" ) card_id = row[0] except Exception as e: logger.error(f"创建卡牌失败: {e}") raise HTTPException(status_code=500, detail="数据库查询失败。") top_n_defects = 3 """ 根据 Card ID 生成评级报告 JSON """ # 1. 获取卡片详情 (复用 Crud 逻辑,确保能拿到所有图片) card_data = crud_card.get_card_with_details(db_conn, card_id) if not card_data: raise HTTPException(status_code=404, detail="未找到该卡片信息") # 初始化返回结构 response_data = { "backImageUrl": "", "frontImageUrl": "", "cardNo": cardNo, "centerBack": "", "centerFront": "", "measureLength": 0.0, "measureWidth": 0.0, "cornerBackNum": 0, "sideBackNum": 0, "surfaceBackNum": 0, "cornerFrontNum": 0, "sideFrontNum": 0, "surfaceFrontNum": 0, "scoreThreshold": float(card_data.get("detection_score") or 0), "evaluateNo": str(card_data.get("id")), "defectDetailList": [] } # 临时列表用于收集所有缺陷,最后排序取 Top N all_defects_collected = [] # 遍历图片寻找 Front Ring 和 Back Ring images = card_data.get("images", []) # 辅助字典:defect_type 到 统计字段 的映射 defect_map_keys = { "front_ring": { "corner": "cornerFrontNum", "edge": "sideFrontNum", "face": "surfaceFrontNum" }, "back_ring": { "corner": "cornerBackNum", "edge": "sideBackNum", "face": "surfaceBackNum" } } for img in images: img_type = img.image_type # 只处理环光图 if img_type not in ["front_ring", "back_ring"]: continue # 设置主图 URL if img_type == "front_ring": response_data["frontImageUrl"] = img.image_path elif img_type == "back_ring": response_data["backImageUrl"] = img.image_path # 获取有效 JSON json_data = _get_active_json(img) if not json_data or "result" not in json_data: continue result_node = json_data["result"] # 1. 处理居中 (Center) center_inf = result_node.get("center_result", {}).get("box_result", {}).get("center_inference", {}) if center_inf: # 格式: L/R=47/53, T/B=51/49 (取整) # center_inference 包含 center_left, center_right, center_top, center_bottom c_str = ( f"L/R={int(round(center_inf.get('center_left', 0)))}/{int(round(center_inf.get('center_right', 0)))}, " f"T/B={int(round(center_inf.get('center_top', 0)))}/{int(round(center_inf.get('center_bottom', 0)))}" ) if img_type == "front_ring": response_data["centerFront"] = c_str # 2. 处理尺寸 (仅从正面取,或者只要有就取) - mm 转 cm,除以 10,保留2位 rw_mm = center_inf.get("real_width_mm", 0) rh_mm = center_inf.get("real_height_mm", 0) response_data["measureWidth"] = round(rw_mm / 10.0, 2) response_data["measureLength"] = round(rh_mm / 10.0, 2) else: response_data["centerBack"] = c_str # 2. 处理缺陷 (Defects) defects = result_node.get("defect_result", {}).get("defects", []) for defect in defects: # 过滤 edit_type == 'del' if defect.get("edit_type") == "del": continue d_type = defect.get("defect_type", "") # corner, edge, face d_label = defect.get("label", "") # scratch, wear, etc. # 统计数量 count_key = defect_map_keys.get(img_type, {}).get(d_type) if count_key: response_data[count_key] += 1 # 收集详细信息用于 Top N 列表 # 需要保存:缺陷对象本身,图片路径,正反面标识 side_str = "FRONT" if img_type == "front_ring" else "BACK" all_defects_collected.append({ "defect_data": defect, "image_path": img.image_path, "side": side_str, "area": defect.get("actual_area", 0) }) # 3. 处理 defectDetailList (Top N 切图) # 按实际面积从大到小排序 all_defects_collected.sort(key=lambda x: x["area"], reverse=True) top_defects = all_defects_collected[:top_n_defects] final_defect_list = [] for idx, item in enumerate(top_defects, start=1): defect = item["defect_data"] side = item["side"] original_img_path = item["image_path"] # 构造 ID d_id = idx # 1, 2, 3 # 构造文件名: {card_id}_{seq_id}.jpg filename = f"{card_id}_{d_id}.jpg" # 执行切图 min_rect = defect.get("min_rect") defect_img_url = "" location_str = "" if min_rect and len(min_rect) == 3: # 切图并保存 defect_img_url = _crop_defect_image(original_img_path, min_rect, filename) # 计算 Location (中心坐标) # min_rect[0] 是 [x, y] cx, cy = min_rect[0] location_str = f"{int(cx)},{int(cy)}" # 构造 Type 字符串: defect_type + label (大写) # 例如: defect_type="edge", label="wear" -> "EDGE WEAR" d_type_raw = defect.get("defect_type", "") d_label_raw = defect.get("label", "") type_str = f"{d_type_raw.upper()} {d_label_raw.upper()}".strip() final_defect_list.append({ "id": d_id, "side": side, "location": location_str, "type": type_str, "defectImgUrl": defect_img_url }) response_data["defectDetailList"] = final_defect_list return response_data