rating_report_utils.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import copy
  2. import io
  3. import json
  4. from collections import Counter
  5. from datetime import datetime
  6. from typing import Any, Dict, List, Optional, Tuple
  7. from fastapi import HTTPException
  8. from PIL import Image
  9. from mysql.connector.pooling import PooledMySQLConnection
  10. from app.core.config import settings
  11. from app.core.logger import get_logger
  12. from app.core.minio_client import minio_client
  13. logger = get_logger(__name__)
  14. def parse_json_value(raw_value: Any) -> Dict[str, Any]:
  15. if raw_value is None:
  16. return {}
  17. if isinstance(raw_value, dict):
  18. return raw_value
  19. if isinstance(raw_value, str):
  20. try:
  21. return json.loads(raw_value)
  22. except json.JSONDecodeError:
  23. return {}
  24. return {}
  25. def format_datetime(dt: Any) -> str:
  26. if isinstance(dt, datetime):
  27. return dt.strftime("%Y-%m-%d %H:%M:%S")
  28. return ""
  29. def _normalize_compare_value(value: Any) -> Any:
  30. """
  31. 归一化对比数据,避免浮点精度和容器类型差异造成误判。
  32. """
  33. if isinstance(value, float):
  34. return round(value, 4)
  35. if isinstance(value, tuple):
  36. return [_normalize_compare_value(item) for item in value]
  37. if isinstance(value, list):
  38. return [_normalize_compare_value(item) for item in value]
  39. if isinstance(value, dict):
  40. return {key: _normalize_compare_value(val) for key, val in value.items()}
  41. return value
  42. def build_defect_compare_key(defect: Dict[str, Any], side: str = "", location: str = "") -> str:
  43. """
  44. 为原始缺陷构造稳定的对比键。
  45. 按用户需求,简化为 actual_area + label + side + location 的组合。
  46. """
  47. actual_area = _normalize_compare_value(defect.get("actual_area"))
  48. label = defect.get("label") or ""
  49. final_side = side or defect.get("side") or ""
  50. final_location = location or defect.get("location") or ""
  51. return f"{final_side}|{final_location}|{label}|{actual_area}"
  52. def _build_report_defect_compare_key(defect_item: Dict[str, Any]) -> str:
  53. """
  54. 为历史报告里的 defectDetailList 项构造对比键。
  55. 新数据优先复用 compareKey,老数据回退到 side/location/label/actual_area 组合。
  56. """
  57. compare_key = defect_item.get("compareKey")
  58. if compare_key:
  59. return compare_key
  60. actual_area = defect_item.get("actualArea")
  61. if actual_area is None:
  62. actual_area = defect_item.get("actual_area")
  63. actual_area = _normalize_compare_value(actual_area)
  64. label = defect_item.get("label")
  65. if label is None:
  66. label = defect_item.get("type") or ""
  67. side = defect_item.get("side") or ""
  68. location = defect_item.get("location") or ""
  69. return f"{side}|{location}|{label}|{actual_area}"
  70. def enrich_report_for_compare(report_json: Any) -> Dict[str, Any]:
  71. """
  72. 给历史报告补齐 compareKey,兼容旧数据。
  73. """
  74. report_data = copy.deepcopy(parse_json_value(report_json))
  75. defect_list = report_data.get("defectDetailList")
  76. if not isinstance(defect_list, list):
  77. report_data["defectDetailList"] = []
  78. return report_data
  79. for defect_item in defect_list:
  80. if not isinstance(defect_item, dict):
  81. continue
  82. defect_item["compareKey"] = _build_report_defect_compare_key(defect_item)
  83. return report_data
  84. def build_report_compare_snapshot(report_json: Any) -> Dict[str, Any]:
  85. """
  86. 生成用于“历史去重”的归一化快照。
  87. 这里会忽略 defectImgUrl 等波动字段,只保留真正反映缺陷是否变化的部分。
  88. """
  89. report_data = enrich_report_for_compare(report_json)
  90. compare_snapshot = copy.deepcopy(report_data)
  91. normalized_defects = []
  92. for defect_item in report_data.get("defectDetailList", []):
  93. if not isinstance(defect_item, dict):
  94. continue
  95. normalized_defects.append({
  96. "side": defect_item.get("side") or "",
  97. "type": defect_item.get("type") or "",
  98. "location": defect_item.get("location") or "",
  99. "compareKey": defect_item.get("compareKey") or "",
  100. })
  101. normalized_defects.sort(key=lambda item: item["compareKey"])
  102. compare_snapshot["defectDetailList"] = normalized_defects
  103. return _normalize_compare_value(compare_snapshot)
  104. def _filter_common_defects(defect_list: List[Dict[str, Any]], common_counter: Counter) -> List[Dict[str, Any]]:
  105. """
  106. 逐项扣减重复计数,只移除两边真正重合的那一部分。
  107. """
  108. filtered_list = []
  109. for defect_item in defect_list:
  110. compare_key = defect_item.get("compareKey") or _build_report_defect_compare_key(defect_item)
  111. if common_counter[compare_key] > 0:
  112. common_counter[compare_key] -= 1
  113. continue
  114. filtered_list.append(defect_item)
  115. return filtered_list
  116. def remove_common_defects(
  117. left_report_json: Any,
  118. right_report_json: Any
  119. ) -> Tuple[Dict[str, Any], Dict[str, Any], int]:
  120. """
  121. 移除两份历史报告中相同的 defectDetailList 项,只保留差异项。
  122. """
  123. left_report = enrich_report_for_compare(left_report_json)
  124. right_report = enrich_report_for_compare(right_report_json)
  125. left_defects = [item for item in left_report.get("defectDetailList", []) if isinstance(item, dict)]
  126. right_defects = [item for item in right_report.get("defectDetailList", []) if isinstance(item, dict)]
  127. left_counter = Counter(item.get("compareKey") or _build_report_defect_compare_key(item) for item in left_defects)
  128. right_counter = Counter(item.get("compareKey") or _build_report_defect_compare_key(item) for item in right_defects)
  129. common_counter = left_counter & right_counter
  130. common_count = sum(common_counter.values())
  131. left_report["defectDetailList"] = _filter_common_defects(left_defects, common_counter.copy())
  132. right_report["defectDetailList"] = _filter_common_defects(right_defects, common_counter.copy())
  133. return left_report, right_report, common_count
  134. def save_rating_report_history(
  135. db_conn: PooledMySQLConnection,
  136. card_id: int,
  137. card_no: str,
  138. report_name: str,
  139. report_json: Dict[str, Any]
  140. ) -> Dict[str, Any]:
  141. try:
  142. current_snapshot = build_report_compare_snapshot(report_json)
  143. with db_conn.cursor(dictionary=True) as cursor:
  144. # 只和上一条历史比较,满足“当前和上一次一样就不存”的需求。
  145. latest_sql = (
  146. f"SELECT rating_id, report_name, report_json "
  147. f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
  148. "WHERE card_id = %s "
  149. "ORDER BY rating_id DESC LIMIT 1"
  150. )
  151. cursor.execute(latest_sql, (card_id,))
  152. latest_row = cursor.fetchone()
  153. if latest_row:
  154. latest_snapshot = build_report_compare_snapshot(latest_row.get("report_json"))
  155. if latest_snapshot == current_snapshot:
  156. logger.info(
  157. f"评级报告历史未发生变化,跳过保存: cardNo={card_no}, rating_id={latest_row.get('rating_id')}"
  158. )
  159. return {
  160. "rating_id": latest_row.get("rating_id"),
  161. "report_name": latest_row.get("report_name"),
  162. "created": False
  163. }
  164. insert_sql = (
  165. f"INSERT INTO {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
  166. "(card_id, cardNo, report_name, report_json) "
  167. "VALUES (%s, %s, %s, %s)"
  168. )
  169. cursor.execute(
  170. insert_sql,
  171. (card_id, card_no, report_name, json.dumps(report_json, ensure_ascii=False))
  172. )
  173. db_conn.commit()
  174. logger.info(f"评级报告历史保存成功: cardNo={card_no}, rating_id={cursor.lastrowid}")
  175. return {
  176. "rating_id": cursor.lastrowid,
  177. "report_name": report_name,
  178. "created": True
  179. }
  180. except Exception as e:
  181. db_conn.rollback()
  182. logger.error(f"保存评级报告历史失败: {e}")
  183. raise HTTPException(status_code=500, detail="保存评级报告历史失败")
  184. def get_active_json(image_data: Any) -> Optional[Dict]:
  185. """获取有效的 json 数据,优先使用 modified_json。"""
  186. if not image_data:
  187. return None
  188. # image_data 可能是 Pydantic 对象或者字典,这里做兼容。
  189. if hasattr(image_data, "modified_json"):
  190. modified_json = image_data.modified_json
  191. detection_json = image_data.detection_json
  192. else:
  193. modified_json = image_data.get("modified_json")
  194. detection_json = image_data.get("detection_json")
  195. if modified_json:
  196. return modified_json
  197. return detection_json
  198. def crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str:
  199. """
  200. 通过 MinIO 裁剪缺陷图,生成正方形缺陷图并回传最终 URL。
  201. """
  202. try:
  203. # 把完整 URL 转成 MinIO 对象路径,例如 /Data/xxx.jpg。
  204. rel_path = original_image_path_str.replace(settings.DATA_HOST_URL, "")
  205. rel_path = "/" + rel_path.lstrip('/\\')
  206. object_name = f"{settings.MINIO_BASE_PREFIX}{rel_path}"
  207. try:
  208. response = minio_client.get_object(settings.MINIO_BUCKET, object_name)
  209. image_bytes = response.read()
  210. response.close()
  211. response.release_conn()
  212. except Exception as e:
  213. logger.warning(f"从 MinIO 获取原图失败: {object_name} -> {e}")
  214. return ""
  215. with Image.open(io.BytesIO(image_bytes)) as img:
  216. img_w, img_h = img.size
  217. (center_x, center_y), (rect_w, rect_h), angle = min_rect
  218. center_x = int(center_x)
  219. center_y = int(center_y)
  220. rect_w = int(rect_w)
  221. rect_h = int(rect_h)
  222. resize_scale = 1.5 - abs(abs(angle % 90) - 45) / 90
  223. side_length = max(max(rect_w, rect_h) * resize_scale, 100)
  224. half_side = side_length / 2
  225. left = max(0, center_x - half_side)
  226. top = max(0, center_y - half_side)
  227. right = min(img_w, center_x + half_side)
  228. bottom = min(img_h, center_y + half_side)
  229. cropped_img = img.crop((left, top, right, bottom))
  230. out_bytes = io.BytesIO()
  231. cropped_img.save(out_bytes, format="JPEG", quality=95)
  232. out_bytes.seek(0)
  233. out_rel_path = f"/DefectImage/{output_filename}"
  234. out_object_name = f"{settings.MINIO_BASE_PREFIX}{out_rel_path}"
  235. minio_client.put_object(
  236. settings.MINIO_BUCKET,
  237. out_object_name,
  238. out_bytes,
  239. len(out_bytes.getvalue()),
  240. content_type="image/jpeg"
  241. )
  242. return settings.get_full_url(out_rel_path)
  243. except Exception as e:
  244. logger.error(f"裁剪并上传缺陷图失败: {e}")
  245. return ""