import cv2 import os import uuid import math from app.core.config import settings from app.core.logger import get_logger from app.schemas.models import CardInfoInput, CardInfoOutput logger = get_logger("VideoService") class VideoService: def __init__(self): # 高斯函数中的 sigma (标准差) 决定了时间权重的下降速度。 self.weight_sigma = 10.0 def time_str_to_ms(self, time_str: str) -> int: try: parts = list(map(int, time_str.split(':'))) if len(parts) == 3: h, m, s = parts return (h * 3600 + m * 60 + s) * 1000 elif len(parts) == 2: m, s = parts return (m * 60 + s) * 1000 return 0 except ValueError: return 0 def get_laplacian_sharpness(self, frame) -> float: """ 计算图像的拉普拉斯方差。 方差越大,代表图像包含的高频边缘信息越多,也就意味着对焦越准、越清晰。 """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) return cv2.Laplacian(gray, cv2.CV_64F).var() def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float: """ 计算时间权重:使用高斯衰减函数。距离目标时间越近,权重越高。 """ diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0 return math.exp(- (diff_seconds ** 2) / (2 * self.weight_sigma ** 2)) def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]: if not os.path.exists(video_path): logger.error(f"❌ 找不到视频文件: {video_path}") raise FileNotFoundError(f"Video file not found: {video_path}") logger.info(f"🎬 打开视频文件: {video_path}") logger.info(f"📋 待处理卡片数量: {len(cards)}") cap = cv2.VideoCapture(video_path) # 获取视频帧率,用于计算安全边界 fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 0: fps = 30.0 output_list = [] success_count = 0 for idx, card_input in enumerate(cards): card_output = CardInfoOutput(**card_input.dict()) target_time_ms = self.time_str_to_ms(card_output.time) # 设定搜索窗口区间: [目标时间 - 1秒, 目标时间 + 4秒] start_time_ms = max(0, target_time_ms - 1000) end_time_ms = target_time_ms + 4000 logger.info( f"📸[{idx + 1}/{len(cards)}] 智能截取 {card_output.time} ({target_time_ms}ms) - {card_output.card_name_cn or '未知卡名'}") logger.info(f" => 搜索区间: [{start_time_ms}ms ~ {end_time_ms}ms]") # 定位到窗口开始时间 cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms) best_frame = None best_score = -1.0 best_time_ms = start_time_ms best_sharpness = 0.0 # 保护机制:最多读取这么多次,防止由于视频末尾造成的无限死循环 max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + 30 read_count = 0 while read_count < max_reads: current_pos_ms = cap.get(cv2.CAP_PROP_POS_MSEC) # 超出窗口最大时间,停止当前卡片的搜索 if current_pos_ms > end_time_ms: break ret, frame = cap.read() if not ret: break # 视频结束 # 计算原图清晰度 sharpness = self.get_laplacian_sharpness(frame) # 计算时间偏移带来的衰减权重 weight = self.calculate_weight(current_pos_ms, target_time_ms) # 综合评分 = 清晰度 * 时间权重 score = sharpness * weight # 更新最佳候选帧 if score > best_score: best_score = score best_frame = frame best_time_ms = current_pos_ms best_sharpness = sharpness read_count += 1 # 保存最清晰的一张 if best_frame is not None: filename = f"{uuid.uuid4()}_{int(best_time_ms)}.jpg" save_path = os.path.join(settings.FRAMES_DIR, filename) try: cv2.imwrite(save_path, best_frame) image_url = f"{settings.BASE_URL}/static/frames/{filename}" card_output.frame_image_path = image_url success_count += 1 time_diff = (best_time_ms - target_time_ms) / 1000.0 logger.info( f" ✅ 保存成功: {filename} (偏移: {time_diff:+.2f}s, 清晰度: {best_sharpness:.1f}, 综合分: {best_score:.1f})") except Exception as e: logger.error(f" ❌ 保存图片失败: {e}") else: logger.warning(f" ⚠️ 无法在窗口内读取到视频帧 (可能时间戳超出视频长度)") output_list.append(card_output) cap.release() logger.info(f"🏁 截取任务结束. 成功: {success_count}, 总数: {len(cards)}") return output_list