from app.utils.scheme import ImageType, CardImageResponse from app.core.logger import get_logger from typing import List, Dict, Any, Optional, Set, Tuple logger = get_logger(__name__) # 参与算分的图片类型(14 图以 fusion 为准,ring 兜底) SCORE_SOURCE_IMAGE_TYPES: Set[str] = { ImageType.front_fusion.value, ImageType.back_fusion.value, ImageType.front_ring.value, ImageType.back_ring.value, ImageType.front_coaxial.value, ImageType.back_coaxial.value, } SCORE_BASE = 10.0 SCORE_MIN = 0.0 SCORE_MAX = 10.0 def _clamp_score(value: float) -> float: """卡牌分数不允许为负,且落在 [0, 10]。""" return max(SCORE_MIN, min(SCORE_MAX, value)) def _as_float(value: Any) -> float: try: return float(value or 0) except (TypeError, ValueError): return 0.0 def _get_reducts(src: Dict[str, Any], side: str) -> Dict[str, float]: """从单面 JSON 提取扣分字段(兼容 stitch 新版与旧版结构)。""" result = src.get("result", {}) if isinstance(src, dict) else {} defect_result = result.get("defect_result", {}) or {} center_result = result.get("center_result", {}) or {} center_deduct = center_result.get("deduct_score") if center_deduct is None: center_deduct = result.get("card_center_deduct_score", 0) return { "card_score": result.get("card_score"), "total": _as_float(result.get("_used_compute_deduct_score", 0)), "center": _as_float(center_deduct), "corner": _as_float(defect_result.get(f"{side}_corner_deduct_score", 0)), "edge": _as_float(defect_result.get(f"{side}_edge_deduct_score", 0)), "face": _as_float(defect_result.get(f"{side}_face_deduct_score", 0)), } def _find_image(images: List[CardImageResponse], image_type: str) -> Optional[CardImageResponse]: for img in images: if img.image_type == image_type: return img return None def _resolve_side_score_image( images: List[CardImageResponse], side: str, ) -> Optional[CardImageResponse]: """每面优先 fusion(stitch JSON 落库处),其次 ring。""" if side == "front": return ( _find_image(images, ImageType.front_fusion.value) or _find_image(images, ImageType.front_ring.value) ) return ( _find_image(images, ImageType.back_fusion.value) or _find_image(images, ImageType.back_ring.value) ) def _resolve_side_detection_score(src: Dict[str, Any], side: str = "front") -> float: """ 单面最终分: - stitch 新版:直接使用 result.card_score(已是该面最终得分) - 旧版兜底:10 + _used_compute_deduct_score """ reducts = _get_reducts(src, side) if reducts["card_score"] is not None: return _clamp_score(_as_float(reducts["card_score"])) return _clamp_score(SCORE_BASE + reducts["total"]) def calculate_scores_from_images(images: List[CardImageResponse]) -> Dict[str, Any]: """ 根据图片计算分数(兼容 14 图 stitch 与历史 8 图)。 detection_score: 优先取正/反面 fusion(或 ring)JSON 中的 card_score 平均; 两面 _used_compute_deduct_score 已是整面总扣分,不可再与 10 分基准相加两次。 detection_score_detail: 各分项仍按 10 分基准 + 对应扣分字段计算。 """ scores = { "detection_score": None, "modified_score": None, "detection_score_detail": { "detection_center_score": None, "detection_corner_score": None, "detection_edge_score": None, "detection_face_score": None }, "modified_score_detail": { "modified_center_score": None, "modified_corner_score": None, "modified_edge_score": None, "modified_face_score": None }, "is_edited": False, } score_images = [img for img in images if img.image_type in SCORE_SOURCE_IMAGE_TYPES] front_side_img = _resolve_side_score_image(score_images, "front") back_side_img = _resolve_side_score_image(score_images, "back") front_coaxial = _find_image(score_images, ImageType.front_coaxial.value) back_coaxial = _find_image(score_images, ImageType.back_coaxial.value) if not front_side_img or not back_side_img: return scores def _apply_side_detail( side_img: CardImageResponse, side: str, center_score: float, corner_score: float, edge_score: float, face_score: float, ) -> Tuple[float, float, float, float]: reducts = _get_reducts(side_img.detection_json or {}, side) return ( center_score + reducts["center"], corner_score + reducts["corner"], edge_score + reducts["edge"], face_score + reducts["face"], ) try: front_src = front_side_img.detection_json or {} back_src = back_side_img.detection_json or {} front_side_score = _resolve_side_detection_score(front_src, "front") back_side_score = _resolve_side_detection_score(back_src, "back") scores["detection_score"] = _clamp_score((front_side_score + back_side_score) / 2) detection_center_score = SCORE_BASE detection_corner_score = SCORE_BASE detection_edge_score = SCORE_BASE detection_face_score = SCORE_BASE for side_img, side in ((front_side_img, "front"), (back_side_img, "back")): try: detection_center_score, detection_corner_score, detection_edge_score, detection_face_score = _apply_side_detail( side_img, side, detection_center_score, detection_corner_score, detection_edge_score, detection_face_score, ) except Exception as e: logger.warning(f"解析 detection_json 分项失败 (image_id={side_img.id}): {e}") if front_coaxial: try: reducts = _get_reducts(front_coaxial.detection_json or {}, "front") detection_face_score += reducts["face"] except Exception as e: logger.warning(f"解析 detection_json 分数失败 (image_id={front_coaxial.id}): {e}") if back_coaxial: try: reducts = _get_reducts(back_coaxial.detection_json or {}, "back") detection_face_score += reducts["face"] except Exception as e: logger.warning(f"解析 detection_json 分数失败 (image_id={back_coaxial.id}): {e}") scores["detection_score_detail"] = { "detection_center_score": _clamp_score(detection_center_score), "detection_corner_score": _clamp_score(detection_corner_score), "detection_edge_score": _clamp_score(detection_edge_score), "detection_face_score": _clamp_score(detection_face_score), } score_sources = [front_side_img, back_side_img, front_coaxial, back_coaxial] is_edited = any( img and img.modified_json is not None for img in score_sources if img is not None ) scores["is_edited"] = is_edited if is_edited: front_m_src = ( front_side_img.modified_json if front_side_img.modified_json is not None else front_side_img.detection_json ) or {} back_m_src = ( back_side_img.modified_json if back_side_img.modified_json is not None else back_side_img.detection_json ) or {} modified_score = _clamp_score( ( _resolve_side_detection_score(front_m_src, "front") + _resolve_side_detection_score(back_m_src, "back") ) / 2 ) modified_center_score = SCORE_BASE modified_corner_score = SCORE_BASE modified_edge_score = SCORE_BASE modified_face_score = SCORE_BASE for side_img, side in ((front_side_img, "front"), (back_side_img, "back")): src = side_img.modified_json if side_img.modified_json is not None else side_img.detection_json try: reducts = _get_reducts(src or {}, side) modified_center_score += reducts["center"] modified_corner_score += reducts["corner"] modified_edge_score += reducts["edge"] modified_face_score += reducts["face"] except Exception as e: logger.warning(f"解析 modified_json 分项失败 (image_id={side_img.id}): {e}") if front_coaxial: src = front_coaxial.modified_json if front_coaxial.modified_json is not None else front_coaxial.detection_json try: modified_face_score += _get_reducts(src or {}, "front")["face"] except Exception as e: logger.warning(f"解析 modified_json 分数失败 (image_id={front_coaxial.id}): {e}") if back_coaxial: src = back_coaxial.modified_json if back_coaxial.modified_json is not None else back_coaxial.detection_json try: modified_face_score += _get_reducts(src or {}, "back")["face"] except Exception as e: logger.warning(f"解析 modified_json 分数失败 (image_id={back_coaxial.id}): {e}") scores["modified_score"] = modified_score scores["modified_score_detail"] = { "modified_center_score": _clamp_score(modified_center_score), "modified_corner_score": _clamp_score(modified_corner_score), "modified_edge_score": _clamp_score(modified_edge_score), "modified_face_score": _clamp_score(modified_face_score), } logger.info( "算分结果: front_side=%.4f back_side=%.4f detection_score=%s", _resolve_side_detection_score(front_src, "front"), _resolve_side_detection_score(back_src, "back"), scores["detection_score"], ) except Exception as e: logger.error(f"计算分数过程异常: {e}") return scores