from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query from mysql.connector.pooling import PooledMySQLConnection from app.core.config import settings from app.core.database_loader import get_db_connection from app.core.logger import get_logger from app.crud import crud_card from app.utils.rating_report_utils import ( build_defect_compare_key, crop_defect_image, format_datetime, get_active_json, parse_json_value, remove_common_defects, save_rating_report_history, ) logger = get_logger(__name__) router = APIRouter() def _build_history_detail_response(row: dict, report_data: dict) -> dict: """ 统一历史详情返回结构,避免不同接口拼装不一致。 """ return { "ratingId": row.get("rating_id"), "cardId": row.get("card_id"), "cardNo": row.get("cardNo"), "reportName": row.get("report_name"), "createdAt": format_datetime(row.get("created_at")), "updatedAt": format_datetime(row.get("updated_at")), "reportData": report_data, } @router.get("/generate", status_code=200, summary="生成评级报告数据") def generate_rating_report( cardNo: str, db_conn: PooledMySQLConnection = Depends(get_db_connection) ): if not cardNo or not cardNo.strip(): raise HTTPException(status_code=400, detail="cardNo 不能为空") try: with db_conn.cursor(buffered=True) as cursor: query_sql = f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} WHERE cardNo = %s LIMIT 1" cursor.execute(query_sql, (cardNo,)) row = cursor.fetchone() except Exception as e: logger.error(f"根据 cardNo 查询卡牌失败: cardNo={cardNo}, error={e}") raise HTTPException(status_code=500, detail="数据库查询失败") if not row: raise HTTPException(status_code=404, detail=f"未找到 cardNo={cardNo} 的卡牌记录") card_id = row[0] rating_time_now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") logger.info(f"开始生成评级报告: cardNo={cardNo}, card_id={card_id}") # 复用卡牌详情查询逻辑,直接拿到当前用于生成报告的图片和缺陷数据。 card_data = crud_card.get_card_with_details(db_conn, card_id) if not card_data: raise HTTPException(status_code=404, detail="未找到该卡牌信息") response_data = { "backImageUrl": "", "frontImageUrl": "", "cardNo": cardNo, "centerBack": "", "centerFront": "", "measureLength": 0.0, "measureWidth": 0.0, "cornerBackNum": 0, "sideBackNum": 0, "surfaceBackNum": 0, "cornerFrontNum": 0, "sideFrontNum": 0, "surfaceFrontNum": 0, "scoreThreshold": float(card_data.get("modified_score") or 0), "evaluateNo": str(card_data.get("id")), "defectDetailList": [] } # 先统一收集缺陷,再按面积排序 all_defects_collected = [] images = card_data.get("images", []) defect_map_keys = { "front_ring": { "corner": "cornerFrontNum", "edge": "sideFrontNum", "face": "surfaceFrontNum" }, "back_ring": { "corner": "cornerBackNum", "edge": "sideBackNum", "face": "surfaceBackNum" } } for img in images: img_type = img.image_type if img_type not in ["front_ring", "back_ring"]: continue if img_type == "front_ring": response_data["frontImageUrl"] = img.image_path elif img_type == "back_ring": response_data["backImageUrl"] = img.image_path json_data = get_active_json(img) if not json_data or "result" not in json_data: continue result_node = json_data["result"] center_inf = result_node.get("center_result", {}).get("box_result", {}).get("center_inference", {}) if center_inf: center_text = ( f"L/R={int(round(center_inf.get('center_left', 0)))}/{int(round(center_inf.get('center_right', 0)))}, " f"T/B={int(round(center_inf.get('center_top', 0)))}/{int(round(center_inf.get('center_bottom', 0)))}" ) if img_type == "front_ring": response_data["centerFront"] = center_text response_data["measureWidth"] = round(center_inf.get("real_width_mm", 0) / 10.0, 2) response_data["measureLength"] = round(center_inf.get("real_height_mm", 0) / 10.0, 2) else: response_data["centerBack"] = center_text defects = result_node.get("defect_result", {}).get("defects", []) for defect in defects: if defect.get("edit_type") == "del": continue defect_type = defect.get("defect_type", "") count_key = defect_map_keys.get(img_type, {}).get(defect_type) if count_key: response_data[count_key] += 1 side_str = "FRONT" if img_type == "front_ring" else "BACK" all_defects_collected.append({ "defect_data": defect, "image_path": img.image_path, "side": side_str, "area": defect.get("actual_area", 0) }) all_defects_collected.sort(key=lambda item: item["area"], reverse=True) final_defect_list = [] for idx, item in enumerate(all_defects_collected, start=1): defect = item["defect_data"] side = item["side"] original_img_path = item["image_path"] defect_id = idx filename = f"{card_id}_{defect_id}_{rating_time_now}.jpg" min_rect = defect.get("min_rect") defect_img_url = "" location_str = "" if min_rect and len(min_rect) == 3: defect_img_url = crop_defect_image(original_img_path, min_rect, filename) center_x, center_y = min_rect[0] location_str = f"{int(center_x)},{int(center_y)}" raw_type = f"{defect.get('defect_type', '').upper()}".strip() type_str_map = { "CORNER": "CORNER", "EDGE": "SIDE", "FACE": "SURFACE" } type_str = type_str_map.get(raw_type) # compareKey 使用面积、标签、正反面和 location。 compare_key = build_defect_compare_key(defect, side, location_str) final_defect_list.append({ "id": defect_id, "compareKey": compare_key, "side": side, "location": location_str, "type": type_str, "defectImgUrl": defect_img_url }) response_data["defectDetailList"] = final_defect_list report_name = f"{cardNo}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" history_result = save_rating_report_history(db_conn, card_id, cardNo, report_name, response_data) response_data["ratingId"] = history_result["rating_id"] response_data["reportName"] = history_result["report_name"] response_data["historySaved"] = history_result["created"] if history_result["created"]: logger.info( f"评级报告生成并保存成功: cardNo={cardNo}, rating_id={history_result['rating_id']}, " f"defect_count={len(final_defect_list)}" ) else: logger.info( f"评级报告生成成功,但内容与上一条历史一致,未重复保存: cardNo={cardNo}, " f"rating_id={history_result['rating_id']}, defect_count={len(final_defect_list)}" ) return response_data @router.get("/history", status_code=200, summary="根据 cardNo 查询评级报告历史列表") def get_rating_report_history_list( cardNo: str, skip: int = Query(0, ge=0), page_num: int = Query(None, ge=1), limit: int = Query(100, ge=1, le=1000), db_conn: PooledMySQLConnection = Depends(get_db_connection) ): if not cardNo or not cardNo.strip(): raise HTTPException(status_code=400, detail="cardNo 不能为空") if page_num is not None: skip = (page_num - 1) * limit try: with db_conn.cursor(dictionary=True) as cursor: count_sql = ( f"SELECT COUNT(*) AS total " f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} " "WHERE cardNo = %s" ) cursor.execute(count_sql, (cardNo,)) count_row = cursor.fetchone() or {} total_count = int(count_row.get("total") or 0) query_sql = ( f"SELECT rating_id, card_id, cardNo, report_name, created_at " f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} " "WHERE cardNo = %s " "ORDER BY rating_id DESC " "LIMIT %s OFFSET %s" ) cursor.execute(query_sql, (cardNo, limit, skip)) rows = cursor.fetchall() except Exception as e: logger.error(f"查询评级报告历史列表失败: cardNo={cardNo}, error={e}") raise HTTPException(status_code=500, detail="查询评级报告历史列表失败") history_list = [] for row in rows: history_list.append({ "ratingId": row.get("rating_id"), "cardId": row.get("card_id"), "cardNo": row.get("cardNo"), "reportName": row.get("report_name"), "createdAt": format_datetime(row.get("created_at")) }) return { "cardNo": cardNo, "data": { "total": total_count, "list": history_list } } @router.get("/history/compare", status_code=200, summary="根据两个 rating_id 对比历史缺陷差异") def compare_rating_report_history( rating_id1: int = Query(..., description="第一个历史记录 ID"), rating_id2: int = Query(..., description="第二个历史记录 ID"), db_conn: PooledMySQLConnection = Depends(get_db_connection) ): try: with db_conn.cursor(dictionary=True) as cursor: query_sql = ( f"SELECT rating_id, card_id, cardNo, report_name, report_json, created_at, updated_at " f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} " "WHERE rating_id IN (%s, %s)" ) cursor.execute(query_sql, (rating_id1, rating_id2)) rows = cursor.fetchall() except Exception as e: logger.error( f"查询评级报告历史对比数据失败: rating_id1={rating_id1}, rating_id2={rating_id2}, error={e}" ) raise HTTPException(status_code=500, detail="查询评级报告历史对比数据失败") row_map = {row.get("rating_id"): row for row in rows} missing_ids = [rating_id for rating_id in [rating_id1, rating_id2] if rating_id not in row_map] if missing_ids: missing_id_text = ",".join(str(item) for item in missing_ids) raise HTTPException(status_code=404, detail=f"未找到 rating_id={missing_id_text} 的历史记录") left_row = row_map[rating_id1] right_row = row_map[rating_id2] left_report, right_report, common_count = remove_common_defects( left_row.get("report_json"), right_row.get("report_json") ) left_only_count = len(left_report.get("defectDetailList", [])) right_only_count = len(right_report.get("defectDetailList", [])) logger.info( f"评级报告历史对比完成: rating_id1={rating_id1}, rating_id2={rating_id2}, " f"same_count={common_count}, left_only_count={left_only_count}, right_only_count={right_only_count}" ) return { "comparisonSummary": { "sameDefectCount": common_count, "leftOnlyCount": left_only_count, "rightOnlyCount": right_only_count }, "left": _build_history_detail_response(left_row, left_report), "right": _build_history_detail_response(right_row, right_report) } @router.get("/history/{rating_id}", status_code=200, summary="根据 rating_id 查询单个评级报告历史") def get_rating_report_history_detail( rating_id: int, db_conn: PooledMySQLConnection = Depends(get_db_connection) ): try: with db_conn.cursor(dictionary=True) as cursor: query_sql = ( f"SELECT rating_id, card_id, cardNo, report_name, report_json, created_at, updated_at " f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} " "WHERE rating_id = %s LIMIT 1" ) cursor.execute(query_sql, (rating_id,)) row = cursor.fetchone() except Exception as e: logger.error(f"查询评级报告历史详情失败: rating_id={rating_id}, error={e}") raise HTTPException(status_code=500, detail="查询评级报告历史详情失败") if not row: raise HTTPException(status_code=404, detail=f"未找到 rating_id={rating_id} 的历史记录") return _build_history_detail_response(row, parse_json_value(row.get("report_json")))