from fastapi import APIRouter, HTTPException, Depends, Query from mysql.connector.pooling import PooledMySQLConnection from datetime import datetime from app.core.logger import get_logger from app.core.config import settings from app.core.database_loader import get_db_connection from app.crud import crud_card from app.utils.rating_report_utils import (get_active_json, save_rating_report_history, format_datetime, parse_json_value, crop_defect_image) logger = get_logger(__name__) router = APIRouter() @router.get("/generate", status_code=200, summary="生成评级报告数据") def generate_rating_report( cardNo: str, db_conn: PooledMySQLConnection = Depends(get_db_connection) ): if not cardNo or not cardNo.strip(): raise HTTPException(status_code=400, detail="cardNo 不能为空") # 根据cardNo 查询id try: with db_conn.cursor(buffered=True) as cursor: query_sql = f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} WHERE cardNo = %s LIMIT 1" cursor.execute(query_sql, (cardNo,)) row = cursor.fetchone() except Exception as e: logger.error(f"创建卡牌失败: {e}") raise HTTPException(status_code=500, detail="数据库查询失败。") if not row: raise HTTPException( status_code=404, detail=f"未找到卡牌编号为 {cardNo} 的相关记录" ) card_id = row[0] rating_time_now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # top_n_defects = 3 """ 根据 Card ID 生成评级报告 JSON """ # 1. 获取卡片详情 (复用 Crud 逻辑,确保能拿到所有图片) card_data = crud_card.get_card_with_details(db_conn, card_id) if not card_data: raise HTTPException(status_code=404, detail="未找到该卡片信息") # 初始化返回结构 response_data = { "backImageUrl": "", "frontImageUrl": "", "cardNo": cardNo, "centerBack": "", "centerFront": "", "measureLength": 0.0, "measureWidth": 0.0, "cornerBackNum": 0, "sideBackNum": 0, "surfaceBackNum": 0, "cornerFrontNum": 0, "sideFrontNum": 0, "surfaceFrontNum": 0, "scoreThreshold": float(card_data.get("modified_score") or 0), "evaluateNo": str(card_data.get("id")), "defectDetailList": [] } # 临时列表用于收集所有缺陷,最后排序取 Top N all_defects_collected = [] # 遍历图片寻找 Front Ring 和 Back Ring images = card_data.get("images", []) # 辅助字典:defect_type 到 统计字段 的映射 defect_map_keys = { "front_ring": { "corner": "cornerFrontNum", "edge": "sideFrontNum", "face": "surfaceFrontNum" }, "back_ring": { "corner": "cornerBackNum", "edge": "sideBackNum", "face": "surfaceBackNum" } } for img in images: img_type = img.image_type # 只处理环光图 if img_type not in ["front_ring", "back_ring"]: continue # 设置主图 URL if img_type == "front_ring": response_data["frontImageUrl"] = img.image_path elif img_type == "back_ring": response_data["backImageUrl"] = img.image_path # 获取有效 JSON json_data = get_active_json(img) if not json_data or "result" not in json_data: continue result_node = json_data["result"] # 1. 处理居中 (Center) center_inf = result_node.get("center_result", {}).get("box_result", {}).get("center_inference", {}) if center_inf: # 格式: L/R=47/53, T/B=51/49 (取整) # center_inference 包含 center_left, center_right, center_top, center_bottom c_str = ( f"L/R={int(round(center_inf.get('center_left', 0)))}/{int(round(center_inf.get('center_right', 0)))}, " f"T/B={int(round(center_inf.get('center_top', 0)))}/{int(round(center_inf.get('center_bottom', 0)))}" ) if img_type == "front_ring": response_data["centerFront"] = c_str # 2. 处理尺寸 (仅从正面取,或者只要有就取) - mm 转 cm,除以 10,保留2位 rw_mm = center_inf.get("real_width_mm", 0) rh_mm = center_inf.get("real_height_mm", 0) response_data["measureWidth"] = round(rw_mm / 10.0, 2) response_data["measureLength"] = round(rh_mm / 10.0, 2) else: response_data["centerBack"] = c_str # 2. 处理缺陷 (Defects) defects = result_node.get("defect_result", {}).get("defects", []) for defect in defects: # 过滤 edit_type == 'del' if defect.get("edit_type") == "del": continue d_type = defect.get("defect_type", "") # corner, edge, face d_label = defect.get("label", "") # scratch, wear, etc. # 统计数量 count_key = defect_map_keys.get(img_type, {}).get(d_type) if count_key: response_data[count_key] += 1 # 收集详细信息用于 Top N 列表 # 需要保存:缺陷对象本身,图片路径,正反面标识 side_str = "FRONT" if img_type == "front_ring" else "BACK" all_defects_collected.append({ "defect_data": defect, "image_path": img.image_path, "side": side_str, "area": defect.get("actual_area", 0) }) # 3. 处理 defectDetailList (Top N 切图) # 按实际面积从大到小排序 all_defects_collected.sort(key=lambda x: x["area"], reverse=True) # top_defects = all_defects_collected[:top_n_defects] top_defects = all_defects_collected final_defect_list = [] for idx, item in enumerate(top_defects, start=1): defect = item["defect_data"] side = item["side"] original_img_path = item["image_path"] # 构造 ID d_id = idx # 1, 2, 3 # 构造文件名: {card_id}_{seq_id}.jpg filename = f"{card_id}_{d_id}_{rating_time_now}.jpg" # 执行切图 min_rect = defect.get("min_rect") defect_img_url = "" location_str = "" if min_rect and len(min_rect) == 3: # 切图并保存 defect_img_url = crop_defect_image(original_img_path, min_rect, filename) # 计算 Location (中心坐标) # min_rect[0] 是 [x, y] cx, cy = min_rect[0] location_str = f"{int(cx)},{int(cy)}" # 构造 Type 字符串: defect_type + label (大写) # 例如: defect_type="edge", label="wear" -> "EDGE WEAR" d_type_raw = defect.get("defect_type", "") # d_label_raw = defect.get("label", "") type_str = f"{d_type_raw.upper()}".strip() type_str_map = {"CORNER": "CORNER", "EDGE": "SIDE", "FACE": "SURFACE"} type_str = type_str_map.get(type_str) final_defect_list.append({ "id": d_id, "side": side, "location": location_str, "type": type_str, "defectImgUrl": defect_img_url }) response_data["defectDetailList"] = final_defect_list report_name = f"{cardNo}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" rating_id = save_rating_report_history(db_conn, card_id, cardNo, report_name, response_data) response_data["ratingId"] = rating_id response_data["reportName"] = report_name return response_data @router.get("/history", status_code=200, summary="根据 cardNo 查询评级报告历史列表") def get_rating_report_history_list( cardNo: str, skip: int = Query(0, ge=0), page_num: int = Query(None, ge=1), limit: int = Query(100, ge=1, le=1000), db_conn: PooledMySQLConnection = Depends(get_db_connection) ): if not cardNo or not cardNo.strip(): raise HTTPException(status_code=400, detail="cardNo 不能为空") if page_num is not None: skip = (page_num - 1) * limit try: with db_conn.cursor(dictionary=True) as cursor: count_sql = ( f"SELECT COUNT(*) AS total " f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} " "WHERE cardNo = %s" ) cursor.execute(count_sql, (cardNo,)) count_row = cursor.fetchone() or {} total_count = int(count_row.get("total") or 0) query_sql = ( f"SELECT rating_id, card_id, cardNo, report_name, created_at " f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} " "WHERE cardNo = %s " "ORDER BY rating_id DESC " "LIMIT %s OFFSET %s" ) cursor.execute(query_sql, (cardNo, limit, skip)) rows = cursor.fetchall() except Exception as e: logger.error(f"查询评级报告历史列表失败: {e}") raise HTTPException(status_code=500, detail="查询评级报告历史列表失败。") history_list = [] for row in rows: history_list.append({ "ratingId": row.get("rating_id"), "cardId": row.get("card_id"), "cardNo": row.get("cardNo"), "reportName": row.get("report_name"), "createdAt": format_datetime(row.get("created_at")) }) return { "cardNo": cardNo, "data": { "total": total_count, "list": history_list } } @router.get("/history/{rating_id}", status_code=200, summary="根据 rating_id 查询单个评级报告历史") def get_rating_report_history_detail( rating_id: int, db_conn: PooledMySQLConnection = Depends(get_db_connection) ): try: with db_conn.cursor(dictionary=True) as cursor: query_sql = ( f"SELECT rating_id, card_id, cardNo, report_name, report_json, created_at, updated_at " f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} " "WHERE rating_id = %s LIMIT 1" ) cursor.execute(query_sql, (rating_id,)) row = cursor.fetchone() except Exception as e: logger.error(f"查询评级报告历史详情失败: {e}") raise HTTPException(status_code=500, detail="查询评级报告历史详情失败。") if not row: raise HTTPException(status_code=404, detail=f"未找到 rating_id={rating_id} 的历史记录") return { "ratingId": row.get("rating_id"), "cardId": row.get("card_id"), "cardNo": row.get("cardNo"), "reportName": row.get("report_name"), "createdAt": format_datetime(row.get("created_at")), "updatedAt": format_datetime(row.get("updated_at")), "reportData": parse_json_value(row.get("report_json")) }