mqtt_capture_service.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. import json
  2. import logging
  3. import threading
  4. import time
  5. from pathlib import Path
  6. from typing import Optional
  7. import paho.mqtt.client as mqtt
  8. from app.core.config import settings
  9. from app.services.camera_service import (
  10. CameraCaptureError,
  11. CameraUnavailableError,
  12. get_camera_service,
  13. )
  14. logger = logging.getLogger(__name__)
  15. class MqttCaptureError(RuntimeError):
  16. """MQTT 联动拍照失败。"""
  17. class MqttCaptureBusyError(MqttCaptureError):
  18. """当前已有任务在执行。"""
  19. _capture_lock = threading.Lock()
  20. class MqttCaptureService:
  21. """按请求临时连接 MQTT,拍完两张图后立即断开。"""
  22. def __init__(self) -> None:
  23. self._mqtt = settings.mqtt
  24. self._static = settings.static
  25. self._camera = get_camera_service()
  26. def capture_pair(self) -> dict[str, object]:
  27. if not _capture_lock.acquire(blocking=False):
  28. raise MqttCaptureBusyError("当前已有一个 MQTT 拍照任务正在执行,请稍后再试。")
  29. self._static.ensure_directories()
  30. request_id = f"req-{int(time.time() * 1000)}"
  31. front_path = self._static.front_image_path
  32. back_path = self._static.back_image_path
  33. state = {
  34. "connected": threading.Event(),
  35. "done": threading.Event(),
  36. "error": "",
  37. "status": None,
  38. "front_saved": False,
  39. "back_saved": False,
  40. }
  41. client = mqtt.Client(
  42. client_id=f"{self._mqtt.client_id_prefix}{int(time.time() * 1000)}",
  43. clean_session=True,
  44. )
  45. client.enable_logger(logger)
  46. def finish_with_error(message: str) -> None:
  47. if not state["error"]:
  48. state["error"] = message
  49. state["done"].set()
  50. def on_connect(client: mqtt.Client, userdata, flags, rc: int) -> None:
  51. if rc != 0:
  52. finish_with_error(f"MQTT 连接失败,返回码: {rc}")
  53. state["connected"].set()
  54. return
  55. for topic in (
  56. self._mqtt.status_topic,
  57. self._mqtt.error_topic,
  58. self._mqtt.camera_command_topic,
  59. ):
  60. result, _ = client.subscribe(topic, qos=1)
  61. if result != mqtt.MQTT_ERR_SUCCESS:
  62. finish_with_error(f"订阅 MQTT topic 失败: {topic}")
  63. break
  64. state["connected"].set()
  65. def on_disconnect(client: mqtt.Client, userdata, rc: int) -> None:
  66. if rc != 0 and not state["done"].is_set():
  67. finish_with_error(f"MQTT 连接意外断开,返回码: {rc}")
  68. def on_message(client: mqtt.Client, userdata, message: mqtt.MQTTMessage) -> None:
  69. payload_text = message.payload.decode("utf-8", errors="replace")
  70. logger.info("收到 MQTT 消息 topic=%s payload=%s", message.topic, payload_text)
  71. try:
  72. payload = json.loads(payload_text)
  73. except json.JSONDecodeError:
  74. finish_with_error(f"收到无法解析的 MQTT 消息: {message.topic}")
  75. return
  76. if message.topic == self._mqtt.status_topic:
  77. try:
  78. state["status"] = int(payload.get("status"))
  79. except (TypeError, ValueError):
  80. finish_with_error(f"收到非法状态消息: {payload}")
  81. return
  82. if payload.get("error_code") not in (None, 0):
  83. finish_with_error(f"机械臂返回错误状态: {payload}")
  84. return
  85. if state["status"] == 4:
  86. state["done"].set()
  87. return
  88. if message.topic == self._mqtt.error_topic:
  89. finish_with_error(f"机械臂返回错误消息: {payload}")
  90. return
  91. if message.topic != self._mqtt.camera_command_topic or payload.get("cmd") != "capture":
  92. return
  93. image_id = str(payload.get("id", "")).strip()
  94. image_path = self._get_image_path(image_id, front_path, back_path)
  95. if image_path is None:
  96. error_message = f"收到未知的拍照编号: {image_id}"
  97. self._send_camera_response(client, 0, error_message)
  98. finish_with_error(error_message)
  99. return
  100. try:
  101. self._camera.capture_to_file(image_path)
  102. except (CameraUnavailableError, CameraCaptureError, OSError) as exc:
  103. error_message = f"保存图片 {image_id}.jpg 失败: {exc}"
  104. self._send_camera_response(client, 0, error_message)
  105. finish_with_error(error_message)
  106. return
  107. if image_id == "1":
  108. state["front_saved"] = True
  109. if image_id == "2":
  110. state["back_saved"] = True
  111. self._send_camera_response(client, 1, "")
  112. client.on_connect = on_connect
  113. client.on_disconnect = on_disconnect
  114. client.on_message = on_message
  115. try:
  116. try:
  117. client.connect(
  118. self._mqtt.broker,
  119. self._mqtt.port,
  120. self._mqtt.keep_alive_interval,
  121. )
  122. except Exception as exc:
  123. raise MqttCaptureError("连接 MQTT Broker 失败,请检查 broker 地址和网络。") from exc
  124. client.loop_start()
  125. if not state["connected"].wait(timeout=self._mqtt.connect_timeout_seconds):
  126. raise MqttCaptureError("连接 MQTT Broker 超时。")
  127. if state["error"]:
  128. raise MqttCaptureError(str(state["error"]))
  129. payload = {
  130. "cmd": "start",
  131. "cycles": self._mqtt.default_cycles,
  132. "request_id": request_id,
  133. "timestamp": int(time.time() * 1000),
  134. }
  135. publish_info = client.publish(self._mqtt.command_topic, json.dumps(payload), qos=1)
  136. if publish_info.rc != mqtt.MQTT_ERR_SUCCESS:
  137. raise MqttCaptureError("发送 start 指令失败。")
  138. publish_info.wait_for_publish()
  139. logger.info("已发送机械臂 start 指令: %s", payload)
  140. if not state["done"].wait(timeout=self._mqtt.request_timeout_seconds):
  141. raise MqttCaptureError("等待机械臂拍照流程结束超时。")
  142. if state["error"]:
  143. raise MqttCaptureError(str(state["error"]))
  144. if state["status"] != 4:
  145. raise MqttCaptureError(f"流程未正常结束,最终 status={state['status']}")
  146. if not state["front_saved"] or not front_path.exists():
  147. raise MqttCaptureError("没有拿到 1.jpg")
  148. if not state["back_saved"] or not back_path.exists():
  149. raise MqttCaptureError("没有拿到 2.jpg")
  150. return {
  151. "request_id": request_id,
  152. "status": state["status"],
  153. "front_path": front_path,
  154. "back_path": back_path,
  155. }
  156. finally:
  157. try:
  158. client.disconnect()
  159. except Exception:
  160. logger.warning("断开 MQTT 连接时发生异常。", exc_info=True)
  161. try:
  162. client.loop_stop()
  163. except Exception:
  164. logger.warning("停止 MQTT 循环时发生异常。", exc_info=True)
  165. _capture_lock.release()
  166. def _get_image_path(self, image_id: str, front_path: Path, back_path: Path) -> Optional[Path]:
  167. if image_id == "1":
  168. return front_path
  169. if image_id == "2":
  170. return back_path
  171. return None
  172. def _send_camera_response(
  173. self,
  174. client: mqtt.Client,
  175. success: int,
  176. error_message: str,
  177. ) -> None:
  178. payload = {
  179. "success": success,
  180. "error_message": error_message,
  181. "timestamp": int(time.time() * 1000),
  182. }
  183. client.publish(self._mqtt.camera_response_topic, json.dumps(payload), qos=1)
  184. logger.info("已发送拍照回执: %s", payload)
  185. def get_mqtt_capture_service() -> MqttCaptureService:
  186. return MqttCaptureService()