video_service.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. import math
  2. import os
  3. import re
  4. import uuid
  5. from dataclasses import dataclass
  6. from typing import Any, Optional
  7. import cv2
  8. from app.core.config import settings
  9. from app.core.logger import get_logger
  10. from app.schemas.models import CardInfoInput, CardInfoOutput
  11. import torch
  12. from PIL import Image
  13. from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation
  14. logger = get_logger("VideoService")
  15. @dataclass
  16. class FrameCandidate:
  17. """
  18. 候选帧数据类:记录了从视频中抽取的某一帧的所有评分维度。
  19. 使用 dataclass 让数据结构非常清晰。
  20. """
  21. frame: Any # 原始图像矩阵 (OpenCV BGR)
  22. time_ms: int # 该帧在视频中的时间戳 (毫秒)
  23. sharpness: float # 拉普拉斯清晰度绝对值 (越大越清晰)
  24. time_weight: float # 时间权重 (距离目标时间戳越近,权重越高)
  25. segmentation_used: bool = False # 是否成功启用了分割模型
  26. has_card: bool = False # 画面中是否出现了卡
  27. has_hand: bool = False # 画面中是否出现了手
  28. card_area_ratio: float = 0.0 # 卡片占画面比例
  29. hand_area_ratio: float = 0.0 # 手占画面比例
  30. card_bbox: Optional[tuple[int, int, int, int]] = None # (x, y, w, h) 卡片/手的聚焦边界框
  31. presence_score: float = 0.0 # 实体存在感得分 (卡和手面积越大,得分越高)
  32. sharpness_score: float = 0.0 # 归一化后的清晰度得分 (0~1)
  33. dwell_score: float = 0.0 # 停留得分 (连续出现的帧数越多得分越高,用于抗闪烁)
  34. base_score: float = 0.0 # 基础分 (不包含OCR)
  35. ocr_text: str = "" # OCR识别出的文本
  36. ocr_score: float = 0.0 # OCR文本与预期卡片信息的匹配度得分 (0~1)
  37. final_score: float = 0.0 # 最终总分
  38. @property
  39. def is_present(self) -> bool:
  40. """只要有卡或者有手,就算该实体在画面中存在"""
  41. return self.has_card or self.has_hand
  42. class VideoService:
  43. def __init__(self):
  44. # 高斯分布的 sigma,用于计算时间权重。更大表示对时间差容忍度更高
  45. self.weight_sigma = 6.0
  46. self.search_before_ms = settings.VIDEO_SEARCH_BEFORE_MS
  47. self.search_after_ms = settings.VIDEO_SEARCH_AFTER_MS
  48. self.analysis_fps = max(settings.VIDEO_ANALYSIS_FPS, 0.5)
  49. # 延迟加载组件,节省初始化时的内存占用
  50. self._ocr_engine = None
  51. self._ocr_disabled = False
  52. self._ocr_runtime_warning_sent = False
  53. self._seg_processor = None
  54. self._seg_model = None
  55. self._seg_torch = None
  56. self._seg_pil_image = None
  57. self._seg_disabled = False
  58. self._seg_runtime_warning_sent = False
  59. def time_str_to_ms(self, time_str: str) -> int:
  60. """将格式为 'HH:MM:SS' 或 'MM:SS' 的字符串转换为毫秒"""
  61. try:
  62. parts = list(map(int, time_str.split(":")))
  63. if len(parts) == 3:
  64. h, m, s = parts
  65. return (h * 3600 + m * 60 + s) * 1000
  66. if len(parts) == 2:
  67. m, s = parts
  68. return (m * 60 + s) * 1000
  69. return 0
  70. except ValueError:
  71. return 0
  72. def get_laplacian_sharpness(self, frame) -> float:
  73. """
  74. 计算图像的拉普拉斯方差,这是业界最常用的无参考图像清晰度评估方法。
  75. 方差越大,说明边缘信息越丰富(越不模糊)。
  76. """
  77. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  78. return float(cv2.Laplacian(gray, cv2.CV_64F).var())
  79. def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float:
  80. """利用高斯函数计算时间权重。距离 target_time_ms 越近,返回值越接近 1.0"""
  81. diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0
  82. return math.exp(-((diff_seconds ** 2) / (2 * self.weight_sigma ** 2)))
  83. def _analysis_stride(self, fps: float) -> int:
  84. """计算视频读取时的跳帧步长,确保处理速度匹配 VIDEO_ANALYSIS_FPS"""
  85. fps = fps if fps > 0 else 30.0
  86. return max(1, int(round(fps / self.analysis_fps)))
  87. def _ensure_ocr_engine(self):
  88. """单例模式懒加载 OCR 引擎 (RapidOCR)"""
  89. if self._ocr_disabled:
  90. return None
  91. if self._ocr_engine is not None:
  92. return self._ocr_engine
  93. try:
  94. from rapidocr import RapidOCR
  95. self._ocr_engine = RapidOCR()
  96. except Exception as exc:
  97. self._ocr_disabled = True
  98. logger.warning(f"OCR disabled: init failed: {exc}")
  99. return None
  100. return self._ocr_engine
  101. def _ensure_segmentation_model(self):
  102. """单例模式懒加载 HuggingFace Segformer 语义分割模型"""
  103. if self._seg_disabled:
  104. return None
  105. if self._seg_processor is not None and self._seg_model is not None:
  106. return self._seg_processor, self._seg_model
  107. model_dir = settings.VIDEO_SEG_MODEL_DIR
  108. if not model_dir or not os.path.exists(model_dir):
  109. self._seg_disabled = True
  110. logger.warning(f"Segmentation disabled: model dir not found: {model_dir}")
  111. return None
  112. try:
  113. self._seg_processor = AutoImageProcessor.from_pretrained(model_dir)
  114. self._seg_model = AutoModelForSemanticSegmentation.from_pretrained(model_dir)
  115. self._seg_model.eval() # 开启评估模式
  116. # 自动分配到 GPU (如果可用) 以加速推理
  117. if torch.cuda.is_available():
  118. self._seg_model = self._seg_model.to("cuda")
  119. self._seg_torch = torch
  120. self._seg_pil_image = Image
  121. except Exception as exc:
  122. self._seg_disabled = True
  123. logger.warning(f"Segmentation disabled: model loading failed: {exc}")
  124. return None
  125. return self._seg_processor, self._seg_model
  126. def _largest_bbox(self, mask) -> Optional[tuple[int, int, int, int]]:
  127. """从二进制掩码 (Mask) 中提取面积最大的连通区域的外接矩形"""
  128. if mask is None or not mask.any():
  129. return None
  130. mask_uint8 = (mask.astype("uint8")) * 255
  131. contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
  132. if not contours:
  133. return None
  134. largest = max(contours, key=cv2.contourArea)
  135. x, y, w, h = cv2.boundingRect(largest)
  136. # 过滤掉噪点 (宽或高小于20像素的通常是识别错误)
  137. if w < 20 or h < 20:
  138. return None
  139. return x, y, w, h
  140. def _expand_bbox(
  141. self,
  142. bbox: Optional[tuple[int, int, int, int]],
  143. width: int,
  144. height: int,
  145. margin_ratio: float = 0.08,
  146. ) -> Optional[tuple[int, int, int, int]]:
  147. """适度扩大 Bounding Box (增加 margin_ratio),防止目标边缘被裁掉,有利于后续 OCR"""
  148. if bbox is None:
  149. return None
  150. x, y, w, h = bbox
  151. margin_x = int(w * margin_ratio)
  152. margin_y = int(h * margin_ratio)
  153. x1 = max(0, x - margin_x)
  154. y1 = max(0, y - margin_y)
  155. x2 = min(width, x + w + margin_x)
  156. y2 = min(height, y + h + margin_y)
  157. return x1, y1, x2 - x1, y2 - y1
  158. def _focus_region(self, frame, bbox: Optional[tuple[int, int, int, int]]):
  159. """裁剪出关注区域。如果没有有效 BBox,则返回原图,作为容错机制。"""
  160. height, width = frame.shape[:2]
  161. expanded = self._expand_bbox(bbox, width, height)
  162. if expanded is None:
  163. return frame
  164. x, y, w, h = expanded
  165. if w < 24 or h < 24:
  166. return frame
  167. return frame[y: y + h, x: x + w]
  168. def _compute_presence_score(
  169. self,
  170. segmentation_used: bool,
  171. has_card: bool,
  172. has_hand: bool,
  173. card_area_ratio: float,
  174. hand_area_ratio: float,
  175. ) -> float:
  176. """根据卡片和手的面积占比计算"存在感得分" (0.0 ~ 1.0)"""
  177. if not segmentation_used:
  178. return 0.0
  179. # 对占比进行归一化,最大不超过 1.0
  180. card_ratio = min(card_area_ratio / max(settings.VIDEO_MIN_CARD_AREA_RATIO, 1e-6), 1.0)
  181. hand_ratio = min(hand_area_ratio / max(settings.VIDEO_MIN_HAND_AREA_RATIO, 1e-6), 1.0)
  182. score = 0.0
  183. if has_card:
  184. score += 0.70 * max(card_ratio, 0.35) # 卡片权重占 70%
  185. if has_hand:
  186. score += 0.30 * max(hand_ratio, 0.25) # 手的权重占 30%
  187. if has_card and has_hand:
  188. score += 0.10 # 卡和手同框,给予额外 10% 奖励分
  189. return min(score, 1.0)
  190. def _analyze_segmentation(self, frame) -> dict[str, Any]:
  191. """对单帧图像进行语义分割分析,寻找卡片和手的区域"""
  192. if self._ensure_segmentation_model() is None:
  193. return {
  194. "segmentation_used": False,
  195. "has_card": False,
  196. "has_hand": False,
  197. "card_area_ratio": 0.0,
  198. "hand_area_ratio": 0.0,
  199. "card_bbox": None,
  200. }
  201. try:
  202. # OpenCV (BGR) 转换为 PIL 所需的 RGB
  203. rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  204. image = self._seg_pil_image.fromarray(rgb_frame)
  205. # 推理所需的数据需放到和模型同一设备上 (CPU or CUDA)
  206. device = next(self._seg_model.parameters()).device
  207. inputs = self._seg_processor(images=image, return_tensors="pt").to(device)
  208. with self._seg_torch.no_grad():
  209. outputs = self._seg_model(**inputs)
  210. logits = outputs.logits
  211. # 上采样回原始分辨率
  212. pred = self._seg_torch.nn.functional.interpolate(
  213. logits,
  214. size=image.size[::-1],
  215. mode="bilinear",
  216. align_corners=False,
  217. ).argmax(dim=1)[0].cpu().numpy()
  218. card_mask = pred == settings.VIDEO_CARD_LABEL_ID
  219. hand_mask = pred == settings.VIDEO_HAND_LABEL_ID
  220. card_area_ratio = float(card_mask.mean()) if card_mask.size else 0.0
  221. hand_area_ratio = float(hand_mask.mean()) if hand_mask.size else 0.0
  222. # [核心改进]: 提取两个 bbox
  223. card_bbox = self._largest_bbox(card_mask)
  224. hand_bbox = self._largest_bbox(hand_mask)
  225. has_card = card_area_ratio >= settings.VIDEO_MIN_CARD_AREA_RATIO
  226. has_hand = hand_area_ratio >= settings.VIDEO_MIN_HAND_AREA_RATIO
  227. # [核心改进]: 如果卡片太小/没切出来,但有手,把 focus box 降级到手的区域
  228. # 因为手大概率握着卡片,对“手”周边做 OCR 也能有效提取卡面信息
  229. focus_bbox = card_bbox if card_bbox is not None else hand_bbox
  230. # [核心改进]: 主动清理张量内存,防止长视频导致显存/内存溢出
  231. del inputs, outputs, logits, pred
  232. if self._seg_torch.cuda.is_available():
  233. self._seg_torch.cuda.empty_cache()
  234. return {
  235. "segmentation_used": True,
  236. "has_card": has_card,
  237. "has_hand": has_hand,
  238. "card_area_ratio": card_area_ratio,
  239. "hand_area_ratio": hand_area_ratio,
  240. "card_bbox": focus_bbox, # 返回 fallback 后的 bbox
  241. }
  242. except Exception as exc:
  243. if not self._seg_runtime_warning_sent:
  244. logger.warning(f"Segmentation runtime failure, fallback enabled: {exc}")
  245. self._seg_runtime_warning_sent = True
  246. return {
  247. "segmentation_used": False,
  248. "has_card": False,
  249. "has_hand": False,
  250. "card_area_ratio": 0.0,
  251. "hand_area_ratio": 0.0,
  252. "card_bbox": None,
  253. }
  254. def _build_candidate(self, frame, current_time_ms: int, target_time_ms: int) -> FrameCandidate:
  255. """整合单帧的所有基础分析数据(分割、清晰度、存在感等),构建候选对象"""
  256. seg_result = self._analyze_segmentation(frame)
  257. # 对裁剪后的有效区域求清晰度,比算全图清晰度更准确,抗背景干扰
  258. focus_region = self._focus_region(frame, seg_result["card_bbox"])
  259. sharpness = self.get_laplacian_sharpness(focus_region)
  260. presence_score = self._compute_presence_score(
  261. segmentation_used=seg_result["segmentation_used"],
  262. has_card=seg_result["has_card"],
  263. has_hand=seg_result["has_hand"],
  264. card_area_ratio=seg_result["card_area_ratio"],
  265. hand_area_ratio=seg_result["hand_area_ratio"],
  266. )
  267. return FrameCandidate(
  268. frame=frame.copy(),
  269. time_ms=int(current_time_ms),
  270. sharpness=sharpness,
  271. time_weight=self.calculate_weight(current_time_ms, target_time_ms),
  272. segmentation_used=seg_result["segmentation_used"],
  273. has_card=seg_result["has_card"],
  274. has_hand=seg_result["has_hand"],
  275. card_area_ratio=seg_result["card_area_ratio"],
  276. hand_area_ratio=seg_result["hand_area_ratio"],
  277. card_bbox=seg_result["card_bbox"],
  278. presence_score=presence_score,
  279. )
  280. def _collect_candidates(
  281. self,
  282. cap: cv2.VideoCapture,
  283. start_time_ms: int,
  284. end_time_ms: int,
  285. target_time_ms: int,
  286. fps: float,
  287. ) -> list[FrameCandidate]:
  288. """在指定时间窗口内滑动,按步长收集视频帧作为候选"""
  289. candidates: list[FrameCandidate] = []
  290. analysis_stride = self._analysis_stride(fps)
  291. # 预估最大读取次数,防止视频结尾卡死死循环
  292. max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + analysis_stride + 10
  293. # 跳转到起始时间 (注意: OpenCV 的 POS_MSEC 某些视频源上可能不精准)
  294. cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms)
  295. read_count = 0
  296. while read_count < max_reads:
  297. ret, frame = cap.read()
  298. if not ret:
  299. break
  300. current_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
  301. if current_time_ms > end_time_ms:
  302. break
  303. # 按计算好的步长 (analysis_stride) 进行抽帧分析
  304. if read_count % analysis_stride == 0:
  305. candidates.append(self._build_candidate(frame, int(current_time_ms), target_time_ms))
  306. read_count += 1
  307. return candidates
  308. def _assign_dwell_scores(self, candidates: list[FrameCandidate]) -> None:
  309. """
  310. 计算"停留得分" (Dwell Score):
  311. 如果一个人抽到卡通常会停留展示一段时间。连续在帧中被检测到的实体,其停留得分会更高。
  312. 这能有效过滤掉发牌时一晃而过的模糊残影。
  313. """
  314. if not candidates or not any(candidate.segmentation_used for candidate in candidates):
  315. return
  316. target_frames = max(1, int(round(settings.VIDEO_DWELL_TARGET_SECONDS * self.analysis_fps)))
  317. index = 0
  318. while index < len(candidates):
  319. if not candidates[index].is_present:
  320. index += 1
  321. continue
  322. # 寻找连续出现(is_present = True)的片段
  323. run_end = index
  324. while run_end + 1 < len(candidates) and candidates[run_end + 1].is_present:
  325. run_end += 1
  326. run_length = run_end - index + 1
  327. dwell_score = min(run_length / target_frames, 1.0)
  328. # 为这段连续的帧赋相同的停留分
  329. for pos in range(index, run_end + 1):
  330. candidates[pos].dwell_score = dwell_score
  331. index = run_end + 1
  332. def _normalize_sharpness(self, sharpness: float, max_sharpness: float) -> float:
  333. """对清晰度进行对数归一化处理。使用 log 防止极值(超锐化噪点)拉爆分数池"""
  334. if sharpness <= 0 or max_sharpness <= 0:
  335. return 0.0
  336. denominator = math.log1p(max_sharpness)
  337. if denominator <= 0:
  338. return 0.0
  339. return min(math.log1p(sharpness) / denominator, 1.0)
  340. def _normalize_text(self, text: str) -> str:
  341. """清洗文本:去点,全大写,仅保留英文、数字、中文字符"""
  342. if not text:
  343. return ""
  344. cleaned = text.replace(".", "")
  345. cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fff]+", " ", cleaned.upper())
  346. return re.sub(r"\s+", " ", cleaned).strip()
  347. def _tokenize_text(self, text: str) -> list[str]:
  348. """将文本拆分为分词列表,去重去单字母(除非是数字)"""
  349. normalized = self._normalize_text(text)
  350. if not normalized:
  351. return []
  352. tokens: list[str] = []
  353. seen: set[str] = set()
  354. for token in normalized.split():
  355. if len(token) == 1 and not token.isdigit():
  356. continue
  357. if token in seen:
  358. continue
  359. seen.add(token)
  360. tokens.append(token)
  361. return tokens
  362. def _build_expected_text(self, card: CardInfoOutput) -> dict[str, Any]:
  363. """从输入的 json 信息中提取期望的卡片名字、系列号、编号等,作为 OCR 的对比基准"""
  364. name_tokens = self._tokenize_text(card.card_name_en or "")
  365. if not name_tokens and card.card_name_cn:
  366. name_tokens = self._tokenize_text(card.card_name_cn)
  367. all_series_tokens = self._tokenize_text(card.series or "")
  368. number_tokens = [token for token in all_series_tokens if token.isdigit()][:3]
  369. series_tokens = [token for token in all_series_tokens if not token.isdigit()]
  370. series_tokens.sort(key=len, reverse=True)
  371. return {
  372. "name_tokens": name_tokens[:4],
  373. "series_tokens": series_tokens[:6],
  374. "number_tokens": number_tokens,
  375. "has_expectation": bool(name_tokens or series_tokens or number_tokens),
  376. }
  377. def _extract_ocr_text(self, ocr_result: Any) -> str:
  378. """递归解析 RapidOCR 返回的复杂嵌套结构,将所有识别出的文本段落拼装成一个大字符串"""
  379. texts: list[str] = []
  380. def visit(node: Any) -> None:
  381. if node is None:
  382. return
  383. if isinstance(node, str):
  384. stripped = node.strip()
  385. if stripped:
  386. texts.append(stripped)
  387. return
  388. if hasattr(node, "txts"):
  389. visit(getattr(node, "txts"))
  390. return
  391. if hasattr(node, "ocr_res"):
  392. visit(getattr(node, "ocr_res"))
  393. return
  394. if isinstance(node, dict):
  395. for value in node.values():
  396. visit(value)
  397. return
  398. if isinstance(node, (list, tuple)):
  399. if len(node) >= 2 and isinstance(node[1], str):
  400. visit(node[1])
  401. return
  402. for item in node:
  403. visit(item)
  404. visit(ocr_result)
  405. deduped: list[str] = []
  406. seen: set[str] = set()
  407. for text in texts:
  408. if text in seen:
  409. continue
  410. seen.add(text)
  411. deduped.append(text)
  412. return " ".join(deduped)
  413. def _token_overlap_score(self, expected_tokens: list[str], ocr_tokens: list[str]) -> float:
  414. """计算期望 Token 和 OCR 识别 Token 之间的重叠得分,包含对子串(部分匹配)的兼容分"""
  415. if not expected_tokens or not ocr_tokens:
  416. return 0.0
  417. score = 0.0
  418. ocr_set = set(ocr_tokens)
  419. for token in expected_tokens:
  420. if token in ocr_set:
  421. score += 1.0 # 完全命中给 1 分
  422. continue
  423. # 兼容:如果目标 token 是 OCR结果的子串,或者 OCR结果是 token的子串 (例如 "PIKACHU" 匹配出 "PIKACH")
  424. partial_match = any(
  425. len(other) >= 2 and (token in other or other in token)
  426. for other in ocr_set
  427. )
  428. if partial_match:
  429. score += 0.6 # 部分匹配给 0.6 分
  430. return min(score / len(expected_tokens), 1.0)
  431. def _score_ocr_match(self, ocr_text: str, expected: dict[str, Any]) -> float:
  432. """综合评判 OCR 识别文本与目标 JSON 信息的多维度匹配程度"""
  433. if not ocr_text or not expected["has_expectation"]:
  434. return 0.0
  435. normalized_text = self._normalize_text(ocr_text)
  436. ocr_tokens = self._tokenize_text(ocr_text)
  437. if not ocr_tokens:
  438. return 0.0
  439. name_tokens = expected["name_tokens"]
  440. series_tokens = expected["series_tokens"]
  441. number_tokens = expected["number_tokens"]
  442. name_score = self._token_overlap_score(name_tokens, ocr_tokens)
  443. if name_tokens:
  444. joined_name = " ".join(name_tokens)
  445. if joined_name and joined_name in normalized_text:
  446. name_score = 1.0 # 名字完全作为整体匹配上,直接满分
  447. series_score = self._token_overlap_score(series_tokens, ocr_tokens)
  448. number_score = self._token_overlap_score(number_tokens, ocr_tokens)
  449. # 加权混合:卡片名字(60%) > 系列名(25%) > 卡片编号(15%)
  450. if name_tokens:
  451. return min(0.60 * name_score + 0.25 * series_score + 0.15 * number_score, 1.0)
  452. return min(0.65 * series_score + 0.35 * number_score, 1.0)
  453. def _run_ocr(self, frame, bbox: Optional[tuple[int, int, int, int]]) -> str:
  454. """调用 OCR 引擎对关注区域进行文本识别,加入抗弹幕干扰的分块策略"""
  455. engine = self._ensure_ocr_engine()
  456. if engine is None:
  457. return ""
  458. focus_region = self._focus_region(frame, bbox)
  459. if focus_region is None or focus_region.shape[0] == 0 or focus_region.shape[1] == 0:
  460. return ""
  461. texts: list[str] = []
  462. # 1. 常规全图 OCR (可能被中间的弹幕压制,但能提取出分散的编号等)
  463. try:
  464. result_full = engine(focus_region)
  465. texts.append(self._extract_ocr_text(result_full))
  466. except Exception as exc:
  467. if not self._ocr_runtime_warning_sent:
  468. logger.warning(f"OCR full region failure: {exc}")
  469. self._ocr_runtime_warning_sent = True
  470. # 2. 分块特写 OCR,避开中心弹幕区,降低识别阈值
  471. h, w = focus_region.shape[:2]
  472. if h > 60 and w > 60:
  473. # A. 专门识别底部 40% (绝大多数球星卡球员名字、宝可梦卡信息在底部)
  474. try:
  475. bottom_half = focus_region[int(h * 0.6):h, :]
  476. result_bottom = engine(bottom_half)
  477. texts.append(self._extract_ocr_text(result_bottom))
  478. except Exception:
  479. pass
  480. # B. 专门识别顶部 40% (通常有 Bowman 1st 标志、帕尼尼系列名、或宝可梦名字)
  481. try:
  482. top_half = focus_region[0:int(h * 0.4), :]
  483. result_top = engine(top_half)
  484. texts.append(self._extract_ocr_text(result_top))
  485. except Exception:
  486. pass
  487. # 将全图、顶部、底部的识别结果合并(后续的 token_overlap_score 会自动处理去重)
  488. combined_text = " ".join(texts)
  489. # 3. 正则剔除常见的直播间高频干扰词 (防止误匹配)
  490. # 这里的词汇通常是海外拆卡直播间(Whatnot/TikTok)经常出现的系统提示语
  491. ignore_words = r"(?i)\b(bought|break|hobby|jumbo|box|close|spot|nice|snack|packs)\b"
  492. combined_text = re.sub(ignore_words, " ", combined_text)
  493. return combined_text
  494. def _score_candidates(
  495. self,
  496. candidates: list[FrameCandidate],
  497. card_output: CardInfoOutput,
  498. ) -> None:
  499. """
  500. 核心打分中枢:结合之前计算的各个单项分,得出最终排名分。
  501. 采用二次打分机制:先通过 Base Score 选出 Top K,再让 Top K 过一遍耗时的 OCR,得出 Final Score。
  502. """
  503. if not candidates:
  504. return
  505. self._assign_dwell_scores(candidates)
  506. # 只对画面里确认有卡/手的帧进行打分
  507. scoring_candidates = [candidate for candidate in candidates if candidate.is_present]
  508. if not scoring_candidates:
  509. scoring_candidates = candidates
  510. # 找准当前窗口期的相对最大清晰度作为归一化基准
  511. max_sharpness = max(candidate.sharpness for candidate in scoring_candidates) if scoring_candidates else 0.0
  512. segmentation_used = any(candidate.segmentation_used for candidate in candidates)
  513. expected = self._build_expected_text(card_output)
  514. ocr_enabled = self._ensure_ocr_engine() is not None and expected["has_expectation"]
  515. # 1. 粗排:计算 Base Score
  516. for candidate in scoring_candidates:
  517. candidate.sharpness_score = self._normalize_sharpness(candidate.sharpness, max_sharpness)
  518. if segmentation_used:
  519. # 若启用了图像分割:存在感(40%) + 清晰度(25%) + 离目标时间点的近度(20%) + 画面稳定性(15%)
  520. candidate.base_score = (
  521. 0.40 * candidate.presence_score
  522. + 0.25 * candidate.sharpness_score
  523. + 0.20 * candidate.time_weight
  524. + 0.15 * candidate.dwell_score
  525. )
  526. else:
  527. # fallback: 没有分割模型,只能靠清晰度和时间权重
  528. candidate.base_score = (
  529. 0.55 * candidate.sharpness_score
  530. + 0.35 * candidate.time_weight
  531. + 0.10 * candidate.dwell_score
  532. )
  533. # 2. 精排:使用 OCR 计算 Final Score
  534. if ocr_enabled:
  535. # 只有 Base Score 排名前 K 的优胜者才会执行 OCR(性能优化)
  536. top_candidates = sorted(
  537. scoring_candidates,
  538. key=lambda item: item.base_score,
  539. reverse=True,
  540. )[: max(1, settings.VIDEO_OCR_TOP_K)]
  541. for candidate in top_candidates:
  542. candidate.ocr_text = self._run_ocr(candidate.frame, candidate.card_bbox)
  543. candidate.ocr_score = self._score_ocr_match(candidate.ocr_text, expected)
  544. # 更新所有入围帧的 Final Score
  545. for candidate in scoring_candidates:
  546. if segmentation_used:
  547. # OCR占核心大头(40%),配合其他物理指标
  548. candidate.final_score = (
  549. 0.40 * candidate.ocr_score
  550. + 0.25 * candidate.presence_score
  551. + 0.20 * candidate.sharpness_score
  552. + 0.10 * candidate.time_weight
  553. + 0.05 * candidate.dwell_score
  554. )
  555. else:
  556. candidate.final_score = (
  557. 0.45 * candidate.ocr_score
  558. + 0.30 * candidate.sharpness_score
  559. + 0.20 * candidate.time_weight
  560. + 0.05 * candidate.dwell_score
  561. )
  562. else:
  563. # 如果 OCR 不可用或没配置预期,则直接用 Base Score 作为终局分数
  564. for candidate in scoring_candidates:
  565. candidate.final_score = candidate.base_score
  566. def _select_best_candidate(
  567. self,
  568. candidates: list[FrameCandidate],
  569. target_time_ms: int,
  570. ) -> Optional[FrameCandidate]:
  571. """选出最终最能代表"高光时刻"的帧"""
  572. if not candidates:
  573. return None
  574. # 核心逻辑:主比对 final_score;如果最终分一样(比如都为0),看清晰度;再一样,看谁离打点时间最近。
  575. return max(
  576. candidates,
  577. key=lambda item: (
  578. item.final_score,
  579. item.sharpness_score,
  580. -abs(item.time_ms - target_time_ms),
  581. ),
  582. )
  583. def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]:
  584. """业务主干:传入视频与目标卡片打点列表,输出高光图片及匹配信息"""
  585. if not os.path.exists(video_path):
  586. logger.error(f"Video file not found: {video_path}")
  587. raise FileNotFoundError(f"Video file not found: {video_path}")
  588. logger.info(f"Open video: {video_path}")
  589. logger.info(f"Cards to process: {len(cards)}")
  590. cap = cv2.VideoCapture(video_path)
  591. fps = cap.get(cv2.CAP_PROP_FPS)
  592. if fps <= 0:
  593. fps = 30.0
  594. output_list: list[CardInfoOutput] = []
  595. success_count = 0
  596. filtered_count = 0
  597. for idx, card_input in enumerate(cards):
  598. card_output = CardInfoOutput(**card_input.dict())
  599. target_time_ms = self.time_str_to_ms(card_output.time)
  600. # 以打点时间戳为锚,建立一个[过去几s 到 未来 几s] 的搜索窗口
  601. start_time_ms = max(0, target_time_ms - self.search_before_ms)
  602. end_time_ms = target_time_ms + self.search_after_ms
  603. logger.info(
  604. f"[{idx + 1}/{len(cards)}] analyze {card_output.time} "
  605. f"for {card_output.card_name_cn or card_output.card_name_en or 'unknown'}"
  606. )
  607. logger.info(f" search window: [{start_time_ms}ms ~ {end_time_ms}ms]")
  608. # 1. 在窗口内收集所有候选帧
  609. candidates = self._collect_candidates(
  610. cap=cap,
  611. start_time_ms=start_time_ms,
  612. end_time_ms=end_time_ms,
  613. target_time_ms=target_time_ms,
  614. fps=fps,
  615. )
  616. if not candidates:
  617. logger.warning(" no frames sampled in the target window")
  618. continue
  619. segmentation_used = any(candidate.segmentation_used for candidate in candidates)
  620. present_candidates = [candidate for candidate in candidates if candidate.is_present]
  621. # [需求点 1]: 如果使用了分割模型且这片窗口内完全找不到手/卡,直接判定无效数据
  622. if segmentation_used and not present_candidates:
  623. filtered_count += 1
  624. logger.info(" filtered out: no card/hand found around the timestamp")
  625. continue
  626. scoring_candidates = present_candidates if present_candidates else candidates
  627. # 2. 调用多维度评分枢纽给各个候选帧打分
  628. self._score_candidates(candidates, card_output)
  629. # 3. 选出最匹配、最清晰的一张
  630. best_candidate = self._select_best_candidate(scoring_candidates, target_time_ms)
  631. if best_candidate is None:
  632. logger.warning(" no usable candidate after scoring")
  633. continue
  634. # 4. 保存为 JPG,构造业务输出数据
  635. filename = f"{uuid.uuid4()}_{best_candidate.time_ms}.jpg"
  636. save_path = os.path.join(settings.FRAMES_DIR, filename)
  637. try:
  638. cv2.imwrite(save_path, best_candidate.frame)
  639. image_url = f"{settings.BASE_URL}/static/frames/{filename}"
  640. card_output.frame_image_path = image_url
  641. output_list.append(card_output)
  642. success_count += 1
  643. time_diff = (best_candidate.time_ms - target_time_ms) / 1000.0
  644. logger.info(
  645. f" saved {filename} "
  646. f"(offset={time_diff:+.2f}s, sharpness={best_candidate.sharpness:.1f}, "
  647. f"presence={best_candidate.presence_score:.2f}, "
  648. f"ocr={best_candidate.ocr_score:.2f}, score={best_candidate.final_score:.2f})"
  649. )
  650. except Exception as exc:
  651. logger.error(f" failed to save frame: {exc}")
  652. # 务必释放 OpenCV 句柄,避免被视频文件死锁
  653. cap.release()
  654. logger.info(
  655. f"Frame capture finished. saved={success_count}, "
  656. f"filtered={filtered_count}, total={len(cards)}"
  657. )
  658. return output_list