| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import cv2
- import os
- import uuid
- from app.core.config import settings
- from app.core.logger import get_logger
- from app.schemas.models import CardInfoInput, CardInfoOutput
- logger = get_logger("VideoService")
- class VideoService:
- def time_str_to_ms(self, time_str: str) -> int:
- try:
- parts = list(map(int, time_str.split(':')))
- if len(parts) == 3:
- h, m, s = parts
- return (h * 3600 + m * 60 + s) * 1000
- elif len(parts) == 2:
- m, s = parts
- return (m * 60 + s) * 1000
- return 0
- except ValueError:
- return 0
- def capture_frames(self, video_path: str, cards: list[CardInfoInput]) -> list[CardInfoOutput]:
- if not os.path.exists(video_path):
- logger.error(f"❌ 找不到视频文件: {video_path}")
- raise FileNotFoundError(f"Video file not found: {video_path}")
- logger.info(f"🎬 打开视频文件: {video_path}")
- logger.info(f"📋 待处理卡片数量: {len(cards)}")
- cap = cv2.VideoCapture(video_path)
- output_list = []
- success_count = 0
- for idx, card_input in enumerate(cards):
- # 将 Input 模型转为 Output 模型 (此时 path 为 None)
- card_output = CardInfoOutput(**card_input.dict())
- time_ms = self.time_str_to_ms(card_output.time)
- logger.info(
- f"📸 [{idx + 1}/{len(cards)}] 正在截取 {card_output.time} ({time_ms}ms) - {card_output.card_name_cn or '未知卡名'}")
- # 设定位置
- cap.set(cv2.CAP_PROP_POS_MSEC, time_ms)
- ret, frame = cap.read()
- if ret:
- filename = f"{uuid.uuid4()}_{time_ms}.jpg"
- save_path = os.path.join(settings.FRAMES_DIR, filename)
- try:
- cv2.imwrite(save_path, frame)
- card_output.frame_image_path = save_path
- success_count += 1
- logger.info(f" ✅ 保存成功: {filename}")
- except Exception as e:
- logger.error(f" ❌ 保存图片失败: {e}")
- else:
- logger.warning(f" ⚠️ 无法读取视频帧 (可能时间戳超出视频长度)")
- output_list.append(card_output)
- cap.release()
- logger.info(f"🏁 截取任务结束. 成功: {success_count}, 总数: {len(cards)}")
- return output_list
|