| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- import json
- import logging
- import threading
- import time
- from pathlib import Path
- from typing import Optional
- import paho.mqtt.client as mqtt
- from app.core.config import settings
- from app.services.camera_service import (
- CameraCaptureError,
- CameraUnavailableError,
- get_camera_service,
- )
- logger = logging.getLogger(__name__)
- class MqttCaptureError(RuntimeError):
- """MQTT 联动拍照失败。"""
- class MqttCaptureBusyError(MqttCaptureError):
- """当前已有任务在执行。"""
- _capture_lock = threading.Lock()
- class MqttCaptureService:
- """按请求临时连接 MQTT,拍完两张图后立即断开。"""
- def __init__(self) -> None:
- self._mqtt = settings.mqtt
- self._static = settings.static
- self._camera = get_camera_service()
- def capture_pair(self) -> dict[str, object]:
- if not _capture_lock.acquire(blocking=False):
- raise MqttCaptureBusyError("当前已有一个 MQTT 拍照任务正在执行,请稍后再试。")
- self._static.ensure_directories()
- request_id = f"req-{int(time.time() * 1000)}"
- front_path = self._static.front_image_path
- back_path = self._static.back_image_path
- state = {
- "connected": threading.Event(),
- "done": threading.Event(),
- "error": "",
- "status": None,
- "front_saved": False,
- "back_saved": False,
- }
- client = mqtt.Client(
- client_id=f"{self._mqtt.client_id_prefix}{int(time.time() * 1000)}",
- clean_session=True,
- )
- client.enable_logger(logger)
- def finish_with_error(message: str) -> None:
- if not state["error"]:
- state["error"] = message
- state["done"].set()
- def on_connect(client: mqtt.Client, userdata, flags, rc: int) -> None:
- if rc != 0:
- finish_with_error(f"MQTT 连接失败,返回码: {rc}")
- state["connected"].set()
- return
- for topic in (
- self._mqtt.status_topic,
- self._mqtt.error_topic,
- self._mqtt.camera_command_topic,
- ):
- result, _ = client.subscribe(topic, qos=1)
- if result != mqtt.MQTT_ERR_SUCCESS:
- finish_with_error(f"订阅 MQTT topic 失败: {topic}")
- break
- state["connected"].set()
- def on_disconnect(client: mqtt.Client, userdata, rc: int) -> None:
- if rc != 0 and not state["done"].is_set():
- finish_with_error(f"MQTT 连接意外断开,返回码: {rc}")
- def on_message(client: mqtt.Client, userdata, message: mqtt.MQTTMessage) -> None:
- payload_text = message.payload.decode("utf-8", errors="replace")
- logger.info("收到 MQTT 消息 topic=%s payload=%s", message.topic, payload_text)
- try:
- payload = json.loads(payload_text)
- except json.JSONDecodeError:
- finish_with_error(f"收到无法解析的 MQTT 消息: {message.topic}")
- return
- if message.topic == self._mqtt.status_topic:
- try:
- state["status"] = int(payload.get("status"))
- except (TypeError, ValueError):
- finish_with_error(f"收到非法状态消息: {payload}")
- return
- if payload.get("error_code") not in (None, 0):
- finish_with_error(f"机械臂返回错误状态: {payload}")
- return
- if state["status"] == 4:
- state["done"].set()
- return
- if message.topic == self._mqtt.error_topic:
- finish_with_error(f"机械臂返回错误消息: {payload}")
- return
- if message.topic != self._mqtt.camera_command_topic or payload.get("cmd") != "capture":
- return
- image_id = str(payload.get("id", "")).strip()
- image_path = self._get_image_path(image_id, front_path, back_path)
- if image_path is None:
- error_message = f"收到未知的拍照编号: {image_id}"
- self._send_camera_response(client, 0, error_message)
- finish_with_error(error_message)
- return
- try:
- self._camera.capture_to_file(image_path)
- except (CameraUnavailableError, CameraCaptureError, OSError) as exc:
- error_message = f"保存图片 {image_id}.jpg 失败: {exc}"
- self._send_camera_response(client, 0, error_message)
- finish_with_error(error_message)
- return
- if image_id == "1":
- state["front_saved"] = True
- if image_id == "2":
- state["back_saved"] = True
- self._send_camera_response(client, 1, "")
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_message = on_message
- try:
- try:
- client.connect(
- self._mqtt.broker,
- self._mqtt.port,
- self._mqtt.keep_alive_interval,
- )
- except Exception as exc:
- raise MqttCaptureError("连接 MQTT Broker 失败,请检查 broker 地址和网络。") from exc
- client.loop_start()
- if not state["connected"].wait(timeout=self._mqtt.connect_timeout_seconds):
- raise MqttCaptureError("连接 MQTT Broker 超时。")
- if state["error"]:
- raise MqttCaptureError(str(state["error"]))
- payload = {
- "cmd": "start",
- "cycles": self._mqtt.default_cycles,
- "request_id": request_id,
- "timestamp": int(time.time() * 1000),
- }
- publish_info = client.publish(self._mqtt.command_topic, json.dumps(payload), qos=1)
- if publish_info.rc != mqtt.MQTT_ERR_SUCCESS:
- raise MqttCaptureError("发送 start 指令失败。")
- publish_info.wait_for_publish()
- logger.info("已发送机械臂 start 指令: %s", payload)
- if not state["done"].wait(timeout=self._mqtt.request_timeout_seconds):
- raise MqttCaptureError("等待机械臂拍照流程结束超时。")
- if state["error"]:
- raise MqttCaptureError(str(state["error"]))
- if state["status"] != 4:
- raise MqttCaptureError(f"流程未正常结束,最终 status={state['status']}")
- if not state["front_saved"] or not front_path.exists():
- raise MqttCaptureError("没有拿到 1.jpg")
- if not state["back_saved"] or not back_path.exists():
- raise MqttCaptureError("没有拿到 2.jpg")
- return {
- "request_id": request_id,
- "status": state["status"],
- "front_path": front_path,
- "back_path": back_path,
- }
- finally:
- try:
- client.disconnect()
- except Exception:
- logger.warning("断开 MQTT 连接时发生异常。", exc_info=True)
- try:
- client.loop_stop()
- except Exception:
- logger.warning("停止 MQTT 循环时发生异常。", exc_info=True)
- _capture_lock.release()
- def _get_image_path(self, image_id: str, front_path: Path, back_path: Path) -> Optional[Path]:
- if image_id == "1":
- return front_path
- if image_id == "2":
- return back_path
- return None
- def _send_camera_response(
- self,
- client: mqtt.Client,
- success: int,
- error_message: str,
- ) -> None:
- payload = {
- "success": success,
- "error_message": error_message,
- "timestamp": int(time.time() * 1000),
- }
- client.publish(self._mqtt.camera_response_topic, json.dumps(payload), qos=1)
- logger.info("已发送拍照回执: %s", payload)
- def get_mqtt_capture_service() -> MqttCaptureService:
- return MqttCaptureService()
|