rating_report.py 9.9 KB


  1. from fastapi import APIRouter, HTTPException, Depends, Query
  2. from mysql.connector.pooling import PooledMySQLConnection
  3. import os
  4. import json
  5. import math
  6. from pathlib import Path
  7. from typing import List, Dict, Any, Optional
  8. from PIL import Image
  9. from app.core.logger import get_logger
  10. from app.core.config import settings
  11. from app.core.database_loader import get_db_connection
  12. from app.crud import crud_card
  13. from app.utils.scheme import ImageType
  14. logger = get_logger(__name__)
  15. router = APIRouter()
  16. # 定义新的缺陷图片存储路径
  17. DEFECT_IMAGE_DIR = settings.DEFECT_IMAGE_DIR
  18. def _get_active_json(image_data: Any) -> Optional[Dict]:
  19. """获取有效的json数据,优先 modified_json"""
  20. if not image_data:
  21. return None
  22. # image_data 可能是 Pydantic 对象或 字典,做兼容处理
  23. if hasattr(image_data, "modified_json"):
  24. mj = image_data.modified_json
  25. dj = image_data.detection_json
  26. else:
  27. mj = image_data.get("modified_json")
  28. dj = image_data.get("detection_json")
  29. # 注意:根据 schema.py,这里读出来已经是 dict 了,不需要 json.loads
  30. # 如果数据库里存的是 null,读出来是 None
  31. if mj:
  32. return mj
  33. return dj
  34. def _crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str:
  35. """
  36. 切割缺陷图片为正方形
  37. min_rect 结构: [[center_x, center_y], [width, height], angle]
  38. """
  39. try:
  40. # 构建绝对路径
  41. # 假设 original_image_path_str 是 "/Data/..." 格式
  42. rel_path = original_image_path_str.lstrip('/\\')
  43. full_path = settings.BASE_PATH / rel_path
  44. if not full_path.exists():
  45. logger.warning(f"原图不存在: {full_path}")
  46. return ""
  47. with Image.open(full_path) as img:
  48. img_w, img_h = img.size
  49. # 解析 min_rect
  50. # min_rect[0] 是中心点 [x, y]
  51. # min_rect[1] 是宽高 [w, h]
  52. center_x, center_y = min_rect[0]
  53. rect_w, rect_h = min_rect[1]
  54. # 确定裁剪的正方形边长:取宽高的最大值,并适当外扩 (例如 1.5 倍) 以展示周围环境
  55. # 如果缺陷非常小,设置一个最小尺寸(例如 100px),避免切图太模糊
  56. side_length = max(rect_w, rect_h) * 1.5
  57. side_length = max(side_length, 100)
  58. half_side = side_length / 2
  59. # 计算裁剪框 (left, top, right, bottom)
  60. left = center_x - half_side
  61. top = center_y - half_side
  62. right = center_x + half_side
  63. bottom = center_y + half_side
  64. # 边界检查,防止超出图片范围
  65. # 如果只是想保持正方形,超出部分可以填黑,或者简单的移动框的位置
  66. # 这里简单处理:如果超出边界,就移动框,实在移不动就截断
  67. if left < 0:
  68. right -= left # 往右移
  69. left = 0
  70. if top < 0:
  71. bottom -= top # 往下移
  72. top = 0
  73. if right > img_w:
  74. left -= (right - img_w) # 往左移
  75. right = img_w
  76. if bottom > img_h:
  77. top -= (bottom - img_h) # 往上移
  78. bottom = img_h
  79. # 再次检查防止负数(比如图片本身比框还小)
  80. left = max(0, left)
  81. top = max(0, top)
  82. right = min(img_w, right)
  83. bottom = min(img_h, bottom)
  84. crop_box = (left, top, right, bottom)
  85. cropped_img = img.crop(crop_box)
  86. # 保存
  87. save_path = DEFECT_IMAGE_DIR / output_filename
  88. cropped_img.save(save_path, quality=95)
  89. # 返回 URL 路径 (相对于项目根目录的 web 路径)
  90. return f"/DefectImage/{output_filename}"
  91. except Exception as e:
  92. logger.error(f"切割图片失败: {e}")
  93. return ""
  94. @router.get("/generate", status_code=200, summary="生成评级报告数据")
  95. def generate_rating_report(
  96. card_id: int,
  97. db_conn: PooledMySQLConnection = Depends(get_db_connection)
  98. ):
  99. top_n_defects = 3
  100. """
  101. 根据 Card ID 生成评级报告 JSON
  102. """
  103. # 1. 获取卡片详情 (复用 Crud 逻辑,确保能拿到所有图片)
  104. card_data = crud_card.get_card_with_details(db_conn, card_id)
  105. if not card_data:
  106. raise HTTPException(status_code=404, detail="未找到该卡片信息")
  107. # 初始化返回结构
  108. response_data = {
  109. "backImageUrl": "",
  110. "frontImageUrl": "",
  111. "cardNo": "",
  112. "centerBack": "",
  113. "centerFront": "",
  114. "measureLength": 0.0,
  115. "measureWidth": 0.0,
  116. "cornerBackNum": 0,
  117. "sideBackNum": 0,
  118. "surfaceBackNum": 0,
  119. "cornerFrontNum": 0,
  120. "sideFrontNum": 0,
  121. "surfaceFrontNum": 0,
  122. "popNum": 0, # 暂时无数据来源,置0
  123. "scoreThreshold": float(card_data.get("detection_score") or 0),
  124. "evaluateNo": str(card_data.get("id")),
  125. "recognizedInfoDTO": {
  126. "cardSet": "",
  127. "player": "",
  128. "series": "",
  129. "year": ""
  130. },
  131. "defectDetailList": []
  132. }
  133. # 临时列表用于收集所有缺陷,最后排序取 Top N
  134. all_defects_collected = []
  135. # 遍历图片寻找 Front Ring 和 Back Ring
  136. images = card_data.get("images", [])
  137. # 辅助字典:defect_type 到 统计字段 的映射
  138. defect_map_keys = {
  139. "front_ring": {
  140. "corner": "cornerFrontNum",
  141. "edge": "sideFrontNum",
  142. "face": "surfaceFrontNum"
  143. },
  144. "back_ring": {
  145. "corner": "cornerBackNum",
  146. "edge": "sideBackNum",
  147. "face": "surfaceBackNum"
  148. }
  149. }
  150. for img in images:
  151. img_type = img.image_type
  152. # 只处理环光图
  153. if img_type not in ["front_ring", "back_ring"]:
  154. continue
  155. # 设置主图 URL
  156. if img_type == "front_ring":
  157. response_data["frontImageUrl"] = img.image_path
  158. elif img_type == "back_ring":
  159. response_data["backImageUrl"] = img.image_path
  160. # 获取有效 JSON
  161. json_data = _get_active_json(img)
  162. if not json_data or "result" not in json_data:
  163. continue
  164. result_node = json_data["result"]
  165. # 1. 处理居中 (Center)
  166. center_inf = result_node.get("center_result", {}).get("box_result", {}).get("center_inference", {})
  167. if center_inf:
  168. # 格式: L/R=47/53, T/B=51/49 (取整)
  169. # center_inference 包含 center_left, center_right, center_top, center_bottom
  170. c_str = (
  171. f"L/R={int(round(center_inf.get('center_left', 0)))}/{int(round(center_inf.get('center_right', 0)))}, "
  172. f"T/B={int(round(center_inf.get('center_top', 0)))}/{int(round(center_inf.get('center_bottom', 0)))}"
  173. )
  174. if img_type == "front_ring":
  175. response_data["centerFront"] = c_str
  176. # 2. 处理尺寸 (仅从正面取,或者只要有就取) - mm 转 cm,除以 10,保留2位
  177. rw_mm = center_inf.get("real_width_mm", 0)
  178. rh_mm = center_inf.get("real_height_mm", 0)
  179. response_data["measureWidth"] = round(rw_mm / 10.0, 2)
  180. response_data["measureLength"] = round(rh_mm / 10.0, 2)
  181. else:
  182. response_data["centerBack"] = c_str
  183. # 2. 处理缺陷 (Defects)
  184. defects = result_node.get("defect_result", {}).get("defects", [])
  185. for defect in defects:
  186. # 过滤 edit_type == 'del'
  187. if defect.get("edit_type") == "del":
  188. continue
  189. d_type = defect.get("defect_type", "") # corner, edge, face
  190. d_label = defect.get("label", "") # scratch, wear, etc.
  191. # 统计数量
  192. count_key = defect_map_keys.get(img_type, {}).get(d_type)
  193. if count_key:
  194. response_data[count_key] += 1
  195. # 收集详细信息用于 Top N 列表
  196. # 需要保存:缺陷对象本身,图片路径,正反面标识
  197. side_str = "FRONT" if img_type == "front_ring" else "BACK"
  198. all_defects_collected.append({
  199. "defect_data": defect,
  200. "image_path": img.image_path,
  201. "side": side_str,
  202. "area": defect.get("actual_area", 0)
  203. })
  204. # 3. 处理 defectDetailList (Top N 切图)
  205. # 按实际面积从大到小排序
  206. all_defects_collected.sort(key=lambda x: x["area"], reverse=True)
  207. top_defects = all_defects_collected[:top_n_defects]
  208. final_defect_list = []
  209. for idx, item in enumerate(top_defects, start=1):
  210. defect = item["defect_data"]
  211. side = item["side"]
  212. original_img_path = item["image_path"]
  213. # 构造 ID
  214. d_id = idx # 1, 2, 3
  215. # 构造文件名: {card_id}_{seq_id}.jpg
  216. filename = f"{card_id}_{d_id}.jpg"
  217. # 执行切图
  218. min_rect = defect.get("min_rect")
  219. defect_img_url = ""
  220. location_str = ""
  221. if min_rect and len(min_rect) == 3:
  222. # 切图并保存
  223. defect_img_url = _crop_defect_image(original_img_path, min_rect, filename)
  224. # 计算 Location (中心坐标)
  225. # min_rect[0] 是 [x, y]
  226. cx, cy = min_rect[0]
  227. location_str = f"{int(cx)},{int(cy)}"
  228. # 构造 Type 字符串: defect_type + label (大写)
  229. # 例如: defect_type="edge", label="wear" -> "EDGE WEAR"
  230. d_type_raw = defect.get("defect_type", "")
  231. d_label_raw = defect.get("label", "")
  232. type_str = f"{d_type_raw.upper()} {d_label_raw.upper()}".strip()
  233. final_defect_list.append({
  234. "id": d_id,
  235. "side": side,
  236. "location": location_str,
  237. "type": type_str,
  238. "defectImgUrl": defect_img_url
  239. })
  240. response_data["defectDetailList"] = final_defect_list
  241. return response_data