import math import os import re import uuid from dataclasses import dataclass from typing import Any, Optional import cv2 from app.core.config import settings from app.core.logger import get_logger from app.schemas.models import CardInfoInput, CardInfoOutput import torch from PIL import Image from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation logger = get_logger("VideoService") @dataclass class FrameCandidate: """ 候选帧数据类:记录了从视频中抽取的某一帧的所有评分维度。 使用 dataclass 让数据结构非常清晰。 """ frame: Any # 原始图像矩阵 (OpenCV BGR) time_ms: int # 该帧在视频中的时间戳 (毫秒) sharpness: float # 拉普拉斯清晰度绝对值 (越大越清晰) time_weight: float # 时间权重 (距离目标时间戳越近,权重越高) segmentation_used: bool = False # 是否成功启用了分割模型 has_card: bool = False # 画面中是否出现了卡 has_hand: bool = False # 画面中是否出现了手 card_area_ratio: float = 0.0 # 卡片占画面比例 hand_area_ratio: float = 0.0 # 手占画面比例 card_bbox: Optional[tuple[int, int, int, int]] = None # (x, y, w, h) 卡片/手的聚焦边界框 presence_score: float = 0.0 # 实体存在感得分 (卡和手面积越大,得分越高) sharpness_score: float = 0.0 # 归一化后的清晰度得分 (0~1) dwell_score: float = 0.0 # 停留得分 (连续出现的帧数越多得分越高,用于抗闪烁) base_score: float = 0.0 # 基础分 (不包含OCR) ocr_text: str = "" # OCR识别出的文本 ocr_score: float = 0.0 # OCR文本与预期卡片信息的匹配度得分 (0~1) final_score: float = 0.0 # 最终总分 @property def is_present(self) -> bool: """只要有卡或者有手,就算该实体在画面中存在""" return self.has_card or self.has_hand class VideoService: def __init__(self): # 高斯分布的 sigma,用于计算时间权重。更大表示对时间差容忍度更高 self.weight_sigma = 6.0 self.search_before_ms = settings.VIDEO_SEARCH_BEFORE_MS self.search_after_ms = settings.VIDEO_SEARCH_AFTER_MS self.analysis_fps = max(settings.VIDEO_ANALYSIS_FPS, 0.5) # 延迟加载组件,节省初始化时的内存占用 self._ocr_engine = None self._ocr_disabled = False self._ocr_runtime_warning_sent = False self._seg_processor = None self._seg_model = None self._seg_torch = None self._seg_pil_image = None self._seg_disabled = False self._seg_runtime_warning_sent = False def time_str_to_ms(self, time_str: str) -> int: """将格式为 'HH:MM:SS' 或 'MM:SS' 的字符串转换为毫秒""" try: parts = list(map(int, time_str.split(":"))) if len(parts) == 3: h, m, s = parts return (h * 3600 + m * 60 + s) * 1000 if len(parts) == 2: m, s = parts return (m * 60 + s) * 1000 return 0 except ValueError: return 0 def get_laplacian_sharpness(self, frame) -> float: """ 计算图像的拉普拉斯方差,这是业界最常用的无参考图像清晰度评估方法。 方差越大,说明边缘信息越丰富(越不模糊)。 """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) return float(cv2.Laplacian(gray, cv2.CV_64F).var()) def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float: """利用高斯函数计算时间权重。距离 target_time_ms 越近,返回值越接近 1.0""" diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0 return math.exp(-((diff_seconds ** 2) / (2 * self.weight_sigma ** 2))) def _analysis_stride(self, fps: float) -> int: """计算视频读取时的跳帧步长,确保处理速度匹配 VIDEO_ANALYSIS_FPS""" fps = fps if fps > 0 else 30.0 return max(1, int(round(fps / self.analysis_fps))) def _ensure_ocr_engine(self): """单例模式懒加载 OCR 引擎 (RapidOCR)""" if self._ocr_disabled: return None if self._ocr_engine is not None: return self._ocr_engine try: from rapidocr import RapidOCR self._ocr_engine = RapidOCR() except Exception as exc: self._ocr_disabled = True logger.warning(f"OCR disabled: init failed: {exc}") return None return self._ocr_engine def _ensure_segmentation_model(self): """单例模式懒加载 HuggingFace Segformer 语义分割模型""" if self._seg_disabled: return None if self._seg_processor is not None and self._seg_model is not None: return self._seg_processor, self._seg_model model_dir = settings.VIDEO_SEG_MODEL_DIR if not model_dir or not os.path.exists(model_dir): self._seg_disabled = True logger.warning(f"Segmentation disabled: model dir not found: {model_dir}") return None try: self._seg_processor = AutoImageProcessor.from_pretrained(model_dir) self._seg_model = AutoModelForSemanticSegmentation.from_pretrained(model_dir) self._seg_model.eval() # 开启评估模式 # 自动分配到 GPU (如果可用) 以加速推理 if torch.cuda.is_available(): self._seg_model = self._seg_model.to("cuda") self._seg_torch = torch self._seg_pil_image = Image except Exception as exc: self._seg_disabled = True logger.warning(f"Segmentation disabled: model loading failed: {exc}") return None return self._seg_processor, self._seg_model def _largest_bbox(self, mask) -> Optional[tuple[int, int, int, int]]: """从二进制掩码 (Mask) 中提取面积最大的连通区域的外接矩形""" if mask is None or not mask.any(): return None mask_uint8 = (mask.astype("uint8")) * 255 contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None largest = max(contours, key=cv2.contourArea) x, y, w, h = cv2.boundingRect(largest) # 过滤掉噪点 (宽或高小于20像素的通常是识别错误) if w < 20 or h < 20: return None return x, y, w, h def _expand_bbox( self, bbox: Optional[tuple[int, int, int, int]], width: int, height: int, margin_ratio: float = 0.08, ) -> Optional[tuple[int, int, int, int]]: """适度扩大 Bounding Box (增加 margin_ratio),防止目标边缘被裁掉,有利于后续 OCR""" if bbox is None: return None x, y, w, h = bbox margin_x = int(w * margin_ratio) margin_y = int(h * margin_ratio) x1 = max(0, x - margin_x) y1 = max(0, y - margin_y) x2 = min(width, x + w + margin_x) y2 = min(height, y + h + margin_y) return x1, y1, x2 - x1, y2 - y1 def _focus_region(self, frame, bbox: Optional[tuple[int, int, int, int]]): """裁剪出关注区域。如果没有有效 BBox,则返回原图,作为容错机制。""" height, width = frame.shape[:2] expanded = self._expand_bbox(bbox, width, height) if expanded is None: return frame x, y, w, h = expanded if w < 24 or h < 24: return frame return frame[y: y + h, x: x + w] def _compute_presence_score( self, segmentation_used: bool, has_card: bool, has_hand: bool, card_area_ratio: float, hand_area_ratio: float, ) -> float: """根据卡片和手的面积占比计算"存在感得分" (0.0 ~ 1.0)""" if not segmentation_used: return 0.0 # 对占比进行归一化,最大不超过 1.0 card_ratio = min(card_area_ratio / max(settings.VIDEO_MIN_CARD_AREA_RATIO, 1e-6), 1.0) hand_ratio = min(hand_area_ratio / max(settings.VIDEO_MIN_HAND_AREA_RATIO, 1e-6), 1.0) score = 0.0 if has_card: score += 0.70 * max(card_ratio, 0.35) # 卡片权重占 70% if has_hand: score += 0.30 * max(hand_ratio, 0.25) # 手的权重占 30% if has_card and has_hand: score += 0.10 # 卡和手同框,给予额外 10% 奖励分 return min(score, 1.0) def _analyze_segmentation(self, frame) -> dict[str, Any]: """对单帧图像进行语义分割分析,寻找卡片和手的区域""" if self._ensure_segmentation_model() is None: return { "segmentation_used": False, "has_card": False, "has_hand": False, "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None, } try: # OpenCV (BGR) 转换为 PIL 所需的 RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image = self._seg_pil_image.fromarray(rgb_frame) # 推理所需的数据需放到和模型同一设备上 (CPU or CUDA) device = next(self._seg_model.parameters()).device inputs = self._seg_processor(images=image, return_tensors="pt").to(device) with self._seg_torch.no_grad(): outputs = self._seg_model(**inputs) logits = outputs.logits # 上采样回原始分辨率 pred = self._seg_torch.nn.functional.interpolate( logits, size=image.size[::-1], mode="bilinear", align_corners=False, ).argmax(dim=1)[0].cpu().numpy() card_mask = pred == settings.VIDEO_CARD_LABEL_ID hand_mask = pred == settings.VIDEO_HAND_LABEL_ID card_area_ratio = float(card_mask.mean()) if card_mask.size else 0.0 hand_area_ratio = float(hand_mask.mean()) if hand_mask.size else 0.0 # [核心改进]: 提取两个 bbox card_bbox = self._largest_bbox(card_mask) hand_bbox = self._largest_bbox(hand_mask) has_card = card_area_ratio >= settings.VIDEO_MIN_CARD_AREA_RATIO has_hand = hand_area_ratio >= settings.VIDEO_MIN_HAND_AREA_RATIO # [核心改进]: 如果卡片太小/没切出来,但有手,把 focus box 降级到手的区域 # 因为手大概率握着卡片,对“手”周边做 OCR 也能有效提取卡面信息 focus_bbox = card_bbox if card_bbox is not None else hand_bbox # [核心改进]: 主动清理张量内存,防止长视频导致显存/内存溢出 del inputs, outputs, logits, pred if self._seg_torch.cuda.is_available(): self._seg_torch.cuda.empty_cache() return { "segmentation_used": True, "has_card": has_card, "has_hand": has_hand, "card_area_ratio": card_area_ratio, "hand_area_ratio": hand_area_ratio, "card_bbox": focus_bbox, # 返回 fallback 后的 bbox } except Exception as exc: if not self._seg_runtime_warning_sent: logger.warning(f"Segmentation runtime failure, fallback enabled: {exc}") self._seg_runtime_warning_sent = True return { "segmentation_used": False, "has_card": False, "has_hand": False, "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None, } def _build_candidate(self, frame, current_time_ms: int, target_time_ms: int) -> FrameCandidate: """整合单帧的所有基础分析数据(分割、清晰度、存在感等),构建候选对象""" seg_result = self._analyze_segmentation(frame) # 对裁剪后的有效区域求清晰度,比算全图清晰度更准确,抗背景干扰 focus_region = self._focus_region(frame, seg_result["card_bbox"]) sharpness = self.get_laplacian_sharpness(focus_region) presence_score = self._compute_presence_score( segmentation_used=seg_result["segmentation_used"], has_card=seg_result["has_card"], has_hand=seg_result["has_hand"], card_area_ratio=seg_result["card_area_ratio"], hand_area_ratio=seg_result["hand_area_ratio"], ) return FrameCandidate( frame=frame.copy(), time_ms=int(current_time_ms), sharpness=sharpness, time_weight=self.calculate_weight(current_time_ms, target_time_ms), segmentation_used=seg_result["segmentation_used"], has_card=seg_result["has_card"], has_hand=seg_result["has_hand"], card_area_ratio=seg_result["card_area_ratio"], hand_area_ratio=seg_result["hand_area_ratio"], card_bbox=seg_result["card_bbox"], presence_score=presence_score, ) def _collect_candidates( self, cap: cv2.VideoCapture, start_time_ms: int, end_time_ms: int, target_time_ms: int, fps: float, ) -> list[FrameCandidate]: """在指定时间窗口内滑动,按步长收集视频帧作为候选""" candidates: list[FrameCandidate] = [] analysis_stride = self._analysis_stride(fps) # 预估最大读取次数,防止视频结尾卡死死循环 max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + analysis_stride + 10 # 跳转到起始时间 (注意: OpenCV 的 POS_MSEC 某些视频源上可能不精准) cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms) read_count = 0 while read_count < max_reads: ret, frame = cap.read() if not ret: break current_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC) if current_time_ms > end_time_ms: break # 按计算好的步长 (analysis_stride) 进行抽帧分析 if read_count % analysis_stride == 0: candidates.append(self._build_candidate(frame, int(current_time_ms), target_time_ms)) read_count += 1 return candidates def _assign_dwell_scores(self, candidates: list[FrameCandidate]) -> None: """ 计算"停留得分" (Dwell Score): 如果一个人抽到卡通常会停留展示一段时间。连续在帧中被检测到的实体,其停留得分会更高。 这能有效过滤掉发牌时一晃而过的模糊残影。 """ if not candidates or not any(candidate.segmentation_used for candidate in candidates): return target_frames = max(1, int(round(settings.VIDEO_DWELL_TARGET_SECONDS * self.analysis_fps))) index = 0 while index < len(candidates): if not candidates[index].is_present: index += 1 continue # 寻找连续出现(is_present = True)的片段 run_end = index while run_end + 1 < len(candidates) and candidates[run_end + 1].is_present: run_end += 1 run_length = run_end - index + 1 dwell_score = min(run_length / target_frames, 1.0) # 为这段连续的帧赋相同的停留分 for pos in range(index, run_end + 1): candidates[pos].dwell_score = dwell_score index = run_end + 1 def _normalize_sharpness(self, sharpness: float, max_sharpness: float) -> float: """对清晰度进行对数归一化处理。使用 log 防止极值(超锐化噪点)拉爆分数池""" if sharpness <= 0 or max_sharpness <= 0: return 0.0 denominator = math.log1p(max_sharpness) if denominator <= 0: return 0.0 return min(math.log1p(sharpness) / denominator, 1.0) def _normalize_text(self, text: str) -> str: """清洗文本:去点,全大写,仅保留英文、数字、中文字符""" if not text: return "" cleaned = text.replace(".", "") cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fff]+", " ", cleaned.upper()) return re.sub(r"\s+", " ", cleaned).strip() def _tokenize_text(self, text: str) -> list[str]: """将文本拆分为分词列表,去重去单字母(除非是数字)""" normalized = self._normalize_text(text) if not normalized: return [] tokens: list[str] = [] seen: set[str] = set() for token in normalized.split(): if len(token) == 1 and not token.isdigit(): continue if token in seen: continue seen.add(token) tokens.append(token) return tokens def _build_expected_text(self, card: CardInfoOutput) -> dict[str, Any]: """从输入的 json 信息中提取期望的卡片名字、系列号、编号等,作为 OCR 的对比基准""" name_tokens = self._tokenize_text(card.card_name_en or "") if not name_tokens and card.card_name_cn: name_tokens = self._tokenize_text(card.card_name_cn) all_series_tokens = self._tokenize_text(card.series or "") number_tokens = [token for token in all_series_tokens if token.isdigit()][:3] series_tokens = [token for token in all_series_tokens if not token.isdigit()] series_tokens.sort(key=len, reverse=True) return { "name_tokens": name_tokens[:4], "series_tokens": series_tokens[:6], "number_tokens": number_tokens, "has_expectation": bool(name_tokens or series_tokens or number_tokens), } def _extract_ocr_text(self, ocr_result: Any) -> str: """递归解析 RapidOCR 返回的复杂嵌套结构,将所有识别出的文本段落拼装成一个大字符串""" texts: list[str] = [] def visit(node: Any) -> None: if node is None: return if isinstance(node, str): stripped = node.strip() if stripped: texts.append(stripped) return if hasattr(node, "txts"): visit(getattr(node, "txts")) return if hasattr(node, "ocr_res"): visit(getattr(node, "ocr_res")) return if isinstance(node, dict): for value in node.values(): visit(value) return if isinstance(node, (list, tuple)): if len(node) >= 2 and isinstance(node[1], str): visit(node[1]) return for item in node: visit(item) visit(ocr_result) deduped: list[str] = [] seen: set[str] = set() for text in texts: if text in seen: continue seen.add(text) deduped.append(text) return " ".join(deduped) def _token_overlap_score(self, expected_tokens: list[str], ocr_tokens: list[str]) -> float: """计算期望 Token 和 OCR 识别 Token 之间的重叠得分,包含对子串(部分匹配)的兼容分""" if not expected_tokens or not ocr_tokens: return 0.0 score = 0.0 ocr_set = set(ocr_tokens) for token in expected_tokens: if token in ocr_set: score += 1.0 # 完全命中给 1 分 continue # 兼容:如果目标 token 是 OCR结果的子串,或者 OCR结果是 token的子串 (例如 "PIKACHU" 匹配出 "PIKACH") partial_match = any( len(other) >= 2 and (token in other or other in token) for other in ocr_set ) if partial_match: score += 0.6 # 部分匹配给 0.6 分 return min(score / len(expected_tokens), 1.0) def _score_ocr_match(self, ocr_text: str, expected: dict[str, Any]) -> float: """综合评判 OCR 识别文本与目标 JSON 信息的多维度匹配程度""" if not ocr_text or not expected["has_expectation"]: return 0.0 normalized_text = self._normalize_text(ocr_text) ocr_tokens = self._tokenize_text(ocr_text) if not ocr_tokens: return 0.0 name_tokens = expected["name_tokens"] series_tokens = expected["series_tokens"] number_tokens = expected["number_tokens"] name_score = self._token_overlap_score(name_tokens, ocr_tokens) if name_tokens: joined_name = " ".join(name_tokens) if joined_name and joined_name in normalized_text: name_score = 1.0 # 名字完全作为整体匹配上,直接满分 series_score = self._token_overlap_score(series_tokens, ocr_tokens) number_score = self._token_overlap_score(number_tokens, ocr_tokens) # 加权混合:卡片名字(60%) > 系列名(25%) > 卡片编号(15%) if name_tokens: return min(0.60 * name_score + 0.25 * series_score + 0.15 * number_score, 1.0) return min(0.65 * series_score + 0.35 * number_score, 1.0) def _run_ocr(self, frame, bbox: Optional[tuple[int, int, int, int]]) -> str: """调用 OCR 引擎对关注区域进行文本识别,加入抗弹幕干扰的分块策略""" engine = self._ensure_ocr_engine() if engine is None: return "" focus_region = self._focus_region(frame, bbox) if focus_region is None or focus_region.shape[0] == 0 or focus_region.shape[1] == 0: return "" texts: list[str] = [] # 1. 常规全图 OCR (可能被中间的弹幕压制,但能提取出分散的编号等) try: result_full = engine(focus_region) texts.append(self._extract_ocr_text(result_full)) except Exception as exc: if not self._ocr_runtime_warning_sent: logger.warning(f"OCR full region failure: {exc}") self._ocr_runtime_warning_sent = True # 2. 分块特写 OCR,避开中心弹幕区,降低识别阈值 h, w = focus_region.shape[:2] if h > 60 and w > 60: # A. 专门识别底部 40% (绝大多数球星卡球员名字、宝可梦卡信息在底部) try: bottom_half = focus_region[int(h * 0.6):h, :] result_bottom = engine(bottom_half) texts.append(self._extract_ocr_text(result_bottom)) except Exception: pass # B. 专门识别顶部 40% (通常有 Bowman 1st 标志、帕尼尼系列名、或宝可梦名字) try: top_half = focus_region[0:int(h * 0.4), :] result_top = engine(top_half) texts.append(self._extract_ocr_text(result_top)) except Exception: pass # 将全图、顶部、底部的识别结果合并(后续的 token_overlap_score 会自动处理去重) combined_text = " ".join(texts) # 3. 正则剔除常见的直播间高频干扰词 (防止误匹配) # 这里的词汇通常是海外拆卡直播间(Whatnot/TikTok)经常出现的系统提示语 ignore_words = r"(?i)\b(bought|break|hobby|jumbo|box|close|spot|nice|snack|packs)\b" combined_text = re.sub(ignore_words, " ", combined_text) return combined_text def _score_candidates( self, candidates: list[FrameCandidate], card_output: CardInfoOutput, ) -> None: """ 核心打分中枢:结合之前计算的各个单项分,得出最终排名分。 采用二次打分机制:先通过 Base Score 选出 Top K,再让 Top K 过一遍耗时的 OCR,得出 Final Score。 """ if not candidates: return self._assign_dwell_scores(candidates) # 只对画面里确认有卡/手的帧进行打分 scoring_candidates = [candidate for candidate in candidates if candidate.is_present] if not scoring_candidates: scoring_candidates = candidates # 找准当前窗口期的相对最大清晰度作为归一化基准 max_sharpness = max(candidate.sharpness for candidate in scoring_candidates) if scoring_candidates else 0.0 segmentation_used = any(candidate.segmentation_used for candidate in candidates) expected = self._build_expected_text(card_output) ocr_enabled = self._ensure_ocr_engine() is not None and expected["has_expectation"] # 1. 粗排:计算 Base Score for candidate in scoring_candidates: candidate.sharpness_score = self._normalize_sharpness(candidate.sharpness, max_sharpness) if segmentation_used: # 若启用了图像分割:存在感(40%) + 清晰度(25%) + 离目标时间点的近度(20%) + 画面稳定性(15%) candidate.base_score = ( 0.40 * candidate.presence_score + 0.25 * candidate.sharpness_score + 0.20 * candidate.time_weight + 0.15 * candidate.dwell_score ) else: # fallback: 没有分割模型,只能靠清晰度和时间权重 candidate.base_score = ( 0.55 * candidate.sharpness_score + 0.35 * candidate.time_weight + 0.10 * candidate.dwell_score ) # 2. 精排:使用 OCR 计算 Final Score if ocr_enabled: # 只有 Base Score 排名前 K 的优胜者才会执行 OCR(性能优化) top_candidates = sorted( scoring_candidates, key=lambda item: item.base_score, reverse=True, )[: max(1, settings.VIDEO_OCR_TOP_K)] for candidate in top_candidates: candidate.ocr_text = self._run_ocr(candidate.frame, candidate.card_bbox) candidate.ocr_score = self._score_ocr_match(candidate.ocr_text, expected) # 更新所有入围帧的 Final Score for candidate in scoring_candidates: if segmentation_used: # OCR占核心大头(40%),配合其他物理指标 candidate.final_score = ( 0.40 * candidate.ocr_score + 0.25 * candidate.presence_score + 0.20 * candidate.sharpness_score + 0.10 * candidate.time_weight + 0.05 * candidate.dwell_score ) else: candidate.final_score = ( 0.45 * candidate.ocr_score + 0.30 * candidate.sharpness_score + 0.20 * candidate.time_weight + 0.05 * candidate.dwell_score ) else: # 如果 OCR 不可用或没配置预期,则直接用 Base Score 作为终局分数 for candidate in scoring_candidates: candidate.final_score = candidate.base_score def _select_best_candidate( self, candidates: list[FrameCandidate], target_time_ms: int, ) -> Optional[FrameCandidate]: """选出最终最能代表"高光时刻"的帧""" if not candidates: return None # 核心逻辑:主比对 final_score;如果最终分一样(比如都为0),看清晰度;再一样,看谁离打点时间最近。 return max( candidates, key=lambda item: ( item.final_score, item.sharpness_score, -abs(item.time_ms - target_time_ms), ), ) def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]: """业务主干:传入视频与目标卡片打点列表,输出高光图片及匹配信息""" if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") raise FileNotFoundError(f"Video file not found: {video_path}") logger.info(f"Open video: {video_path}") logger.info(f"Cards to process: {len(cards)}") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 0: fps = 30.0 output_list: list[CardInfoOutput] = [] success_count = 0 filtered_count = 0 for idx, card_input in enumerate(cards): card_output = CardInfoOutput(**card_input.dict()) target_time_ms = self.time_str_to_ms(card_output.time) # 以打点时间戳为锚,建立一个[过去几s 到 未来 几s] 的搜索窗口 start_time_ms = max(0, target_time_ms - self.search_before_ms) end_time_ms = target_time_ms + self.search_after_ms logger.info( f"[{idx + 1}/{len(cards)}] analyze {card_output.time} " f"for {card_output.card_name_cn or card_output.card_name_en or 'unknown'}" ) logger.info(f" search window: [{start_time_ms}ms ~ {end_time_ms}ms]") # 1. 在窗口内收集所有候选帧 candidates = self._collect_candidates( cap=cap, start_time_ms=start_time_ms, end_time_ms=end_time_ms, target_time_ms=target_time_ms, fps=fps, ) if not candidates: logger.warning(" no frames sampled in the target window") continue segmentation_used = any(candidate.segmentation_used for candidate in candidates) present_candidates = [candidate for candidate in candidates if candidate.is_present] # [需求点 1]: 如果使用了分割模型且这片窗口内完全找不到手/卡,直接判定无效数据 if segmentation_used and not present_candidates: filtered_count += 1 logger.info(" filtered out: no card/hand found around the timestamp") continue scoring_candidates = present_candidates if present_candidates else candidates # 2. 调用多维度评分枢纽给各个候选帧打分 self._score_candidates(candidates, card_output) # 3. 选出最匹配、最清晰的一张 best_candidate = self._select_best_candidate(scoring_candidates, target_time_ms) if best_candidate is None: logger.warning(" no usable candidate after scoring") continue # 4. 保存为 JPG,构造业务输出数据 filename = f"{uuid.uuid4()}_{best_candidate.time_ms}.jpg" save_path = os.path.join(settings.FRAMES_DIR, filename) try: cv2.imwrite(save_path, best_candidate.frame) image_url = f"{settings.BASE_URL}/static/frames/{filename}" card_output.frame_image_path = image_url output_list.append(card_output) success_count += 1 time_diff = (best_candidate.time_ms - target_time_ms) / 1000.0 logger.info( f" saved {filename} " f"(offset={time_diff:+.2f}s, sharpness={best_candidate.sharpness:.1f}, " f"presence={best_candidate.presence_score:.2f}, " f"ocr={best_candidate.ocr_score:.2f}, score={best_candidate.final_score:.2f})" ) except Exception as exc: logger.error(f" failed to save frame: {exc}") # 务必释放 OpenCV 句柄,避免被视频文件死锁 cap.release() logger.info( f"Frame capture finished. saved={success_count}, " f"filtered={filtered_count}, total={len(cards)}" ) return output_list