camera.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. import logging
  2. import os
  3. import shutil
  4. import subprocess
  5. import time
  6. from concurrent.futures import ThreadPoolExecutor
  7. from datetime import datetime
  8. from threading import Lock, Timer
  9. from typing import Any, Dict
  10. from fastapi import APIRouter, HTTPException
  11. from pydantic import BaseModel
  12. from app.core.config import settings
  13. from app.services.storage import get_storage_service, sanitize_name
  14. logger = logging.getLogger(__name__)
  15. executor = ThreadPoolExecutor(max_workers=4)
  16. lock = Lock()
  17. # 摄像头任务状态。当前按摄像头维度维护最新一条任务。
  18. cam_status: Dict[str, Dict[str, Any]] = {}
  19. # 任务索引。按 camera_id + task_name 保存最近一条同名任务状态,便于外部服务查询结果。
  20. task_status_index: Dict[tuple[str, str], Dict[str, Any]] = {}
  21. active_statuses = {"starting", "recording", "stopping", "uploading"}
  22. router = APIRouter()
  23. class TaskRequest(BaseModel):
  24. camera_id: str
  25. task_name: str
  26. def _build_local_file(camera_id: str, task_name: str, started_at: datetime) -> str:
  27. """生成本地录像文件路径。"""
  28. safe_task_name = sanitize_name(task_name)
  29. timestamp = started_at.strftime("%Y-%m-%d_%H-%M-%S")
  30. file_name = f"{sanitize_name(camera_id)}_{safe_task_name}_{timestamp}.mp4"
  31. return os.path.join(settings.OUTPUT_DIR, file_name)
  32. def _build_response(task_status: Dict[str, Any]) -> Dict[str, Any]:
  33. """过滤内部字段,构造接口返回内容。"""
  34. response = {
  35. "status": task_status.get("status"),
  36. "camera_id": task_status.get("camera_id"),
  37. "task_name": task_status.get("task_name"),
  38. "start_time": task_status.get("start_time"),
  39. "storage_type": task_status.get("storage_type"),
  40. "local_file": task_status.get("local_file"),
  41. "bucket": task_status.get("bucket"),
  42. "object_name": task_status.get("object_name"),
  43. "download_url": task_status.get("download_url"),
  44. "stop_reason": task_status.get("stop_reason"),
  45. "error": task_status.get("error"),
  46. "local_file_deleted": task_status.get("local_file_deleted"),
  47. }
  48. return {key: value for key, value in response.items() if value is not None}
  49. def _build_task_key(camera_id: str, task_name: str) -> tuple[str, str]:
  50. """生成任务索引键。"""
  51. return camera_id, task_name
  52. def _get_ffmpeg_executable() -> str:
  53. """解析 FFmpeg 可执行文件路径。"""
  54. configured_path = settings.FFMPEG_PATH.strip()
  55. if not configured_path:
  56. raise RuntimeError("FFMPEG_PATH 未配置")
  57. if os.path.isfile(configured_path):
  58. return configured_path
  59. resolved_path = shutil.which(configured_path)
  60. if resolved_path:
  61. return resolved_path
  62. raise RuntimeError(
  63. f"未找到 FFmpeg 可执行文件,请安装 ffmpeg 或配置 FFMPEG_PATH。当前值: {configured_path}"
  64. )
  65. def _build_ffmpeg_command(rtsp_url: str, local_file: str) -> list[str]:
  66. """构造 FFmpeg 录像命令。"""
  67. ffmpeg_path = _get_ffmpeg_executable()
  68. return [
  69. ffmpeg_path,
  70. "-y",
  71. "-hide_banner",
  72. "-loglevel",
  73. settings.FFMPEG_LOGLEVEL,
  74. "-rtsp_transport",
  75. settings.FFMPEG_RTSP_TRANSPORT,
  76. "-i",
  77. rtsp_url,
  78. "-an",
  79. "-c:v",
  80. "libx264",
  81. "-preset",
  82. settings.FFMPEG_PRESET,
  83. "-pix_fmt",
  84. "yuv420p",
  85. "-vf",
  86. "scale=trunc(iw/2)*2:trunc(ih/2)*2",
  87. "-movflags",
  88. "+faststart",
  89. local_file,
  90. ]
  91. def _set_stop_flag(camera_id: str, stop_reason: str) -> Dict[str, Any] | None:
  92. """将任务标记为停止中,由后台线程自行收尾。"""
  93. with lock:
  94. task_status = cam_status.get(camera_id)
  95. if not task_status:
  96. return None
  97. if task_status.get("status") in {"uploaded", "completed", "failed"}:
  98. return task_status
  99. task_status["stop_flag"] = True
  100. task_status["stop_reason"] = stop_reason
  101. if task_status.get("status") != "uploading":
  102. task_status["status"] = "stopping"
  103. timer = task_status.get("timer")
  104. if timer:
  105. timer.cancel()
  106. return task_status
  107. def stop_task_internal(camera_id: str) -> None:
  108. """超时后自动停止任务。"""
  109. task_status = _set_stop_flag(camera_id, "达到最大录制时长,系统已自动停止任务")
  110. if task_status:
  111. logger.info(
  112. "摄像头任务超时,已请求停止。摄像头: %s,任务: %s",
  113. camera_id,
  114. task_status.get("task_name"),
  115. )
  116. def record_camera(camera_id: str, task_name: str, local_file: str) -> int:
  117. """使用 FFmpeg 将摄像头视频流录制为浏览器兼容的 mp4 文件。"""
  118. rtsp_url = settings.CAMERA_CONFIG[camera_id]
  119. command = _build_ffmpeg_command(rtsp_url, local_file)
  120. logger.info(
  121. "开始使用 FFmpeg 录制摄像头视频。摄像头: %s,任务: %s,输出文件: %s,FFmpeg: %s",
  122. camera_id,
  123. task_name,
  124. local_file,
  125. command[0],
  126. )
  127. creation_flags = getattr(subprocess, "CREATE_NO_WINDOW", 0)
  128. process = subprocess.Popen(
  129. command,
  130. stdin=subprocess.PIPE,
  131. stdout=subprocess.DEVNULL,
  132. stderr=subprocess.PIPE,
  133. text=True,
  134. encoding="utf-8",
  135. errors="ignore",
  136. creationflags=creation_flags,
  137. )
  138. stderr_output = ""
  139. try:
  140. while True:
  141. return_code = process.poll()
  142. if return_code is not None:
  143. break
  144. with lock:
  145. task_status = cam_status.get(camera_id)
  146. if not task_status or task_status.get("stop_flag"):
  147. logger.info("检测到停止标记,准备优雅停止 FFmpeg。摄像头: %s,任务: %s", camera_id, task_name)
  148. if process.stdin and not process.stdin.closed:
  149. try:
  150. process.stdin.write("q\n")
  151. process.stdin.flush()
  152. except OSError:
  153. logger.info("FFmpeg 标准输入已关闭,跳过优雅停止写入。摄像头: %s,任务: %s", camera_id, task_name)
  154. break
  155. time.sleep(0.2)
  156. finally:
  157. try:
  158. stderr_output = process.communicate(timeout=15)[1] or ""
  159. except subprocess.TimeoutExpired:
  160. logger.warning("FFmpeg 未能在预期时间内退出,准备强制结束。摄像头: %s,任务: %s", camera_id, task_name)
  161. process.kill()
  162. stderr_output = process.communicate(timeout=5)[1] or ""
  163. logger.info(
  164. "FFmpeg 录像进程已结束。摄像头: %s,任务: %s,退出码: %s,本地文件: %s",
  165. camera_id,
  166. task_name,
  167. process.returncode,
  168. local_file,
  169. )
  170. if process.returncode != 0:
  171. error_message = stderr_output.strip() or f"FFmpeg 退出码异常: {process.returncode}"
  172. raise RuntimeError(f"FFmpeg 录像失败: {error_message}")
  173. if not os.path.exists(local_file) or os.path.getsize(local_file) <= 0:
  174. raise RuntimeError(f"FFmpeg 未生成有效录像文件: {local_file}")
  175. file_size = os.path.getsize(local_file)
  176. logger.info(
  177. "FFmpeg 已生成浏览器兼容的 mp4 文件。摄像头: %s,任务: %s,文件大小: %s 字节",
  178. camera_id,
  179. task_name,
  180. file_size,
  181. )
  182. return file_size
  183. def run_recording_task(camera_id: str, task_name: str, local_file: str, started_at: datetime) -> None:
  184. """后台执行录像和 MinIO 上传。"""
  185. try:
  186. with lock:
  187. task_status = cam_status.get(camera_id)
  188. if task_status:
  189. task_status["status"] = "recording"
  190. record_camera(camera_id, task_name, local_file)
  191. with lock:
  192. task_status = cam_status.get(camera_id)
  193. if task_status:
  194. task_status["status"] = "uploading" if settings.STORAGE_TYPE == "minio" else "completed"
  195. storage_result = get_storage_service().save_recording(camera_id, task_name, local_file, started_at)
  196. with lock:
  197. task_status = cam_status.get(camera_id)
  198. if task_status:
  199. task_status.update(
  200. {
  201. "status": "uploaded" if storage_result.storage_type == "minio" else "completed",
  202. "storage_type": storage_result.storage_type,
  203. "bucket": storage_result.bucket,
  204. "object_name": storage_result.object_name,
  205. "download_url": storage_result.download_url,
  206. "local_file_deleted": storage_result.local_file_deleted,
  207. "error": None,
  208. }
  209. )
  210. task_status_index[_build_task_key(camera_id, task_name)] = task_status
  211. logger.info(
  212. "任务处理完成。摄像头: %s,任务: %s,最终状态: %s",
  213. camera_id,
  214. task_name,
  215. "uploaded" if storage_result.storage_type == "minio" else "completed",
  216. )
  217. except Exception as exc:
  218. logger.exception("摄像头任务执行失败。摄像头: %s,任务: %s,错误: %s", camera_id, task_name, exc)
  219. with lock:
  220. task_status = cam_status.get(camera_id)
  221. if task_status:
  222. task_status["status"] = "failed"
  223. task_status["error"] = str(exc)
  224. task_status_index[_build_task_key(camera_id, task_name)] = task_status
  225. finally:
  226. with lock:
  227. task_status = cam_status.get(camera_id)
  228. if task_status and task_status.get("timer"):
  229. task_status["timer"].cancel()
  230. @router.post("/start_task")
  231. def start_task(req: TaskRequest):
  232. camera_id = req.camera_id
  233. task_name = req.task_name
  234. logger.info("收到启动录像请求。摄像头: %s,任务: %s", camera_id, task_name)
  235. if camera_id not in settings.CAMERA_CONFIG:
  236. raise HTTPException(status_code=404, detail=f"摄像头 {camera_id} 未配置")
  237. try:
  238. ffmpeg_executable = _get_ffmpeg_executable()
  239. except RuntimeError as exc:
  240. logger.error("启动录像前检查 FFmpeg 失败: %s", exc)
  241. raise HTTPException(status_code=500, detail=str(exc))
  242. started_at = datetime.now()
  243. local_file = _build_local_file(camera_id, task_name, started_at)
  244. with lock:
  245. existing_status = cam_status.get(camera_id)
  246. if existing_status and existing_status.get("status") in active_statuses:
  247. logger.info(
  248. "摄像头已有任务在执行,拒绝重复启动。摄像头: %s,当前任务: %s,当前状态: %s",
  249. camera_id,
  250. existing_status.get("task_name"),
  251. existing_status.get("status"),
  252. )
  253. return _build_response(existing_status)
  254. timer = Timer(settings.MAX_TASK_SECONDS, lambda: stop_task_internal(camera_id))
  255. cam_status[camera_id] = {
  256. "camera_id": camera_id,
  257. "task_name": task_name,
  258. "start_time": started_at.strftime("%Y-%m-%d %H:%M:%S"),
  259. "storage_type": settings.STORAGE_TYPE,
  260. "status": "starting",
  261. "local_file": local_file,
  262. "bucket": None,
  263. "object_name": None,
  264. "download_url": None,
  265. "local_file_deleted": False,
  266. "stop_reason": None,
  267. "error": None,
  268. "timer": timer,
  269. "future": None,
  270. "stop_flag": False,
  271. }
  272. task_status_index[_build_task_key(camera_id, task_name)] = cam_status[camera_id]
  273. try:
  274. future = executor.submit(run_recording_task, camera_id, task_name, local_file, started_at)
  275. except Exception as exc:
  276. timer.cancel()
  277. cam_status.pop(camera_id, None)
  278. logger.exception("提交后台录像任务失败。摄像头: %s,任务: %s,错误: %s", camera_id, task_name, exc)
  279. raise HTTPException(status_code=500, detail="创建录像任务失败")
  280. cam_status[camera_id]["future"] = future
  281. timer.start()
  282. response = _build_response(cam_status[camera_id])
  283. logger.info(
  284. "录像任务已提交到后台线程。摄像头: %s,任务: %s,本地文件: %s,FFmpeg: %s",
  285. camera_id,
  286. task_name,
  287. local_file,
  288. ffmpeg_executable,
  289. )
  290. return response
  291. @router.post("/stop_task")
  292. def stop_task(req: TaskRequest):
  293. camera_id = req.camera_id
  294. task_name = req.task_name
  295. logger.info("收到停止录像请求。摄像头: %s,任务: %s", camera_id, task_name)
  296. with lock:
  297. task_status = cam_status.get(camera_id)
  298. if not task_status:
  299. return {"status": "not_running", "camera_id": camera_id}
  300. if task_status.get("task_name") != task_name:
  301. return {
  302. "status": "task_mismatch",
  303. "camera_id": camera_id,
  304. "running_task": task_status.get("task_name"),
  305. }
  306. task_status = _set_stop_flag(camera_id, "收到停止请求,等待录像线程安全结束")
  307. if not task_status:
  308. return {"status": "not_running", "camera_id": camera_id}
  309. logger.info("已向后台录像线程发送停止信号。摄像头: %s,任务: %s", camera_id, task_name)
  310. return _build_response(task_status)
  311. @router.get("/status/{camera_id}")
  312. def status(camera_id: str):
  313. with lock:
  314. task_status = cam_status.get(camera_id)
  315. if not task_status:
  316. return {"status": "idle", "camera_id": camera_id}
  317. return _build_response(task_status)
  318. @router.post("/query_task")
  319. def query_task(req: TaskRequest):
  320. """按 camera_id 和 task_name 查询任务状态。"""
  321. camera_id = req.camera_id
  322. task_name = req.task_name
  323. logger.info("收到任务查询请求。摄像头: %s,任务: %s", camera_id, task_name)
  324. with lock:
  325. task_status = task_status_index.get(_build_task_key(camera_id, task_name))
  326. if not task_status:
  327. return {
  328. "status": "not_found",
  329. "camera_id": camera_id,
  330. "task_name": task_name,
  331. }
  332. response = _build_response(task_status)
  333. logger.info(
  334. "任务查询完成。摄像头: %s,任务: %s,当前状态: %s",
  335. camera_id,
  336. task_name,
  337. response.get("status"),
  338. )
  339. return response