import json import logging import threading import time from pathlib import Path from typing import Optional import paho.mqtt.client as mqtt from app.core.config import settings from app.services.camera_service import ( CameraCaptureError, CameraUnavailableError, get_camera_service, ) logger = logging.getLogger(__name__) class MqttCaptureError(RuntimeError): """MQTT 联动拍照失败。""" class MqttCaptureBusyError(MqttCaptureError): """当前已有任务在执行。""" _capture_lock = threading.Lock() class MqttCaptureService: """按请求临时连接 MQTT,拍完两张图后立即断开。""" def __init__(self) -> None: self._mqtt = settings.mqtt self._static = settings.static self._camera = get_camera_service() def capture_pair(self) -> dict[str, object]: if not _capture_lock.acquire(blocking=False): raise MqttCaptureBusyError("当前已有一个 MQTT 拍照任务正在执行,请稍后再试。") self._static.ensure_directories() request_id = f"req-{int(time.time() * 1000)}" front_path = self._static.front_image_path back_path = self._static.back_image_path state = { "connected": threading.Event(), "done": threading.Event(), "error": "", "status": None, "front_saved": False, "back_saved": False, } client = mqtt.Client( client_id=f"{self._mqtt.client_id_prefix}{int(time.time() * 1000)}", clean_session=True, ) client.enable_logger(logger) def finish_with_error(message: str) -> None: if not state["error"]: state["error"] = message state["done"].set() def on_connect(client: mqtt.Client, userdata, flags, rc: int) -> None: if rc != 0: finish_with_error(f"MQTT 连接失败,返回码: {rc}") state["connected"].set() return for topic in ( self._mqtt.status_topic, self._mqtt.error_topic, self._mqtt.camera_command_topic, ): result, _ = client.subscribe(topic, qos=1) if result != mqtt.MQTT_ERR_SUCCESS: finish_with_error(f"订阅 MQTT topic 失败: {topic}") break state["connected"].set() def on_disconnect(client: mqtt.Client, userdata, rc: int) -> None: if rc != 0 and not state["done"].is_set(): finish_with_error(f"MQTT 连接意外断开,返回码: {rc}") def on_message(client: mqtt.Client, userdata, message: mqtt.MQTTMessage) -> None: payload_text = message.payload.decode("utf-8", errors="replace") logger.info("收到 MQTT 消息 topic=%s payload=%s", message.topic, payload_text) try: payload = json.loads(payload_text) except json.JSONDecodeError: finish_with_error(f"收到无法解析的 MQTT 消息: {message.topic}") return if message.topic == self._mqtt.status_topic: try: state["status"] = int(payload.get("status")) except (TypeError, ValueError): finish_with_error(f"收到非法状态消息: {payload}") return if payload.get("error_code") not in (None, 0): finish_with_error(f"机械臂返回错误状态: {payload}") return if state["status"] == 4: state["done"].set() return if message.topic == self._mqtt.error_topic: finish_with_error(f"机械臂返回错误消息: {payload}") return if message.topic != self._mqtt.camera_command_topic or payload.get("cmd") != "capture": return image_id = str(payload.get("id", "")).strip() image_path = self._get_image_path(image_id, front_path, back_path) if image_path is None: error_message = f"收到未知的拍照编号: {image_id}" self._send_camera_response(client, 0, error_message) finish_with_error(error_message) return try: self._camera.capture_to_file(image_path) except (CameraUnavailableError, CameraCaptureError, OSError) as exc: error_message = f"保存图片 {image_id}.jpg 失败: {exc}" self._send_camera_response(client, 0, error_message) finish_with_error(error_message) return if image_id == "1": state["front_saved"] = True if image_id == "2": state["back_saved"] = True self._send_camera_response(client, 1, "") client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message try: try: client.connect( self._mqtt.broker, self._mqtt.port, self._mqtt.keep_alive_interval, ) except Exception as exc: raise MqttCaptureError("连接 MQTT Broker 失败,请检查 broker 地址和网络。") from exc client.loop_start() if not state["connected"].wait(timeout=self._mqtt.connect_timeout_seconds): raise MqttCaptureError("连接 MQTT Broker 超时。") if state["error"]: raise MqttCaptureError(str(state["error"])) payload = { "cmd": "start", "cycles": self._mqtt.default_cycles, "request_id": request_id, "timestamp": int(time.time() * 1000), } publish_info = client.publish(self._mqtt.command_topic, json.dumps(payload), qos=1) if publish_info.rc != mqtt.MQTT_ERR_SUCCESS: raise MqttCaptureError("发送 start 指令失败。") publish_info.wait_for_publish() logger.info("已发送机械臂 start 指令: %s", payload) if not state["done"].wait(timeout=self._mqtt.request_timeout_seconds): raise MqttCaptureError("等待机械臂拍照流程结束超时。") if state["error"]: raise MqttCaptureError(str(state["error"])) if state["status"] != 4: raise MqttCaptureError(f"流程未正常结束,最终 status={state['status']}") if not state["front_saved"] or not front_path.exists(): raise MqttCaptureError("没有拿到 1.jpg") if not state["back_saved"] or not back_path.exists(): raise MqttCaptureError("没有拿到 2.jpg") return { "request_id": request_id, "status": state["status"], "front_path": front_path, "back_path": back_path, } finally: try: client.disconnect() except Exception: logger.warning("断开 MQTT 连接时发生异常。", exc_info=True) try: client.loop_stop() except Exception: logger.warning("停止 MQTT 循环时发生异常。", exc_info=True) _capture_lock.release() def _get_image_path(self, image_id: str, front_path: Path, back_path: Path) -> Optional[Path]: if image_id == "1": return front_path if image_id == "2": return back_path return None def _send_camera_response( self, client: mqtt.Client, success: int, error_message: str, ) -> None: payload = { "success": success, "error_message": error_message, "timestamp": int(time.time() * 1000), } client.publish(self._mqtt.camera_response_topic, json.dumps(payload), qos=1) logger.info("已发送拍照回执: %s", payload) def get_mqtt_capture_service() -> MqttCaptureService: return MqttCaptureService()