| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- import logging
- import os
- import shutil
- import subprocess
- import time
- from concurrent.futures import ThreadPoolExecutor
- from datetime import datetime
- from threading import Lock, Timer
- from typing import Any, Dict
- from fastapi import APIRouter, HTTPException
- from pydantic import BaseModel
- from app.core.config import settings
- from app.services.storage import get_storage_service, sanitize_name
- logger = logging.getLogger(__name__)
- executor = ThreadPoolExecutor(max_workers=4)
- lock = Lock()
- # 摄像头任务状态。当前按摄像头维度维护最新一条任务。
- cam_status: Dict[str, Dict[str, Any]] = {}
- # 任务索引。按 camera_id + task_name 保存最近一条同名任务状态,便于外部服务查询结果。
- task_status_index: Dict[tuple[str, str], Dict[str, Any]] = {}
- active_statuses = {"starting", "recording", "stopping", "uploading"}
- router = APIRouter()
- class TaskRequest(BaseModel):
- camera_id: str
- task_name: str
- def _build_local_file(camera_id: str, task_name: str, started_at: datetime) -> str:
- """生成本地录像文件路径。"""
- safe_task_name = sanitize_name(task_name)
- timestamp = started_at.strftime("%Y-%m-%d_%H-%M-%S")
- file_name = f"{sanitize_name(camera_id)}_{safe_task_name}_{timestamp}.mp4"
- return os.path.join(settings.OUTPUT_DIR, file_name)
- def _build_response(task_status: Dict[str, Any]) -> Dict[str, Any]:
- """过滤内部字段,构造接口返回内容。"""
- response = {
- "status": task_status.get("status"),
- "camera_id": task_status.get("camera_id"),
- "task_name": task_status.get("task_name"),
- "start_time": task_status.get("start_time"),
- "storage_type": task_status.get("storage_type"),
- "local_file": task_status.get("local_file"),
- "bucket": task_status.get("bucket"),
- "object_name": task_status.get("object_name"),
- "download_url": task_status.get("download_url"),
- "stop_reason": task_status.get("stop_reason"),
- "error": task_status.get("error"),
- "local_file_deleted": task_status.get("local_file_deleted"),
- }
- return {key: value for key, value in response.items() if value is not None}
- def _build_task_key(camera_id: str, task_name: str) -> tuple[str, str]:
- """生成任务索引键。"""
- return camera_id, task_name
- def _get_ffmpeg_executable() -> str:
- """解析 FFmpeg 可执行文件路径。"""
- configured_path = settings.FFMPEG_PATH.strip()
- if not configured_path:
- raise RuntimeError("FFMPEG_PATH 未配置")
- if os.path.isfile(configured_path):
- return configured_path
- resolved_path = shutil.which(configured_path)
- if resolved_path:
- return resolved_path
- raise RuntimeError(
- f"未找到 FFmpeg 可执行文件,请安装 ffmpeg 或配置 FFMPEG_PATH。当前值: {configured_path}"
- )
- def _build_ffmpeg_command(rtsp_url: str, local_file: str) -> list[str]:
- """构造 FFmpeg 录像命令。"""
- ffmpeg_path = _get_ffmpeg_executable()
- return [
- ffmpeg_path,
- "-y",
- "-hide_banner",
- "-loglevel",
- settings.FFMPEG_LOGLEVEL,
- "-rtsp_transport",
- settings.FFMPEG_RTSP_TRANSPORT,
- "-i",
- rtsp_url,
- "-an",
- "-c:v",
- "libx264",
- "-preset",
- settings.FFMPEG_PRESET,
- "-pix_fmt",
- "yuv420p",
- "-vf",
- "scale=trunc(iw/2)*2:trunc(ih/2)*2",
- "-movflags",
- "+faststart",
- local_file,
- ]
- def _set_stop_flag(camera_id: str, stop_reason: str) -> Dict[str, Any] | None:
- """将任务标记为停止中,由后台线程自行收尾。"""
- with lock:
- task_status = cam_status.get(camera_id)
- if not task_status:
- return None
- if task_status.get("status") in {"uploaded", "completed", "failed"}:
- return task_status
- task_status["stop_flag"] = True
- task_status["stop_reason"] = stop_reason
- if task_status.get("status") != "uploading":
- task_status["status"] = "stopping"
- timer = task_status.get("timer")
- if timer:
- timer.cancel()
- return task_status
- def stop_task_internal(camera_id: str) -> None:
- """超时后自动停止任务。"""
- task_status = _set_stop_flag(camera_id, "达到最大录制时长,系统已自动停止任务")
- if task_status:
- logger.info(
- "摄像头任务超时,已请求停止。摄像头: %s,任务: %s",
- camera_id,
- task_status.get("task_name"),
- )
- def record_camera(camera_id: str, task_name: str, local_file: str) -> int:
- """使用 FFmpeg 将摄像头视频流录制为浏览器兼容的 mp4 文件。"""
- rtsp_url = settings.CAMERA_CONFIG[camera_id]
- command = _build_ffmpeg_command(rtsp_url, local_file)
- logger.info(
- "开始使用 FFmpeg 录制摄像头视频。摄像头: %s,任务: %s,输出文件: %s,FFmpeg: %s",
- camera_id,
- task_name,
- local_file,
- command[0],
- )
- creation_flags = getattr(subprocess, "CREATE_NO_WINDOW", 0)
- process = subprocess.Popen(
- command,
- stdin=subprocess.PIPE,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.PIPE,
- text=True,
- encoding="utf-8",
- errors="ignore",
- creationflags=creation_flags,
- )
- stderr_output = ""
- try:
- while True:
- return_code = process.poll()
- if return_code is not None:
- break
- with lock:
- task_status = cam_status.get(camera_id)
- if not task_status or task_status.get("stop_flag"):
- logger.info("检测到停止标记,准备优雅停止 FFmpeg。摄像头: %s,任务: %s", camera_id, task_name)
- if process.stdin and not process.stdin.closed:
- try:
- process.stdin.write("q\n")
- process.stdin.flush()
- except OSError:
- logger.info("FFmpeg 标准输入已关闭,跳过优雅停止写入。摄像头: %s,任务: %s", camera_id, task_name)
- break
- time.sleep(0.2)
- finally:
- try:
- stderr_output = process.communicate(timeout=15)[1] or ""
- except subprocess.TimeoutExpired:
- logger.warning("FFmpeg 未能在预期时间内退出,准备强制结束。摄像头: %s,任务: %s", camera_id, task_name)
- process.kill()
- stderr_output = process.communicate(timeout=5)[1] or ""
- logger.info(
- "FFmpeg 录像进程已结束。摄像头: %s,任务: %s,退出码: %s,本地文件: %s",
- camera_id,
- task_name,
- process.returncode,
- local_file,
- )
- if process.returncode != 0:
- error_message = stderr_output.strip() or f"FFmpeg 退出码异常: {process.returncode}"
- raise RuntimeError(f"FFmpeg 录像失败: {error_message}")
- if not os.path.exists(local_file) or os.path.getsize(local_file) <= 0:
- raise RuntimeError(f"FFmpeg 未生成有效录像文件: {local_file}")
- file_size = os.path.getsize(local_file)
- logger.info(
- "FFmpeg 已生成浏览器兼容的 mp4 文件。摄像头: %s,任务: %s,文件大小: %s 字节",
- camera_id,
- task_name,
- file_size,
- )
- return file_size
- def run_recording_task(camera_id: str, task_name: str, local_file: str, started_at: datetime) -> None:
- """后台执行录像和 MinIO 上传。"""
- try:
- with lock:
- task_status = cam_status.get(camera_id)
- if task_status:
- task_status["status"] = "recording"
- record_camera(camera_id, task_name, local_file)
- with lock:
- task_status = cam_status.get(camera_id)
- if task_status:
- task_status["status"] = "uploading" if settings.STORAGE_TYPE == "minio" else "completed"
- storage_result = get_storage_service().save_recording(camera_id, task_name, local_file, started_at)
- with lock:
- task_status = cam_status.get(camera_id)
- if task_status:
- task_status.update(
- {
- "status": "uploaded" if storage_result.storage_type == "minio" else "completed",
- "storage_type": storage_result.storage_type,
- "bucket": storage_result.bucket,
- "object_name": storage_result.object_name,
- "download_url": storage_result.download_url,
- "local_file_deleted": storage_result.local_file_deleted,
- "error": None,
- }
- )
- task_status_index[_build_task_key(camera_id, task_name)] = task_status
- logger.info(
- "任务处理完成。摄像头: %s,任务: %s,最终状态: %s",
- camera_id,
- task_name,
- "uploaded" if storage_result.storage_type == "minio" else "completed",
- )
- except Exception as exc:
- logger.exception("摄像头任务执行失败。摄像头: %s,任务: %s,错误: %s", camera_id, task_name, exc)
- with lock:
- task_status = cam_status.get(camera_id)
- if task_status:
- task_status["status"] = "failed"
- task_status["error"] = str(exc)
- task_status_index[_build_task_key(camera_id, task_name)] = task_status
- finally:
- with lock:
- task_status = cam_status.get(camera_id)
- if task_status and task_status.get("timer"):
- task_status["timer"].cancel()
- @router.post("/start_task")
- def start_task(req: TaskRequest):
- camera_id = req.camera_id
- task_name = req.task_name
- logger.info("收到启动录像请求。摄像头: %s,任务: %s", camera_id, task_name)
- if camera_id not in settings.CAMERA_CONFIG:
- raise HTTPException(status_code=404, detail=f"摄像头 {camera_id} 未配置")
- try:
- ffmpeg_executable = _get_ffmpeg_executable()
- except RuntimeError as exc:
- logger.error("启动录像前检查 FFmpeg 失败: %s", exc)
- raise HTTPException(status_code=500, detail=str(exc))
- started_at = datetime.now()
- local_file = _build_local_file(camera_id, task_name, started_at)
- with lock:
- existing_status = cam_status.get(camera_id)
- if existing_status and existing_status.get("status") in active_statuses:
- logger.info(
- "摄像头已有任务在执行,拒绝重复启动。摄像头: %s,当前任务: %s,当前状态: %s",
- camera_id,
- existing_status.get("task_name"),
- existing_status.get("status"),
- )
- return _build_response(existing_status)
- timer = Timer(settings.MAX_TASK_SECONDS, lambda: stop_task_internal(camera_id))
- cam_status[camera_id] = {
- "camera_id": camera_id,
- "task_name": task_name,
- "start_time": started_at.strftime("%Y-%m-%d %H:%M:%S"),
- "storage_type": settings.STORAGE_TYPE,
- "status": "starting",
- "local_file": local_file,
- "bucket": None,
- "object_name": None,
- "download_url": None,
- "local_file_deleted": False,
- "stop_reason": None,
- "error": None,
- "timer": timer,
- "future": None,
- "stop_flag": False,
- }
- task_status_index[_build_task_key(camera_id, task_name)] = cam_status[camera_id]
- try:
- future = executor.submit(run_recording_task, camera_id, task_name, local_file, started_at)
- except Exception as exc:
- timer.cancel()
- cam_status.pop(camera_id, None)
- logger.exception("提交后台录像任务失败。摄像头: %s,任务: %s,错误: %s", camera_id, task_name, exc)
- raise HTTPException(status_code=500, detail="创建录像任务失败")
- cam_status[camera_id]["future"] = future
- timer.start()
- response = _build_response(cam_status[camera_id])
- logger.info(
- "录像任务已提交到后台线程。摄像头: %s,任务: %s,本地文件: %s,FFmpeg: %s",
- camera_id,
- task_name,
- local_file,
- ffmpeg_executable,
- )
- return response
- @router.post("/stop_task")
- def stop_task(req: TaskRequest):
- camera_id = req.camera_id
- task_name = req.task_name
- logger.info("收到停止录像请求。摄像头: %s,任务: %s", camera_id, task_name)
- with lock:
- task_status = cam_status.get(camera_id)
- if not task_status:
- return {"status": "not_running", "camera_id": camera_id}
- if task_status.get("task_name") != task_name:
- return {
- "status": "task_mismatch",
- "camera_id": camera_id,
- "running_task": task_status.get("task_name"),
- }
- task_status = _set_stop_flag(camera_id, "收到停止请求,等待录像线程安全结束")
- if not task_status:
- return {"status": "not_running", "camera_id": camera_id}
- logger.info("已向后台录像线程发送停止信号。摄像头: %s,任务: %s", camera_id, task_name)
- return _build_response(task_status)
- @router.get("/status/{camera_id}")
- def status(camera_id: str):
- with lock:
- task_status = cam_status.get(camera_id)
- if not task_status:
- return {"status": "idle", "camera_id": camera_id}
- return _build_response(task_status)
- @router.post("/query_task")
- def query_task(req: TaskRequest):
- """按 camera_id 和 task_name 查询任务状态。"""
- camera_id = req.camera_id
- task_name = req.task_name
- logger.info("收到任务查询请求。摄像头: %s,任务: %s", camera_id, task_name)
- with lock:
- task_status = task_status_index.get(_build_task_key(camera_id, task_name))
- if not task_status:
- return {
- "status": "not_found",
- "camera_id": camera_id,
- "task_name": task_name,
- }
- response = _build_response(task_status)
- logger.info(
- "任务查询完成。摄像头: %s,任务: %s,当前状态: %s",
- camera_id,
- task_name,
- response.get("status"),
- )
- return response
|