rating_report_utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. import copy
  2. import io
  3. import json
  4. from time import perf_counter
  5. from collections import Counter
  6. from datetime import datetime
  7. from typing import Any, Dict, List, Optional, Tuple
  8. from fastapi import HTTPException
  9. from PIL import Image
  10. from mysql.connector.pooling import PooledMySQLConnection
  11. from app.core.config import settings
  12. from app.core.logger import get_logger
  13. from app.core.minio_client import minio_client
  14. logger = get_logger(__name__)
  15. def parse_json_value(raw_value: Any) -> Dict[str, Any]:
  16. if raw_value is None:
  17. return {}
  18. if isinstance(raw_value, dict):
  19. return raw_value
  20. if isinstance(raw_value, str):
  21. try:
  22. return json.loads(raw_value)
  23. except json.JSONDecodeError:
  24. return {}
  25. return {}
  26. def format_datetime(dt: Any) -> str:
  27. if isinstance(dt, datetime):
  28. return dt.strftime("%Y-%m-%d %H:%M:%S")
  29. return ""
  30. def _normalize_compare_value(value: Any) -> Any:
  31. """
  32. 归一化对比数据,避免浮点精度和容器类型差异造成误判。
  33. """
  34. if isinstance(value, float):
  35. return round(value, 4)
  36. if isinstance(value, tuple):
  37. return [_normalize_compare_value(item) for item in value]
  38. if isinstance(value, list):
  39. return [_normalize_compare_value(item) for item in value]
  40. if isinstance(value, dict):
  41. return {key: _normalize_compare_value(val) for key, val in value.items()}
  42. return value
  43. def build_defect_compare_key(defect: Dict[str, Any], side: str = "", location: str = "") -> str:
  44. """
  45. 为原始缺陷构造稳定的对比键。
  46. 按用户需求,简化为 actual_area + label + side + location 的组合。
  47. """
  48. actual_area = _normalize_compare_value(defect.get("actual_area"))
  49. label = defect.get("label") or ""
  50. final_side = side or defect.get("side") or ""
  51. final_location = location or defect.get("location") or ""
  52. return f"{final_side}|{final_location}|{label}|{actual_area}"
  53. def _build_report_defect_compare_key(defect_item: Dict[str, Any]) -> str:
  54. """
  55. 为历史报告里的 defectDetailList 项构造对比键。
  56. 新数据优先复用 compareKey,老数据回退到 side/location/label/actual_area 组合。
  57. """
  58. compare_key = defect_item.get("compareKey")
  59. if compare_key:
  60. return compare_key
  61. actual_area = defect_item.get("actualArea")
  62. if actual_area is None:
  63. actual_area = defect_item.get("actual_area")
  64. actual_area = _normalize_compare_value(actual_area)
  65. label = defect_item.get("label")
  66. if label is None:
  67. label = defect_item.get("type") or ""
  68. side = defect_item.get("side") or ""
  69. location = defect_item.get("location") or ""
  70. return f"{side}|{location}|{label}|{actual_area}"
  71. def enrich_report_for_compare(report_json: Any) -> Dict[str, Any]:
  72. """
  73. 给历史报告补齐 compareKey,兼容旧数据。
  74. """
  75. report_data = copy.deepcopy(parse_json_value(report_json))
  76. defect_list = report_data.get("defectDetailList")
  77. if not isinstance(defect_list, list):
  78. report_data["defectDetailList"] = []
  79. return report_data
  80. for defect_item in defect_list:
  81. if not isinstance(defect_item, dict):
  82. continue
  83. defect_item["compareKey"] = _build_report_defect_compare_key(defect_item)
  84. return report_data
  85. def build_report_compare_snapshot(report_json: Any) -> Dict[str, Any]:
  86. """
  87. 生成用于“历史去重”的归一化快照。
  88. 这里会忽略 defectImgUrl 等波动字段,只保留真正反映缺陷是否变化的部分。
  89. """
  90. report_data = enrich_report_for_compare(report_json)
  91. compare_snapshot = copy.deepcopy(report_data)
  92. normalized_defects = []
  93. for defect_item in report_data.get("defectDetailList", []):
  94. if not isinstance(defect_item, dict):
  95. continue
  96. normalized_defects.append({
  97. "side": defect_item.get("side") or "",
  98. "type": defect_item.get("type") or "",
  99. "location": defect_item.get("location") or "",
  100. "compareKey": defect_item.get("compareKey") or "",
  101. })
  102. normalized_defects.sort(key=lambda item: item["compareKey"])
  103. compare_snapshot["defectDetailList"] = normalized_defects
  104. return _normalize_compare_value(compare_snapshot)
  105. def _filter_common_defects(defect_list: List[Dict[str, Any]], common_counter: Counter) -> List[Dict[str, Any]]:
  106. """
  107. 逐项扣减重复计数,只移除两边真正重合的那一部分。
  108. """
  109. filtered_list = []
  110. for defect_item in defect_list:
  111. compare_key = defect_item.get("compareKey") or _build_report_defect_compare_key(defect_item)
  112. if common_counter[compare_key] > 0:
  113. common_counter[compare_key] -= 1
  114. continue
  115. filtered_list.append(defect_item)
  116. return filtered_list
  117. def remove_common_defects(
  118. left_report_json: Any,
  119. right_report_json: Any
  120. ) -> Tuple[Dict[str, Any], Dict[str, Any], int]:
  121. """
  122. 移除两份历史报告中相同的 defectDetailList 项,只保留差异项。
  123. """
  124. left_report = enrich_report_for_compare(left_report_json)
  125. right_report = enrich_report_for_compare(right_report_json)
  126. left_defects = [item for item in left_report.get("defectDetailList", []) if isinstance(item, dict)]
  127. right_defects = [item for item in right_report.get("defectDetailList", []) if isinstance(item, dict)]
  128. left_counter = Counter(item.get("compareKey") or _build_report_defect_compare_key(item) for item in left_defects)
  129. right_counter = Counter(item.get("compareKey") or _build_report_defect_compare_key(item) for item in right_defects)
  130. common_counter = left_counter & right_counter
  131. common_count = sum(common_counter.values())
  132. left_report["defectDetailList"] = _filter_common_defects(left_defects, common_counter.copy())
  133. right_report["defectDetailList"] = _filter_common_defects(right_defects, common_counter.copy())
  134. return left_report, right_report, common_count
  135. def save_rating_report_history(
  136. db_conn: PooledMySQLConnection,
  137. card_id: int,
  138. card_no: str,
  139. report_name: str,
  140. report_json: Dict[str, Any]
  141. ) -> Dict[str, Any]:
  142. try:
  143. current_snapshot = build_report_compare_snapshot(report_json)
  144. with db_conn.cursor(dictionary=True) as cursor:
  145. # 只和上一条历史比较,满足“当前和上一次一样就不存”的需求。
  146. latest_sql = (
  147. f"SELECT rating_id, report_name, report_json "
  148. f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
  149. "WHERE card_id = %s "
  150. "ORDER BY rating_id DESC LIMIT 1"
  151. )
  152. cursor.execute(latest_sql, (card_id,))
  153. latest_row = cursor.fetchone()
  154. if latest_row:
  155. latest_snapshot = build_report_compare_snapshot(latest_row.get("report_json"))
  156. if latest_snapshot == current_snapshot:
  157. logger.info(
  158. f"评级报告历史未发生变化,跳过保存: cardNo={card_no}, rating_id={latest_row.get('rating_id')}"
  159. )
  160. return {
  161. "rating_id": latest_row.get("rating_id"),
  162. "report_name": latest_row.get("report_name"),
  163. "created": False
  164. }
  165. insert_sql = (
  166. f"INSERT INTO {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
  167. "(card_id, cardNo, report_name, report_json) "
  168. "VALUES (%s, %s, %s, %s)"
  169. )
  170. cursor.execute(
  171. insert_sql,
  172. (card_id, card_no, report_name, json.dumps(report_json, ensure_ascii=False))
  173. )
  174. db_conn.commit()
  175. logger.info(f"评级报告历史保存成功: cardNo={card_no}, rating_id={cursor.lastrowid}")
  176. return {
  177. "rating_id": cursor.lastrowid,
  178. "report_name": report_name,
  179. "created": True
  180. }
  181. except Exception as e:
  182. db_conn.rollback()
  183. logger.error(f"保存评级报告历史失败: {e}")
  184. raise HTTPException(status_code=500, detail="保存评级报告历史失败")
  185. def get_active_json(image_data: Any) -> Optional[Dict]:
  186. """获取有效的 json 数据,优先使用 modified_json。"""
  187. if not image_data:
  188. return None
  189. # image_data 可能是 Pydantic 对象或者字典,这里做兼容。
  190. if hasattr(image_data, "modified_json"):
  191. modified_json = image_data.modified_json
  192. detection_json = image_data.detection_json
  193. else:
  194. modified_json = image_data.get("modified_json")
  195. detection_json = image_data.get("detection_json")
  196. if modified_json:
  197. return modified_json
  198. return detection_json
  199. def crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str:
  200. """
  201. 通过 MinIO 裁剪缺陷图,生成正方形缺陷图并回传最终 URL。
  202. """
  203. start_time = perf_counter()
  204. try:
  205. # 把完整 URL 转成 MinIO 对象路径,例如 /Data/xxx.jpg。
  206. rel_path = original_image_path_str.replace(settings.DATA_HOST_URL, "")
  207. rel_path = "/" + rel_path.lstrip('/\\')
  208. object_name = f"{settings.MINIO_BASE_PREFIX}{rel_path}"
  209. try:
  210. response = minio_client.get_object(settings.MINIO_BUCKET, object_name)
  211. image_bytes = response.read()
  212. response.close()
  213. response.release_conn()
  214. except Exception as e:
  215. logger.warning(f"从 MinIO 获取原图失败: {object_name} -> {e}")
  216. return ""
  217. with Image.open(io.BytesIO(image_bytes)) as img:
  218. img_w, img_h = img.size
  219. if img_w <= 1 or img_h <= 1:
  220. return ""
  221. (center_x, center_y), (rect_w, rect_h), angle = min_rect
  222. center_x = int(center_x)
  223. center_y = int(center_y)
  224. rect_w = int(rect_w)
  225. rect_h = int(rect_h)
  226. resize_scale = 1.5 - abs(abs(angle % 90) - 45) / 90
  227. side_length = max(max(rect_w, rect_h) * resize_scale, 100)
  228. half_side = side_length / 2
  229. left = int(max(0, min(img_w - 1, center_x - half_side)))
  230. top = int(max(0, min(img_h - 1, center_y - half_side)))
  231. right = int(min(img_w, max(left + 1, center_x + half_side)))
  232. bottom = int(min(img_h, max(top + 1, center_y + half_side)))
  233. if right <= left or bottom <= top:
  234. logger.warning(
  235. f"裁剪坐标无效,跳过: img_size=({img_w},{img_h}), "
  236. f"crop=({left},{top},{right},{bottom}), min_rect={min_rect}"
  237. )
  238. return ""
  239. cropped_img = img.crop((left, top, right, bottom))
  240. out_bytes = io.BytesIO()
  241. cropped_img.save(out_bytes, format="JPEG", quality=95)
  242. out_bytes.seek(0)
  243. out_rel_path = f"/DefectImage/{output_filename}"
  244. out_object_name = f"{settings.MINIO_BASE_PREFIX}{out_rel_path}"
  245. minio_client.put_object(
  246. settings.MINIO_BUCKET,
  247. out_object_name,
  248. out_bytes,
  249. len(out_bytes.getvalue()),
  250. content_type="image/jpeg"
  251. )
  252. result_url = settings.get_full_url(out_rel_path)
  253. logger.info(
  254. "耗时埋点 crop_defect_image: source=%s output=%s elapsed_ms=%.2f",
  255. object_name, out_object_name, (perf_counter() - start_time) * 1000
  256. )
  257. return result_url
  258. except Exception as e:
  259. logger.error(f"裁剪并上传缺陷图失败: {e}")
  260. logger.info(
  261. "耗时埋点 crop_defect_image: source=%s output=%s elapsed_ms=%.2f",
  262. original_image_path_str, output_filename, (perf_counter() - start_time) * 1000
  263. )
  264. return ""