card_score_calculate.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. from app.utils.scheme import ImageType, CardImageResponse
  2. from app.core.logger import get_logger
  3. from typing import List, Dict, Any, Optional, Set, Tuple
  4. logger = get_logger(__name__)
  5. # 参与算分的图片类型(14 图以 fusion 为准,ring 兜底)
  6. SCORE_SOURCE_IMAGE_TYPES: Set[str] = {
  7. ImageType.front_fusion.value,
  8. ImageType.back_fusion.value,
  9. ImageType.front_ring.value,
  10. ImageType.back_ring.value,
  11. ImageType.front_coaxial.value,
  12. ImageType.back_coaxial.value,
  13. }
  14. SCORE_BASE = 10.0
  15. SCORE_MIN = 0.0
  16. SCORE_MAX = 10.0
  17. def _clamp_score(value: float) -> float:
  18. """卡牌分数不允许为负,且落在 [0, 10]。"""
  19. return max(SCORE_MIN, min(SCORE_MAX, value))
  20. def _as_float(value: Any) -> float:
  21. try:
  22. return float(value or 0)
  23. except (TypeError, ValueError):
  24. return 0.0
  25. def _get_reducts(src: Dict[str, Any], side: str) -> Dict[str, float]:
  26. """从单面 JSON 提取扣分字段(兼容 stitch 新版与旧版结构)。"""
  27. result = src.get("result", {}) if isinstance(src, dict) else {}
  28. defect_result = result.get("defect_result", {}) or {}
  29. center_result = result.get("center_result", {}) or {}
  30. center_deduct = center_result.get("deduct_score")
  31. if center_deduct is None:
  32. center_deduct = result.get("card_center_deduct_score", 0)
  33. return {
  34. "card_score": result.get("card_score"),
  35. "total": _as_float(result.get("_used_compute_deduct_score", 0)),
  36. "center": _as_float(center_deduct),
  37. "corner": _as_float(defect_result.get(f"{side}_corner_deduct_score", 0)),
  38. "edge": _as_float(defect_result.get(f"{side}_edge_deduct_score", 0)),
  39. "face": _as_float(defect_result.get(f"{side}_face_deduct_score", 0)),
  40. }
  41. def _find_image(images: List[CardImageResponse], image_type: str) -> Optional[CardImageResponse]:
  42. for img in images:
  43. if img.image_type == image_type:
  44. return img
  45. return None
  46. def _resolve_side_score_image(
  47. images: List[CardImageResponse],
  48. side: str,
  49. ) -> Optional[CardImageResponse]:
  50. """每面优先 fusion(stitch JSON 落库处),其次 ring。"""
  51. if side == "front":
  52. return (
  53. _find_image(images, ImageType.front_fusion.value)
  54. or _find_image(images, ImageType.front_ring.value)
  55. )
  56. return (
  57. _find_image(images, ImageType.back_fusion.value)
  58. or _find_image(images, ImageType.back_ring.value)
  59. )
  60. def _resolve_side_detection_score(src: Dict[str, Any], side: str = "front") -> float:
  61. """
  62. 单面最终分:
  63. - stitch 新版:直接使用 result.card_score(已是该面最终得分)
  64. - 旧版兜底:10 + _used_compute_deduct_score
  65. """
  66. reducts = _get_reducts(src, side)
  67. if reducts["card_score"] is not None:
  68. return _clamp_score(_as_float(reducts["card_score"]))
  69. return _clamp_score(SCORE_BASE + reducts["total"])
  70. def calculate_scores_from_images(images: List[CardImageResponse]) -> Dict[str, Any]:
  71. """
  72. 根据图片计算分数(兼容 14 图 stitch 与历史 8 图)。
  73. detection_score:
  74. 优先取正/反面 fusion(或 ring)JSON 中的 card_score 平均;
  75. 两面 _used_compute_deduct_score 已是整面总扣分,不可再与 10 分基准相加两次。
  76. detection_score_detail:
  77. 各分项仍按 10 分基准 + 对应扣分字段计算。
  78. """
  79. scores = {
  80. "detection_score": None,
  81. "modified_score": None,
  82. "detection_score_detail": {
  83. "detection_center_score": None,
  84. "detection_corner_score": None,
  85. "detection_edge_score": None,
  86. "detection_face_score": None
  87. },
  88. "modified_score_detail": {
  89. "modified_center_score": None,
  90. "modified_corner_score": None,
  91. "modified_edge_score": None,
  92. "modified_face_score": None
  93. },
  94. "is_edited": False,
  95. }
  96. score_images = [img for img in images if img.image_type in SCORE_SOURCE_IMAGE_TYPES]
  97. front_side_img = _resolve_side_score_image(score_images, "front")
  98. back_side_img = _resolve_side_score_image(score_images, "back")
  99. front_coaxial = _find_image(score_images, ImageType.front_coaxial.value)
  100. back_coaxial = _find_image(score_images, ImageType.back_coaxial.value)
  101. if not front_side_img or not back_side_img:
  102. return scores
  103. def _apply_side_detail(
  104. side_img: CardImageResponse,
  105. side: str,
  106. center_score: float,
  107. corner_score: float,
  108. edge_score: float,
  109. face_score: float,
  110. ) -> Tuple[float, float, float, float]:
  111. reducts = _get_reducts(side_img.detection_json or {}, side)
  112. return (
  113. center_score + reducts["center"],
  114. corner_score + reducts["corner"],
  115. edge_score + reducts["edge"],
  116. face_score + reducts["face"],
  117. )
  118. try:
  119. front_src = front_side_img.detection_json or {}
  120. back_src = back_side_img.detection_json or {}
  121. front_side_score = _resolve_side_detection_score(front_src, "front")
  122. back_side_score = _resolve_side_detection_score(back_src, "back")
  123. scores["detection_score"] = _clamp_score((front_side_score + back_side_score) / 2)
  124. detection_center_score = SCORE_BASE
  125. detection_corner_score = SCORE_BASE
  126. detection_edge_score = SCORE_BASE
  127. detection_face_score = SCORE_BASE
  128. for side_img, side in ((front_side_img, "front"), (back_side_img, "back")):
  129. try:
  130. detection_center_score, detection_corner_score, detection_edge_score, detection_face_score = _apply_side_detail(
  131. side_img, side,
  132. detection_center_score, detection_corner_score, detection_edge_score, detection_face_score,
  133. )
  134. except Exception as e:
  135. logger.warning(f"解析 detection_json 分项失败 (image_id={side_img.id}): {e}")
  136. if front_coaxial:
  137. try:
  138. reducts = _get_reducts(front_coaxial.detection_json or {}, "front")
  139. detection_face_score += reducts["face"]
  140. except Exception as e:
  141. logger.warning(f"解析 detection_json 分数失败 (image_id={front_coaxial.id}): {e}")
  142. if back_coaxial:
  143. try:
  144. reducts = _get_reducts(back_coaxial.detection_json or {}, "back")
  145. detection_face_score += reducts["face"]
  146. except Exception as e:
  147. logger.warning(f"解析 detection_json 分数失败 (image_id={back_coaxial.id}): {e}")
  148. scores["detection_score_detail"] = {
  149. "detection_center_score": _clamp_score(detection_center_score),
  150. "detection_corner_score": _clamp_score(detection_corner_score),
  151. "detection_edge_score": _clamp_score(detection_edge_score),
  152. "detection_face_score": _clamp_score(detection_face_score),
  153. }
  154. score_sources = [front_side_img, back_side_img, front_coaxial, back_coaxial]
  155. is_edited = any(
  156. img and img.modified_json is not None
  157. for img in score_sources if img is not None
  158. )
  159. scores["is_edited"] = is_edited
  160. if is_edited:
  161. front_m_src = (
  162. front_side_img.modified_json
  163. if front_side_img.modified_json is not None
  164. else front_side_img.detection_json
  165. ) or {}
  166. back_m_src = (
  167. back_side_img.modified_json
  168. if back_side_img.modified_json is not None
  169. else back_side_img.detection_json
  170. ) or {}
  171. modified_score = _clamp_score(
  172. (
  173. _resolve_side_detection_score(front_m_src, "front")
  174. + _resolve_side_detection_score(back_m_src, "back")
  175. ) / 2
  176. )
  177. modified_center_score = SCORE_BASE
  178. modified_corner_score = SCORE_BASE
  179. modified_edge_score = SCORE_BASE
  180. modified_face_score = SCORE_BASE
  181. for side_img, side in ((front_side_img, "front"), (back_side_img, "back")):
  182. src = side_img.modified_json if side_img.modified_json is not None else side_img.detection_json
  183. try:
  184. reducts = _get_reducts(src or {}, side)
  185. modified_center_score += reducts["center"]
  186. modified_corner_score += reducts["corner"]
  187. modified_edge_score += reducts["edge"]
  188. modified_face_score += reducts["face"]
  189. except Exception as e:
  190. logger.warning(f"解析 modified_json 分项失败 (image_id={side_img.id}): {e}")
  191. if front_coaxial:
  192. src = front_coaxial.modified_json if front_coaxial.modified_json is not None else front_coaxial.detection_json
  193. try:
  194. modified_face_score += _get_reducts(src or {}, "front")["face"]
  195. except Exception as e:
  196. logger.warning(f"解析 modified_json 分数失败 (image_id={front_coaxial.id}): {e}")
  197. if back_coaxial:
  198. src = back_coaxial.modified_json if back_coaxial.modified_json is not None else back_coaxial.detection_json
  199. try:
  200. modified_face_score += _get_reducts(src or {}, "back")["face"]
  201. except Exception as e:
  202. logger.warning(f"解析 modified_json 分数失败 (image_id={back_coaxial.id}): {e}")
  203. scores["modified_score"] = modified_score
  204. scores["modified_score_detail"] = {
  205. "modified_center_score": _clamp_score(modified_center_score),
  206. "modified_corner_score": _clamp_score(modified_corner_score),
  207. "modified_edge_score": _clamp_score(modified_edge_score),
  208. "modified_face_score": _clamp_score(modified_face_score),
  209. }
  210. logger.info(
  211. "算分结果: front_side=%.4f back_side=%.4f detection_score=%s",
  212. _resolve_side_detection_score(front_src, "front"),
  213. _resolve_side_detection_score(back_src, "back"),
  214. scores["detection_score"],
  215. )
  216. except Exception as e:
  217. logger.error(f"计算分数过程异常: {e}")
  218. return scores