| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import cv2
- import time
- from datetime import datetime
- import os
- import uuid
- from fastapi import APIRouter, HTTPException
- from pydantic import BaseModel
- from concurrent.futures import ThreadPoolExecutor
- from threading import Lock, Timer
- from app.core.config import settings
- # -------------------- 配置 --------------------
- executor = ThreadPoolExecutor(max_workers=4)
- lock = Lock()
- # 摄像头任务状态
- cam_status = {}
- router = APIRouter()
- class TaskRequest(BaseModel):
- camera_id: str
- task_name: str
- # -------------------- 录像函数 --------------------
- def record_camera(camera_id: str, task_name: str, output_file: str):
- rtsp_url = settings.CAMERA_CONFIG[camera_id]
- cap = cv2.VideoCapture(rtsp_url)
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 减少延迟
- if not cap.isOpened():
- raise RuntimeError(f"无法连接摄像头 {camera_id}")
- frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- fps = cap.get(cv2.CAP_PROP_FPS)
- if fps <= 0: fps = 25
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
- out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))
- print(f"[{camera_id}] 任务 {task_name} 开始录制...")
- try:
- while True:
- ret, frame = cap.read()
- if not ret:
- print(f"[{camera_id}] 丢帧,停止录像")
- break
- out.write(frame)
- # 检查任务是否被标记停止
- with lock:
- status = cam_status.get(camera_id)
- if not status or status.get("stop_flag"):
- break
- time.sleep(0.001)
- finally:
- cap.release()
- out.release()
- print(f"[{camera_id}] 任务 {task_name} 完成,文件: {output_file}")
- def stop_task_internal(camera_id: str):
- with lock:
- status = cam_status.get(camera_id)
- if not status:
- return
- status["stop_flag"] = True
- status["timer"].cancel()
- # -------------------- API接口 --------------------
- @router.post("/start_task")
- def start_task(req: TaskRequest):
- camera_id = req.camera_id
- task_name = req.task_name
- if camera_id not in settings.CAMERA_CONFIG:
- raise HTTPException(status_code=404, detail=f"摄像头 {camera_id} 未配置")
- with lock:
- if camera_id in cam_status:
- return {"status": "running", "task_name": cam_status[camera_id]["task_name"]}
- # 输出文件名
- now_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- filename = f"{camera_id}_{task_name}_{now_time}.mp4"
- output_file = os.path.join(settings.OUTPUT_DIR, filename)
- # 记录状态
- stop_flag = False
- timer = Timer(settings.MAX_TASK_SECONDS, lambda: stop_task_internal(camera_id))
- timer.start()
- future = executor.submit(record_camera, camera_id, task_name, output_file)
- cam_status[camera_id] = {
- "task_name": task_name,
- "start_time": time.time(),
- "timer": timer,
- "future": future,
- "file": output_file,
- "stop_flag": stop_flag
- }
- return {"status": "started", "file": output_file}
- @router.post("/stop_task")
- def stop_task(req: TaskRequest):
- camera_id = req.camera_id
- task_name = req.task_name
- with lock:
- status = cam_status.get(camera_id)
- if not status:
- return {"status": "not_running"}
- if status["task_name"] != task_name:
- return {"status": "task_mismatch", "running_task": status["task_name"]}
- # 标记停止
- status["stop_flag"] = True
- status["timer"].cancel()
- return {"status": "stopped", "file": status["file"]}
- @router.get("/status/{camera_id}")
- def status(camera_id: str):
- with lock:
- status = cam_status.get(camera_id)
- if not status:
- return {"status": "idle"}
- return {
- "status": "running",
- "task_name": status["task_name"],
- "start_time": status["start_time"],
- "file": status["file"]
- }
|