| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import cv2
- import os
- import uuid
- import math
- from app.core.config import settings
- from app.core.logger import get_logger
- from app.schemas.models import CardInfoInput, CardInfoOutput
- logger = get_logger("VideoService")
- class VideoService:
- def __init__(self):
- # 高斯函数中的 sigma (标准差) 决定了时间权重的下降速度。
- self.weight_sigma = 10.0
- def time_str_to_ms(self, time_str: str) -> int:
- try:
- parts = list(map(int, time_str.split(':')))
- if len(parts) == 3:
- h, m, s = parts
- return (h * 3600 + m * 60 + s) * 1000
- elif len(parts) == 2:
- m, s = parts
- return (m * 60 + s) * 1000
- return 0
- except ValueError:
- return 0
- def get_laplacian_sharpness(self, frame) -> float:
- """
- 计算图像的拉普拉斯方差。
- 方差越大,代表图像包含的高频边缘信息越多,也就意味着对焦越准、越清晰。
- """
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- return cv2.Laplacian(gray, cv2.CV_64F).var()
- def calculate_weight(self, current_time_ms: int, target_time_ms: int) -> float:
- """
- 计算时间权重:使用高斯衰减函数。距离目标时间越近,权重越高。
- """
- diff_seconds = abs(current_time_ms - target_time_ms) / 1000.0
- return math.exp(- (diff_seconds ** 2) / (2 * self.weight_sigma ** 2))
- def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]:
- if not os.path.exists(video_path):
- logger.error(f"❌ 找不到视频文件: {video_path}")
- raise FileNotFoundError(f"Video file not found: {video_path}")
- logger.info(f"🎬 打开视频文件: {video_path}")
- logger.info(f"📋 待处理卡片数量: {len(cards)}")
- cap = cv2.VideoCapture(video_path)
- # 获取视频帧率,用于计算安全边界
- fps = cap.get(cv2.CAP_PROP_FPS)
- if fps <= 0:
- fps = 30.0
- output_list = []
- success_count = 0
- for idx, card_input in enumerate(cards):
- card_output = CardInfoOutput(**card_input.dict())
- target_time_ms = self.time_str_to_ms(card_output.time)
- # 设定搜索窗口区间: [目标时间 - 1秒, 目标时间 + 4秒]
- start_time_ms = max(0, target_time_ms - 1000)
- end_time_ms = target_time_ms + 4000
- logger.info(
- f"📸[{idx + 1}/{len(cards)}] 智能截取 {card_output.time} ({target_time_ms}ms) - {card_output.card_name_cn or '未知卡名'}")
- logger.info(f" => 搜索区间: [{start_time_ms}ms ~ {end_time_ms}ms]")
- # 定位到窗口开始时间
- cap.set(cv2.CAP_PROP_POS_MSEC, start_time_ms)
- best_frame = None
- best_score = -1.0
- best_time_ms = start_time_ms
- best_sharpness = 0.0
- # 保护机制:最多读取这么多次,防止由于视频末尾造成的无限死循环
- max_reads = int((end_time_ms - start_time_ms) / 1000.0 * fps) + 30
- read_count = 0
- while read_count < max_reads:
- current_pos_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
- # 超出窗口最大时间,停止当前卡片的搜索
- if current_pos_ms > end_time_ms:
- break
- ret, frame = cap.read()
- if not ret:
- break # 视频结束
- # 计算原图清晰度
- sharpness = self.get_laplacian_sharpness(frame)
- # 计算时间偏移带来的衰减权重
- weight = self.calculate_weight(current_pos_ms, target_time_ms)
- # 综合评分 = 清晰度 * 时间权重
- score = sharpness * weight
- # 更新最佳候选帧
- if score > best_score:
- best_score = score
- best_frame = frame
- best_time_ms = current_pos_ms
- best_sharpness = sharpness
- read_count += 1
- # 保存最清晰的一张
- if best_frame is not None:
- filename = f"{uuid.uuid4()}_{int(best_time_ms)}.jpg"
- save_path = os.path.join(settings.FRAMES_DIR, filename)
- try:
- cv2.imwrite(save_path, best_frame)
- image_url = f"{settings.BASE_URL}/static/frames/{filename}"
- card_output.frame_image_path = image_url
- success_count += 1
- time_diff = (best_time_ms - target_time_ms) / 1000.0
- logger.info(
- f" ✅ 保存成功: {filename} (偏移: {time_diff:+.2f}s, 清晰度: {best_sharpness:.1f}, 综合分: {best_score:.1f})")
- except Exception as e:
- logger.error(f" ❌ 保存图片失败: {e}")
- else:
- logger.warning(f" ⚠️ 无法在窗口内读取到视频帧 (可能时间戳超出视频长度)")
- output_list.append(card_output)
- cap.release()
- logger.info(f"🏁 截取任务结束. 成功: {success_count}, 总数: {len(cards)}")
- return output_list
|