mqtt_capture_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import json
  2. import logging
  3. import threading
  4. import time
  5. from datetime import datetime
  6. from pathlib import Path
  7. from typing import Any, Optional
  8. import paho.mqtt.client as mqtt
  9. from app.core.config import settings
  10. from app.services.camera_service import (
  11. CameraCaptureError,
  12. CameraUnavailableError,
  13. get_camera_service,
  14. )
  15. logger = logging.getLogger(__name__)
  16. class MqttCaptureError(RuntimeError):
  17. """MQTT 联动拍照失败。"""
  18. class MqttCaptureBusyError(MqttCaptureError):
  19. """当前已有任务在执行。"""
  20. _capture_lock = threading.Lock()
  21. class MqttCaptureService:
  22. """按请求临时连接 MQTT,拍完两张图后立即断开。"""
  23. def __init__(self) -> None:
  24. self._mqtt = settings.mqtt
  25. self._static = settings.static
  26. self._cloud = settings.cloud
  27. self._camera = get_camera_service()
  28. def capture_pair(self) -> dict[str, object]:
  29. if not _capture_lock.acquire(blocking=False):
  30. raise MqttCaptureBusyError("当前已有一个 MQTT 拍照任务正在执行,请稍后再试。")
  31. self._static.ensure_directories()
  32. front_path = self._static.front_image_path
  33. back_path = self._static.back_image_path
  34. request_id = f"req-{int(time.time() * 1000)}"
  35. state: dict[str, Any] = {
  36. "connected": threading.Event(),
  37. "done": threading.Event(),
  38. "recognized": threading.Event(),
  39. "error": "",
  40. "status": None,
  41. "front_saved": False,
  42. "back_saved": False,
  43. "recognizedInfoDTO": None,
  44. }
  45. client = mqtt.Client(
  46. client_id=f"{self._mqtt.client_id_prefix}{int(time.time() * 1000)}",
  47. clean_session=True,
  48. )
  49. client.enable_logger(logger)
  50. def finish_with_error(message: str) -> None:
  51. if not state["error"]:
  52. state["error"] = message
  53. state["done"].set()
  54. def on_connect(client: mqtt.Client, userdata, flags, rc: int) -> None:
  55. if rc != 0:
  56. finish_with_error(f"MQTT 连接失败,返回码: {rc}")
  57. state["connected"].set()
  58. return
  59. for topic in (
  60. self._mqtt.status_topic,
  61. self._mqtt.error_topic,
  62. self._mqtt.camera_command_topic,
  63. ):
  64. result, _ = client.subscribe(topic, qos=1)
  65. if result != mqtt.MQTT_ERR_SUCCESS:
  66. finish_with_error(f"订阅 MQTT topic 失败: {topic}")
  67. break
  68. state["connected"].set()
  69. def on_disconnect(client: mqtt.Client, userdata, rc: int) -> None:
  70. if rc != 0 and not state["done"].is_set():
  71. finish_with_error(f"MQTT 连接意外断开,返回码: {rc}")
  72. def on_message(client: mqtt.Client, userdata, message: mqtt.MQTTMessage) -> None:
  73. payload_text = message.payload.decode("utf-8", errors="replace")
  74. logger.info("收到 MQTT 消息 topic=%s payload=%s", message.topic, payload_text)
  75. try:
  76. payload = json.loads(payload_text)
  77. except json.JSONDecodeError:
  78. finish_with_error(f"收到无法解析的 MQTT 消息: {message.topic}")
  79. return
  80. if message.topic == self._mqtt.status_topic:
  81. try:
  82. state["status"] = int(payload.get("status"))
  83. except (TypeError, ValueError):
  84. finish_with_error(f"收到非法状态消息: {payload}")
  85. return
  86. if payload.get("error_code") not in (None, 0):
  87. finish_with_error(f"机械臂返回错误状态: {payload}")
  88. return
  89. if state["status"] == 4:
  90. state["done"].set()
  91. return
  92. if message.topic == self._mqtt.error_topic:
  93. finish_with_error(f"机械臂返回错误消息: {payload}")
  94. return
  95. if message.topic != self._mqtt.camera_command_topic or payload.get("cmd") != "capture":
  96. return
  97. image_id = str(payload.get("id", "")).strip()
  98. image_path = self._get_image_path(image_id, front_path, back_path)
  99. if image_path is None:
  100. error_message = f"收到未知的拍照编号: {image_id}"
  101. self._send_camera_response(client, 0, error_message)
  102. finish_with_error(error_message)
  103. return
  104. try:
  105. self._camera.capture_to_file(image_path)
  106. except (CameraUnavailableError, CameraCaptureError, OSError) as exc:
  107. error_message = f"保存图片 {image_id}.jpg 失败: {exc}"
  108. self._send_camera_response(client, 0, error_message)
  109. finish_with_error(error_message)
  110. return
  111. if image_id == "1":
  112. state["front_saved"] = True
  113. threading.Thread(
  114. target=self._recognize_front_image,
  115. args=(front_path, state),
  116. daemon=True,
  117. ).start()
  118. if image_id == "2":
  119. state["back_saved"] = True
  120. self._send_camera_response(client, 1, "")
  121. client.on_connect = on_connect
  122. client.on_disconnect = on_disconnect
  123. client.on_message = on_message
  124. try:
  125. try:
  126. client.connect(
  127. self._mqtt.broker,
  128. self._mqtt.port,
  129. self._mqtt.keep_alive_interval,
  130. )
  131. except Exception as exc:
  132. raise MqttCaptureError("连接 MQTT Broker 失败,请检查 broker 地址和网络。") from exc
  133. client.loop_start()
  134. if not state["connected"].wait(timeout=self._mqtt.connect_timeout_seconds):
  135. raise MqttCaptureError("连接 MQTT Broker 超时。")
  136. if state["error"]:
  137. raise MqttCaptureError(str(state["error"]))
  138. payload = {
  139. "cmd": "start",
  140. "cycles": self._mqtt.default_cycles,
  141. "request_id": request_id,
  142. "timestamp": int(time.time() * 1000),
  143. }
  144. publish_info = client.publish(self._mqtt.command_topic, json.dumps(payload), qos=1)
  145. if publish_info.rc != mqtt.MQTT_ERR_SUCCESS:
  146. raise MqttCaptureError("发送 start 指令失败。")
  147. publish_info.wait_for_publish()
  148. logger.info("已发送机械臂 start 指令: %s", payload)
  149. if not state["done"].wait(timeout=self._mqtt.request_timeout_seconds):
  150. raise MqttCaptureError("等待机械臂拍照流程结束超时。")
  151. if state["error"]:
  152. raise MqttCaptureError(str(state["error"]))
  153. if state["status"] != 4:
  154. raise MqttCaptureError(f"流程未正常结束,最终 status={state['status']}")
  155. if not state["front_saved"] or not front_path.exists():
  156. raise MqttCaptureError("没有拿到正面图片")
  157. if not state["back_saved"] or not back_path.exists():
  158. raise MqttCaptureError("没有拿到背面图片")
  159. state["recognized"].wait(timeout=self._cloud.recognize_timeout_seconds)
  160. return {
  161. "card_front_url": self._upload_image_to_cloud(front_path, "front"),
  162. "card_back_url": self._upload_image_to_cloud(back_path, "back"),
  163. "recognizedInfoDTO": state["recognizedInfoDTO"],
  164. }
  165. finally:
  166. try:
  167. client.disconnect()
  168. except Exception:
  169. logger.warning("断开 MQTT 连接时发生异常。", exc_info=True)
  170. try:
  171. client.loop_stop()
  172. except Exception:
  173. logger.warning("停止 MQTT 循环时发生异常。", exc_info=True)
  174. _capture_lock.release()
  175. def _get_image_path(self, image_id: str, front_path: Path, back_path: Path) -> Optional[Path]:
  176. if image_id == "1":
  177. return front_path
  178. if image_id == "2":
  179. return back_path
  180. return None
  181. def _send_camera_response(
  182. self,
  183. client: mqtt.Client,
  184. success: int,
  185. error_message: str,
  186. ) -> None:
  187. payload = {
  188. "success": success,
  189. "error_message": error_message,
  190. "timestamp": int(time.time() * 1000),
  191. }
  192. client.publish(self._mqtt.camera_response_topic, json.dumps(payload), qos=1)
  193. logger.info("已发送拍照回执: %s", payload)
  194. def _recognize_front_image(self, front_path: Path, state: dict[str, Any]) -> None:
  195. try:
  196. state["recognizedInfoDTO"] = self._recognize_card_info(front_path)
  197. finally:
  198. state["recognized"].set()
  199. def _upload_image_to_cloud(self, local_file_path: Path, side: str) -> Optional[str]:
  200. if not local_file_path.exists():
  201. return None
  202. try:
  203. from minio import Minio
  204. except ImportError:
  205. logger.warning("未安装 minio 依赖,跳过上传。")
  206. return None
  207. try:
  208. minio_client = Minio(
  209. self._cloud.minio_endpoint,
  210. access_key=self._cloud.minio_access_key,
  211. secret_key=self._cloud.minio_secret_key,
  212. secure=self._cloud.minio_secure,
  213. )
  214. timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")
  215. object_name = f"raspi_{side}_{timestamp}.jpg"
  216. cloud_path = f"{self._cloud.minio_base_prefix}/{object_name}"
  217. minio_client.fput_object(
  218. self._cloud.minio_bucket,
  219. cloud_path,
  220. str(local_file_path),
  221. content_type="image/jpeg",
  222. )
  223. return f"{self._cloud.minio_public_base_url}/{object_name}"
  224. except Exception:
  225. logger.warning("上传图片到 MinIO 失败: %s", local_file_path, exc_info=True)
  226. return None
  227. def _recognize_card_info(self, local_file_path: Path) -> Optional[dict[str, str]]:
  228. if not local_file_path.exists():
  229. return None
  230. try:
  231. import requests
  232. except ImportError:
  233. logger.warning("未安装 requests 依赖,跳过卡牌识别。")
  234. return None
  235. try:
  236. with local_file_path.open("rb") as image_file:
  237. response = requests.post(
  238. self._cloud.recognize_api_url,
  239. files={
  240. "imgFile": (
  241. local_file_path.name,
  242. image_file,
  243. "image/jpeg",
  244. )
  245. },
  246. timeout=self._cloud.recognize_timeout_seconds,
  247. )
  248. if response.status_code != 200:
  249. logger.warning("卡牌识别接口返回异常状态码: %s", response.status_code)
  250. return None
  251. result_list = response.json()
  252. if not result_list or not isinstance(result_list, list):
  253. logger.warning("卡牌识别接口返回了空结果或非法格式。")
  254. return None
  255. detail = (result_list[0] or {}).get("detail") or {}
  256. path = detail.get("path") or ""
  257. if path:
  258. path = f"{self._cloud.data_prefix}{path}"
  259. return {
  260. "year": detail.get("year", ""),
  261. "series": detail.get("series", ""),
  262. "cardSet": detail.get("cardset", ""),
  263. "player": detail.get("player", ""),
  264. "confidentialId": detail.get("no", ""),
  265. "limitId": "",
  266. "path": path,
  267. }
  268. except Exception:
  269. logger.warning("卡牌识别请求失败: %s", local_file_path, exc_info=True)
  270. return None
  271. def get_mqtt_capture_service() -> MqttCaptureService:
  272. return MqttCaptureService()