import math import os import re import uuid from dataclasses import dataclass from typing import Any, Optional import cv2 import numpy as np import difflib from app.core.config import settings from app.core.logger import get_logger from app.schemas.models import CardInfoInput, CardInfoOutput import torch from PIL import Image from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation logger = get_logger("VideoService") @dataclass class FrameCandidate: """ 候选帧数据类:记录了从视频中抽取的某一帧的所有评分维度。 使用 dataclass 让数据结构非常清晰。 """ frame: Any # 原始图像矩阵 (OpenCV BGR) time_ms: int # 该帧在视频中的时间戳 (毫秒) sharpness: float # 拉普拉斯清晰度绝对值 (越大越清晰) time_weight: float # 时间权重 (距离目标时间戳越近,权重越高) segmentation_used: bool = False # 是否成功启用了分割模型 has_card: bool = False # 画面中是否出现了卡 has_hand: bool = False # 画面中是否出现了手 card_area_ratio: float = 0.0 # 卡片占画面比例 hand_area_ratio: float = 0.0 # 手占画面比例 card_bbox: Optional[tuple[int, int, int, int]] = None # (x, y, w, h) 卡片/手的聚焦边界框 presence_score: float = 0.0 # 实体存在感得分 (卡和手面积越大,得分越高) sharpness_score: float = 0.0 # 归一化后的清晰度得分 (0~1) dwell_score: float = 0.0 # 停留得分 (连续出现的帧数越多得分越高,用于抗闪烁) base_score: float = 0.0 # 基础分 (不包含OCR) ocr_text: str = "" # OCR识别出的文本 ocr_score: float = 0.0 # OCR文本与预期卡片信息的匹配度得分 (0~1) final_score: float = 0.0 # 最终总分 @property def is_present(self) -> bool: """只要有卡或者有手,就算该实体在画面中存在""" return self.has_card or self.has_hand class VideoService: def __init__(self): # 高斯分布的 sigma,用于计算时间权重。更大表示对时间差容忍度更高 self.weight_sigma = 6.0 self.search_before_ms = settings.VIDEO_SEARCH_BEFORE_MS self.search_after_ms = settings.VIDEO_SEARCH_AFTER_MS self.analysis_fps = max(settings.VIDEO_ANALYSIS_FPS, 0.5) # 延迟加载组件,节省初始化时的内存占用 self._ocr_engine = None self._ocr_disabled = False self._ocr_runtime_warning_sent = False self._seg_processor = None self._seg_model = None self._seg_torch = None self._seg_pil_image = None self._seg_disabled = False self._seg_runtime_warning_sent = False def time_str_to_ms(self, time_str: str) -> int: """将格式为 'HH:MM:SS' 或 'MM:SS' 的字符串转换为毫秒""" try: parts = list(map(int, time_str.split(":"))) if len(parts) == 3: h, m, s = parts return (h * 3600 + m * 60 + s) * 1000 if len(parts) == 2: m, s = parts return (m * 60 + s) * 1000 return 0 except ValueError: return 0 def get_laplacian_sharpness(self, frame) -> float: """ 计算图像的拉普拉斯方差,这是业界最常用的无参考图像清晰度评估方法。 方差越大,说明边缘信息越丰富(越不模糊)。 """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 增加高斯模糊,过滤掉反光产生的噪点和高频毛刺 blurred = cv2.GaussianBlur(gray, (3, 3), 0) return float(cv2.Laplacian(blurred, cv2.CV_64F).var()) def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float: """利用高斯函数计算时间权重。距离 target_time_ms 越近,返回值越接近 1.0""" diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0 return math.exp(-((diff_seconds ** 2) / (2 * self.weight_sigma ** 2))) def _analysis_stride(self, fps: float) -> int: """计算视频读取时的跳帧步长,确保处理速度匹配 VIDEO_ANALYSIS_FPS""" fps = fps if fps > 0 else 30.0 return max(1, int(round(fps / self.analysis_fps))) def _ensure_ocr_engine(self): """单例模式懒加载 OCR 引擎 (RapidOCR)""" if self._ocr_disabled: return None if self._ocr_engine is not None: return self._ocr_engine try: from rapidocr import RapidOCR self._ocr_engine = RapidOCR() except Exception as exc: self._ocr_disabled = True logger.warning(f"OCR disabled: init failed: {exc}") return None return self._ocr_engine def _ensure_segmentation_model(self): """单例模式懒加载 HuggingFace Segformer 语义分割模型""" if self._seg_disabled: return None if self._seg_processor is not None and self._seg_model is not None: return self._seg_processor, self._seg_model model_dir = settings.VIDEO_SEG_MODEL_DIR if not model_dir or not os.path.exists(model_dir): self._seg_disabled = True logger.warning(f"Segmentation disabled: model dir not found: {model_dir}") return None try: self._seg_processor = AutoImageProcessor.from_pretrained(model_dir) self._seg_model = AutoModelForSemanticSegmentation.from_pretrained(model_dir) self._seg_model.eval() # 开启评估模式 # 自动分配到 GPU (如果可用) 以加速推理 if torch.cuda.is_available(): self._seg_model = self._seg_model.to("cuda") self._seg_torch = torch self._seg_pil_image = Image except Exception as exc: self._seg_disabled = True logger.warning(f"Segmentation disabled: model loading failed: {exc}") return None return self._seg_processor, self._seg_model def _largest_bbox(self, mask) -> Optional[tuple[int, int, int, int]]: """从二进制掩码 (Mask) 中提取面积最大的连通区域的外接矩形""" if mask is None or not mask.any(): return None mask_uint8 = (mask.astype("uint8")) * 255 contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None largest = max(contours, key=cv2.contourArea) x, y, w, h = cv2.boundingRect(largest) # 过滤掉噪点 (宽或高小于20像素的通常是识别错误) if w < 20 or h < 20: return None return x, y, w, h def _expand_bbox( self, bbox: Optional[tuple[int, int, int, int]], width: int, height: int, margin_ratio: float = 0.08, ) -> Optional[tuple[int, int, int, int]]: """适度扩大 Bounding Box (增加 margin_ratio),防止目标边缘被裁掉,有利于后续 OCR""" if bbox is None: return None x, y, w, h = bbox margin_x = int(w * margin_ratio) margin_y = int(h * margin_ratio) x1 = max(0, x - margin_x) y1 = max(0, y - margin_y) x2 = min(width, x + w + margin_x) y2 = min(height, y + h + margin_y) return x1, y1, x2 - x1, y2 - y1 def _focus_region(self, frame, bbox: Optional[tuple[int, int, int, int]]): """裁剪出关注区域。如果没有有效 BBox,则返回原图,作为容错机制。""" height, width = frame.shape[:2] expanded = self._expand_bbox(bbox, width, height) if expanded is None: return frame x, y, w, h = expanded if w < 24 or h < 24: return frame return frame[y: y + h, x: x + w] def _compute_presence_score( self, segmentation_used: bool, has_card: bool, has_hand: bool, card_area_ratio: float, hand_area_ratio: float, ) -> float: """根据卡片和手的面积占比计算"存在感得分" (0.0 ~ 1.0)""" if not segmentation_used: return 0.0 # 对占比进行归一化,最大不超过 1.0 card_ratio = min(card_area_ratio / max(settings.VIDEO_MIN_CARD_AREA_RATIO, 1e-6), 1.0) hand_ratio = min(hand_area_ratio / max(settings.VIDEO_MIN_HAND_AREA_RATIO, 1e-6), 1.0) score = 0.0 if has_card: score += 0.70 * max(card_ratio, 0.35) # 卡片权重占 70% if has_hand: score += 0.30 * max(hand_ratio, 0.25) # 手的权重占 30% if has_card and has_hand: score += 0.10 # 卡和手同框,给予额外 10% 奖励分 return min(score, 1.0) def _batch_analyze_segmentation(self, frames: list[Any]) -> list[dict[str, Any]]: """批量对多张图像进行语义分割,极大提高 GPU 利用率""" if not frames or self._ensure_segmentation_model() is None: return [{"segmentation_used": False, "has_card": False, "has_hand": False, "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None}] * len(frames) try: pil_images = [self._seg_pil_image.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB)) for f in frames] device = next(self._seg_model.parameters()).device results = [] # 分块批处理,防止显存 OOM(比如 16 帧一个 Batch) batch_size = 16 for i in range(0, len(pil_images), batch_size): batch_imgs = pil_images[i: i + batch_size] inputs = self._seg_processor(images=batch_imgs, return_tensors="pt").to(device) with self._seg_torch.no_grad(): outputs = self._seg_model(**inputs) logits = outputs.logits # 批量上采样并取 argmax preds = self._seg_torch.nn.functional.interpolate( logits, size=batch_imgs[0].size[::-1], # 假设所有帧分辨率一样 mode="bilinear", align_corners=False, ).argmax(dim=1).cpu().numpy() # 解析每张图的 Mask for pred in preds: card_mask = pred == settings.VIDEO_CARD_LABEL_ID hand_mask = pred == settings.VIDEO_HAND_LABEL_ID card_area = float(card_mask.mean()) if card_mask.size else 0.0 hand_area = float(hand_mask.mean()) if hand_mask.size else 0.0 card_bbox = self._largest_bbox(card_mask) hand_bbox = self._largest_bbox(hand_mask) focus_bbox = card_bbox if card_bbox is not None else hand_bbox results.append({ "segmentation_used": True, "has_card": card_area >= settings.VIDEO_MIN_CARD_AREA_RATIO, "has_hand": hand_area >= settings.VIDEO_MIN_HAND_AREA_RATIO, "card_area_ratio": card_area, "hand_area_ratio": hand_area, "card_bbox": focus_bbox, }) # 及时清理这批显存 del inputs, outputs, logits, preds if self._seg_torch.cuda.is_available(): self._seg_torch.cuda.empty_cache() return results except Exception as exc: logger.warning(f"Batch segmentation failed, fallback: {exc}") return [{"segmentation_used": False, "has_card": False, "has_hand": False, "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None}] * len(frames) def _analyze_segmentation(self, frame) -> dict[str, Any]: """对单帧图像进行语义分割分析,寻找卡片和手的区域""" if self._ensure_segmentation_model() is None: return { "segmentation_used": False, "has_card": False, "has_hand": False, "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None, } try: # OpenCV (BGR) 转换为 PIL 所需的 RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image = self._seg_pil_image.fromarray(rgb_frame) # 推理所需的数据需放到和模型同一设备上 (CPU or CUDA) device = next(self._seg_model.parameters()).device inputs = self._seg_processor(images=image, return_tensors="pt").to(device) with self._seg_torch.no_grad(): outputs = self._seg_model(**inputs) logits = outputs.logits # 上采样回原始分辨率 pred = self._seg_torch.nn.functional.interpolate( logits, size=image.size[::-1], mode="bilinear", align_corners=False, ).argmax(dim=1)[0].cpu().numpy() card_mask = pred == settings.VIDEO_CARD_LABEL_ID hand_mask = pred == settings.VIDEO_HAND_LABEL_ID card_area_ratio = float(card_mask.mean()) if card_mask.size else 0.0 hand_area_ratio = float(hand_mask.mean()) if hand_mask.size else 0.0 # [核心改进]: 提取两个 bbox card_bbox = self._largest_bbox(card_mask) hand_bbox = self._largest_bbox(hand_mask) has_card = card_area_ratio >= settings.VIDEO_MIN_CARD_AREA_RATIO has_hand = hand_area_ratio >= settings.VIDEO_MIN_HAND_AREA_RATIO # [核心改进]: 如果卡片太小/没切出来,但有手,把 focus box 降级到手的区域 # 因为手大概率握着卡片,对“手”周边做 OCR 也能有效提取卡面信息 focus_bbox = card_bbox if card_bbox is not None else hand_bbox # [核心改进]: 主动清理张量内存,防止长视频导致显存/内存溢出 del inputs, outputs, logits, pred if self._seg_torch.cuda.is_available(): self._seg_torch.cuda.empty_cache() return { "segmentation_used": True, "has_card": has_card, "has_hand": has_hand, "card_area_ratio": card_area_ratio, "hand_area_ratio": hand_area_ratio, "card_bbox": focus_bbox, # 返回 fallback 后的 bbox } except Exception as exc: if not self._seg_runtime_warning_sent: logger.warning(f"Segmentation runtime failure, fallback enabled: {exc}") self._seg_runtime_warning_sent = True return { "segmentation_used": False, "has_card": False, "has_hand": False, "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None, } def _build_candidate(self, frame, current_time_ms: int, target_time_ms: int) -> FrameCandidate: """整合单帧的所有基础分析数据(分割、清晰度、存在感等),构建候选对象""" seg_result = self._analyze_segmentation(frame) # 对裁剪后的有效区域求清晰度,比算全图清晰度更准确,抗背景干扰 focus_region = self._focus_region(frame, seg_result["card_bbox"]) sharpness = self.get_laplacian_sharpness(focus_region) presence_score = self._compute_presence_score( segmentation_used=seg_result["segmentation_used"], has_card=seg_result["has_card"], has_hand=seg_result["has_hand"], card_area_ratio=seg_result["card_area_ratio"], hand_area_ratio=seg_result["hand_area_ratio"], ) return FrameCandidate( frame=frame.copy(), time_ms=int(current_time_ms), sharpness=sharpness, time_weight=self.calculate_weight(current_time_ms, target_time_ms), segmentation_used=seg_result["segmentation_used"], has_card=seg_result["has_card"], has_hand=seg_result["has_hand"], card_area_ratio=seg_result["card_area_ratio"], hand_area_ratio=seg_result["hand_area_ratio"], card_bbox=seg_result["card_bbox"], presence_score=presence_score, ) def _collect_candidates( self, cap: cv2.VideoCapture, start_time_ms: int, end_time_ms: int, target_time_ms: int, fps: float, ) -> list[FrameCandidate]: """在指定时间窗口内滑动,按步长收集视频帧作为候选""" candidates: list[FrameCandidate] = [] raw_frames = [] time_ms_list = [] analysis_stride = self._analysis_stride(fps) # 预估最大读取次数,防止视频结尾卡死死循环 max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + analysis_stride + 10 # 跳转到起始时间 (注意: OpenCV 的 POS_MSEC 某些视频源上可能不精准) cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms) read_count = 0 while read_count < max_reads: # 仅仅抓取下一帧的数据流,不进行耗时的图像解码 ret = cap.grab() if not ret: break current_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC) if current_time_ms > end_time_ms: break # 到达步长,才真正解码成图像矩阵 if read_count % analysis_stride == 0: ret, frame = cap.retrieve() if ret: raw_frames.append(frame.copy()) time_ms_list.append(current_time_ms) read_count += 1 if not raw_frames: return [] # 1. 批量过分割模型 seg_results = self._batch_analyze_segmentation(raw_frames) # 2. 遍历组装 Candidate 并计算清晰度 for frame, time_ms, seg_res in zip(raw_frames, time_ms_list, seg_results): # 切割关注区域算清晰度 focus_region = self._focus_region(frame, seg_res["card_bbox"]) sharpness = self.get_laplacian_sharpness(focus_region) presence_score = self._compute_presence_score( seg_res["segmentation_used"], seg_res["has_card"], seg_res["has_hand"], seg_res["card_area_ratio"], seg_res["hand_area_ratio"] ) candidates.append(FrameCandidate( frame=frame, time_ms=int(time_ms), sharpness=sharpness, time_weight=self.calculate_weight(time_ms, target_time_ms), presence_score=presence_score, **seg_res # 解包填入 has_card, card_bbox 等属性 )) return candidates def _assign_dwell_scores(self, candidates: list[FrameCandidate]) -> None: """ 计算"停留得分" (Dwell Score): 如果一个人抽到卡通常会停留展示一段时间。连续在帧中被检测到的实体,其停留得分会更高。 这能有效过滤掉发牌时一晃而过的模糊残影。 """ if not candidates or not any(candidate.segmentation_used for candidate in candidates): return target_frames = max(1, int(round(settings.VIDEO_DWELL_TARGET_SECONDS * self.analysis_fps))) index = 0 while index < len(candidates): if not candidates[index].is_present: index += 1 continue # 寻找连续出现(is_present = True)的片段 run_end = index while run_end + 1 < len(candidates) and candidates[run_end + 1].is_present: run_end += 1 run_length = run_end - index + 1 dwell_score = min(run_length / target_frames, 1.0) # 为这段连续的帧赋相同的停留分 for pos in range(index, run_end + 1): candidates[pos].dwell_score = dwell_score index = run_end + 1 def _normalize_sharpness(self, sharpness: float, max_sharpness: float) -> float: """对清晰度进行对数归一化处理。使用 log 防止极值(超锐化噪点)拉爆分数池""" if sharpness <= 0 or max_sharpness <= 0: return 0.0 denominator = math.log1p(max_sharpness) if denominator <= 0: return 0.0 return min(math.log1p(sharpness) / denominator, 1.0) def _normalize_text(self, text: str) -> str: """清洗文本:去点,全大写,仅保留英文、数字、中文字符""" if not text: return "" cleaned = text.replace(".", "") cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fff]+", " ", cleaned.upper()) return re.sub(r"\s+", " ", cleaned).strip() def _tokenize_text(self, text: str) -> list[str]: """将文本拆分为分词列表,去重去单字母(除非是数字)""" normalized = self._normalize_text(text) if not normalized: return [] tokens: list[str] = [] seen: set[str] = set() for token in normalized.split(): if len(token) == 1 and not token.isdigit(): continue if token in seen: continue seen.add(token) tokens.append(token) return tokens def _build_expected_text(self, card: CardInfoOutput) -> dict[str, Any]: """从输入的 json 信息中提取期望的卡片名字、系列号、编号等,作为 OCR 的对比基准""" name_tokens = self._tokenize_text(card.card_name_en or "") if not name_tokens and card.card_name_cn: name_tokens = self._tokenize_text(card.card_name_cn) all_series_tokens = self._tokenize_text(card.series or "") number_tokens = [token for token in all_series_tokens if token.isdigit()][:3] series_tokens = [token for token in all_series_tokens if not token.isdigit()] series_tokens.sort(key=len, reverse=True) return { "name_tokens": name_tokens[:4], "series_tokens": series_tokens[:6], "number_tokens": number_tokens, "has_expectation": bool(name_tokens or series_tokens or number_tokens), } def _extract_ocr_text(self, ocr_result: Any) -> str: """递归解析 RapidOCR 返回的复杂嵌套结构,将所有识别出的文本段落拼装成一个大字符串""" texts: list[str] = [] def visit(node: Any) -> None: if node is None: return if isinstance(node, str): stripped = node.strip() if stripped: texts.append(stripped) return if hasattr(node, "txts"): visit(getattr(node, "txts")) return if hasattr(node, "ocr_res"): visit(getattr(node, "ocr_res")) return if isinstance(node, dict): for value in node.values(): visit(value) return if isinstance(node, (list, tuple)): if len(node) >= 2 and isinstance(node[1], str): visit(node[1]) return for item in node: visit(item) visit(ocr_result) deduped: list[str] = [] seen: set[str] = set() for text in texts: if text in seen: continue seen.add(text) deduped.append(text) return " ".join(deduped) def _token_overlap_score(self, expected_tokens: list[str], ocr_tokens: list[str]) -> float: """计算期望 Token 和 OCR 识别 Token 之间的重叠得分,包含对子串(部分匹配)的兼容分""" if not expected_tokens or not ocr_tokens: return 0.0 score = 0.0 ocr_set = set(ocr_tokens) for token in expected_tokens: best_ratio = 0.0 for other in ocr_tokens: # 计算字符串相似度 (0 到 1) ratio = difflib.SequenceMatcher(None, token, other).ratio() if ratio > best_ratio: best_ratio = ratio if best_ratio > 0.85: score += 1.0 # 相似度极高,视为完全命中 elif best_ratio > 0.6: score += 0.6 # 存在一定错别字,给部分分 return min(score / len(expected_tokens), 1.0) def _score_ocr_match(self, ocr_text: str, expected: dict[str, Any]) -> float: """综合评判 OCR 识别文本与目标 JSON 信息的多维度匹配程度""" if not ocr_text or not expected["has_expectation"]: return 0.0 normalized_text = self._normalize_text(ocr_text) ocr_tokens = self._tokenize_text(ocr_text) if not ocr_tokens: return 0.0 name_tokens = expected["name_tokens"] series_tokens = expected["series_tokens"] number_tokens = expected["number_tokens"] name_score = self._token_overlap_score(name_tokens, ocr_tokens) if name_tokens: joined_name = " ".join(name_tokens) if joined_name and joined_name in normalized_text: name_score = 1.0 # 名字完全作为整体匹配上,直接满分 series_score = self._token_overlap_score(series_tokens, ocr_tokens) number_score = self._token_overlap_score(number_tokens, ocr_tokens) # 加权混合:卡片名字(60%) > 系列名(25%) > 卡片编号(15%) if name_tokens: return min(0.60 * name_score + 0.25 * series_score + 0.15 * number_score, 1.0) return min(0.65 * series_score + 0.35 * number_score, 1.0) def _run_ocr(self, frame, bbox: Optional[tuple[int, int, int, int]]) -> str: """调用 OCR 引擎对关注区域进行文本识别,加入抗弹幕干扰的分块策略""" engine = self._ensure_ocr_engine() if engine is None: return "" focus_region = self._focus_region(frame, bbox) if focus_region is None or focus_region.shape[0] == 0 or focus_region.shape[1] == 0: return "" texts: list[str] = [] # 1. 常规全图 OCR (可能被中间的弹幕压制,但能提取出分散的编号等) try: result_full = engine(focus_region) texts.append(self._extract_ocr_text(result_full)) except Exception as exc: if not self._ocr_runtime_warning_sent: logger.warning(f"OCR full region failure: {exc}") self._ocr_runtime_warning_sent = True # 2. 分块特写 OCR,避开中心弹幕区,降低识别阈值 h, w = focus_region.shape[:2] if h > 60 and w > 60: # A. 专门识别底部 40% (绝大多数球星卡球员名字、宝可梦卡信息在底部) try: bottom_half = focus_region[int(h * 0.6):h, :] result_bottom = engine(bottom_half) texts.append(self._extract_ocr_text(result_bottom)) except Exception: pass # B. 专门识别顶部 40% (通常有 Bowman 1st 标志、帕尼尼系列名、或宝可梦名字) try: top_half = focus_region[0:int(h * 0.4), :] result_top = engine(top_half) texts.append(self._extract_ocr_text(result_top)) except Exception: pass # 将全图、顶部、底部的识别结果合并(后续的 token_overlap_score 会自动处理去重) combined_text = " ".join(texts) # 3. 正则剔除常见的直播间高频干扰词 (防止误匹配) # 这里的词汇通常是海外拆卡直播间(Whatnot/TikTok)经常出现的系统提示语 ignore_words = r"(?i)\b(bought|break|hobby|jumbo|box|close|spot|nice|snack|packs)\b" combined_text = re.sub(ignore_words, " ", combined_text) return combined_text def _score_candidates( self, candidates: list[FrameCandidate], card_output: CardInfoOutput, ) -> None: """ 核心打分中枢:结合之前计算的各个单项分,得出最终排名分。 采用二次打分机制:先通过 Base Score 选出 Top K,再让 Top K 过一遍耗时的 OCR,得出 Final Score。 """ if not candidates: return self._assign_dwell_scores(candidates) # 只对画面里确认有卡/手的帧进行打分 scoring_candidates = [candidate for candidate in candidates if candidate.is_present] if not scoring_candidates: scoring_candidates = candidates # 改为使用 90 分位数,防止单帧反光噪点拉爆整个分数池 if scoring_candidates: sharpnesses = [c.sharpness for c in scoring_candidates] max_sharpness = float(np.percentile(sharpnesses, 90)) else: max_sharpness = 0.0 segmentation_used = any(candidate.segmentation_used for candidate in candidates) expected = self._build_expected_text(card_output) ocr_enabled = self._ensure_ocr_engine() is not None and expected["has_expectation"] # 1. 粗排:计算 Base Score for candidate in scoring_candidates: candidate.sharpness_score = self._normalize_sharpness(candidate.sharpness, max_sharpness) if segmentation_used: # 若启用了图像分割:存在感(40%) + 清晰度(25%) + 离目标时间点的近度(20%) + 画面稳定性(15%) candidate.base_score = ( 0.40 * candidate.presence_score + 0.25 * candidate.sharpness_score + 0.20 * candidate.time_weight + 0.15 * candidate.dwell_score ) else: # fallback: 没有分割模型,只能靠清晰度和时间权重 candidate.base_score = ( 0.55 * candidate.sharpness_score + 0.35 * candidate.time_weight + 0.10 * candidate.dwell_score ) # 2. 精排:使用 OCR 计算 Final Score if ocr_enabled: # 只有 Base Score 排名前 K 的优胜者才会执行 OCR(性能优化) top_candidates = sorted( scoring_candidates, key=lambda item: item.base_score, reverse=True, )[: max(1, settings.VIDEO_OCR_TOP_K)] for candidate in top_candidates: candidate.ocr_text = self._run_ocr(candidate.frame, candidate.card_bbox) candidate.ocr_score = self._score_ocr_match(candidate.ocr_text, expected) # 更新所有入围帧的 Final Score for candidate in scoring_candidates: if segmentation_used: # OCR占核心大头(40%),配合其他物理指标 candidate.final_score = ( 0.40 * candidate.ocr_score + 0.25 * candidate.presence_score + 0.20 * candidate.sharpness_score + 0.10 * candidate.time_weight + 0.05 * candidate.dwell_score ) else: candidate.final_score = ( 0.45 * candidate.ocr_score + 0.30 * candidate.sharpness_score + 0.20 * candidate.time_weight + 0.05 * candidate.dwell_score ) else: # 如果 OCR 不可用或没配置预期,则直接用 Base Score 作为终局分数 for candidate in scoring_candidates: candidate.final_score = candidate.base_score def _select_best_candidate( self, candidates: list[FrameCandidate], target_time_ms: int, ) -> Optional[FrameCandidate]: """选出最终最能代表"高光时刻"的帧""" if not candidates: return None # 核心逻辑:主比对 final_score;如果最终分一样(比如都为0),看清晰度;再一样,看谁离打点时间最近。 return max( candidates, key=lambda item: ( item.final_score, item.sharpness_score, -abs(item.time_ms - target_time_ms), ), ) def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]: """业务主干:传入视频与目标卡片打点列表,输出高光图片及匹配信息""" if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") raise FileNotFoundError(f"Video file not found: {video_path}") logger.info(f"Open video: {video_path}") logger.info(f"Cards to process: {len(cards)}") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 0: fps = 30.0 output_list: list[CardInfoOutput] = [] success_count = 0 filtered_count = 0 for idx, card_input in enumerate(cards): card_output = CardInfoOutput(**card_input.dict()) target_time_ms = self.time_str_to_ms(card_output.time) # 以打点时间戳为锚,建立一个[过去几s 到 未来 几s] 的搜索窗口 start_time_ms = max(0, target_time_ms - self.search_before_ms) end_time_ms = target_time_ms + self.search_after_ms logger.info( f"[{idx + 1}/{len(cards)}] analyze {card_output.time} " f"for {card_output.card_name_cn or card_output.card_name_en or 'unknown'}" ) logger.info(f" search window: [{start_time_ms}ms ~ {end_time_ms}ms]") # 1. 在窗口内收集所有候选帧 candidates = self._collect_candidates( cap=cap, start_time_ms=start_time_ms, end_time_ms=end_time_ms, target_time_ms=target_time_ms, fps=fps, ) if not candidates: logger.warning(" no frames sampled in the target window") continue segmentation_used = any(candidate.segmentation_used for candidate in candidates) present_candidates = [candidate for candidate in candidates if candidate.is_present] # [需求点 1]: 如果使用了分割模型且这片窗口内完全找不到手/卡,直接判定无效数据 if segmentation_used and not present_candidates: filtered_count += 1 logger.info(" filtered out: no card/hand found around the timestamp") continue scoring_candidates = present_candidates if present_candidates else candidates # 2. 调用多维度评分枢纽给各个候选帧打分 self._score_candidates(candidates, card_output) # 3. 选出最匹配、最清晰的一张 best_candidate = self._select_best_candidate(scoring_candidates, target_time_ms) if best_candidate is None: logger.warning(" no usable candidate after scoring") continue # 4. 保存为 JPG,构造业务输出数据 filename = f"{uuid.uuid4()}_{best_candidate.time_ms}.jpg" save_path = os.path.join(settings.FRAMES_DIR, filename) try: cv2.imwrite(save_path, best_candidate.frame) image_url = f"{settings.BASE_URL}/static/frames/{filename}" card_output.frame_image_path = image_url output_list.append(card_output) success_count += 1 time_diff = (best_candidate.time_ms - target_time_ms) / 1000.0 logger.info( f" saved {filename} " f"(offset={time_diff:+.2f}s, sharpness={best_candidate.sharpness:.1f}, " f"presence={best_candidate.presence_score:.2f}, " f"ocr={best_candidate.ocr_score:.2f}, score={best_candidate.final_score:.2f})" ) except Exception as exc: logger.error(f" failed to save frame: {exc}") # 务必释放 OpenCV 句柄,避免被视频文件死锁 cap.release() logger.info( f"Frame capture finished. saved={success_count}, " f"filtered={filtered_count}, total={len(cards)}" ) return output_list