| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- from datetime import datetime
- from fastapi import APIRouter, Depends, HTTPException, Query
- from mysql.connector.pooling import PooledMySQLConnection
- from app.core.config import settings
- from app.core.database_loader import get_db_connection
- from app.core.logger import get_logger
- from app.crud import crud_card
- from app.utils.rating_report_utils import (
- build_defect_compare_key,
- crop_defect_image,
- format_datetime,
- get_active_json,
- parse_json_value,
- remove_common_defects,
- save_rating_report_history,
- )
- logger = get_logger(__name__)
- router = APIRouter()
- def _build_history_detail_response(row: dict, report_data: dict) -> dict:
- """
- 统一历史详情返回结构,避免不同接口拼装不一致。
- """
- return {
- "ratingId": row.get("rating_id"),
- "cardId": row.get("card_id"),
- "cardNo": row.get("cardNo"),
- "reportName": row.get("report_name"),
- "createdAt": format_datetime(row.get("created_at")),
- "updatedAt": format_datetime(row.get("updated_at")),
- "reportData": report_data,
- }
- @router.get("/generate", status_code=200, summary="生成评级报告数据")
- def generate_rating_report(
- cardNo: str,
- db_conn: PooledMySQLConnection = Depends(get_db_connection)
- ):
- if not cardNo or not cardNo.strip():
- raise HTTPException(status_code=400, detail="cardNo 不能为空")
- try:
- with db_conn.cursor(buffered=True) as cursor:
- query_sql = f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} WHERE cardNo = %s LIMIT 1"
- cursor.execute(query_sql, (cardNo,))
- row = cursor.fetchone()
- except Exception as e:
- logger.error(f"根据 cardNo 查询卡牌失败: cardNo={cardNo}, error={e}")
- raise HTTPException(status_code=500, detail="数据库查询失败")
- if not row:
- raise HTTPException(status_code=404, detail=f"未找到 cardNo={cardNo} 的卡牌记录")
- card_id = row[0]
- rating_time_now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- logger.info(f"开始生成评级报告: cardNo={cardNo}, card_id={card_id}")
- # 复用卡牌详情查询逻辑,直接拿到当前用于生成报告的图片和缺陷数据。
- card_data = crud_card.get_card_with_details(db_conn, card_id)
- if not card_data:
- raise HTTPException(status_code=404, detail="未找到该卡牌信息")
- response_data = {
- "backImageUrl": "",
- "frontImageUrl": "",
- "cardNo": cardNo,
- "centerBack": "",
- "centerFront": "",
- "measureLength": 0.0,
- "measureWidth": 0.0,
- "cornerBackNum": 0,
- "sideBackNum": 0,
- "surfaceBackNum": 0,
- "cornerFrontNum": 0,
- "sideFrontNum": 0,
- "surfaceFrontNum": 0,
- "scoreThreshold": float(card_data.get("modified_score") or 0),
- "evaluateNo": str(card_data.get("id")),
- "defectDetailList": []
- }
- # 先统一收集缺陷,再按面积排序
- all_defects_collected = []
- images = card_data.get("images", [])
- defect_map_keys = {
- "front_ring": {
- "corner": "cornerFrontNum",
- "edge": "sideFrontNum",
- "face": "surfaceFrontNum"
- },
- "back_ring": {
- "corner": "cornerBackNum",
- "edge": "sideBackNum",
- "face": "surfaceBackNum"
- }
- }
- for img in images:
- img_type = img.image_type
- if img_type not in ["front_ring", "back_ring"]:
- continue
- if img_type == "front_ring":
- response_data["frontImageUrl"] = img.image_path
- elif img_type == "back_ring":
- response_data["backImageUrl"] = img.image_path
- json_data = get_active_json(img)
- if not json_data or "result" not in json_data:
- continue
- result_node = json_data["result"]
- center_inf = result_node.get("center_result", {}).get("box_result", {}).get("center_inference", {})
- if center_inf:
- center_text = (
- f"L/R={int(round(center_inf.get('center_left', 0)))}/{int(round(center_inf.get('center_right', 0)))}, "
- f"T/B={int(round(center_inf.get('center_top', 0)))}/{int(round(center_inf.get('center_bottom', 0)))}"
- )
- if img_type == "front_ring":
- response_data["centerFront"] = center_text
- response_data["measureWidth"] = round(center_inf.get("real_width_mm", 0) / 10.0, 2)
- response_data["measureLength"] = round(center_inf.get("real_height_mm", 0) / 10.0, 2)
- else:
- response_data["centerBack"] = center_text
- defects = result_node.get("defect_result", {}).get("defects", [])
- for defect in defects:
- if defect.get("edit_type") == "del":
- continue
- defect_type = defect.get("defect_type", "")
- count_key = defect_map_keys.get(img_type, {}).get(defect_type)
- if count_key:
- response_data[count_key] += 1
- side_str = "FRONT" if img_type == "front_ring" else "BACK"
- all_defects_collected.append({
- "defect_data": defect,
- "image_path": img.image_path,
- "side": side_str,
- "area": defect.get("actual_area", 0)
- })
- all_defects_collected.sort(key=lambda item: item["area"], reverse=True)
- final_defect_list = []
- for idx, item in enumerate(all_defects_collected, start=1):
- defect = item["defect_data"]
- side = item["side"]
- original_img_path = item["image_path"]
- defect_id = idx
- filename = f"{card_id}_{defect_id}_{rating_time_now}.jpg"
- min_rect = defect.get("min_rect")
- defect_img_url = ""
- location_str = ""
- if min_rect and len(min_rect) == 3:
- defect_img_url = crop_defect_image(original_img_path, min_rect, filename)
- center_x, center_y = min_rect[0]
- location_str = f"{int(center_x)},{int(center_y)}"
- raw_type = f"{defect.get('defect_type', '').upper()}".strip()
- type_str_map = {
- "CORNER": "CORNER",
- "EDGE": "SIDE",
- "FACE": "SURFACE"
- }
- type_str = type_str_map.get(raw_type)
- # compareKey 使用面积、标签、正反面和 location。
- compare_key = build_defect_compare_key(defect, side, location_str)
- final_defect_list.append({
- "id": defect_id,
- "compareKey": compare_key,
- "side": side,
- "location": location_str,
- "type": type_str,
- "defectImgUrl": defect_img_url
- })
- response_data["defectDetailList"] = final_defect_list
- report_name = f"{cardNo}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
- history_result = save_rating_report_history(db_conn, card_id, cardNo, report_name, response_data)
- response_data["ratingId"] = history_result["rating_id"]
- response_data["reportName"] = history_result["report_name"]
- response_data["historySaved"] = history_result["created"]
- if history_result["created"]:
- logger.info(
- f"评级报告生成并保存成功: cardNo={cardNo}, rating_id={history_result['rating_id']}, "
- f"defect_count={len(final_defect_list)}"
- )
- else:
- logger.info(
- f"评级报告生成成功,但内容与上一条历史一致,未重复保存: cardNo={cardNo}, "
- f"rating_id={history_result['rating_id']}, defect_count={len(final_defect_list)}"
- )
- return response_data
- @router.get("/history", status_code=200, summary="根据 cardNo 查询评级报告历史列表")
- def get_rating_report_history_list(
- cardNo: str,
- skip: int = Query(0, ge=0),
- page_num: int = Query(None, ge=1),
- limit: int = Query(100, ge=1, le=1000),
- db_conn: PooledMySQLConnection = Depends(get_db_connection)
- ):
- if not cardNo or not cardNo.strip():
- raise HTTPException(status_code=400, detail="cardNo 不能为空")
- if page_num is not None:
- skip = (page_num - 1) * limit
- try:
- with db_conn.cursor(dictionary=True) as cursor:
- count_sql = (
- f"SELECT COUNT(*) AS total "
- f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
- "WHERE cardNo = %s"
- )
- cursor.execute(count_sql, (cardNo,))
- count_row = cursor.fetchone() or {}
- total_count = int(count_row.get("total") or 0)
- query_sql = (
- f"SELECT rating_id, card_id, cardNo, report_name, created_at "
- f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
- "WHERE cardNo = %s "
- "ORDER BY rating_id DESC "
- "LIMIT %s OFFSET %s"
- )
- cursor.execute(query_sql, (cardNo, limit, skip))
- rows = cursor.fetchall()
- except Exception as e:
- logger.error(f"查询评级报告历史列表失败: cardNo={cardNo}, error={e}")
- raise HTTPException(status_code=500, detail="查询评级报告历史列表失败")
- history_list = []
- for row in rows:
- history_list.append({
- "ratingId": row.get("rating_id"),
- "cardId": row.get("card_id"),
- "cardNo": row.get("cardNo"),
- "reportName": row.get("report_name"),
- "createdAt": format_datetime(row.get("created_at"))
- })
- return {
- "cardNo": cardNo,
- "data": {
- "total": total_count,
- "list": history_list
- }
- }
- @router.get("/history/compare", status_code=200, summary="根据两个 rating_id 对比历史缺陷差异")
- def compare_rating_report_history(
- rating_id1: int = Query(..., description="第一个历史记录 ID"),
- rating_id2: int = Query(..., description="第二个历史记录 ID"),
- db_conn: PooledMySQLConnection = Depends(get_db_connection)
- ):
- try:
- with db_conn.cursor(dictionary=True) as cursor:
- query_sql = (
- f"SELECT rating_id, card_id, cardNo, report_name, report_json, created_at, updated_at "
- f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
- "WHERE rating_id IN (%s, %s)"
- )
- cursor.execute(query_sql, (rating_id1, rating_id2))
- rows = cursor.fetchall()
- except Exception as e:
- logger.error(
- f"查询评级报告历史对比数据失败: rating_id1={rating_id1}, rating_id2={rating_id2}, error={e}"
- )
- raise HTTPException(status_code=500, detail="查询评级报告历史对比数据失败")
- row_map = {row.get("rating_id"): row for row in rows}
- missing_ids = [rating_id for rating_id in [rating_id1, rating_id2] if rating_id not in row_map]
- if missing_ids:
- missing_id_text = ",".join(str(item) for item in missing_ids)
- raise HTTPException(status_code=404, detail=f"未找到 rating_id={missing_id_text} 的历史记录")
- left_row = row_map[rating_id1]
- right_row = row_map[rating_id2]
- left_report, right_report, common_count = remove_common_defects(
- left_row.get("report_json"),
- right_row.get("report_json")
- )
- left_only_count = len(left_report.get("defectDetailList", []))
- right_only_count = len(right_report.get("defectDetailList", []))
- logger.info(
- f"评级报告历史对比完成: rating_id1={rating_id1}, rating_id2={rating_id2}, "
- f"same_count={common_count}, left_only_count={left_only_count}, right_only_count={right_only_count}"
- )
- return {
- "comparisonSummary": {
- "sameDefectCount": common_count,
- "leftOnlyCount": left_only_count,
- "rightOnlyCount": right_only_count
- },
- "left": _build_history_detail_response(left_row, left_report),
- "right": _build_history_detail_response(right_row, right_report)
- }
- @router.get("/history/{rating_id}", status_code=200, summary="根据 rating_id 查询单个评级报告历史")
- def get_rating_report_history_detail(
- rating_id: int,
- db_conn: PooledMySQLConnection = Depends(get_db_connection)
- ):
- try:
- with db_conn.cursor(dictionary=True) as cursor:
- query_sql = (
- f"SELECT rating_id, card_id, cardNo, report_name, report_json, created_at, updated_at "
- f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
- "WHERE rating_id = %s LIMIT 1"
- )
- cursor.execute(query_sql, (rating_id,))
- row = cursor.fetchone()
- except Exception as e:
- logger.error(f"查询评级报告历史详情失败: rating_id={rating_id}, error={e}")
- raise HTTPException(status_code=500, detail="查询评级报告历史详情失败")
- if not row:
- raise HTTPException(status_code=404, detail=f"未找到 rating_id={rating_id} 的历史记录")
- return _build_history_detail_response(row, parse_json_value(row.get("report_json")))
|