video_service.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893
  1. import math
  2. import os
  3. import re
  4. import uuid
  5. from dataclasses import dataclass
  6. from typing import Any, Optional
  7. import cv2
  8. import numpy as np
  9. import difflib
  10. from app.core.config import settings
  11. from app.core.logger import get_logger
  12. from app.schemas.models import CardInfoInput, CardInfoOutput
  13. import torch
  14. from PIL import Image
  15. from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation
  16. logger = get_logger("VideoService")
  17. @dataclass
  18. class FrameCandidate:
  19. """
  20. 候选帧数据类:记录了从视频中抽取的某一帧的所有评分维度。
  21. 使用 dataclass 让数据结构非常清晰。
  22. """
  23. frame: Any # 原始图像矩阵 (OpenCV BGR)
  24. time_ms: int # 该帧在视频中的时间戳 (毫秒)
  25. sharpness: float # 拉普拉斯清晰度绝对值 (越大越清晰)
  26. time_weight: float # 时间权重 (距离目标时间戳越近,权重越高)
  27. segmentation_used: bool = False # 是否成功启用了分割模型
  28. has_card: bool = False # 画面中是否出现了卡
  29. has_hand: bool = False # 画面中是否出现了手
  30. card_area_ratio: float = 0.0 # 卡片占画面比例
  31. hand_area_ratio: float = 0.0 # 手占画面比例
  32. card_bbox: Optional[tuple[int, int, int, int]] = None # (x, y, w, h) 卡片/手的聚焦边界框
  33. presence_score: float = 0.0 # 实体存在感得分 (卡和手面积越大,得分越高)
  34. sharpness_score: float = 0.0 # 归一化后的清晰度得分 (0~1)
  35. dwell_score: float = 0.0 # 停留得分 (连续出现的帧数越多得分越高,用于抗闪烁)
  36. base_score: float = 0.0 # 基础分 (不包含OCR)
  37. ocr_text: str = "" # OCR识别出的文本
  38. ocr_score: float = 0.0 # OCR文本与预期卡片信息的匹配度得分 (0~1)
  39. final_score: float = 0.0 # 最终总分
  40. @property
  41. def is_present(self) -> bool:
  42. """只要有卡或者有手,就算该实体在画面中存在"""
  43. return self.has_card or self.has_hand
  44. class VideoService:
  45. def __init__(self):
  46. # 高斯分布的 sigma,用于计算时间权重。更大表示对时间差容忍度更高
  47. self.weight_sigma = 6.0
  48. self.search_before_ms = settings.VIDEO_SEARCH_BEFORE_MS
  49. self.search_after_ms = settings.VIDEO_SEARCH_AFTER_MS
  50. self.analysis_fps = max(settings.VIDEO_ANALYSIS_FPS, 0.5)
  51. # 延迟加载组件,节省初始化时的内存占用
  52. self._ocr_engine = None
  53. self._ocr_disabled = False
  54. self._ocr_runtime_warning_sent = False
  55. self._seg_processor = None
  56. self._seg_model = None
  57. self._seg_torch = None
  58. self._seg_pil_image = None
  59. self._seg_disabled = False
  60. self._seg_runtime_warning_sent = False
  61. def time_str_to_ms(self, time_str: str) -> int:
  62. """将格式为 'HH:MM:SS' 或 'MM:SS' 的字符串转换为毫秒"""
  63. try:
  64. parts = list(map(int, time_str.split(":")))
  65. if len(parts) == 3:
  66. h, m, s = parts
  67. return (h * 3600 + m * 60 + s) * 1000
  68. if len(parts) == 2:
  69. m, s = parts
  70. return (m * 60 + s) * 1000
  71. return 0
  72. except ValueError:
  73. return 0
  74. def get_laplacian_sharpness(self, frame) -> float:
  75. """
  76. 计算图像的拉普拉斯方差,这是业界最常用的无参考图像清晰度评估方法。
  77. 方差越大,说明边缘信息越丰富(越不模糊)。
  78. """
  79. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  80. # 增加高斯模糊,过滤掉反光产生的噪点和高频毛刺
  81. blurred = cv2.GaussianBlur(gray, (3, 3), 0)
  82. return float(cv2.Laplacian(blurred, cv2.CV_64F).var())
  83. def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float:
  84. """利用高斯函数计算时间权重。距离 target_time_ms 越近,返回值越接近 1.0"""
  85. diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0
  86. return math.exp(-((diff_seconds ** 2) / (2 * self.weight_sigma ** 2)))
  87. def _analysis_stride(self, fps: float) -> int:
  88. """计算视频读取时的跳帧步长,确保处理速度匹配 VIDEO_ANALYSIS_FPS"""
  89. fps = fps if fps > 0 else 30.0
  90. return max(1, int(round(fps / self.analysis_fps)))
  91. def _ensure_ocr_engine(self):
  92. """单例模式懒加载 OCR 引擎 (RapidOCR)"""
  93. if self._ocr_disabled:
  94. return None
  95. if self._ocr_engine is not None:
  96. return self._ocr_engine
  97. try:
  98. from rapidocr import RapidOCR
  99. self._ocr_engine = RapidOCR()
  100. except Exception as exc:
  101. self._ocr_disabled = True
  102. logger.warning(f"OCR disabled: init failed: {exc}")
  103. return None
  104. return self._ocr_engine
  105. def _ensure_segmentation_model(self):
  106. """单例模式懒加载 HuggingFace Segformer 语义分割模型"""
  107. if self._seg_disabled:
  108. return None
  109. if self._seg_processor is not None and self._seg_model is not None:
  110. return self._seg_processor, self._seg_model
  111. model_dir = settings.VIDEO_SEG_MODEL_DIR
  112. if not model_dir or not os.path.exists(model_dir):
  113. self._seg_disabled = True
  114. logger.warning(f"Segmentation disabled: model dir not found: {model_dir}")
  115. return None
  116. try:
  117. self._seg_processor = AutoImageProcessor.from_pretrained(model_dir)
  118. self._seg_model = AutoModelForSemanticSegmentation.from_pretrained(model_dir)
  119. self._seg_model.eval() # 开启评估模式
  120. # 自动分配到 GPU (如果可用) 以加速推理
  121. if torch.cuda.is_available():
  122. self._seg_model = self._seg_model.to("cuda")
  123. self._seg_torch = torch
  124. self._seg_pil_image = Image
  125. except Exception as exc:
  126. self._seg_disabled = True
  127. logger.warning(f"Segmentation disabled: model loading failed: {exc}")
  128. return None
  129. return self._seg_processor, self._seg_model
  130. def _largest_bbox(self, mask) -> Optional[tuple[int, int, int, int]]:
  131. """从二进制掩码 (Mask) 中提取面积最大的连通区域的外接矩形"""
  132. if mask is None or not mask.any():
  133. return None
  134. mask_uint8 = (mask.astype("uint8")) * 255
  135. contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
  136. if not contours:
  137. return None
  138. largest = max(contours, key=cv2.contourArea)
  139. x, y, w, h = cv2.boundingRect(largest)
  140. # 过滤掉噪点 (宽或高小于20像素的通常是识别错误)
  141. if w < 20 or h < 20:
  142. return None
  143. return x, y, w, h
  144. def _expand_bbox(
  145. self,
  146. bbox: Optional[tuple[int, int, int, int]],
  147. width: int,
  148. height: int,
  149. margin_ratio: float = 0.08,
  150. ) -> Optional[tuple[int, int, int, int]]:
  151. """适度扩大 Bounding Box (增加 margin_ratio),防止目标边缘被裁掉,有利于后续 OCR"""
  152. if bbox is None:
  153. return None
  154. x, y, w, h = bbox
  155. margin_x = int(w * margin_ratio)
  156. margin_y = int(h * margin_ratio)
  157. x1 = max(0, x - margin_x)
  158. y1 = max(0, y - margin_y)
  159. x2 = min(width, x + w + margin_x)
  160. y2 = min(height, y + h + margin_y)
  161. return x1, y1, x2 - x1, y2 - y1
  162. def _focus_region(self, frame, bbox: Optional[tuple[int, int, int, int]]):
  163. """裁剪出关注区域。如果没有有效 BBox,则返回原图,作为容错机制。"""
  164. height, width = frame.shape[:2]
  165. expanded = self._expand_bbox(bbox, width, height)
  166. if expanded is None:
  167. return frame
  168. x, y, w, h = expanded
  169. if w < 24 or h < 24:
  170. return frame
  171. return frame[y: y + h, x: x + w]
  172. def _compute_presence_score(
  173. self,
  174. segmentation_used: bool,
  175. has_card: bool,
  176. has_hand: bool,
  177. card_area_ratio: float,
  178. hand_area_ratio: float,
  179. ) -> float:
  180. """根据卡片和手的面积占比计算"存在感得分" (0.0 ~ 1.0)"""
  181. if not segmentation_used:
  182. return 0.0
  183. # 对占比进行归一化,最大不超过 1.0
  184. card_ratio = min(card_area_ratio / max(settings.VIDEO_MIN_CARD_AREA_RATIO, 1e-6), 1.0)
  185. hand_ratio = min(hand_area_ratio / max(settings.VIDEO_MIN_HAND_AREA_RATIO, 1e-6), 1.0)
  186. score = 0.0
  187. if has_card:
  188. score += 0.70 * max(card_ratio, 0.35) # 卡片权重占 70%
  189. if has_hand:
  190. score += 0.30 * max(hand_ratio, 0.25) # 手的权重占 30%
  191. if has_card and has_hand:
  192. score += 0.10 # 卡和手同框,给予额外 10% 奖励分
  193. return min(score, 1.0)
  194. def _batch_analyze_segmentation(self, frames: list[Any]) -> list[dict[str, Any]]:
  195. """批量对多张图像进行语义分割,极大提高 GPU 利用率"""
  196. if not frames or self._ensure_segmentation_model() is None:
  197. return [{"segmentation_used": False, "has_card": False, "has_hand": False,
  198. "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None}] * len(frames)
  199. try:
  200. pil_images = [self._seg_pil_image.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB)) for f in frames]
  201. device = next(self._seg_model.parameters()).device
  202. results = []
  203. # 分块批处理,防止显存 OOM(比如 16 帧一个 Batch)
  204. batch_size = 16
  205. for i in range(0, len(pil_images), batch_size):
  206. batch_imgs = pil_images[i: i + batch_size]
  207. inputs = self._seg_processor(images=batch_imgs, return_tensors="pt").to(device)
  208. with self._seg_torch.no_grad():
  209. outputs = self._seg_model(**inputs)
  210. logits = outputs.logits
  211. # 批量上采样并取 argmax
  212. preds = self._seg_torch.nn.functional.interpolate(
  213. logits,
  214. size=batch_imgs[0].size[::-1], # 假设所有帧分辨率一样
  215. mode="bilinear",
  216. align_corners=False,
  217. ).argmax(dim=1).cpu().numpy()
  218. # 解析每张图的 Mask
  219. for pred in preds:
  220. card_mask = pred == settings.VIDEO_CARD_LABEL_ID
  221. hand_mask = pred == settings.VIDEO_HAND_LABEL_ID
  222. card_area = float(card_mask.mean()) if card_mask.size else 0.0
  223. hand_area = float(hand_mask.mean()) if hand_mask.size else 0.0
  224. card_bbox = self._largest_bbox(card_mask)
  225. hand_bbox = self._largest_bbox(hand_mask)
  226. focus_bbox = card_bbox if card_bbox is not None else hand_bbox
  227. results.append({
  228. "segmentation_used": True,
  229. "has_card": card_area >= settings.VIDEO_MIN_CARD_AREA_RATIO,
  230. "has_hand": hand_area >= settings.VIDEO_MIN_HAND_AREA_RATIO,
  231. "card_area_ratio": card_area,
  232. "hand_area_ratio": hand_area,
  233. "card_bbox": focus_bbox,
  234. })
  235. # 及时清理这批显存
  236. del inputs, outputs, logits, preds
  237. if self._seg_torch.cuda.is_available():
  238. self._seg_torch.cuda.empty_cache()
  239. return results
  240. except Exception as exc:
  241. logger.warning(f"Batch segmentation failed, fallback: {exc}")
  242. return [{"segmentation_used": False, "has_card": False, "has_hand": False,
  243. "card_area_ratio": 0.0, "hand_area_ratio": 0.0, "card_bbox": None}] * len(frames)
  244. def _analyze_segmentation(self, frame) -> dict[str, Any]:
  245. """对单帧图像进行语义分割分析,寻找卡片和手的区域"""
  246. if self._ensure_segmentation_model() is None:
  247. return {
  248. "segmentation_used": False,
  249. "has_card": False,
  250. "has_hand": False,
  251. "card_area_ratio": 0.0,
  252. "hand_area_ratio": 0.0,
  253. "card_bbox": None,
  254. }
  255. try:
  256. # OpenCV (BGR) 转换为 PIL 所需的 RGB
  257. rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  258. image = self._seg_pil_image.fromarray(rgb_frame)
  259. # 推理所需的数据需放到和模型同一设备上 (CPU or CUDA)
  260. device = next(self._seg_model.parameters()).device
  261. inputs = self._seg_processor(images=image, return_tensors="pt").to(device)
  262. with self._seg_torch.no_grad():
  263. outputs = self._seg_model(**inputs)
  264. logits = outputs.logits
  265. # 上采样回原始分辨率
  266. pred = self._seg_torch.nn.functional.interpolate(
  267. logits,
  268. size=image.size[::-1],
  269. mode="bilinear",
  270. align_corners=False,
  271. ).argmax(dim=1)[0].cpu().numpy()
  272. card_mask = pred == settings.VIDEO_CARD_LABEL_ID
  273. hand_mask = pred == settings.VIDEO_HAND_LABEL_ID
  274. card_area_ratio = float(card_mask.mean()) if card_mask.size else 0.0
  275. hand_area_ratio = float(hand_mask.mean()) if hand_mask.size else 0.0
  276. # [核心改进]: 提取两个 bbox
  277. card_bbox = self._largest_bbox(card_mask)
  278. hand_bbox = self._largest_bbox(hand_mask)
  279. has_card = card_area_ratio >= settings.VIDEO_MIN_CARD_AREA_RATIO
  280. has_hand = hand_area_ratio >= settings.VIDEO_MIN_HAND_AREA_RATIO
  281. # [核心改进]: 如果卡片太小/没切出来,但有手,把 focus box 降级到手的区域
  282. # 因为手大概率握着卡片,对“手”周边做 OCR 也能有效提取卡面信息
  283. focus_bbox = card_bbox if card_bbox is not None else hand_bbox
  284. # [核心改进]: 主动清理张量内存,防止长视频导致显存/内存溢出
  285. del inputs, outputs, logits, pred
  286. if self._seg_torch.cuda.is_available():
  287. self._seg_torch.cuda.empty_cache()
  288. return {
  289. "segmentation_used": True,
  290. "has_card": has_card,
  291. "has_hand": has_hand,
  292. "card_area_ratio": card_area_ratio,
  293. "hand_area_ratio": hand_area_ratio,
  294. "card_bbox": focus_bbox, # 返回 fallback 后的 bbox
  295. }
  296. except Exception as exc:
  297. if not self._seg_runtime_warning_sent:
  298. logger.warning(f"Segmentation runtime failure, fallback enabled: {exc}")
  299. self._seg_runtime_warning_sent = True
  300. return {
  301. "segmentation_used": False,
  302. "has_card": False,
  303. "has_hand": False,
  304. "card_area_ratio": 0.0,
  305. "hand_area_ratio": 0.0,
  306. "card_bbox": None,
  307. }
  308. def _build_candidate(self, frame, current_time_ms: int, target_time_ms: int) -> FrameCandidate:
  309. """整合单帧的所有基础分析数据(分割、清晰度、存在感等),构建候选对象"""
  310. seg_result = self._analyze_segmentation(frame)
  311. # 对裁剪后的有效区域求清晰度,比算全图清晰度更准确,抗背景干扰
  312. focus_region = self._focus_region(frame, seg_result["card_bbox"])
  313. sharpness = self.get_laplacian_sharpness(focus_region)
  314. presence_score = self._compute_presence_score(
  315. segmentation_used=seg_result["segmentation_used"],
  316. has_card=seg_result["has_card"],
  317. has_hand=seg_result["has_hand"],
  318. card_area_ratio=seg_result["card_area_ratio"],
  319. hand_area_ratio=seg_result["hand_area_ratio"],
  320. )
  321. return FrameCandidate(
  322. frame=frame.copy(),
  323. time_ms=int(current_time_ms),
  324. sharpness=sharpness,
  325. time_weight=self.calculate_weight(current_time_ms, target_time_ms),
  326. segmentation_used=seg_result["segmentation_used"],
  327. has_card=seg_result["has_card"],
  328. has_hand=seg_result["has_hand"],
  329. card_area_ratio=seg_result["card_area_ratio"],
  330. hand_area_ratio=seg_result["hand_area_ratio"],
  331. card_bbox=seg_result["card_bbox"],
  332. presence_score=presence_score,
  333. )
  334. def _collect_candidates(
  335. self,
  336. cap: cv2.VideoCapture,
  337. start_time_ms: int,
  338. end_time_ms: int,
  339. target_time_ms: int,
  340. fps: float,
  341. ) -> list[FrameCandidate]:
  342. """在指定时间窗口内滑动,按步长收集视频帧作为候选"""
  343. candidates: list[FrameCandidate] = []
  344. raw_frames = []
  345. time_ms_list = []
  346. analysis_stride = self._analysis_stride(fps)
  347. # 预估最大读取次数,防止视频结尾卡死死循环
  348. max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + analysis_stride + 10
  349. # 跳转到起始时间 (注意: OpenCV 的 POS_MSEC 某些视频源上可能不精准)
  350. cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms)
  351. read_count = 0
  352. while read_count < max_reads:
  353. # 仅仅抓取下一帧的数据流,不进行耗时的图像解码
  354. ret = cap.grab()
  355. if not ret:
  356. break
  357. current_time_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
  358. if current_time_ms > end_time_ms:
  359. break
  360. # 到达步长,才真正解码成图像矩阵
  361. if read_count % analysis_stride == 0:
  362. ret, frame = cap.retrieve()
  363. if ret:
  364. raw_frames.append(frame.copy())
  365. time_ms_list.append(current_time_ms)
  366. read_count += 1
  367. if not raw_frames:
  368. return []
  369. # 1. 批量过分割模型
  370. seg_results = self._batch_analyze_segmentation(raw_frames)
  371. # 2. 遍历组装 Candidate 并计算清晰度
  372. for frame, time_ms, seg_res in zip(raw_frames, time_ms_list, seg_results):
  373. # 切割关注区域算清晰度
  374. focus_region = self._focus_region(frame, seg_res["card_bbox"])
  375. sharpness = self.get_laplacian_sharpness(focus_region)
  376. presence_score = self._compute_presence_score(
  377. seg_res["segmentation_used"], seg_res["has_card"],
  378. seg_res["has_hand"], seg_res["card_area_ratio"], seg_res["hand_area_ratio"]
  379. )
  380. candidates.append(FrameCandidate(
  381. frame=frame,
  382. time_ms=int(time_ms),
  383. sharpness=sharpness,
  384. time_weight=self.calculate_weight(time_ms, target_time_ms),
  385. presence_score=presence_score,
  386. **seg_res # 解包填入 has_card, card_bbox 等属性
  387. ))
  388. return candidates
  389. def _assign_dwell_scores(self, candidates: list[FrameCandidate]) -> None:
  390. """
  391. 计算"停留得分" (Dwell Score):
  392. 如果一个人抽到卡通常会停留展示一段时间。连续在帧中被检测到的实体,其停留得分会更高。
  393. 这能有效过滤掉发牌时一晃而过的模糊残影。
  394. """
  395. if not candidates or not any(candidate.segmentation_used for candidate in candidates):
  396. return
  397. target_frames = max(1, int(round(settings.VIDEO_DWELL_TARGET_SECONDS * self.analysis_fps)))
  398. index = 0
  399. while index < len(candidates):
  400. if not candidates[index].is_present:
  401. index += 1
  402. continue
  403. # 寻找连续出现(is_present = True)的片段
  404. run_end = index
  405. while run_end + 1 < len(candidates) and candidates[run_end + 1].is_present:
  406. run_end += 1
  407. run_length = run_end - index + 1
  408. dwell_score = min(run_length / target_frames, 1.0)
  409. # 为这段连续的帧赋相同的停留分
  410. for pos in range(index, run_end + 1):
  411. candidates[pos].dwell_score = dwell_score
  412. index = run_end + 1
  413. def _normalize_sharpness(self, sharpness: float, max_sharpness: float) -> float:
  414. """对清晰度进行对数归一化处理。使用 log 防止极值(超锐化噪点)拉爆分数池"""
  415. if sharpness <= 0 or max_sharpness <= 0:
  416. return 0.0
  417. denominator = math.log1p(max_sharpness)
  418. if denominator <= 0:
  419. return 0.0
  420. return min(math.log1p(sharpness) / denominator, 1.0)
  421. def _normalize_text(self, text: str) -> str:
  422. """清洗文本:去点,全大写,仅保留英文、数字、中文字符"""
  423. if not text:
  424. return ""
  425. cleaned = text.replace(".", "")
  426. cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fff]+", " ", cleaned.upper())
  427. return re.sub(r"\s+", " ", cleaned).strip()
  428. def _tokenize_text(self, text: str) -> list[str]:
  429. """将文本拆分为分词列表,去重去单字母(除非是数字)"""
  430. normalized = self._normalize_text(text)
  431. if not normalized:
  432. return []
  433. tokens: list[str] = []
  434. seen: set[str] = set()
  435. for token in normalized.split():
  436. if len(token) == 1 and not token.isdigit():
  437. continue
  438. if token in seen:
  439. continue
  440. seen.add(token)
  441. tokens.append(token)
  442. return tokens
  443. def _build_expected_text(self, card: CardInfoOutput) -> dict[str, Any]:
  444. """从输入的 json 信息中提取期望的卡片名字、系列号、编号等,作为 OCR 的对比基准"""
  445. name_tokens = self._tokenize_text(card.card_name_en or "")
  446. if not name_tokens and card.card_name_cn:
  447. name_tokens = self._tokenize_text(card.card_name_cn)
  448. all_series_tokens = self._tokenize_text(card.series or "")
  449. number_tokens = [token for token in all_series_tokens if token.isdigit()][:3]
  450. series_tokens = [token for token in all_series_tokens if not token.isdigit()]
  451. series_tokens.sort(key=len, reverse=True)
  452. return {
  453. "name_tokens": name_tokens[:4],
  454. "series_tokens": series_tokens[:6],
  455. "number_tokens": number_tokens,
  456. "has_expectation": bool(name_tokens or series_tokens or number_tokens),
  457. }
  458. def _extract_ocr_text(self, ocr_result: Any) -> str:
  459. """递归解析 RapidOCR 返回的复杂嵌套结构,将所有识别出的文本段落拼装成一个大字符串"""
  460. texts: list[str] = []
  461. def visit(node: Any) -> None:
  462. if node is None:
  463. return
  464. if isinstance(node, str):
  465. stripped = node.strip()
  466. if stripped:
  467. texts.append(stripped)
  468. return
  469. if hasattr(node, "txts"):
  470. visit(getattr(node, "txts"))
  471. return
  472. if hasattr(node, "ocr_res"):
  473. visit(getattr(node, "ocr_res"))
  474. return
  475. if isinstance(node, dict):
  476. for value in node.values():
  477. visit(value)
  478. return
  479. if isinstance(node, (list, tuple)):
  480. if len(node) >= 2 and isinstance(node[1], str):
  481. visit(node[1])
  482. return
  483. for item in node:
  484. visit(item)
  485. visit(ocr_result)
  486. deduped: list[str] = []
  487. seen: set[str] = set()
  488. for text in texts:
  489. if text in seen:
  490. continue
  491. seen.add(text)
  492. deduped.append(text)
  493. return " ".join(deduped)
  494. def _token_overlap_score(self, expected_tokens: list[str], ocr_tokens: list[str]) -> float:
  495. """计算期望 Token 和 OCR 识别 Token 之间的重叠得分,包含对子串(部分匹配)的兼容分"""
  496. if not expected_tokens or not ocr_tokens:
  497. return 0.0
  498. score = 0.0
  499. ocr_set = set(ocr_tokens)
  500. for token in expected_tokens:
  501. best_ratio = 0.0
  502. for other in ocr_tokens:
  503. # 计算字符串相似度 (0 到 1)
  504. ratio = difflib.SequenceMatcher(None, token, other).ratio()
  505. if ratio > best_ratio:
  506. best_ratio = ratio
  507. if best_ratio > 0.85:
  508. score += 1.0 # 相似度极高,视为完全命中
  509. elif best_ratio > 0.6:
  510. score += 0.6 # 存在一定错别字,给部分分
  511. return min(score / len(expected_tokens), 1.0)
  512. def _score_ocr_match(self, ocr_text: str, expected: dict[str, Any]) -> float:
  513. """综合评判 OCR 识别文本与目标 JSON 信息的多维度匹配程度"""
  514. if not ocr_text or not expected["has_expectation"]:
  515. return 0.0
  516. normalized_text = self._normalize_text(ocr_text)
  517. ocr_tokens = self._tokenize_text(ocr_text)
  518. if not ocr_tokens:
  519. return 0.0
  520. name_tokens = expected["name_tokens"]
  521. series_tokens = expected["series_tokens"]
  522. number_tokens = expected["number_tokens"]
  523. name_score = self._token_overlap_score(name_tokens, ocr_tokens)
  524. if name_tokens:
  525. joined_name = " ".join(name_tokens)
  526. if joined_name and joined_name in normalized_text:
  527. name_score = 1.0 # 名字完全作为整体匹配上,直接满分
  528. series_score = self._token_overlap_score(series_tokens, ocr_tokens)
  529. number_score = self._token_overlap_score(number_tokens, ocr_tokens)
  530. # 加权混合:卡片名字(60%) > 系列名(25%) > 卡片编号(15%)
  531. if name_tokens:
  532. return min(0.60 * name_score + 0.25 * series_score + 0.15 * number_score, 1.0)
  533. return min(0.65 * series_score + 0.35 * number_score, 1.0)
  534. def _run_ocr(self, frame, bbox: Optional[tuple[int, int, int, int]]) -> str:
  535. """调用 OCR 引擎对关注区域进行文本识别,加入抗弹幕干扰的分块策略"""
  536. engine = self._ensure_ocr_engine()
  537. if engine is None:
  538. return ""
  539. focus_region = self._focus_region(frame, bbox)
  540. if focus_region is None or focus_region.shape[0] == 0 or focus_region.shape[1] == 0:
  541. return ""
  542. texts: list[str] = []
  543. # 1. 常规全图 OCR (可能被中间的弹幕压制,但能提取出分散的编号等)
  544. try:
  545. result_full = engine(focus_region)
  546. texts.append(self._extract_ocr_text(result_full))
  547. except Exception as exc:
  548. if not self._ocr_runtime_warning_sent:
  549. logger.warning(f"OCR full region failure: {exc}")
  550. self._ocr_runtime_warning_sent = True
  551. # 2. 分块特写 OCR,避开中心弹幕区,降低识别阈值
  552. h, w = focus_region.shape[:2]
  553. if h > 60 and w > 60:
  554. # A. 专门识别底部 40% (绝大多数球星卡球员名字、宝可梦卡信息在底部)
  555. try:
  556. bottom_half = focus_region[int(h * 0.6):h, :]
  557. result_bottom = engine(bottom_half)
  558. texts.append(self._extract_ocr_text(result_bottom))
  559. except Exception:
  560. pass
  561. # B. 专门识别顶部 40% (通常有 Bowman 1st 标志、帕尼尼系列名、或宝可梦名字)
  562. try:
  563. top_half = focus_region[0:int(h * 0.4), :]
  564. result_top = engine(top_half)
  565. texts.append(self._extract_ocr_text(result_top))
  566. except Exception:
  567. pass
  568. # 将全图、顶部、底部的识别结果合并(后续的 token_overlap_score 会自动处理去重)
  569. combined_text = " ".join(texts)
  570. # 3. 正则剔除常见的直播间高频干扰词 (防止误匹配)
  571. # 这里的词汇通常是海外拆卡直播间(Whatnot/TikTok)经常出现的系统提示语
  572. ignore_words = r"(?i)\b(bought|break|hobby|jumbo|box|close|spot|nice|snack|packs)\b"
  573. combined_text = re.sub(ignore_words, " ", combined_text)
  574. return combined_text
  575. def _score_candidates(
  576. self,
  577. candidates: list[FrameCandidate],
  578. card_output: CardInfoOutput,
  579. ) -> None:
  580. """
  581. 核心打分中枢:结合之前计算的各个单项分,得出最终排名分。
  582. 采用二次打分机制:先通过 Base Score 选出 Top K,再让 Top K 过一遍耗时的 OCR,得出 Final Score。
  583. """
  584. if not candidates:
  585. return
  586. self._assign_dwell_scores(candidates)
  587. # 只对画面里确认有卡/手的帧进行打分
  588. scoring_candidates = [candidate for candidate in candidates if candidate.is_present]
  589. if not scoring_candidates:
  590. scoring_candidates = candidates
  591. # 改为使用 90 分位数,防止单帧反光噪点拉爆整个分数池
  592. if scoring_candidates:
  593. sharpnesses = [c.sharpness for c in scoring_candidates]
  594. max_sharpness = float(np.percentile(sharpnesses, 90))
  595. else:
  596. max_sharpness = 0.0
  597. segmentation_used = any(candidate.segmentation_used for candidate in candidates)
  598. expected = self._build_expected_text(card_output)
  599. ocr_enabled = self._ensure_ocr_engine() is not None and expected["has_expectation"]
  600. # 1. 粗排:计算 Base Score
  601. for candidate in scoring_candidates:
  602. candidate.sharpness_score = self._normalize_sharpness(candidate.sharpness, max_sharpness)
  603. if segmentation_used:
  604. # 若启用了图像分割:存在感(40%) + 清晰度(25%) + 离目标时间点的近度(20%) + 画面稳定性(15%)
  605. candidate.base_score = (
  606. 0.40 * candidate.presence_score
  607. + 0.25 * candidate.sharpness_score
  608. + 0.20 * candidate.time_weight
  609. + 0.15 * candidate.dwell_score
  610. )
  611. else:
  612. # fallback: 没有分割模型,只能靠清晰度和时间权重
  613. candidate.base_score = (
  614. 0.55 * candidate.sharpness_score
  615. + 0.35 * candidate.time_weight
  616. + 0.10 * candidate.dwell_score
  617. )
  618. # 2. 精排:使用 OCR 计算 Final Score
  619. if ocr_enabled:
  620. # 只有 Base Score 排名前 K 的优胜者才会执行 OCR(性能优化)
  621. top_candidates = sorted(
  622. scoring_candidates,
  623. key=lambda item: item.base_score,
  624. reverse=True,
  625. )[: max(1, settings.VIDEO_OCR_TOP_K)]
  626. for candidate in top_candidates:
  627. candidate.ocr_text = self._run_ocr(candidate.frame, candidate.card_bbox)
  628. candidate.ocr_score = self._score_ocr_match(candidate.ocr_text, expected)
  629. # 更新所有入围帧的 Final Score
  630. for candidate in scoring_candidates:
  631. if segmentation_used:
  632. # OCR占核心大头(40%),配合其他物理指标
  633. candidate.final_score = (
  634. 0.40 * candidate.ocr_score
  635. + 0.25 * candidate.presence_score
  636. + 0.20 * candidate.sharpness_score
  637. + 0.10 * candidate.time_weight
  638. + 0.05 * candidate.dwell_score
  639. )
  640. else:
  641. candidate.final_score = (
  642. 0.45 * candidate.ocr_score
  643. + 0.30 * candidate.sharpness_score
  644. + 0.20 * candidate.time_weight
  645. + 0.05 * candidate.dwell_score
  646. )
  647. else:
  648. # 如果 OCR 不可用或没配置预期,则直接用 Base Score 作为终局分数
  649. for candidate in scoring_candidates:
  650. candidate.final_score = candidate.base_score
  651. def _select_best_candidate(
  652. self,
  653. candidates: list[FrameCandidate],
  654. target_time_ms: int,
  655. ) -> Optional[FrameCandidate]:
  656. """选出最终最能代表"高光时刻"的帧"""
  657. if not candidates:
  658. return None
  659. # 核心逻辑:主比对 final_score;如果最终分一样(比如都为0),看清晰度;再一样,看谁离打点时间最近。
  660. return max(
  661. candidates,
  662. key=lambda item: (
  663. item.final_score,
  664. item.sharpness_score,
  665. -abs(item.time_ms - target_time_ms),
  666. ),
  667. )
  668. def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]:
  669. """业务主干:传入视频与目标卡片打点列表,输出高光图片及匹配信息"""
  670. if not os.path.exists(video_path):
  671. logger.error(f"Video file not found: {video_path}")
  672. raise FileNotFoundError(f"Video file not found: {video_path}")
  673. logger.info(f"Open video: {video_path}")
  674. logger.info(f"Cards to process: {len(cards)}")
  675. cap = cv2.VideoCapture(video_path)
  676. fps = cap.get(cv2.CAP_PROP_FPS)
  677. if fps <= 0:
  678. fps = 30.0
  679. output_list: list[CardInfoOutput] = []
  680. success_count = 0
  681. filtered_count = 0
  682. for idx, card_input in enumerate(cards):
  683. card_output = CardInfoOutput(**card_input.dict())
  684. target_time_ms = self.time_str_to_ms(card_output.time)
  685. # 以打点时间戳为锚,建立一个[过去几s 到 未来 几s] 的搜索窗口
  686. start_time_ms = max(0, target_time_ms - self.search_before_ms)
  687. end_time_ms = target_time_ms + self.search_after_ms
  688. logger.info(
  689. f"[{idx + 1}/{len(cards)}] analyze {card_output.time} "
  690. f"for {card_output.card_name_cn or card_output.card_name_en or 'unknown'}"
  691. )
  692. logger.info(f" search window: [{start_time_ms}ms ~ {end_time_ms}ms]")
  693. # 1. 在窗口内收集所有候选帧
  694. candidates = self._collect_candidates(
  695. cap=cap,
  696. start_time_ms=start_time_ms,
  697. end_time_ms=end_time_ms,
  698. target_time_ms=target_time_ms,
  699. fps=fps,
  700. )
  701. if not candidates:
  702. logger.warning(" no frames sampled in the target window")
  703. continue
  704. segmentation_used = any(candidate.segmentation_used for candidate in candidates)
  705. present_candidates = [candidate for candidate in candidates if candidate.is_present]
  706. # [需求点 1]: 如果使用了分割模型且这片窗口内完全找不到手/卡,直接判定无效数据
  707. if segmentation_used and not present_candidates:
  708. filtered_count += 1
  709. logger.info(" filtered out: no card/hand found around the timestamp")
  710. continue
  711. scoring_candidates = present_candidates if present_candidates else candidates
  712. # 2. 调用多维度评分枢纽给各个候选帧打分
  713. self._score_candidates(candidates, card_output)
  714. # 3. 选出最匹配、最清晰的一张
  715. best_candidate = self._select_best_candidate(scoring_candidates, target_time_ms)
  716. if best_candidate is None:
  717. logger.warning(" no usable candidate after scoring")
  718. continue
  719. # 4. 保存为 JPG,构造业务输出数据
  720. filename = f"{uuid.uuid4()}_{best_candidate.time_ms}.jpg"
  721. save_path = os.path.join(settings.FRAMES_DIR, filename)
  722. try:
  723. cv2.imwrite(save_path, best_candidate.frame)
  724. image_url = f"{settings.BASE_URL}/static/frames/{filename}"
  725. card_output.frame_image_path = image_url
  726. output_list.append(card_output)
  727. success_count += 1
  728. time_diff = (best_candidate.time_ms - target_time_ms) / 1000.0
  729. logger.info(
  730. f" saved {filename} "
  731. f"(offset={time_diff:+.2f}s, sharpness={best_candidate.sharpness:.1f}, "
  732. f"presence={best_candidate.presence_score:.2f}, "
  733. f"ocr={best_candidate.ocr_score:.2f}, score={best_candidate.final_score:.2f})"
  734. )
  735. except Exception as exc:
  736. logger.error(f" failed to save frame: {exc}")
  737. # 务必释放 OpenCV 句柄,避免被视频文件死锁
  738. cap.release()
  739. logger.info(
  740. f"Frame capture finished. saved={success_count}, "
  741. f"filtered={filtered_count}, total={len(cards)}"
  742. )
  743. return output_list