|
|
@@ -1,7 +1,12 @@
|
|
|
-import cv2
|
|
|
+import math
|
|
|
import os
|
|
|
+import re
|
|
|
import uuid
|
|
|
-import math
|
|
|
+from dataclasses import dataclass
|
|
|
+from typing import Any, Optional
|
|
|
+
|
|
|
+import cv2
|
|
|
+
|
|
|
from app.core.config import settings
|
|
|
from app.core.logger import get_logger
|
|
|
from app.schemas.models import CardInfoInput, CardInfoOutput
|
|
|
@@ -9,18 +14,64 @@ from app.schemas.models import CardInfoInput, CardInfoOutput
|
|
|
logger = get_logger("VideoService")
|
|
|
|
|
|
|
|
|
+@dataclass
|
|
|
+class FrameCandidate:
|
|
|
+ """
|
|
|
+ 候选帧数据类:记录了从视频中抽取的某一帧的所有评分维度。
|
|
|
+ 使用 dataclass 让数据结构非常清晰。
|
|
|
+ """
|
|
|
+ frame: Any # 原始图像矩阵 (OpenCV BGR)
|
|
|
+ time_ms: int # 该帧在视频中的时间戳 (毫秒)
|
|
|
+ sharpness: float # 拉普拉斯清晰度绝对值 (越大越清晰)
|
|
|
+ time_weight: float # 时间权重 (距离目标时间戳越近,权重越高)
|
|
|
+ segmentation_used: bool = False # 是否成功启用了分割模型
|
|
|
+ has_card: bool = False # 画面中是否出现了卡
|
|
|
+ has_hand: bool = False # 画面中是否出现了手
|
|
|
+ card_area_ratio: float = 0.0 # 卡片占画面比例
|
|
|
+ hand_area_ratio: float = 0.0 # 手占画面比例
|
|
|
+ card_bbox: Optional[tuple[int, int, int, int]] = None # (x, y, w, h) 卡片/手的聚焦边界框
|
|
|
+ presence_score: float = 0.0 # 实体存在感得分 (卡和手面积越大,得分越高)
|
|
|
+ sharpness_score: float = 0.0 # 归一化后的清晰度得分 (0~1)
|
|
|
+ dwell_score: float = 0.0 # 停留得分 (连续出现的帧数越多得分越高,用于抗闪烁)
|
|
|
+ base_score: float = 0.0 # 基础分 (不包含OCR)
|
|
|
+ ocr_text: str = "" # OCR识别出的文本
|
|
|
+ ocr_score: float = 0.0 # OCR文本与预期卡片信息的匹配度得分 (0~1)
|
|
|
+ final_score: float = 0.0 # 最终总分
|
|
|
+
|
|
|
+ @property
|
|
|
+ def is_present(self) -> bool:
|
|
|
+ """只要有卡或者有手,就算该实体在画面中存在"""
|
|
|
+ return self.has_card or self.has_hand
|
|
|
+
|
|
|
+
|
|
|
class VideoService:
|
|
|
def __init__(self):
|
|
|
- # 高斯函数中的 sigma (标准差) 决定了时间权重的下降速度。
|
|
|
+ # 高斯分布的 sigma,用于计算时间权重。10.0 表示对时间差容忍度较高
|
|
|
self.weight_sigma = 10.0
|
|
|
+ self.search_before_ms = settings.VIDEO_SEARCH_BEFORE_MS
|
|
|
+ self.search_after_ms = settings.VIDEO_SEARCH_AFTER_MS
|
|
|
+ self.analysis_fps = max(settings.VIDEO_ANALYSIS_FPS, 0.5)
|
|
|
+
|
|
|
+ # 延迟加载组件,节省初始化时的内存占用
|
|
|
+ self._ocr_engine = None
|
|
|
+ self._ocr_disabled = False
|
|
|
+ self._ocr_runtime_warning_sent = False
|
|
|
+
|
|
|
+ self._seg_processor = None
|
|
|
+ self._seg_model = None
|
|
|
+ self._seg_torch = None
|
|
|
+ self._seg_pil_image = None
|
|
|
+ self._seg_disabled = False
|
|
|
+ self._seg_runtime_warning_sent = False
|
|
|
|
|
|
def time_str_to_ms(self, time_str: str) -> int:
|
|
|
+ """将格式为 'HH:MM:SS' 或 'MM:SS' 的字符串转换为毫秒"""
|
|
|
try:
|
|
|
- parts = list(map(int, time_str.split(':')))
|
|
|
+ parts = list(map(int, time_str.split(":")))
|
|
|
if len(parts) == 3:
|
|
|
h, m, s = parts
|
|
|
return (h * 3600 + m * 60 + s) * 1000
|
|
|
- elif len(parts) == 2:
|
|
|
+ if len(parts) == 2:
|
|
|
m, s = parts
|
|
|
return (m * 60 + s) * 1000
|
|
|
return 0
|
|
|
@@ -29,110 +80,676 @@ class VideoService:
|
|
|
|
|
|
def get_laplacian_sharpness(self, frame) -> float:
|
|
|
"""
|
|
|
- 计算图像的拉普拉斯方差。
|
|
|
- 方差越大,代表图像包含的高频边缘信息越多,也就意味着对焦越准、越清晰。
|
|
|
+ 计算图像的拉普拉斯方差,这是业界最常用的无参考图像清晰度评估方法。
|
|
|
+ 方差越大,说明边缘信息越丰富(越不模糊)。
|
|
|
"""
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
- return cv2.Laplacian(gray, cv2.CV_64F).var()
|
|
|
+ return float(cv2.Laplacian(gray, cv2.CV_64F).var())
|
|
|
|
|
|
def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float:
|
|
|
+ """利用高斯函数计算时间权重。距离 target_time_ms 越近,返回值越接近 1.0"""
|
|
|
+ diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0
|
|
|
+ return math.exp(-((diff_seconds ** 2) / (2 * self.weight_sigma ** 2)))
|
|
|
+
|
|
|
+ def _analysis_stride(self, fps: float) -> int:
|
|
|
+ """计算视频读取时的跳帧步长,确保处理速度匹配 VIDEO_ANALYSIS_FPS"""
|
|
|
+ fps = fps if fps > 0 else 30.0
|
|
|
+ return max(1, int(round(fps / self.analysis_fps)))
|
|
|
+
|
|
|
+ def _ensure_ocr_engine(self):
|
|
|
+ """单例模式懒加载 OCR 引擎 (RapidOCR)"""
|
|
|
+ if self._ocr_disabled:
|
|
|
+ return None
|
|
|
+ if self._ocr_engine is not None:
|
|
|
+ return self._ocr_engine
|
|
|
+
|
|
|
+ try:
|
|
|
+ from rapidocr import RapidOCR
|
|
|
+ self._ocr_engine = RapidOCR()
|
|
|
+ except Exception as exc:
|
|
|
+ self._ocr_disabled = True
|
|
|
+ logger.warning(f"OCR disabled: init failed: {exc}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ return self._ocr_engine
|
|
|
+
|
|
|
+ def _ensure_segmentation_model(self):
|
|
|
+ """单例模式懒加载 HuggingFace Segformer 语义分割模型"""
|
|
|
+ if self._seg_disabled:
|
|
|
+ return None
|
|
|
+ if self._seg_processor is not None and self._seg_model is not None:
|
|
|
+ return self._seg_processor, self._seg_model
|
|
|
+
|
|
|
+ model_dir = settings.VIDEO_SEG_MODEL_DIR
|
|
|
+ if not model_dir or not os.path.exists(model_dir):
|
|
|
+ self._seg_disabled = True
|
|
|
+ logger.warning(f"Segmentation disabled: model dir not found: {model_dir}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ import torch
|
|
|
+ from PIL import Image
|
|
|
+ from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation
|
|
|
+
|
|
|
+ self._seg_processor = AutoImageProcessor.from_pretrained(model_dir)
|
|
|
+ self._seg_model = AutoModelForSemanticSegmentation.from_pretrained(model_dir)
|
|
|
+ self._seg_model.eval() # 开启评估模式
|
|
|
+
|
|
|
+ # 自动分配到 GPU (如果可用) 以加速推理
|
|
|
+ if torch.cuda.is_available():
|
|
|
+ self._seg_model = self._seg_model.to("cuda")
|
|
|
+
|
|
|
+ self._seg_torch = torch
|
|
|
+ self._seg_pil_image = Image
|
|
|
+ except Exception as exc:
|
|
|
+ self._seg_disabled = True
|
|
|
+ logger.warning(f"Segmentation disabled: model loading failed: {exc}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ return self._seg_processor, self._seg_model
|
|
|
+
|
|
|
+ def _largest_bbox(self, mask) -> Optional[tuple[int, int, int, int]]:
|
|
|
+ """从二进制掩码 (Mask) 中提取面积最大的连通区域的外接矩形"""
|
|
|
+ if mask is None or not mask.any():
|
|
|
+ return None
|
|
|
+
|
|
|
+ mask_uint8 = (mask.astype("uint8")) * 255
|
|
|
+ contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
+ if not contours:
|
|
|
+ return None
|
|
|
+
|
|
|
+ largest = max(contours, key=cv2.contourArea)
|
|
|
+ x, y, w, h = cv2.boundingRect(largest)
|
|
|
+
|
|
|
+ # 过滤掉噪点 (宽或高小于20像素的通常是识别错误)
|
|
|
+ if w < 20 or h < 20:
|
|
|
+ return None
|
|
|
+ return x, y, w, h
|
|
|
+
|
|
|
+ def _expand_bbox(
|
|
|
+ self,
|
|
|
+ bbox: Optional[tuple[int, int, int, int]],
|
|
|
+ width: int,
|
|
|
+ height: int,
|
|
|
+ margin_ratio: float = 0.08,
|
|
|
+ ) -> Optional[tuple[int, int, int, int]]:
|
|
|
+ """适度扩大 Bounding Box (增加 margin_ratio),防止目标边缘被裁掉,有利于后续 OCR"""
|
|
|
+ if bbox is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ x, y, w, h = bbox
|
|
|
+ margin_x = int(w * margin_ratio)
|
|
|
+ margin_y = int(h * margin_ratio)
|
|
|
+
|
|
|
+ x1 = max(0, x - margin_x)
|
|
|
+ y1 = max(0, y - margin_y)
|
|
|
+ x2 = min(width, x + w + margin_x)
|
|
|
+ y2 = min(height, y + h + margin_y)
|
|
|
+ return x1, y1, x2 - x1, y2 - y1
|
|
|
+
|
|
|
+ def _focus_region(self, frame, bbox: Optional[tuple[int, int, int, int]]):
|
|
|
+ """裁剪出关注区域。如果没有有效 BBox,则返回原图,作为容错机制。"""
|
|
|
+ height, width = frame.shape[:2]
|
|
|
+ expanded = self._expand_bbox(bbox, width, height)
|
|
|
+ if expanded is None:
|
|
|
+ return frame
|
|
|
+
|
|
|
+ x, y, w, h = expanded
|
|
|
+ if w < 24 or h < 24:
|
|
|
+ return frame
|
|
|
+ return frame[y: y + h, x: x + w]
|
|
|
+
|
|
|
+ def _compute_presence_score(
|
|
|
+ self,
|
|
|
+ segmentation_used: bool,
|
|
|
+ has_card: bool,
|
|
|
+ has_hand: bool,
|
|
|
+ card_area_ratio: float,
|
|
|
+ hand_area_ratio: float,
|
|
|
+ ) -> float:
|
|
|
+ """根据卡片和手的面积占比计算"存在感得分" (0.0 ~ 1.0)"""
|
|
|
+ if not segmentation_used:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ # 对占比进行归一化,最大不超过 1.0
|
|
|
+ card_ratio = min(card_area_ratio / max(settings.VIDEO_MIN_CARD_AREA_RATIO, 1e-6), 1.0)
|
|
|
+ hand_ratio = min(hand_area_ratio / max(settings.VIDEO_MIN_HAND_AREA_RATIO, 1e-6), 1.0)
|
|
|
+
|
|
|
+ score = 0.0
|
|
|
+ if has_card:
|
|
|
+ score += 0.70 * max(card_ratio, 0.35) # 卡片权重占 70%
|
|
|
+ if has_hand:
|
|
|
+ score += 0.30 * max(hand_ratio, 0.25) # 手的权重占 30%
|
|
|
+ if has_card and has_hand:
|
|
|
+ score += 0.10 # 卡和手同框,给予额外 10% 奖励分
|
|
|
+
|
|
|
+ return min(score, 1.0)
|
|
|
+
|
|
|
+ def _analyze_segmentation(self, frame) -> dict[str, Any]:
|
|
|
+ """对单帧图像进行语义分割分析,寻找卡片和手的区域"""
|
|
|
+ if self._ensure_segmentation_model() is None:
|
|
|
+ return {
|
|
|
+ "segmentation_used": False,
|
|
|
+ "has_card": False,
|
|
|
+ "has_hand": False,
|
|
|
+ "card_area_ratio": 0.0,
|
|
|
+ "hand_area_ratio": 0.0,
|
|
|
+ "card_bbox": None,
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ # OpenCV (BGR) 转换为 PIL 所需的 RGB
|
|
|
+ rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
+ image = self._seg_pil_image.fromarray(rgb_frame)
|
|
|
+
|
|
|
+ # 推理所需的数据需放到和模型同一设备上 (CPU or CUDA)
|
|
|
+ device = next(self._seg_model.parameters()).device
|
|
|
+ inputs = self._seg_processor(images=image, return_tensors="pt").to(device)
|
|
|
+
|
|
|
+ with self._seg_torch.no_grad():
|
|
|
+ outputs = self._seg_model(**inputs)
|
|
|
+
|
|
|
+ logits = outputs.logits
|
|
|
+
|
|
|
+ # 上采样回原始分辨率
|
|
|
+ pred = self._seg_torch.nn.functional.interpolate(
|
|
|
+ logits,
|
|
|
+ size=image.size[::-1],
|
|
|
+ mode="bilinear",
|
|
|
+ align_corners=False,
|
|
|
+ ).argmax(dim=1)[0].cpu().numpy()
|
|
|
+
|
|
|
+ card_mask = pred == settings.VIDEO_CARD_LABEL_ID
|
|
|
+ hand_mask = pred == settings.VIDEO_HAND_LABEL_ID
|
|
|
+
|
|
|
+ card_area_ratio = float(card_mask.mean()) if card_mask.size else 0.0
|
|
|
+ hand_area_ratio = float(hand_mask.mean()) if hand_mask.size else 0.0
|
|
|
+
|
|
|
+ # [核心改进]: 提取两个 bbox
|
|
|
+ card_bbox = self._largest_bbox(card_mask)
|
|
|
+ hand_bbox = self._largest_bbox(hand_mask)
|
|
|
+
|
|
|
+ has_card = card_area_ratio >= settings.VIDEO_MIN_CARD_AREA_RATIO
|
|
|
+ has_hand = hand_area_ratio >= settings.VIDEO_MIN_HAND_AREA_RATIO
|
|
|
+
|
|
|
+ # [核心改进]: 如果卡片太小/没切出来,但有手,把 focus box 降级到手的区域
|
|
|
+ # 因为手大概率握着卡片,对“手”周边做 OCR 也能有效提取卡面信息
|
|
|
+ focus_bbox = card_bbox if card_bbox is not None else hand_bbox
|
|
|
+
|
|
|
+ # [核心改进]: 主动清理张量内存,防止长视频导致显存/内存溢出
|
|
|
+ del inputs, outputs, logits, pred
|
|
|
+ if self._seg_torch.cuda.is_available():
|
|
|
+ self._seg_torch.cuda.empty_cache()
|
|
|
+
|
|
|
+ return {
|
|
|
+ "segmentation_used": True,
|
|
|
+ "has_card": has_card,
|
|
|
+ "has_hand": has_hand,
|
|
|
+ "card_area_ratio": card_area_ratio,
|
|
|
+ "hand_area_ratio": hand_area_ratio,
|
|
|
+ "card_bbox": focus_bbox, # 返回 fallback 后的 bbox
|
|
|
+ }
|
|
|
+ except Exception as exc:
|
|
|
+ if not self._seg_runtime_warning_sent:
|
|
|
+ logger.warning(f"Segmentation runtime failure, fallback enabled: {exc}")
|
|
|
+ self._seg_runtime_warning_sent = True
|
|
|
+
|
|
|
+ return {
|
|
|
+ "segmentation_used": False,
|
|
|
+ "has_card": False,
|
|
|
+ "has_hand": False,
|
|
|
+ "card_area_ratio": 0.0,
|
|
|
+ "hand_area_ratio": 0.0,
|
|
|
+ "card_bbox": None,
|
|
|
+ }
|
|
|
+
|
|
|
+ def _build_candidate(self, frame, current_time_ms: int, target_time_ms: int) -> FrameCandidate:
|
|
|
+ """整合单帧的所有基础分析数据(分割、清晰度、存在感等),构建候选对象"""
|
|
|
+ seg_result = self._analyze_segmentation(frame)
|
|
|
+
|
|
|
+ # 对裁剪后的有效区域求清晰度,比算全图清晰度更准确,抗背景干扰
|
|
|
+ focus_region = self._focus_region(frame, seg_result["card_bbox"])
|
|
|
+ sharpness = self.get_laplacian_sharpness(focus_region)
|
|
|
+
|
|
|
+ presence_score = self._compute_presence_score(
|
|
|
+ segmentation_used=seg_result["segmentation_used"],
|
|
|
+ has_card=seg_result["has_card"],
|
|
|
+ has_hand=seg_result["has_hand"],
|
|
|
+ card_area_ratio=seg_result["card_area_ratio"],
|
|
|
+ hand_area_ratio=seg_result["hand_area_ratio"],
|
|
|
+ )
|
|
|
+
|
|
|
+ return FrameCandidate(
|
|
|
+ frame=frame.copy(),
|
|
|
+ time_ms=int(current_time_ms),
|
|
|
+ sharpness=sharpness,
|
|
|
+ time_weight=self.calculate_weight(current_time_ms, target_time_ms),
|
|
|
+ segmentation_used=seg_result["segmentation_used"],
|
|
|
+ has_card=seg_result["has_card"],
|
|
|
+ has_hand=seg_result["has_hand"],
|
|
|
+ card_area_ratio=seg_result["card_area_ratio"],
|
|
|
+ hand_area_ratio=seg_result["hand_area_ratio"],
|
|
|
+ card_bbox=seg_result["card_bbox"],
|
|
|
+ presence_score=presence_score,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _collect_candidates(
|
|
|
+ self,
|
|
|
+ cap: cv2.VideoCapture,
|
|
|
+ start_time_ms: int,
|
|
|
+ end_time_ms: int,
|
|
|
+ target_time_ms: int,
|
|
|
+ fps: float,
|
|
|
+ ) -> list[FrameCandidate]:
|
|
|
+ """在指定时间窗口内滑动,按步长收集视频帧作为候选"""
|
|
|
+ candidates: list[FrameCandidate] = []
|
|
|
+ analysis_stride = self._analysis_stride(fps)
|
|
|
+
|
|
|
+ # 预估最大读取次数,防止视频结尾卡死死循环
|
|
|
+ max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + analysis_stride + 10
|
|
|
+
|
|
|
+ # 跳转到起始时间 (注意: OpenCV 的 POS_MSEC 某些视频源上可能不精准)
|
|
|
+ cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms)
|
|
|
+
|
|
|
+ read_count = 0
|
|
|
+ while read_count < max_reads:
|
|
|
+ ret, frame = cap.read()
|
|
|
+ if not ret:
|
|
|
+ break
|
|
|
+
|
|
|
+ current_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
|
|
|
+ if current_time_ms > end_time_ms:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 按计算好的步长 (analysis_stride) 进行抽帧分析
|
|
|
+ if read_count % analysis_stride == 0:
|
|
|
+ candidates.append(self._build_candidate(frame, int(current_time_ms), target_time_ms))
|
|
|
+
|
|
|
+ read_count += 1
|
|
|
+
|
|
|
+ return candidates
|
|
|
+
|
|
|
+ def _assign_dwell_scores(self, candidates: list[FrameCandidate]) -> None:
|
|
|
"""
|
|
|
- 计算时间权重:使用高斯衰减函数。距离目标时间越近,权重越高。
|
|
|
+ 计算"停留得分" (Dwell Score):
|
|
|
+ 如果一个人抽到卡通常会停留展示一段时间。连续在帧中被检测到的实体,其停留得分会更高。
|
|
|
+ 这能有效过滤掉发牌时一晃而过的模糊残影。
|
|
|
"""
|
|
|
- diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0
|
|
|
- return math.exp(- (diff_seconds ** 2) / (2 * self.weight_sigma ** 2))
|
|
|
+ if not candidates or not any(candidate.segmentation_used for candidate in candidates):
|
|
|
+ return
|
|
|
+
|
|
|
+ target_frames = max(1, int(round(settings.VIDEO_DWELL_TARGET_SECONDS * self.analysis_fps)))
|
|
|
+ index = 0
|
|
|
+
|
|
|
+ while index < len(candidates):
|
|
|
+ if not candidates[index].is_present:
|
|
|
+ index += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 寻找连续出现(is_present = True)的片段
|
|
|
+ run_end = index
|
|
|
+ while run_end + 1 < len(candidates) and candidates[run_end + 1].is_present:
|
|
|
+ run_end += 1
|
|
|
+
|
|
|
+ run_length = run_end - index + 1
|
|
|
+ dwell_score = min(run_length / target_frames, 1.0)
|
|
|
+
|
|
|
+ # 为这段连续的帧赋相同的停留分
|
|
|
+ for pos in range(index, run_end + 1):
|
|
|
+ candidates[pos].dwell_score = dwell_score
|
|
|
+
|
|
|
+ index = run_end + 1
|
|
|
+
|
|
|
+ def _normalize_sharpness(self, sharpness: float, max_sharpness: float) -> float:
|
|
|
+ """对清晰度进行对数归一化处理。使用 log 防止极值(超锐化噪点)拉爆分数池"""
|
|
|
+ if sharpness <= 0 or max_sharpness <= 0:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ denominator = math.log1p(max_sharpness)
|
|
|
+ if denominator <= 0:
|
|
|
+ return 0.0
|
|
|
+ return min(math.log1p(sharpness) / denominator, 1.0)
|
|
|
+
|
|
|
+ def _normalize_text(self, text: str) -> str:
|
|
|
+ """清洗文本:去点,全大写,仅保留英文、数字、中文字符"""
|
|
|
+ if not text:
|
|
|
+ return ""
|
|
|
+ cleaned = text.replace(".", "")
|
|
|
+ cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fff]+", " ", cleaned.upper())
|
|
|
+ return re.sub(r"\s+", " ", cleaned).strip()
|
|
|
+
|
|
|
+ def _tokenize_text(self, text: str) -> list[str]:
|
|
|
+ """将文本拆分为分词列表,去重去单字母(除非是数字)"""
|
|
|
+ normalized = self._normalize_text(text)
|
|
|
+ if not normalized:
|
|
|
+ return []
|
|
|
+
|
|
|
+ tokens: list[str] = []
|
|
|
+ seen: set[str] = set()
|
|
|
+ for token in normalized.split():
|
|
|
+ if len(token) == 1 and not token.isdigit():
|
|
|
+ continue
|
|
|
+ if token in seen:
|
|
|
+ continue
|
|
|
+ seen.add(token)
|
|
|
+ tokens.append(token)
|
|
|
+ return tokens
|
|
|
+
|
|
|
+ def _build_expected_text(self, card: CardInfoOutput) -> dict[str, Any]:
|
|
|
+ """从输入的 json 信息中提取期望的卡片名字、系列号、编号等,作为 OCR 的对比基准"""
|
|
|
+ name_tokens = self._tokenize_text(card.card_name_en or "")
|
|
|
+ if not name_tokens and card.card_name_cn:
|
|
|
+ name_tokens = self._tokenize_text(card.card_name_cn)
|
|
|
+
|
|
|
+ all_series_tokens = self._tokenize_text(card.series or "")
|
|
|
+ number_tokens = [token for token in all_series_tokens if token.isdigit()][:3]
|
|
|
+ series_tokens = [token for token in all_series_tokens if not token.isdigit()]
|
|
|
+ series_tokens.sort(key=len, reverse=True)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "name_tokens": name_tokens[:4],
|
|
|
+ "series_tokens": series_tokens[:6],
|
|
|
+ "number_tokens": number_tokens,
|
|
|
+ "has_expectation": bool(name_tokens or series_tokens or number_tokens),
|
|
|
+ }
|
|
|
+
|
|
|
+ def _extract_ocr_text(self, ocr_result: Any) -> str:
|
|
|
+ """递归解析 RapidOCR 返回的复杂嵌套结构,将所有识别出的文本段落拼装成一个大字符串"""
|
|
|
+ texts: list[str] = []
|
|
|
+
|
|
|
+ def visit(node: Any) -> None:
|
|
|
+ if node is None:
|
|
|
+ return
|
|
|
+ if isinstance(node, str):
|
|
|
+ stripped = node.strip()
|
|
|
+ if stripped:
|
|
|
+ texts.append(stripped)
|
|
|
+ return
|
|
|
+ if hasattr(node, "txts"):
|
|
|
+ visit(getattr(node, "txts"))
|
|
|
+ return
|
|
|
+ if hasattr(node, "ocr_res"):
|
|
|
+ visit(getattr(node, "ocr_res"))
|
|
|
+ return
|
|
|
+ if isinstance(node, dict):
|
|
|
+ for value in node.values():
|
|
|
+ visit(value)
|
|
|
+ return
|
|
|
+ if isinstance(node, (list, tuple)):
|
|
|
+ if len(node) >= 2 and isinstance(node[1], str):
|
|
|
+ visit(node[1])
|
|
|
+ return
|
|
|
+ for item in node:
|
|
|
+ visit(item)
|
|
|
+
|
|
|
+ visit(ocr_result)
|
|
|
+
|
|
|
+ deduped: list[str] = []
|
|
|
+ seen: set[str] = set()
|
|
|
+ for text in texts:
|
|
|
+ if text in seen:
|
|
|
+ continue
|
|
|
+ seen.add(text)
|
|
|
+ deduped.append(text)
|
|
|
+ return " ".join(deduped)
|
|
|
+
|
|
|
+ def _token_overlap_score(self, expected_tokens: list[str], ocr_tokens: list[str]) -> float:
|
|
|
+ """计算期望 Token 和 OCR 识别 Token 之间的重叠得分,包含对子串(部分匹配)的兼容分"""
|
|
|
+ if not expected_tokens or not ocr_tokens:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ score = 0.0
|
|
|
+ ocr_set = set(ocr_tokens)
|
|
|
+ for token in expected_tokens:
|
|
|
+ if token in ocr_set:
|
|
|
+ score += 1.0 # 完全命中给 1 分
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 兼容:如果目标 token 是 OCR结果的子串,或者 OCR结果是 token的子串 (例如 "PIKACHU" 匹配出 "PIKACH")
|
|
|
+ partial_match = any(
|
|
|
+ len(other) >= 2 and (token in other or other in token)
|
|
|
+ for other in ocr_set
|
|
|
+ )
|
|
|
+ if partial_match:
|
|
|
+ score += 0.6 # 部分匹配给 0.6 分
|
|
|
+
|
|
|
+ return min(score / len(expected_tokens), 1.0)
|
|
|
+
|
|
|
+ def _score_ocr_match(self, ocr_text: str, expected: dict[str, Any]) -> float:
|
|
|
+ """综合评判 OCR 识别文本与目标 JSON 信息的多维度匹配程度"""
|
|
|
+ if not ocr_text or not expected["has_expectation"]:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ normalized_text = self._normalize_text(ocr_text)
|
|
|
+ ocr_tokens = self._tokenize_text(ocr_text)
|
|
|
+ if not ocr_tokens:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ name_tokens = expected["name_tokens"]
|
|
|
+ series_tokens = expected["series_tokens"]
|
|
|
+ number_tokens = expected["number_tokens"]
|
|
|
+
|
|
|
+ name_score = self._token_overlap_score(name_tokens, ocr_tokens)
|
|
|
+ if name_tokens:
|
|
|
+ joined_name = " ".join(name_tokens)
|
|
|
+ if joined_name and joined_name in normalized_text:
|
|
|
+ name_score = 1.0 # 名字完全作为整体匹配上,直接满分
|
|
|
+
|
|
|
+ series_score = self._token_overlap_score(series_tokens, ocr_tokens)
|
|
|
+ number_score = self._token_overlap_score(number_tokens, ocr_tokens)
|
|
|
+
|
|
|
+ # 加权混合:卡片名字(60%) > 系列名(25%) > 卡片编号(15%)
|
|
|
+ if name_tokens:
|
|
|
+ return min(0.60 * name_score + 0.25 * series_score + 0.15 * number_score, 1.0)
|
|
|
+ return min(0.65 * series_score + 0.35 * number_score, 1.0)
|
|
|
+
|
|
|
+ def _run_ocr(self, frame, bbox: Optional[tuple[int, int, int, int]]) -> str:
|
|
|
+ """调用 OCR 引擎对关注区域(卡片或手)进行文本识别"""
|
|
|
+ engine = self._ensure_ocr_engine()
|
|
|
+ if engine is None:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ focus_region = self._focus_region(frame, bbox)
|
|
|
+ try:
|
|
|
+ result = engine(focus_region)
|
|
|
+ return self._extract_ocr_text(result)
|
|
|
+ except Exception as exc:
|
|
|
+ if not self._ocr_runtime_warning_sent:
|
|
|
+ logger.warning(f"OCR runtime failure, fallback enabled: {exc}")
|
|
|
+ self._ocr_runtime_warning_sent = True
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def _score_candidates(
|
|
|
+ self,
|
|
|
+ candidates: list[FrameCandidate],
|
|
|
+ card_output: CardInfoOutput,
|
|
|
+ ) -> None:
|
|
|
+ """
|
|
|
+ 核心打分中枢:结合之前计算的各个单项分,得出最终排名分。
|
|
|
+ 采用二次打分机制:先通过 Base Score 选出 Top K,再让 Top K 过一遍耗时的 OCR,得出 Final Score。
|
|
|
+ """
|
|
|
+ if not candidates:
|
|
|
+ return
|
|
|
+
|
|
|
+ self._assign_dwell_scores(candidates)
|
|
|
+
|
|
|
+ # 只对画面里确认有卡/手的帧进行打分
|
|
|
+ scoring_candidates = [candidate for candidate in candidates if candidate.is_present]
|
|
|
+ if not scoring_candidates:
|
|
|
+ scoring_candidates = candidates
|
|
|
+
|
|
|
+ # 找准当前窗口期的相对最大清晰度作为归一化基准
|
|
|
+ max_sharpness = max(candidate.sharpness for candidate in scoring_candidates) if scoring_candidates else 0.0
|
|
|
+ segmentation_used = any(candidate.segmentation_used for candidate in candidates)
|
|
|
+
|
|
|
+ expected = self._build_expected_text(card_output)
|
|
|
+ ocr_enabled = self._ensure_ocr_engine() is not None and expected["has_expectation"]
|
|
|
+
|
|
|
+ # 1. 粗排:计算 Base Score
|
|
|
+ for candidate in scoring_candidates:
|
|
|
+ candidate.sharpness_score = self._normalize_sharpness(candidate.sharpness, max_sharpness)
|
|
|
+ if segmentation_used:
|
|
|
+ # 若启用了图像分割:存在感(40%) + 清晰度(25%) + 离目标时间点的近度(20%) + 画面稳定性(15%)
|
|
|
+ candidate.base_score = (
|
|
|
+ 0.40 * candidate.presence_score
|
|
|
+ + 0.25 * candidate.sharpness_score
|
|
|
+ + 0.20 * candidate.time_weight
|
|
|
+ + 0.15 * candidate.dwell_score
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # fallback: 没有分割模型,只能靠清晰度和时间权重
|
|
|
+ candidate.base_score = (
|
|
|
+ 0.55 * candidate.sharpness_score
|
|
|
+ + 0.35 * candidate.time_weight
|
|
|
+ + 0.10 * candidate.dwell_score
|
|
|
+ )
|
|
|
+
|
|
|
+ # 2. 精排:使用 OCR 计算 Final Score
|
|
|
+ if ocr_enabled:
|
|
|
+ # 只有 Base Score 排名前 K 的优胜者才会执行 OCR(性能优化)
|
|
|
+ top_candidates = sorted(
|
|
|
+ scoring_candidates,
|
|
|
+ key=lambda item: item.base_score,
|
|
|
+ reverse=True,
|
|
|
+ )[: max(1, settings.VIDEO_OCR_TOP_K)]
|
|
|
+
|
|
|
+ for candidate in top_candidates:
|
|
|
+ candidate.ocr_text = self._run_ocr(candidate.frame, candidate.card_bbox)
|
|
|
+ candidate.ocr_score = self._score_ocr_match(candidate.ocr_text, expected)
|
|
|
+
|
|
|
+ # 更新所有入围帧的 Final Score
|
|
|
+ for candidate in scoring_candidates:
|
|
|
+ if segmentation_used:
|
|
|
+ # OCR占核心大头(40%),配合其他物理指标
|
|
|
+ candidate.final_score = (
|
|
|
+ 0.40 * candidate.ocr_score
|
|
|
+ + 0.25 * candidate.presence_score
|
|
|
+ + 0.20 * candidate.sharpness_score
|
|
|
+ + 0.10 * candidate.time_weight
|
|
|
+ + 0.05 * candidate.dwell_score
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ candidate.final_score = (
|
|
|
+ 0.45 * candidate.ocr_score
|
|
|
+ + 0.30 * candidate.sharpness_score
|
|
|
+ + 0.20 * candidate.time_weight
|
|
|
+ + 0.05 * candidate.dwell_score
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 如果 OCR 不可用或没配置预期,则直接用 Base Score 作为终局分数
|
|
|
+ for candidate in scoring_candidates:
|
|
|
+ candidate.final_score = candidate.base_score
|
|
|
+
|
|
|
+ def _select_best_candidate(
|
|
|
+ self,
|
|
|
+ candidates: list[FrameCandidate],
|
|
|
+ target_time_ms: int,
|
|
|
+ ) -> Optional[FrameCandidate]:
|
|
|
+ """选出最终最能代表"高光时刻"的帧"""
|
|
|
+ if not candidates:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 核心逻辑:主比对 final_score;如果最终分一样(比如都为0),看清晰度;再一样,看谁离打点时间最近。
|
|
|
+ return max(
|
|
|
+ candidates,
|
|
|
+ key=lambda item: (
|
|
|
+ item.final_score,
|
|
|
+ item.sharpness_score,
|
|
|
+ -abs(item.time_ms - target_time_ms),
|
|
|
+ ),
|
|
|
+ )
|
|
|
|
|
|
def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]:
|
|
|
+ """业务主干:传入视频与目标卡片打点列表,输出高光图片及匹配信息"""
|
|
|
if not os.path.exists(video_path):
|
|
|
- logger.error(f"❌ 找不到视频文件: {video_path}")
|
|
|
+ logger.error(f"Video file not found: {video_path}")
|
|
|
raise FileNotFoundError(f"Video file not found: {video_path}")
|
|
|
|
|
|
- logger.info(f"🎬 打开视频文件: {video_path}")
|
|
|
- logger.info(f"📋 待处理卡片数量: {len(cards)}")
|
|
|
+ logger.info(f"Open video: {video_path}")
|
|
|
+ logger.info(f"Cards to process: {len(cards)}")
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
- # 获取视频帧率,用于计算安全边界
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
if fps <= 0:
|
|
|
fps = 30.0
|
|
|
|
|
|
- output_list = []
|
|
|
+ output_list: list[CardInfoOutput] = []
|
|
|
success_count = 0
|
|
|
+ filtered_count = 0
|
|
|
|
|
|
for idx, card_input in enumerate(cards):
|
|
|
card_output = CardInfoOutput(**card_input.dict())
|
|
|
target_time_ms = self.time_str_to_ms(card_output.time)
|
|
|
|
|
|
- # 设定搜索窗口区间: [目标时间 - 1秒, 目标时间 + 4秒]
|
|
|
- start_time_ms = max(0, target_time_ms - 1000)
|
|
|
- end_time_ms = target_time_ms + 4000
|
|
|
+ # 以打点时间戳为锚,建立一个[过去几s 到 未来 几s] 的搜索窗口
|
|
|
+ start_time_ms = max(0, target_time_ms - self.search_before_ms)
|
|
|
+ end_time_ms = target_time_ms + self.search_after_ms
|
|
|
|
|
|
logger.info(
|
|
|
- f"📸[{idx + 1}/{len(cards)}] 智能截取 {card_output.time} ({target_time_ms}ms) - {card_output.card_name_cn or '未知卡名'}")
|
|
|
- logger.info(f" => 搜索区间: [{start_time_ms}ms ~ {end_time_ms}ms]")
|
|
|
-
|
|
|
- # 定位到窗口开始时间
|
|
|
- cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms)
|
|
|
-
|
|
|
- best_frame = None
|
|
|
- best_score = -1.0
|
|
|
- best_time_ms = start_time_ms
|
|
|
- best_sharpness = 0.0
|
|
|
-
|
|
|
- # 保护机制:最多读取这么多次,防止由于视频末尾造成的无限死循环
|
|
|
- max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + 30
|
|
|
- read_count = 0
|
|
|
-
|
|
|
- while read_count < max_reads:
|
|
|
- current_pos_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
|
|
|
-
|
|
|
- # 超出窗口最大时间,停止当前卡片的搜索
|
|
|
- if current_pos_ms > end_time_ms:
|
|
|
- break
|
|
|
-
|
|
|
- ret, frame = cap.read()
|
|
|
- if not ret:
|
|
|
- break # 视频结束
|
|
|
-
|
|
|
- # 计算原图清晰度
|
|
|
- sharpness = self.get_laplacian_sharpness(frame)
|
|
|
- # 计算时间偏移带来的衰减权重
|
|
|
- weight = self.calculate_weight(current_pos_ms, target_time_ms)
|
|
|
-
|
|
|
- # 综合评分 = 清晰度 * 时间权重
|
|
|
- score = sharpness * weight
|
|
|
-
|
|
|
- # 更新最佳候选帧
|
|
|
- if score > best_score:
|
|
|
- best_score = score
|
|
|
- best_frame = frame
|
|
|
- best_time_ms = current_pos_ms
|
|
|
- best_sharpness = sharpness
|
|
|
-
|
|
|
- read_count += 1
|
|
|
-
|
|
|
- # 保存最清晰的一张
|
|
|
- if best_frame is not None:
|
|
|
- filename = f"{uuid.uuid4()}_{int(best_time_ms)}.jpg"
|
|
|
- save_path = os.path.join(settings.FRAMES_DIR, filename)
|
|
|
-
|
|
|
- try:
|
|
|
- cv2.imwrite(save_path, best_frame)
|
|
|
-
|
|
|
- image_url = f"{settings.BASE_URL}/static/frames/{filename}"
|
|
|
- card_output.frame_image_path = image_url
|
|
|
- success_count += 1
|
|
|
-
|
|
|
- time_diff = (best_time_ms - target_time_ms) / 1000.0
|
|
|
- logger.info(
|
|
|
- f" ✅ 保存成功: {filename} (偏移: {time_diff:+.2f}s, 清晰度: {best_sharpness:.1f}, 综合分: {best_score:.1f})")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f" ❌ 保存图片失败: {e}")
|
|
|
- else:
|
|
|
- logger.warning(f" ⚠️ 无法在窗口内读取到视频帧 (可能时间戳超出视频长度)")
|
|
|
+ f"[{idx + 1}/{len(cards)}] analyze {card_output.time} "
|
|
|
+ f"for {card_output.card_name_cn or card_output.card_name_en or 'unknown'}"
|
|
|
+ )
|
|
|
+ logger.info(f" search window: [{start_time_ms}ms ~ {end_time_ms}ms]")
|
|
|
+
|
|
|
+ # 1. 在窗口内收集所有候选帧
|
|
|
+ candidates = self._collect_candidates(
|
|
|
+ cap=cap,
|
|
|
+ start_time_ms=start_time_ms,
|
|
|
+ end_time_ms=end_time_ms,
|
|
|
+ target_time_ms=target_time_ms,
|
|
|
+ fps=fps,
|
|
|
+ )
|
|
|
+
|
|
|
+ if not candidates:
|
|
|
+ logger.warning(" no frames sampled in the target window")
|
|
|
+ continue
|
|
|
+
|
|
|
+ segmentation_used = any(candidate.segmentation_used for candidate in candidates)
|
|
|
+ present_candidates = [candidate for candidate in candidates if candidate.is_present]
|
|
|
+
|
|
|
+ # [需求点 1]: 如果使用了分割模型且这片窗口内完全找不到手/卡,直接判定无效数据
|
|
|
+ if segmentation_used and not present_candidates:
|
|
|
+ filtered_count += 1
|
|
|
+ logger.info(" filtered out: no card/hand found around the timestamp")
|
|
|
+ continue
|
|
|
+
|
|
|
+ scoring_candidates = present_candidates if present_candidates else candidates
|
|
|
+
|
|
|
+ # 2. 调用多维度评分枢纽给各个候选帧打分
|
|
|
+ self._score_candidates(candidates, card_output)
|
|
|
+
|
|
|
+ # 3. 选出最匹配、最清晰的一张
|
|
|
+ best_candidate = self._select_best_candidate(scoring_candidates, target_time_ms)
|
|
|
+
|
|
|
+ if best_candidate is None:
|
|
|
+ logger.warning(" no usable candidate after scoring")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 4. 保存为 JPG,构造业务输出数据
|
|
|
+ filename = f"{uuid.uuid4()}_{best_candidate.time_ms}.jpg"
|
|
|
+ save_path = os.path.join(settings.FRAMES_DIR, filename)
|
|
|
+
|
|
|
+ try:
|
|
|
+ cv2.imwrite(save_path, best_candidate.frame)
|
|
|
+ image_url = f"{settings.BASE_URL}/static/frames/{filename}"
|
|
|
+ card_output.frame_image_path = image_url
|
|
|
+ output_list.append(card_output)
|
|
|
+ success_count += 1
|
|
|
|
|
|
- output_list.append(card_output)
|
|
|
+ time_diff = (best_candidate.time_ms - target_time_ms) / 1000.0
|
|
|
+ logger.info(
|
|
|
+ f" saved {filename} "
|
|
|
+ f"(offset={time_diff:+.2f}s, sharpness={best_candidate.sharpness:.1f}, "
|
|
|
+ f"presence={best_candidate.presence_score:.2f}, "
|
|
|
+ f"ocr={best_candidate.ocr_score:.2f}, score={best_candidate.final_score:.2f})"
|
|
|
+ )
|
|
|
+ except Exception as exc:
|
|
|
+ logger.error(f" failed to save frame: {exc}")
|
|
|
|
|
|
+ # 务必释放 OpenCV 句柄,避免被视频文件死锁
|
|
|
cap.release()
|
|
|
- logger.info(f"🏁 截取任务结束. 成功: {success_count}, 总数: {len(cards)}")
|
|
|
- return output_list
|
|
|
+ logger.info(
|
|
|
+ f"Frame capture finished. saved={success_count}, "
|
|
|
+ f"filtered={filtered_count}, total={len(cards)}"
|
|
|
+ )
|
|
|
+ return output_list
|