| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789 |
- import math
- import os
- import re
- import uuid
- from dataclasses import dataclass
- from typing import Any, Optional
- import cv2
- from app.core.config import settings
- from app.core.logger import get_logger
- from app.schemas.models import CardInfoInput, CardInfoOutput
- logger = get_logger("VideoService")
- @dataclass
- class FrameCandidate:
- """
- 候选帧数据类:记录了从视频中抽取的某一帧的所有评分维度。
- 使用 dataclass 让数据结构非常清晰。
- """
- frame: Any # 原始图像矩阵 (OpenCV BGR)
- time_ms: int # 该帧在视频中的时间戳 (毫秒)
- sharpness: float # 拉普拉斯清晰度绝对值 (越大越清晰)
- time_weight: float # 时间权重 (距离目标时间戳越近,权重越高)
- segmentation_used: bool = False # 是否成功启用了分割模型
- has_card: bool = False # 画面中是否出现了卡
- has_hand: bool = False # 画面中是否出现了手
- card_area_ratio: float = 0.0 # 卡片占画面比例
- hand_area_ratio: float = 0.0 # 手占画面比例
- card_bbox: Optional[tuple[int, int, int, int]] = None # (x, y, w, h) 卡片/手的聚焦边界框
- presence_score: float = 0.0 # 实体存在感得分 (卡和手面积越大,得分越高)
- sharpness_score: float = 0.0 # 归一化后的清晰度得分 (0~1)
- dwell_score: float = 0.0 # 停留得分 (连续出现的帧数越多得分越高,用于抗闪烁)
- base_score: float = 0.0 # 基础分 (不包含OCR)
- ocr_text: str = "" # OCR识别出的文本
- ocr_score: float = 0.0 # OCR文本与预期卡片信息的匹配度得分 (0~1)
- final_score: float = 0.0 # 最终总分
- @property
- def is_present(self) -> bool:
- """只要有卡或者有手,就算该实体在画面中存在"""
- return self.has_card or self.has_hand
- class VideoService:
- def __init__(self):
- # 高斯分布的 sigma,用于计算时间权重。10.0 表示对时间差容忍度较高
- self.weight_sigma = 10.0
- self.search_before_ms = settings.VIDEO_SEARCH_BEFORE_MS
- self.search_after_ms = settings.VIDEO_SEARCH_AFTER_MS
- self.analysis_fps = max(settings.VIDEO_ANALYSIS_FPS, 0.5)
- # 延迟加载组件,节省初始化时的内存占用
- self._ocr_engine = None
- self._ocr_disabled = False
- self._ocr_runtime_warning_sent = False
- self._seg_processor = None
- self._seg_model = None
- self._seg_torch = None
- self._seg_pil_image = None
- self._seg_disabled = False
- self._seg_runtime_warning_sent = False
- def time_str_to_ms(self, time_str: str) -> int:
- """将格式为 'HH:MM:SS' 或 'MM:SS' 的字符串转换为毫秒"""
- try:
- parts = list(map(int, time_str.split(":")))
- if len(parts) == 3:
- h, m, s = parts
- return (h * 3600 + m * 60 + s) * 1000
- if len(parts) == 2:
- m, s = parts
- return (m * 60 + s) * 1000
- return 0
- except ValueError:
- return 0
- def get_laplacian_sharpness(self, frame) -> float:
- """
- 计算图像的拉普拉斯方差,这是业界最常用的无参考图像清晰度评估方法。
- 方差越大,说明边缘信息越丰富(越不模糊)。
- """
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- return float(cv2.Laplacian(gray, cv2.CV_64F).var())
- def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float:
- """利用高斯函数计算时间权重。距离 target_time_ms 越近,返回值越接近 1.0"""
- diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0
- return math.exp(-((diff_seconds ** 2) / (2 * self.weight_sigma ** 2)))
- def _analysis_stride(self, fps: float) -> int:
- """计算视频读取时的跳帧步长,确保处理速度匹配 VIDEO_ANALYSIS_FPS"""
- fps = fps if fps > 0 else 30.0
- return max(1, int(round(fps / self.analysis_fps)))
- def _ensure_ocr_engine(self):
- """单例模式懒加载 OCR 引擎 (RapidOCR)"""
- if self._ocr_disabled:
- return None
- if self._ocr_engine is not None:
- return self._ocr_engine
- try:
- from rapidocr import RapidOCR
- self._ocr_engine = RapidOCR()
- except Exception as exc:
- self._ocr_disabled = True
- logger.warning(f"OCR disabled: init failed: {exc}")
- return None
- return self._ocr_engine
- def _ensure_segmentation_model(self):
- """单例模式懒加载 HuggingFace Segformer 语义分割模型"""
- if self._seg_disabled:
- return None
- if self._seg_processor is not None and self._seg_model is not None:
- return self._seg_processor, self._seg_model
- model_dir = settings.VIDEO_SEG_MODEL_DIR
- if not model_dir or not os.path.exists(model_dir):
- self._seg_disabled = True
- logger.warning(f"Segmentation disabled: model dir not found: {model_dir}")
- return None
- try:
- import torch
- from PIL import Image
- from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation
- self._seg_processor = AutoImageProcessor.from_pretrained(model_dir)
- self._seg_model = AutoModelForSemanticSegmentation.from_pretrained(model_dir)
- self._seg_model.eval() # 开启评估模式
- # 自动分配到 GPU (如果可用) 以加速推理
- if torch.cuda.is_available():
- self._seg_model = self._seg_model.to("cuda")
- self._seg_torch = torch
- self._seg_pil_image = Image
- except Exception as exc:
- self._seg_disabled = True
- logger.warning(f"Segmentation disabled: model loading failed: {exc}")
- return None
- return self._seg_processor, self._seg_model
- def _largest_bbox(self, mask) -> Optional[tuple[int, int, int, int]]:
- """从二进制掩码 (Mask) 中提取面积最大的连通区域的外接矩形"""
- if mask is None or not mask.any():
- return None
- mask_uint8 = (mask.astype("uint8")) * 255
- contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- if not contours:
- return None
- largest = max(contours, key=cv2.contourArea)
- x, y, w, h = cv2.boundingRect(largest)
- # 过滤掉噪点 (宽或高小于20像素的通常是识别错误)
- if w < 20 or h < 20:
- return None
- return x, y, w, h
- def _expand_bbox(
- self,
- bbox: Optional[tuple[int, int, int, int]],
- width: int,
- height: int,
- margin_ratio: float = 0.08,
- ) -> Optional[tuple[int, int, int, int]]:
- """适度扩大 Bounding Box (增加 margin_ratio),防止目标边缘被裁掉,有利于后续 OCR"""
- if bbox is None:
- return None
- x, y, w, h = bbox
- margin_x = int(w * margin_ratio)
- margin_y = int(h * margin_ratio)
- x1 = max(0, x - margin_x)
- y1 = max(0, y - margin_y)
- x2 = min(width, x + w + margin_x)
- y2 = min(height, y + h + margin_y)
- return x1, y1, x2 - x1, y2 - y1
- def _focus_region(self, frame, bbox: Optional[tuple[int, int, int, int]]):
- """裁剪出关注区域。如果没有有效 BBox,则返回原图,作为容错机制。"""
- height, width = frame.shape[:2]
- expanded = self._expand_bbox(bbox, width, height)
- if expanded is None:
- return frame
- x, y, w, h = expanded
- if w < 24 or h < 24:
- return frame
- return frame[y: y + h, x: x + w]
- def _compute_presence_score(
- self,
- segmentation_used: bool,
- has_card: bool,
- has_hand: bool,
- card_area_ratio: float,
- hand_area_ratio: float,
- ) -> float:
- """根据卡片和手的面积占比计算"存在感得分" (0.0 ~ 1.0)"""
- if not segmentation_used:
- return 0.0
- # 对占比进行归一化,最大不超过 1.0
- card_ratio = min(card_area_ratio / max(settings.VIDEO_MIN_CARD_AREA_RATIO, 1e-6), 1.0)
- hand_ratio = min(hand_area_ratio / max(settings.VIDEO_MIN_HAND_AREA_RATIO, 1e-6), 1.0)
- score = 0.0
- if has_card:
- score += 0.70 * max(card_ratio, 0.35) # 卡片权重占 70%
- if has_hand:
- score += 0.30 * max(hand_ratio, 0.25) # 手的权重占 30%
- if has_card and has_hand:
- score += 0.10 # 卡和手同框,给予额外 10% 奖励分
- return min(score, 1.0)
- def _analyze_segmentation(self, frame) -> dict[str, Any]:
- """对单帧图像进行语义分割分析,寻找卡片和手的区域"""
- if self._ensure_segmentation_model() is None:
- return {
- "segmentation_used": False,
- "has_card": False,
- "has_hand": False,
- "card_area_ratio": 0.0,
- "hand_area_ratio": 0.0,
- "card_bbox": None,
- }
- try:
- # OpenCV (BGR) 转换为 PIL 所需的 RGB
- rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
- image = self._seg_pil_image.fromarray(rgb_frame)
- # 推理所需的数据需放到和模型同一设备上 (CPU or CUDA)
- device = next(self._seg_model.parameters()).device
- inputs = self._seg_processor(images=image, return_tensors="pt").to(device)
- with self._seg_torch.no_grad():
- outputs = self._seg_model(**inputs)
- logits = outputs.logits
- # 上采样回原始分辨率
- pred = self._seg_torch.nn.functional.interpolate(
- logits,
- size=image.size[::-1],
- mode="bilinear",
- align_corners=False,
- ).argmax(dim=1)[0].cpu().numpy()
- card_mask = pred == settings.VIDEO_CARD_LABEL_ID
- hand_mask = pred == settings.VIDEO_HAND_LABEL_ID
- card_area_ratio = float(card_mask.mean()) if card_mask.size else 0.0
- hand_area_ratio = float(hand_mask.mean()) if hand_mask.size else 0.0
- # [核心改进]: 提取两个 bbox
- card_bbox = self._largest_bbox(card_mask)
- hand_bbox = self._largest_bbox(hand_mask)
- has_card = card_area_ratio >= settings.VIDEO_MIN_CARD_AREA_RATIO
- has_hand = hand_area_ratio >= settings.VIDEO_MIN_HAND_AREA_RATIO
- # [核心改进]: 如果卡片太小/没切出来,但有手,把 focus box 降级到手的区域
- # 因为手大概率握着卡片,对“手”周边做 OCR 也能有效提取卡面信息
- focus_bbox = card_bbox if card_bbox is not None else hand_bbox
- # [核心改进]: 主动清理张量内存,防止长视频导致显存/内存溢出
- del inputs, outputs, logits, pred
- if self._seg_torch.cuda.is_available():
- self._seg_torch.cuda.empty_cache()
- return {
- "segmentation_used": True,
- "has_card": has_card,
- "has_hand": has_hand,
- "card_area_ratio": card_area_ratio,
- "hand_area_ratio": hand_area_ratio,
- "card_bbox": focus_bbox, # 返回 fallback 后的 bbox
- }
- except Exception as exc:
- if not self._seg_runtime_warning_sent:
- logger.warning(f"Segmentation runtime failure, fallback enabled: {exc}")
- self._seg_runtime_warning_sent = True
- return {
- "segmentation_used": False,
- "has_card": False,
- "has_hand": False,
- "card_area_ratio": 0.0,
- "hand_area_ratio": 0.0,
- "card_bbox": None,
- }
- def _build_candidate(self, frame, current_time_ms: int, target_time_ms: int) -> FrameCandidate:
- """整合单帧的所有基础分析数据(分割、清晰度、存在感等),构建候选对象"""
- seg_result = self._analyze_segmentation(frame)
- # 对裁剪后的有效区域求清晰度,比算全图清晰度更准确,抗背景干扰
- focus_region = self._focus_region(frame, seg_result["card_bbox"])
- sharpness = self.get_laplacian_sharpness(focus_region)
- presence_score = self._compute_presence_score(
- segmentation_used=seg_result["segmentation_used"],
- has_card=seg_result["has_card"],
- has_hand=seg_result["has_hand"],
- card_area_ratio=seg_result["card_area_ratio"],
- hand_area_ratio=seg_result["hand_area_ratio"],
- )
- return FrameCandidate(
- frame=frame.copy(),
- time_ms=int(current_time_ms),
- sharpness=sharpness,
- time_weight=self.calculate_weight(current_time_ms, target_time_ms),
- segmentation_used=seg_result["segmentation_used"],
- has_card=seg_result["has_card"],
- has_hand=seg_result["has_hand"],
- card_area_ratio=seg_result["card_area_ratio"],
- hand_area_ratio=seg_result["hand_area_ratio"],
- card_bbox=seg_result["card_bbox"],
- presence_score=presence_score,
- )
- def _collect_candidates(
- self,
- cap: cv2.VideoCapture,
- start_time_ms: int,
- end_time_ms: int,
- target_time_ms: int,
- fps: float,
- ) -> list[FrameCandidate]:
- """在指定时间窗口内滑动,按步长收集视频帧作为候选"""
- candidates: list[FrameCandidate] = []
- analysis_stride = self._analysis_stride(fps)
- # 预估最大读取次数,防止视频结尾卡死死循环
- max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + analysis_stride + 10
- # 跳转到起始时间 (注意: OpenCV 的 POS_MSEC 某些视频源上可能不精准)
- cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms)
- read_count = 0
- while read_count < max_reads:
- ret, frame = cap.read()
- if not ret:
- break
- current_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
- if current_time_ms > end_time_ms:
- break
- # 按计算好的步长 (analysis_stride) 进行抽帧分析
- if read_count % analysis_stride == 0:
- candidates.append(self._build_candidate(frame, int(current_time_ms), target_time_ms))
- read_count += 1
- return candidates
- def _assign_dwell_scores(self, candidates: list[FrameCandidate]) -> None:
- """
- 计算"停留得分" (Dwell Score):
- 如果一个人抽到卡通常会停留展示一段时间。连续在帧中被检测到的实体,其停留得分会更高。
- 这能有效过滤掉发牌时一晃而过的模糊残影。
- """
- if not candidates or not any(candidate.segmentation_used for candidate in candidates):
- return
- target_frames = max(1, int(round(settings.VIDEO_DWELL_TARGET_SECONDS * self.analysis_fps)))
- index = 0
- while index < len(candidates):
- if not candidates[index].is_present:
- index += 1
- continue
- # 寻找连续出现(is_present = True)的片段
- run_end = index
- while run_end + 1 < len(candidates) and candidates[run_end + 1].is_present:
- run_end += 1
- run_length = run_end - index + 1
- dwell_score = min(run_length / target_frames, 1.0)
- # 为这段连续的帧赋相同的停留分
- for pos in range(index, run_end + 1):
- candidates[pos].dwell_score = dwell_score
- index = run_end + 1
- def _normalize_sharpness(self, sharpness: float, max_sharpness: float) -> float:
- """对清晰度进行对数归一化处理。使用 log 防止极值(超锐化噪点)拉爆分数池"""
- if sharpness <= 0 or max_sharpness <= 0:
- return 0.0
- denominator = math.log1p(max_sharpness)
- if denominator <= 0:
- return 0.0
- return min(math.log1p(sharpness) / denominator, 1.0)
- def _normalize_text(self, text: str) -> str:
- """清洗文本:去点,全大写,仅保留英文、数字、中文字符"""
- if not text:
- return ""
- cleaned = text.replace(".", "")
- cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fff]+", " ", cleaned.upper())
- return re.sub(r"\s+", " ", cleaned).strip()
- def _tokenize_text(self, text: str) -> list[str]:
- """将文本拆分为分词列表,去重去单字母(除非是数字)"""
- normalized = self._normalize_text(text)
- if not normalized:
- return []
- tokens: list[str] = []
- seen: set[str] = set()
- for token in normalized.split():
- if len(token) == 1 and not token.isdigit():
- continue
- if token in seen:
- continue
- seen.add(token)
- tokens.append(token)
- return tokens
- def _build_expected_text(self, card: CardInfoOutput) -> dict[str, Any]:
- """从输入的 json 信息中提取期望的卡片名字、系列号、编号等,作为 OCR 的对比基准"""
- name_tokens = self._tokenize_text(card.card_name_en or "")
- if not name_tokens and card.card_name_cn:
- name_tokens = self._tokenize_text(card.card_name_cn)
- all_series_tokens = self._tokenize_text(card.series or "")
- number_tokens = [token for token in all_series_tokens if token.isdigit()][:3]
- series_tokens = [token for token in all_series_tokens if not token.isdigit()]
- series_tokens.sort(key=len, reverse=True)
- return {
- "name_tokens": name_tokens[:4],
- "series_tokens": series_tokens[:6],
- "number_tokens": number_tokens,
- "has_expectation": bool(name_tokens or series_tokens or number_tokens),
- }
- def _extract_ocr_text(self, ocr_result: Any) -> str:
- """递归解析 RapidOCR 返回的复杂嵌套结构,将所有识别出的文本段落拼装成一个大字符串"""
- texts: list[str] = []
- def visit(node: Any) -> None:
- if node is None:
- return
- if isinstance(node, str):
- stripped = node.strip()
- if stripped:
- texts.append(stripped)
- return
- if hasattr(node, "txts"):
- visit(getattr(node, "txts"))
- return
- if hasattr(node, "ocr_res"):
- visit(getattr(node, "ocr_res"))
- return
- if isinstance(node, dict):
- for value in node.values():
- visit(value)
- return
- if isinstance(node, (list, tuple)):
- if len(node) >= 2 and isinstance(node[1], str):
- visit(node[1])
- return
- for item in node:
- visit(item)
- visit(ocr_result)
- deduped: list[str] = []
- seen: set[str] = set()
- for text in texts:
- if text in seen:
- continue
- seen.add(text)
- deduped.append(text)
- return " ".join(deduped)
- def _token_overlap_score(self, expected_tokens: list[str], ocr_tokens: list[str]) -> float:
- """计算期望 Token 和 OCR 识别 Token 之间的重叠得分,包含对子串(部分匹配)的兼容分"""
- if not expected_tokens or not ocr_tokens:
- return 0.0
- score = 0.0
- ocr_set = set(ocr_tokens)
- for token in expected_tokens:
- if token in ocr_set:
- score += 1.0 # 完全命中给 1 分
- continue
- # 兼容:如果目标 token 是 OCR结果的子串,或者 OCR结果是 token的子串 (例如 "PIKACHU" 匹配出 "PIKACH")
- partial_match = any(
- len(other) >= 2 and (token in other or other in token)
- for other in ocr_set
- )
- if partial_match:
- score += 0.6 # 部分匹配给 0.6 分
- return min(score / len(expected_tokens), 1.0)
- def _score_ocr_match(self, ocr_text: str, expected: dict[str, Any]) -> float:
- """综合评判 OCR 识别文本与目标 JSON 信息的多维度匹配程度"""
- if not ocr_text or not expected["has_expectation"]:
- return 0.0
- normalized_text = self._normalize_text(ocr_text)
- ocr_tokens = self._tokenize_text(ocr_text)
- if not ocr_tokens:
- return 0.0
- name_tokens = expected["name_tokens"]
- series_tokens = expected["series_tokens"]
- number_tokens = expected["number_tokens"]
- name_score = self._token_overlap_score(name_tokens, ocr_tokens)
- if name_tokens:
- joined_name = " ".join(name_tokens)
- if joined_name and joined_name in normalized_text:
- name_score = 1.0 # 名字完全作为整体匹配上,直接满分
- series_score = self._token_overlap_score(series_tokens, ocr_tokens)
- number_score = self._token_overlap_score(number_tokens, ocr_tokens)
- # 加权混合:卡片名字(60%) > 系列名(25%) > 卡片编号(15%)
- if name_tokens:
- return min(0.60 * name_score + 0.25 * series_score + 0.15 * number_score, 1.0)
- return min(0.65 * series_score + 0.35 * number_score, 1.0)
- def _run_ocr(self, frame, bbox: Optional[tuple[int, int, int, int]]) -> str:
- """调用 OCR 引擎对关注区域进行文本识别,加入抗弹幕干扰的分块策略"""
- engine = self._ensure_ocr_engine()
- if engine is None:
- return ""
- focus_region = self._focus_region(frame, bbox)
- if focus_region is None or focus_region.shape[0] == 0 or focus_region.shape[1] == 0:
- return ""
- texts: list[str] = []
- # 1. 常规全图 OCR (可能被中间的弹幕压制,但能提取出分散的编号等)
- try:
- result_full = engine(focus_region)
- texts.append(self._extract_ocr_text(result_full))
- except Exception as exc:
- if not self._ocr_runtime_warning_sent:
- logger.warning(f"OCR full region failure: {exc}")
- self._ocr_runtime_warning_sent = True
- # 2. 分块特写 OCR,避开中心弹幕区,降低识别阈值
- h, w = focus_region.shape[:2]
- if h > 60 and w > 60:
- # A. 专门识别底部 40% (绝大多数球星卡球员名字、宝可梦卡信息在底部)
- try:
- bottom_half = focus_region[int(h * 0.6):h, :]
- result_bottom = engine(bottom_half)
- texts.append(self._extract_ocr_text(result_bottom))
- except Exception:
- pass
- # B. 专门识别顶部 40% (通常有 Bowman 1st 标志、帕尼尼系列名、或宝可梦名字)
- try:
- top_half = focus_region[0:int(h * 0.4), :]
- result_top = engine(top_half)
- texts.append(self._extract_ocr_text(result_top))
- except Exception:
- pass
- # 将全图、顶部、底部的识别结果合并(后续的 token_overlap_score 会自动处理去重)
- combined_text = " ".join(texts)
- # 3. 正则剔除常见的直播间高频干扰词 (防止误匹配)
- # 这里的词汇通常是海外拆卡直播间(Whatnot/TikTok)经常出现的系统提示语
- ignore_words = r"(?i)\b(bought|break|hobby|jumbo|box|close|spot|nice|snack|packs)\b"
- combined_text = re.sub(ignore_words, " ", combined_text)
- return combined_text
- def _score_candidates(
- self,
- candidates: list[FrameCandidate],
- card_output: CardInfoOutput,
- ) -> None:
- """
- 核心打分中枢:结合之前计算的各个单项分,得出最终排名分。
- 采用二次打分机制:先通过 Base Score 选出 Top K,再让 Top K 过一遍耗时的 OCR,得出 Final Score。
- """
- if not candidates:
- return
- self._assign_dwell_scores(candidates)
- # 只对画面里确认有卡/手的帧进行打分
- scoring_candidates = [candidate for candidate in candidates if candidate.is_present]
- if not scoring_candidates:
- scoring_candidates = candidates
- # 找准当前窗口期的相对最大清晰度作为归一化基准
- max_sharpness = max(candidate.sharpness for candidate in scoring_candidates) if scoring_candidates else 0.0
- segmentation_used = any(candidate.segmentation_used for candidate in candidates)
- expected = self._build_expected_text(card_output)
- ocr_enabled = self._ensure_ocr_engine() is not None and expected["has_expectation"]
- # 1. 粗排:计算 Base Score
- for candidate in scoring_candidates:
- candidate.sharpness_score = self._normalize_sharpness(candidate.sharpness, max_sharpness)
- if segmentation_used:
- # 若启用了图像分割:存在感(40%) + 清晰度(25%) + 离目标时间点的近度(20%) + 画面稳定性(15%)
- candidate.base_score = (
- 0.40 * candidate.presence_score
- + 0.25 * candidate.sharpness_score
- + 0.20 * candidate.time_weight
- + 0.15 * candidate.dwell_score
- )
- else:
- # fallback: 没有分割模型,只能靠清晰度和时间权重
- candidate.base_score = (
- 0.55 * candidate.sharpness_score
- + 0.35 * candidate.time_weight
- + 0.10 * candidate.dwell_score
- )
- # 2. 精排:使用 OCR 计算 Final Score
- if ocr_enabled:
- # 只有 Base Score 排名前 K 的优胜者才会执行 OCR(性能优化)
- top_candidates = sorted(
- scoring_candidates,
- key=lambda item: item.base_score,
- reverse=True,
- )[: max(1, settings.VIDEO_OCR_TOP_K)]
- for candidate in top_candidates:
- candidate.ocr_text = self._run_ocr(candidate.frame, candidate.card_bbox)
- candidate.ocr_score = self._score_ocr_match(candidate.ocr_text, expected)
- # 更新所有入围帧的 Final Score
- for candidate in scoring_candidates:
- if segmentation_used:
- # OCR占核心大头(40%),配合其他物理指标
- candidate.final_score = (
- 0.40 * candidate.ocr_score
- + 0.25 * candidate.presence_score
- + 0.20 * candidate.sharpness_score
- + 0.10 * candidate.time_weight
- + 0.05 * candidate.dwell_score
- )
- else:
- candidate.final_score = (
- 0.45 * candidate.ocr_score
- + 0.30 * candidate.sharpness_score
- + 0.20 * candidate.time_weight
- + 0.05 * candidate.dwell_score
- )
- else:
- # 如果 OCR 不可用或没配置预期,则直接用 Base Score 作为终局分数
- for candidate in scoring_candidates:
- candidate.final_score = candidate.base_score
- def _select_best_candidate(
- self,
- candidates: list[FrameCandidate],
- target_time_ms: int,
- ) -> Optional[FrameCandidate]:
- """选出最终最能代表"高光时刻"的帧"""
- if not candidates:
- return None
- # 核心逻辑:主比对 final_score;如果最终分一样(比如都为0),看清晰度;再一样,看谁离打点时间最近。
- return max(
- candidates,
- key=lambda item: (
- item.final_score,
- item.sharpness_score,
- -abs(item.time_ms - target_time_ms),
- ),
- )
- def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]:
- """业务主干:传入视频与目标卡片打点列表,输出高光图片及匹配信息"""
- if not os.path.exists(video_path):
- logger.error(f"Video file not found: {video_path}")
- raise FileNotFoundError(f"Video file not found: {video_path}")
- logger.info(f"Open video: {video_path}")
- logger.info(f"Cards to process: {len(cards)}")
- cap = cv2.VideoCapture(video_path)
- fps = cap.get(cv2.CAP_PROP_FPS)
- if fps <= 0:
- fps = 30.0
- output_list: list[CardInfoOutput] = []
- success_count = 0
- filtered_count = 0
- for idx, card_input in enumerate(cards):
- card_output = CardInfoOutput(**card_input.dict())
- target_time_ms = self.time_str_to_ms(card_output.time)
- # 以打点时间戳为锚,建立一个[过去几s 到 未来 几s] 的搜索窗口
- start_time_ms = max(0, target_time_ms - self.search_before_ms)
- end_time_ms = target_time_ms + self.search_after_ms
- logger.info(
- f"[{idx + 1}/{len(cards)}] analyze {card_output.time} "
- f"for {card_output.card_name_cn or card_output.card_name_en or 'unknown'}"
- )
- logger.info(f" search window: [{start_time_ms}ms ~ {end_time_ms}ms]")
- # 1. 在窗口内收集所有候选帧
- candidates = self._collect_candidates(
- cap=cap,
- start_time_ms=start_time_ms,
- end_time_ms=end_time_ms,
- target_time_ms=target_time_ms,
- fps=fps,
- )
- if not candidates:
- logger.warning(" no frames sampled in the target window")
- continue
- segmentation_used = any(candidate.segmentation_used for candidate in candidates)
- present_candidates = [candidate for candidate in candidates if candidate.is_present]
- # [需求点 1]: 如果使用了分割模型且这片窗口内完全找不到手/卡,直接判定无效数据
- if segmentation_used and not present_candidates:
- filtered_count += 1
- logger.info(" filtered out: no card/hand found around the timestamp")
- continue
- scoring_candidates = present_candidates if present_candidates else candidates
- # 2. 调用多维度评分枢纽给各个候选帧打分
- self._score_candidates(candidates, card_output)
- # 3. 选出最匹配、最清晰的一张
- best_candidate = self._select_best_candidate(scoring_candidates, target_time_ms)
- if best_candidate is None:
- logger.warning(" no usable candidate after scoring")
- continue
- # 4. 保存为 JPG,构造业务输出数据
- filename = f"{uuid.uuid4()}_{best_candidate.time_ms}.jpg"
- save_path = os.path.join(settings.FRAMES_DIR, filename)
- try:
- cv2.imwrite(save_path, best_candidate.frame)
- image_url = f"{settings.BASE_URL}/static/frames/{filename}"
- card_output.frame_image_path = image_url
- output_list.append(card_output)
- success_count += 1
- time_diff = (best_candidate.time_ms - target_time_ms) / 1000.0
- logger.info(
- f" saved {filename} "
- f"(offset={time_diff:+.2f}s, sharpness={best_candidate.sharpness:.1f}, "
- f"presence={best_candidate.presence_score:.2f}, "
- f"ocr={best_candidate.ocr_score:.2f}, score={best_candidate.final_score:.2f})"
- )
- except Exception as exc:
- logger.error(f" failed to save frame: {exc}")
- # 务必释放 OpenCV 句柄,避免被视频文件死锁
- cap.release()
- logger.info(
- f"Frame capture finished. saved={success_count}, "
- f"filtered={filtered_count}, total={len(cards)}"
- )
- return output_list
|