import cv2 import time from datetime import datetime import os import uuid from fastapi import APIRouter, HTTPException from pydantic import BaseModel from concurrent.futures import ThreadPoolExecutor from threading import Lock, Timer from app.core.config import settings # -------------------- 配置 -------------------- executor = ThreadPoolExecutor(max_workers=4) lock = Lock() # 摄像头任务状态 cam_status = {} router = APIRouter() class TaskRequest(BaseModel): camera_id: str task_name: str # -------------------- 录像函数 -------------------- def record_camera(camera_id: str, task_name: str, output_file: str): rtsp_url = settings.CAMERA_CONFIG[camera_id] cap = cv2.VideoCapture(rtsp_url) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 减少延迟 if not cap.isOpened(): raise RuntimeError(f"无法连接摄像头 {camera_id}") frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 0: fps = 25 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height)) print(f"[{camera_id}] 任务 {task_name} 开始录制...") try: while True: ret, frame = cap.read() if not ret: print(f"[{camera_id}] 丢帧,停止录像") break out.write(frame) # 检查任务是否被标记停止 with lock: status = cam_status.get(camera_id) if not status or status.get("stop_flag"): break time.sleep(0.001) finally: cap.release() out.release() print(f"[{camera_id}] 任务 {task_name} 完成,文件: {output_file}") def stop_task_internal(camera_id: str): with lock: status = cam_status.get(camera_id) if not status: return status["stop_flag"] = True status["timer"].cancel() # -------------------- API接口 -------------------- @router.post("/start_task") def start_task(req: TaskRequest): camera_id = req.camera_id task_name = req.task_name if camera_id not in settings.CAMERA_CONFIG: raise HTTPException(status_code=404, detail=f"摄像头 {camera_id} 未配置") with lock: if camera_id in cam_status: return {"status": "running", "task_name": cam_status[camera_id]["task_name"]} # 输出文件名 now_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") filename = f"{camera_id}_{task_name}_{now_time}.mp4" output_file = os.path.join(settings.OUTPUT_DIR, filename) # 记录状态 stop_flag = False timer = Timer(settings.MAX_TASK_SECONDS, lambda: stop_task_internal(camera_id)) timer.start() future = executor.submit(record_camera, camera_id, task_name, output_file) cam_status[camera_id] = { "task_name": task_name, "start_time": time.time(), "timer": timer, "future": future, "file": output_file, "stop_flag": stop_flag } return {"status": "started", "file": output_file} @router.post("/stop_task") def stop_task(req: TaskRequest): camera_id = req.camera_id task_name = req.task_name with lock: status = cam_status.get(camera_id) if not status: return {"status": "not_running"} if status["task_name"] != task_name: return {"status": "task_mismatch", "running_task": status["task_name"]} # 标记停止 status["stop_flag"] = True status["timer"].cancel() return {"status": "stopped", "file": status["file"]} @router.get("/status/{camera_id}") def status(camera_id: str): with lock: status = cam_status.get(camera_id) if not status: return {"status": "idle"} return { "status": "running", "task_name": status["task_name"], "start_time": status["start_time"], "file": status["file"] }