|
@@ -1,16 +1,101 @@
|
|
|
from app.utils.scheme import ImageType, CardImageResponse
|
|
from app.utils.scheme import ImageType, CardImageResponse
|
|
|
from app.core.logger import get_logger
|
|
from app.core.logger import get_logger
|
|
|
-from typing import List, Dict, Any, Optional
|
|
|
|
|
|
|
+from typing import List, Dict, Any, Optional, Set, Tuple
|
|
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
|
|
|
|
+# 参与算分的图片类型(14 图以 fusion 为准,ring 兜底)
|
|
|
|
|
+SCORE_SOURCE_IMAGE_TYPES: Set[str] = {
|
|
|
|
|
+ ImageType.front_fusion.value,
|
|
|
|
|
+ ImageType.back_fusion.value,
|
|
|
|
|
+ ImageType.front_ring.value,
|
|
|
|
|
+ ImageType.back_ring.value,
|
|
|
|
|
+ ImageType.front_coaxial.value,
|
|
|
|
|
+ ImageType.back_coaxial.value,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+SCORE_BASE = 10.0
|
|
|
|
|
+SCORE_MIN = 0.0
|
|
|
|
|
+SCORE_MAX = 10.0
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _clamp_score(value: float) -> float:
|
|
|
|
|
+ """卡牌分数不允许为负,且落在 [0, 10]。"""
|
|
|
|
|
+ return max(SCORE_MIN, min(SCORE_MAX, value))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _as_float(value: Any) -> float:
|
|
|
|
|
+ try:
|
|
|
|
|
+ return float(value or 0)
|
|
|
|
|
+ except (TypeError, ValueError):
|
|
|
|
|
+ return 0.0
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _get_reducts(src: Dict[str, Any], side: str) -> Dict[str, float]:
|
|
|
|
|
+ """从单面 JSON 提取扣分字段(兼容 stitch 新版与旧版结构)。"""
|
|
|
|
|
+ result = src.get("result", {}) if isinstance(src, dict) else {}
|
|
|
|
|
+ defect_result = result.get("defect_result", {}) or {}
|
|
|
|
|
+ center_result = result.get("center_result", {}) or {}
|
|
|
|
|
+
|
|
|
|
|
+ center_deduct = center_result.get("deduct_score")
|
|
|
|
|
+ if center_deduct is None:
|
|
|
|
|
+ center_deduct = result.get("card_center_deduct_score", 0)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "card_score": result.get("card_score"),
|
|
|
|
|
+ "total": _as_float(result.get("_used_compute_deduct_score", 0)),
|
|
|
|
|
+ "center": _as_float(center_deduct),
|
|
|
|
|
+ "corner": _as_float(defect_result.get(f"{side}_corner_deduct_score", 0)),
|
|
|
|
|
+ "edge": _as_float(defect_result.get(f"{side}_edge_deduct_score", 0)),
|
|
|
|
|
+ "face": _as_float(defect_result.get(f"{side}_face_deduct_score", 0)),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _find_image(images: List[CardImageResponse], image_type: str) -> Optional[CardImageResponse]:
|
|
|
|
|
+ for img in images:
|
|
|
|
|
+ if img.image_type == image_type:
|
|
|
|
|
+ return img
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _resolve_side_score_image(
|
|
|
|
|
+ images: List[CardImageResponse],
|
|
|
|
|
+ side: str,
|
|
|
|
|
+) -> Optional[CardImageResponse]:
|
|
|
|
|
+ """每面优先 fusion(stitch JSON 落库处),其次 ring。"""
|
|
|
|
|
+ if side == "front":
|
|
|
|
|
+ return (
|
|
|
|
|
+ _find_image(images, ImageType.front_fusion.value)
|
|
|
|
|
+ or _find_image(images, ImageType.front_ring.value)
|
|
|
|
|
+ )
|
|
|
|
|
+ return (
|
|
|
|
|
+ _find_image(images, ImageType.back_fusion.value)
|
|
|
|
|
+ or _find_image(images, ImageType.back_ring.value)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _resolve_side_detection_score(src: Dict[str, Any], side: str = "front") -> float:
|
|
|
|
|
+ """
|
|
|
|
|
+ 单面最终分:
|
|
|
|
|
+ - stitch 新版:直接使用 result.card_score(已是该面最终得分)
|
|
|
|
|
+ - 旧版兜底:10 + _used_compute_deduct_score
|
|
|
|
|
+ """
|
|
|
|
|
+ reducts = _get_reducts(src, side)
|
|
|
|
|
+ if reducts["card_score"] is not None:
|
|
|
|
|
+ return _clamp_score(_as_float(reducts["card_score"]))
|
|
|
|
|
+ return _clamp_score(SCORE_BASE + reducts["total"])
|
|
|
|
|
+
|
|
|
|
|
|
|
|
def calculate_scores_from_images(images: List[CardImageResponse]) -> Dict[str, Any]:
|
|
def calculate_scores_from_images(images: List[CardImageResponse]) -> Dict[str, Any]:
|
|
|
"""
|
|
"""
|
|
|
- 根据图片计算分数(兼容 14 图与历史 8 图)。
|
|
|
|
|
- - 必要条件:front_ring + back_ring 存在
|
|
|
|
|
- - face 分数来源:同面 stripe(1~4) / coaxial / fusion,按可用图累加
|
|
|
|
|
- - 不再依赖 coaxial 必须存在
|
|
|
|
|
|
|
+ 根据图片计算分数(兼容 14 图 stitch 与历史 8 图)。
|
|
|
|
|
+
|
|
|
|
|
+ detection_score:
|
|
|
|
|
+ 优先取正/反面 fusion(或 ring)JSON 中的 card_score 平均;
|
|
|
|
|
+ 两面 _used_compute_deduct_score 已是整面总扣分,不可再与 10 分基准相加两次。
|
|
|
|
|
+
|
|
|
|
|
+ detection_score_detail:
|
|
|
|
|
+ 各分项仍按 10 分基准 + 对应扣分字段计算。
|
|
|
"""
|
|
"""
|
|
|
scores = {
|
|
scores = {
|
|
|
"detection_score": None,
|
|
"detection_score": None,
|
|
@@ -30,132 +115,144 @@ def calculate_scores_from_images(images: List[CardImageResponse]) -> Dict[str, A
|
|
|
"is_edited": False,
|
|
"is_edited": False,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- def _as_float(value: Any) -> float:
|
|
|
|
|
- try:
|
|
|
|
|
- return float(value or 0)
|
|
|
|
|
- except (TypeError, ValueError):
|
|
|
|
|
- return 0.0
|
|
|
|
|
-
|
|
|
|
|
- def _get_reducts(src: Dict[str, Any], side: str) -> Dict[str, float]:
|
|
|
|
|
- defect_result = src.get("result", {}).get("defect_result", {})
|
|
|
|
|
- center_result = src.get("result", {}).get("center_result", {})
|
|
|
|
|
- return {
|
|
|
|
|
- "total": _as_float(src.get("result", {}).get("_used_compute_deduct_score", 0)),
|
|
|
|
|
- "center": _as_float(center_result.get("deduct_score", 0)),
|
|
|
|
|
- "corner": _as_float(defect_result.get(f"{side}_corner_deduct_score", 0)),
|
|
|
|
|
- "edge": _as_float(defect_result.get(f"{side}_edge_deduct_score", 0)),
|
|
|
|
|
- "face": _as_float(defect_result.get(f"{side}_face_deduct_score", 0)),
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ score_images = [img for img in images if img.image_type in SCORE_SOURCE_IMAGE_TYPES]
|
|
|
|
|
+ front_side_img = _resolve_side_score_image(score_images, "front")
|
|
|
|
|
+ back_side_img = _resolve_side_score_image(score_images, "back")
|
|
|
|
|
+ front_coaxial = _find_image(score_images, ImageType.front_coaxial.value)
|
|
|
|
|
+ back_coaxial = _find_image(score_images, ImageType.back_coaxial.value)
|
|
|
|
|
|
|
|
- front_ring: Optional[CardImageResponse] = None
|
|
|
|
|
- back_ring: Optional[CardImageResponse] = None
|
|
|
|
|
- front_face_imgs: List[CardImageResponse] = []
|
|
|
|
|
- back_face_imgs: List[CardImageResponse] = []
|
|
|
|
|
-
|
|
|
|
|
- for img in images:
|
|
|
|
|
- image_type = img.image_type
|
|
|
|
|
- if image_type == ImageType.front_ring:
|
|
|
|
|
- front_ring = img
|
|
|
|
|
- elif image_type == ImageType.back_ring:
|
|
|
|
|
- back_ring = img
|
|
|
|
|
-
|
|
|
|
|
- if image_type in [
|
|
|
|
|
- ImageType.front_stripe1, ImageType.front_stripe2,
|
|
|
|
|
- ImageType.front_stripe3, ImageType.front_stripe4,
|
|
|
|
|
- ImageType.front_coaxial, ImageType.front_fusion,
|
|
|
|
|
- ]:
|
|
|
|
|
- front_face_imgs.append(img)
|
|
|
|
|
- elif image_type in [
|
|
|
|
|
- ImageType.back_stripe1, ImageType.back_stripe2,
|
|
|
|
|
- ImageType.back_stripe3, ImageType.back_stripe4,
|
|
|
|
|
- ImageType.back_coaxial, ImageType.back_fusion,
|
|
|
|
|
- ]:
|
|
|
|
|
- back_face_imgs.append(img)
|
|
|
|
|
-
|
|
|
|
|
- # ring 是当前模型计算中心/角/边的必要数据
|
|
|
|
|
- if not front_ring or not back_ring:
|
|
|
|
|
|
|
+ if not front_side_img or not back_side_img:
|
|
|
return scores
|
|
return scores
|
|
|
|
|
|
|
|
|
|
+ def _apply_side_detail(
|
|
|
|
|
+ side_img: CardImageResponse,
|
|
|
|
|
+ side: str,
|
|
|
|
|
+ center_score: float,
|
|
|
|
|
+ corner_score: float,
|
|
|
|
|
+ edge_score: float,
|
|
|
|
|
+ face_score: float,
|
|
|
|
|
+ ) -> Tuple[float, float, float, float]:
|
|
|
|
|
+ reducts = _get_reducts(side_img.detection_json or {}, side)
|
|
|
|
|
+ return (
|
|
|
|
|
+ center_score + reducts["center"],
|
|
|
|
|
+ corner_score + reducts["corner"],
|
|
|
|
|
+ edge_score + reducts["edge"],
|
|
|
|
|
+ face_score + reducts["face"],
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
try:
|
|
try:
|
|
|
- # ---------- detection_score ----------
|
|
|
|
|
- detection_score = 10.0
|
|
|
|
|
- detection_center_score = 10.0
|
|
|
|
|
- detection_corner_score = 10.0
|
|
|
|
|
- detection_edge_score = 10.0
|
|
|
|
|
- detection_face_score = 10.0
|
|
|
|
|
-
|
|
|
|
|
- for ring_img, side in ((front_ring, "front"), (back_ring, "back")):
|
|
|
|
|
|
|
+ front_src = front_side_img.detection_json or {}
|
|
|
|
|
+ back_src = back_side_img.detection_json or {}
|
|
|
|
|
+ front_side_score = _resolve_side_detection_score(front_src, "front")
|
|
|
|
|
+ back_side_score = _resolve_side_detection_score(back_src, "back")
|
|
|
|
|
+ scores["detection_score"] = _clamp_score((front_side_score + back_side_score) / 2)
|
|
|
|
|
+
|
|
|
|
|
+ detection_center_score = SCORE_BASE
|
|
|
|
|
+ detection_corner_score = SCORE_BASE
|
|
|
|
|
+ detection_edge_score = SCORE_BASE
|
|
|
|
|
+ detection_face_score = SCORE_BASE
|
|
|
|
|
+
|
|
|
|
|
+ for side_img, side in ((front_side_img, "front"), (back_side_img, "back")):
|
|
|
|
|
+ try:
|
|
|
|
|
+ detection_center_score, detection_corner_score, detection_edge_score, detection_face_score = _apply_side_detail(
|
|
|
|
|
+ side_img, side,
|
|
|
|
|
+ detection_center_score, detection_corner_score, detection_edge_score, detection_face_score,
|
|
|
|
|
+ )
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"解析 detection_json 分项失败 (image_id={side_img.id}): {e}")
|
|
|
|
|
+
|
|
|
|
|
+ if front_coaxial:
|
|
|
try:
|
|
try:
|
|
|
- reducts = _get_reducts(ring_img.detection_json or {}, side)
|
|
|
|
|
- detection_score += reducts["total"]
|
|
|
|
|
- detection_center_score += reducts["center"]
|
|
|
|
|
- detection_corner_score += reducts["corner"]
|
|
|
|
|
- detection_edge_score += reducts["edge"]
|
|
|
|
|
|
|
+ reducts = _get_reducts(front_coaxial.detection_json or {}, "front")
|
|
|
detection_face_score += reducts["face"]
|
|
detection_face_score += reducts["face"]
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- logger.warning(f"解析 detection_json 分数失败 (image_id={ring_img.id}): {e}")
|
|
|
|
|
|
|
+ logger.warning(f"解析 detection_json 分数失败 (image_id={front_coaxial.id}): {e}")
|
|
|
|
|
|
|
|
- # 同面 face 辅助图(stripe/coaxial/fusion)叠加 face 与 total
|
|
|
|
|
- for side, face_imgs in (("front", front_face_imgs), ("back", back_face_imgs)):
|
|
|
|
|
- for img in face_imgs:
|
|
|
|
|
- try:
|
|
|
|
|
- reducts = _get_reducts(img.detection_json or {}, side)
|
|
|
|
|
- detection_score += reducts["total"]
|
|
|
|
|
- detection_face_score += reducts["face"]
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.warning(f"解析 detection_json 分数失败 (image_id={img.id}): {e}")
|
|
|
|
|
|
|
+ if back_coaxial:
|
|
|
|
|
+ try:
|
|
|
|
|
+ reducts = _get_reducts(back_coaxial.detection_json or {}, "back")
|
|
|
|
|
+ detection_face_score += reducts["face"]
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"解析 detection_json 分数失败 (image_id={back_coaxial.id}): {e}")
|
|
|
|
|
|
|
|
- scores["detection_score"] = detection_score
|
|
|
|
|
scores["detection_score_detail"] = {
|
|
scores["detection_score_detail"] = {
|
|
|
- "detection_center_score": detection_center_score,
|
|
|
|
|
- "detection_corner_score": detection_corner_score,
|
|
|
|
|
- "detection_edge_score": detection_edge_score,
|
|
|
|
|
- "detection_face_score": detection_face_score,
|
|
|
|
|
|
|
+ "detection_center_score": _clamp_score(detection_center_score),
|
|
|
|
|
+ "detection_corner_score": _clamp_score(detection_corner_score),
|
|
|
|
|
+ "detection_edge_score": _clamp_score(detection_edge_score),
|
|
|
|
|
+ "detection_face_score": _clamp_score(detection_face_score),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- # ---------- modified_score ----------
|
|
|
|
|
- modified_score = 10.0
|
|
|
|
|
- modified_center_score = 10.0
|
|
|
|
|
- modified_corner_score = 10.0
|
|
|
|
|
- modified_edge_score = 10.0
|
|
|
|
|
- modified_face_score = 10.0
|
|
|
|
|
-
|
|
|
|
|
- all_score_images = [front_ring, back_ring] + front_face_imgs + back_face_imgs
|
|
|
|
|
- is_edited = any(img.modified_json is not None for img in all_score_images if img is not None)
|
|
|
|
|
|
|
+ score_sources = [front_side_img, back_side_img, front_coaxial, back_coaxial]
|
|
|
|
|
+ is_edited = any(
|
|
|
|
|
+ img and img.modified_json is not None
|
|
|
|
|
+ for img in score_sources if img is not None
|
|
|
|
|
+ )
|
|
|
scores["is_edited"] = is_edited
|
|
scores["is_edited"] = is_edited
|
|
|
|
|
|
|
|
if is_edited:
|
|
if is_edited:
|
|
|
- for ring_img, side in ((front_ring, "front"), (back_ring, "back")):
|
|
|
|
|
- src = ring_img.modified_json if ring_img.modified_json is not None else ring_img.detection_json
|
|
|
|
|
|
|
+ front_m_src = (
|
|
|
|
|
+ front_side_img.modified_json
|
|
|
|
|
+ if front_side_img.modified_json is not None
|
|
|
|
|
+ else front_side_img.detection_json
|
|
|
|
|
+ ) or {}
|
|
|
|
|
+ back_m_src = (
|
|
|
|
|
+ back_side_img.modified_json
|
|
|
|
|
+ if back_side_img.modified_json is not None
|
|
|
|
|
+ else back_side_img.detection_json
|
|
|
|
|
+ ) or {}
|
|
|
|
|
+ modified_score = _clamp_score(
|
|
|
|
|
+ (
|
|
|
|
|
+ _resolve_side_detection_score(front_m_src, "front")
|
|
|
|
|
+ + _resolve_side_detection_score(back_m_src, "back")
|
|
|
|
|
+ ) / 2
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ modified_center_score = SCORE_BASE
|
|
|
|
|
+ modified_corner_score = SCORE_BASE
|
|
|
|
|
+ modified_edge_score = SCORE_BASE
|
|
|
|
|
+ modified_face_score = SCORE_BASE
|
|
|
|
|
+
|
|
|
|
|
+ for side_img, side in ((front_side_img, "front"), (back_side_img, "back")):
|
|
|
|
|
+ src = side_img.modified_json if side_img.modified_json is not None else side_img.detection_json
|
|
|
try:
|
|
try:
|
|
|
reducts = _get_reducts(src or {}, side)
|
|
reducts = _get_reducts(src or {}, side)
|
|
|
- modified_score += reducts["total"]
|
|
|
|
|
modified_center_score += reducts["center"]
|
|
modified_center_score += reducts["center"]
|
|
|
modified_corner_score += reducts["corner"]
|
|
modified_corner_score += reducts["corner"]
|
|
|
modified_edge_score += reducts["edge"]
|
|
modified_edge_score += reducts["edge"]
|
|
|
modified_face_score += reducts["face"]
|
|
modified_face_score += reducts["face"]
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- logger.warning(f"解析 modified_json 分数失败 (image_id={ring_img.id}): {e}")
|
|
|
|
|
-
|
|
|
|
|
- for side, face_imgs in (("front", front_face_imgs), ("back", back_face_imgs)):
|
|
|
|
|
- for img in face_imgs:
|
|
|
|
|
- src = img.modified_json if img.modified_json is not None else img.detection_json
|
|
|
|
|
- try:
|
|
|
|
|
- reducts = _get_reducts(src or {}, side)
|
|
|
|
|
- modified_score += reducts["total"]
|
|
|
|
|
- modified_face_score += reducts["face"]
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.warning(f"解析 modified_json 分数失败 (image_id={img.id}): {e}")
|
|
|
|
|
|
|
+ logger.warning(f"解析 modified_json 分项失败 (image_id={side_img.id}): {e}")
|
|
|
|
|
+
|
|
|
|
|
+ if front_coaxial:
|
|
|
|
|
+ src = front_coaxial.modified_json if front_coaxial.modified_json is not None else front_coaxial.detection_json
|
|
|
|
|
+ try:
|
|
|
|
|
+ modified_face_score += _get_reducts(src or {}, "front")["face"]
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"解析 modified_json 分数失败 (image_id={front_coaxial.id}): {e}")
|
|
|
|
|
+
|
|
|
|
|
+ if back_coaxial:
|
|
|
|
|
+ src = back_coaxial.modified_json if back_coaxial.modified_json is not None else back_coaxial.detection_json
|
|
|
|
|
+ try:
|
|
|
|
|
+ modified_face_score += _get_reducts(src or {}, "back")["face"]
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"解析 modified_json 分数失败 (image_id={back_coaxial.id}): {e}")
|
|
|
|
|
|
|
|
scores["modified_score"] = modified_score
|
|
scores["modified_score"] = modified_score
|
|
|
scores["modified_score_detail"] = {
|
|
scores["modified_score_detail"] = {
|
|
|
- "modified_center_score": modified_center_score,
|
|
|
|
|
- "modified_corner_score": modified_corner_score,
|
|
|
|
|
- "modified_edge_score": modified_edge_score,
|
|
|
|
|
- "modified_face_score": modified_face_score,
|
|
|
|
|
|
|
+ "modified_center_score": _clamp_score(modified_center_score),
|
|
|
|
|
+ "modified_corner_score": _clamp_score(modified_corner_score),
|
|
|
|
|
+ "modified_edge_score": _clamp_score(modified_edge_score),
|
|
|
|
|
+ "modified_face_score": _clamp_score(modified_face_score),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ "算分结果: front_side=%.4f back_side=%.4f detection_score=%s",
|
|
|
|
|
+ _resolve_side_detection_score(front_src, "front"),
|
|
|
|
|
+ _resolve_side_detection_score(back_src, "back"),
|
|
|
|
|
+ scores["detection_score"],
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"计算分数过程异常: {e}")
|
|
logger.error(f"计算分数过程异常: {e}")
|
|
|
|
|
|
|
|
- return scores
|
|
|
|
|
|
|
+ return scores
|