| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- import json
- import logging
- import threading
- import time
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Optional
- import paho.mqtt.client as mqtt
- from app.core.config import settings
- from app.services.camera_service import (
- CameraCaptureError,
- CameraUnavailableError,
- get_camera_service,
- )
- logger = logging.getLogger(__name__)
- class MqttCaptureError(RuntimeError):
- """MQTT 联动拍照失败。"""
- class MqttCaptureBusyError(MqttCaptureError):
- """当前已有任务在执行。"""
- _capture_lock = threading.Lock()
- class MqttCaptureService:
- """按请求临时连接 MQTT,拍完两张图后立即断开。"""
- def __init__(self) -> None:
- self._mqtt = settings.mqtt
- self._static = settings.static
- self._cloud = settings.cloud
- self._camera = get_camera_service()
- def capture_pair(self) -> dict[str, object]:
- if not _capture_lock.acquire(blocking=False):
- raise MqttCaptureBusyError("当前已有一个 MQTT 拍照任务正在执行,请稍后再试。")
- self._static.ensure_directories()
- front_path = self._static.front_image_path
- back_path = self._static.back_image_path
- request_id = f"req-{int(time.time() * 1000)}"
- state: dict[str, Any] = {
- "connected": threading.Event(),
- "done": threading.Event(),
- "recognized": threading.Event(),
- "error": "",
- "status": None,
- "front_saved": False,
- "back_saved": False,
- "recognizedInfoDTO": None,
- }
- client = mqtt.Client(
- client_id=f"{self._mqtt.client_id_prefix}{int(time.time() * 1000)}",
- clean_session=True,
- )
- client.enable_logger(logger)
- def finish_with_error(message: str) -> None:
- if not state["error"]:
- state["error"] = message
- state["done"].set()
- def on_connect(client: mqtt.Client, userdata, flags, rc: int) -> None:
- if rc != 0:
- finish_with_error(f"MQTT 连接失败,返回码: {rc}")
- state["connected"].set()
- return
- for topic in (
- self._mqtt.status_topic,
- self._mqtt.error_topic,
- self._mqtt.camera_command_topic,
- ):
- result, _ = client.subscribe(topic, qos=1)
- if result != mqtt.MQTT_ERR_SUCCESS:
- finish_with_error(f"订阅 MQTT topic 失败: {topic}")
- break
- state["connected"].set()
- def on_disconnect(client: mqtt.Client, userdata, rc: int) -> None:
- if rc != 0 and not state["done"].is_set():
- finish_with_error(f"MQTT 连接意外断开,返回码: {rc}")
- def on_message(client: mqtt.Client, userdata, message: mqtt.MQTTMessage) -> None:
- payload_text = message.payload.decode("utf-8", errors="replace")
- logger.info("收到 MQTT 消息 topic=%s payload=%s", message.topic, payload_text)
- try:
- payload = json.loads(payload_text)
- except json.JSONDecodeError:
- finish_with_error(f"收到无法解析的 MQTT 消息: {message.topic}")
- return
- if message.topic == self._mqtt.status_topic:
- try:
- state["status"] = int(payload.get("status"))
- except (TypeError, ValueError):
- finish_with_error(f"收到非法状态消息: {payload}")
- return
- if payload.get("error_code") not in (None, 0):
- finish_with_error(f"机械臂返回错误状态: {payload}")
- return
- if state["status"] == 4:
- state["done"].set()
- return
- if message.topic == self._mqtt.error_topic:
- finish_with_error(f"机械臂返回错误消息: {payload}")
- return
- if message.topic != self._mqtt.camera_command_topic or payload.get("cmd") != "capture":
- return
- image_id = str(payload.get("id", "")).strip()
- image_path = self._get_image_path(image_id, front_path, back_path)
- if image_path is None:
- error_message = f"收到未知的拍照编号: {image_id}"
- self._send_camera_response(client, 0, error_message)
- finish_with_error(error_message)
- return
- try:
- self._camera.capture_to_file(image_path)
- except (CameraUnavailableError, CameraCaptureError, OSError) as exc:
- error_message = f"保存图片 {image_id}.jpg 失败: {exc}"
- self._send_camera_response(client, 0, error_message)
- finish_with_error(error_message)
- return
- if image_id == "1":
- state["front_saved"] = True
- threading.Thread(
- target=self._recognize_front_image,
- args=(front_path, state),
- daemon=True,
- ).start()
- if image_id == "2":
- state["back_saved"] = True
- self._send_camera_response(client, 1, "")
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_message = on_message
- try:
- try:
- client.connect(
- self._mqtt.broker,
- self._mqtt.port,
- self._mqtt.keep_alive_interval,
- )
- except Exception as exc:
- raise MqttCaptureError("连接 MQTT Broker 失败,请检查 broker 地址和网络。") from exc
- client.loop_start()
- if not state["connected"].wait(timeout=self._mqtt.connect_timeout_seconds):
- raise MqttCaptureError("连接 MQTT Broker 超时。")
- if state["error"]:
- raise MqttCaptureError(str(state["error"]))
- payload = {
- "cmd": "start",
- "cycles": self._mqtt.default_cycles,
- "request_id": request_id,
- "timestamp": int(time.time() * 1000),
- }
- publish_info = client.publish(self._mqtt.command_topic, json.dumps(payload), qos=1)
- if publish_info.rc != mqtt.MQTT_ERR_SUCCESS:
- raise MqttCaptureError("发送 start 指令失败。")
- publish_info.wait_for_publish()
- logger.info("已发送机械臂 start 指令: %s", payload)
- if not state["done"].wait(timeout=self._mqtt.request_timeout_seconds):
- raise MqttCaptureError("等待机械臂拍照流程结束超时。")
- if state["error"]:
- raise MqttCaptureError(str(state["error"]))
- if state["status"] != 4:
- raise MqttCaptureError(f"流程未正常结束,最终 status={state['status']}")
- if not state["front_saved"] or not front_path.exists():
- raise MqttCaptureError("没有拿到正面图片")
- if not state["back_saved"] or not back_path.exists():
- raise MqttCaptureError("没有拿到背面图片")
- state["recognized"].wait(timeout=self._cloud.recognize_timeout_seconds)
- return {
- "card_front_url": self._upload_image_to_cloud(front_path, "front"),
- "card_back_url": self._upload_image_to_cloud(back_path, "back"),
- "recognizedInfoDTO": state["recognizedInfoDTO"],
- }
- finally:
- try:
- client.disconnect()
- except Exception:
- logger.warning("断开 MQTT 连接时发生异常。", exc_info=True)
- try:
- client.loop_stop()
- except Exception:
- logger.warning("停止 MQTT 循环时发生异常。", exc_info=True)
- _capture_lock.release()
- def _get_image_path(self, image_id: str, front_path: Path, back_path: Path) -> Optional[Path]:
- if image_id == "1":
- return front_path
- if image_id == "2":
- return back_path
- return None
- def _send_camera_response(
- self,
- client: mqtt.Client,
- success: int,
- error_message: str,
- ) -> None:
- payload = {
- "success": success,
- "error_message": error_message,
- "timestamp": int(time.time() * 1000),
- }
- client.publish(self._mqtt.camera_response_topic, json.dumps(payload), qos=1)
- logger.info("已发送拍照回执: %s", payload)
- def _recognize_front_image(self, front_path: Path, state: dict[str, Any]) -> None:
- try:
- state["recognizedInfoDTO"] = self._recognize_card_info(front_path)
- finally:
- state["recognized"].set()
- def _upload_image_to_cloud(self, local_file_path: Path, side: str) -> Optional[str]:
- if not local_file_path.exists():
- return None
- try:
- from minio import Minio
- except ImportError:
- logger.warning("未安装 minio 依赖,跳过上传。")
- return None
- try:
- minio_client = Minio(
- self._cloud.minio_endpoint,
- access_key=self._cloud.minio_access_key,
- secret_key=self._cloud.minio_secret_key,
- secure=self._cloud.minio_secure,
- )
- timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")
- object_name = f"raspi_{side}_{timestamp}.jpg"
- cloud_path = f"{self._cloud.minio_base_prefix}/{object_name}"
- minio_client.fput_object(
- self._cloud.minio_bucket,
- cloud_path,
- str(local_file_path),
- content_type="image/jpeg",
- )
- return f"{self._cloud.minio_public_base_url}/{object_name}"
- except Exception:
- logger.warning("上传图片到 MinIO 失败: %s", local_file_path, exc_info=True)
- return None
- def _recognize_card_info(self, local_file_path: Path) -> Optional[dict[str, str]]:
- if not local_file_path.exists():
- return None
- try:
- import requests
- except ImportError:
- logger.warning("未安装 requests 依赖,跳过卡牌识别。")
- return None
- try:
- with local_file_path.open("rb") as image_file:
- response = requests.post(
- self._cloud.recognize_api_url,
- files={
- "imgFile": (
- local_file_path.name,
- image_file,
- "image/jpeg",
- )
- },
- timeout=self._cloud.recognize_timeout_seconds,
- )
- if response.status_code != 200:
- logger.warning("卡牌识别接口返回异常状态码: %s", response.status_code)
- return None
- result_list = response.json()
- if not result_list or not isinstance(result_list, list):
- logger.warning("卡牌识别接口返回了空结果或非法格式。")
- return None
- detail = (result_list[0] or {}).get("detail") or {}
- path = detail.get("path") or ""
- if path:
- path = f"{self._cloud.data_prefix}{path}"
- return {
- "year": detail.get("year", ""),
- "series": detail.get("series", ""),
- "cardSet": detail.get("cardset", ""),
- "player": detail.get("player", ""),
- "confidentialId": detail.get("no", ""),
- "limitId": "",
- "path": path,
- }
- except Exception:
- logger.warning("卡牌识别请求失败: %s", local_file_path, exc_info=True)
- return None
- def get_mqtt_capture_service() -> MqttCaptureService:
- return MqttCaptureService()
|