import json import logging import threading import time from datetime import datetime from pathlib import Path from typing import Any, Optional import paho.mqtt.client as mqtt from app.core.config import settings from app.services.camera_service import ( CameraCaptureError, CameraUnavailableError, get_camera_service, ) logger = logging.getLogger(__name__) class MqttCaptureError(RuntimeError): """MQTT 联动拍照失败。""" class MqttCaptureBusyError(MqttCaptureError): """当前已有任务在执行。""" _capture_lock = threading.Lock() class MqttCaptureService: """按请求临时连接 MQTT,拍完两张图后立即断开。""" def __init__(self) -> None: self._mqtt = settings.mqtt self._static = settings.static self._cloud = settings.cloud self._camera = get_camera_service() def capture_pair(self) -> dict[str, object]: if not _capture_lock.acquire(blocking=False): raise MqttCaptureBusyError("当前已有一个 MQTT 拍照任务正在执行,请稍后再试。") self._static.ensure_directories() front_path = self._static.front_image_path back_path = self._static.back_image_path request_id = f"req-{int(time.time() * 1000)}" state: dict[str, Any] = { "connected": threading.Event(), "done": threading.Event(), "recognized": threading.Event(), "error": "", "status": None, "front_saved": False, "back_saved": False, "recognizedInfoDTO": None, } client = mqtt.Client( client_id=f"{self._mqtt.client_id_prefix}{int(time.time() * 1000)}", clean_session=True, ) client.enable_logger(logger) def finish_with_error(message: str) -> None: if not state["error"]: state["error"] = message state["done"].set() def on_connect(client: mqtt.Client, userdata, flags, rc: int) -> None: if rc != 0: finish_with_error(f"MQTT 连接失败,返回码: {rc}") state["connected"].set() return for topic in ( self._mqtt.status_topic, self._mqtt.error_topic, self._mqtt.camera_command_topic, ): result, _ = client.subscribe(topic, qos=1) if result != mqtt.MQTT_ERR_SUCCESS: finish_with_error(f"订阅 MQTT topic 失败: {topic}") break state["connected"].set() def on_disconnect(client: mqtt.Client, userdata, rc: int) -> None: if rc != 0 and not state["done"].is_set(): finish_with_error(f"MQTT 连接意外断开,返回码: {rc}") def on_message(client: mqtt.Client, userdata, message: mqtt.MQTTMessage) -> None: payload_text = message.payload.decode("utf-8", errors="replace") logger.info("收到 MQTT 消息 topic=%s payload=%s", message.topic, payload_text) try: payload = json.loads(payload_text) except json.JSONDecodeError: finish_with_error(f"收到无法解析的 MQTT 消息: {message.topic}") return if message.topic == self._mqtt.status_topic: try: state["status"] = int(payload.get("status")) except (TypeError, ValueError): finish_with_error(f"收到非法状态消息: {payload}") return if payload.get("error_code") not in (None, 0): finish_with_error(f"机械臂返回错误状态: {payload}") return if state["status"] == 4: state["done"].set() return if message.topic == self._mqtt.error_topic: finish_with_error(f"机械臂返回错误消息: {payload}") return if message.topic != self._mqtt.camera_command_topic or payload.get("cmd") != "capture": return image_id = str(payload.get("id", "")).strip() image_path = self._get_image_path(image_id, front_path, back_path) if image_path is None: error_message = f"收到未知的拍照编号: {image_id}" self._send_camera_response(client, 0, error_message) finish_with_error(error_message) return try: self._camera.capture_to_file(image_path) except (CameraUnavailableError, CameraCaptureError, OSError) as exc: error_message = f"保存图片 {image_id}.jpg 失败: {exc}" self._send_camera_response(client, 0, error_message) finish_with_error(error_message) return if image_id == "1": state["front_saved"] = True threading.Thread( target=self._recognize_front_image, args=(front_path, state), daemon=True, ).start() if image_id == "2": state["back_saved"] = True self._send_camera_response(client, 1, "") client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message try: try: client.connect( self._mqtt.broker, self._mqtt.port, self._mqtt.keep_alive_interval, ) except Exception as exc: raise MqttCaptureError("连接 MQTT Broker 失败,请检查 broker 地址和网络。") from exc client.loop_start() if not state["connected"].wait(timeout=self._mqtt.connect_timeout_seconds): raise MqttCaptureError("连接 MQTT Broker 超时。") if state["error"]: raise MqttCaptureError(str(state["error"])) payload = { "cmd": "start", "cycles": self._mqtt.default_cycles, "request_id": request_id, "timestamp": int(time.time() * 1000), } publish_info = client.publish(self._mqtt.command_topic, json.dumps(payload), qos=1) if publish_info.rc != mqtt.MQTT_ERR_SUCCESS: raise MqttCaptureError("发送 start 指令失败。") publish_info.wait_for_publish() logger.info("已发送机械臂 start 指令: %s", payload) if not state["done"].wait(timeout=self._mqtt.request_timeout_seconds): raise MqttCaptureError("等待机械臂拍照流程结束超时。") if state["error"]: raise MqttCaptureError(str(state["error"])) if state["status"] != 4: raise MqttCaptureError(f"流程未正常结束,最终 status={state['status']}") if not state["front_saved"] or not front_path.exists(): raise MqttCaptureError("没有拿到正面图片") if not state["back_saved"] or not back_path.exists(): raise MqttCaptureError("没有拿到背面图片") state["recognized"].wait(timeout=self._cloud.recognize_timeout_seconds) return { "card_front_url": self._upload_image_to_cloud(front_path, "front"), "card_back_url": self._upload_image_to_cloud(back_path, "back"), "recognizedInfoDTO": state["recognizedInfoDTO"], } finally: try: client.disconnect() except Exception: logger.warning("断开 MQTT 连接时发生异常。", exc_info=True) try: client.loop_stop() except Exception: logger.warning("停止 MQTT 循环时发生异常。", exc_info=True) _capture_lock.release() def _get_image_path(self, image_id: str, front_path: Path, back_path: Path) -> Optional[Path]: if image_id == "1": return front_path if image_id == "2": return back_path return None def _send_camera_response( self, client: mqtt.Client, success: int, error_message: str, ) -> None: payload = { "success": success, "error_message": error_message, "timestamp": int(time.time() * 1000), } client.publish(self._mqtt.camera_response_topic, json.dumps(payload), qos=1) logger.info("已发送拍照回执: %s", payload) def _recognize_front_image(self, front_path: Path, state: dict[str, Any]) -> None: try: state["recognizedInfoDTO"] = self._recognize_card_info(front_path) finally: state["recognized"].set() def _upload_image_to_cloud(self, local_file_path: Path, side: str) -> Optional[str]: if not local_file_path.exists(): return None try: from minio import Minio except ImportError: logger.warning("未安装 minio 依赖,跳过上传。") return None try: minio_client = Minio( self._cloud.minio_endpoint, access_key=self._cloud.minio_access_key, secret_key=self._cloud.minio_secret_key, secure=self._cloud.minio_secure, ) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f") object_name = f"raspi_{side}_{timestamp}.jpg" cloud_path = f"{self._cloud.minio_base_prefix}/{object_name}" minio_client.fput_object( self._cloud.minio_bucket, cloud_path, str(local_file_path), content_type="image/jpeg", ) return f"{self._cloud.minio_public_base_url}/{object_name}" except Exception: logger.warning("上传图片到 MinIO 失败: %s", local_file_path, exc_info=True) return None def _recognize_card_info(self, local_file_path: Path) -> Optional[dict[str, str]]: if not local_file_path.exists(): return None try: import requests except ImportError: logger.warning("未安装 requests 依赖,跳过卡牌识别。") return None try: with local_file_path.open("rb") as image_file: response = requests.post( self._cloud.recognize_api_url, files={ "imgFile": ( local_file_path.name, image_file, "image/jpeg", ) }, timeout=self._cloud.recognize_timeout_seconds, ) if response.status_code != 200: logger.warning("卡牌识别接口返回异常状态码: %s", response.status_code) return None result_list = response.json() if not result_list or not isinstance(result_list, list): logger.warning("卡牌识别接口返回了空结果或非法格式。") return None detail = (result_list[0] or {}).get("detail") or {} path = detail.get("path") or "" if path: path = f"{self._cloud.data_prefix}{path}" return { "year": detail.get("year", ""), "series": detail.get("series", ""), "cardSet": detail.get("cardset", ""), "player": detail.get("player", ""), "confidentialId": detail.get("no", ""), "limitId": "", "path": path, } except Exception: logger.warning("卡牌识别请求失败: %s", local_file_path, exc_info=True) return None def get_mqtt_capture_service() -> MqttCaptureService: return MqttCaptureService()