import cv2 import os import uuid from app.core.config import settings from app.core.logger import get_logger from app.schemas.models import CardInfoInput, CardInfoOutput logger = get_logger("VideoService") class VideoService: def time_str_to_ms(self, time_str: str) -> int: try: parts = list(map(int, time_str.split(':'))) if len(parts) == 3: h, m, s = parts return (h * 3600 + m * 60 + s) * 1000 elif len(parts) == 2: m, s = parts return (m * 60 + s) * 1000 return 0 except ValueError: return 0 def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]: if not os.path.exists(video_path): logger.error(f"❌ 找不到视频文件: {video_path}") raise FileNotFoundError(f"Video file not found: {video_path}") logger.info(f"🎬 打开视频文件: {video_path}") logger.info(f"📋 待处理卡片数量: {len(cards)}") cap = cv2.VideoCapture(video_path) output_list = [] success_count = 0 for idx, card_input in enumerate(cards): # 将 Input 模型转为 Output 模型 (此时 path 为 None) card_output = CardInfoOutput(**card_input.dict()) time_ms = self.time_str_to_ms(card_output.time) logger.info( f"📸 [{idx + 1}/{len(cards)}] 正在截取 {card_output.time} ({time_ms}ms) - {card_output.card_name_cn or '未知卡名'}") # 设定位置 cap.set(cv2.CAP_PROP_POS_MSEC, time_ms) ret, frame = cap.read() if ret: filename = f"{uuid.uuid4()}_{time_ms}.jpg" save_path = os.path.join(settings.FRAMES_DIR, filename) try: cv2.imwrite(save_path, frame) card_output.frame_image_path = save_path success_count += 1 logger.info(f" ✅ 保存成功: {filename}") except Exception as e: logger.error(f" ❌ 保存图片失败: {e}") else: logger.warning(f" ⚠️ 无法读取视频帧 (可能时间戳超出视频长度)") output_list.append(card_output) cap.release() logger.info(f"🏁 截取任务结束. 成功: {success_count}, 总数: {len(cards)}") return output_list