mqtt_test.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import paho.mqtt.client as mqtt
  2. import json
  3. import time
  4. import picamera2
  5. import cv2
  6. picam2 = picamera2.Picamera2()
  7. picam2.configure(picam2.create_still_configuration(
  8. main={"size": (2048, 2048),
  9. "format": "RGB888"}
  10. ))
  11. picam2.start()
  12. # MQTT 服务器配置
  13. MQTT_BROKER = "192.168.77.132" # 机械臂的IP地址
  14. MQTT_PORT = 1883 # 端口
  15. MQTT_KEEP_ALIVE_INTERVAL = 60 # 保持连接时间间隔
  16. CLIENT_ID = "arm_client_001" # 客户端ID
  17. # 定义 topic
  18. # 发送
  19. TOPIC_COMMAND = "arm_card_dealer/command"
  20. TOPIC_CAMERA_RESPONSE = "arm_card_dealer/camera/response"
  21. # 接收
  22. TOPIC_STATUS = "arm_card_dealer/status"
  23. TOPIC_ERROR = "arm_card_dealer/error"
  24. TOPIC_CAMERA_COMMAND = "arm_card_dealer/camera/command"
  25. def capture(img_path: str):
  26. frame = picam2.capture_array() # 捕获图像
  27. # 保存图片查看结果
  28. cv2.imwrite(img_path, frame)
  29. return True
  30. # 定义发送指令函数
  31. def send_command(cmd, request_id, cycles=-1):
  32. payload = {
  33. "cmd": cmd,
  34. "cycles": cycles,
  35. "request_id": request_id,
  36. "timestamp": int(time.time() * 1000) # 当前时间戳
  37. }
  38. client.publish(TOPIC_COMMAND, json.dumps(payload), qos=1)
  39. print(f"Sent command: {payload}")
  40. # success integer 拍照是否成功(1=成功,0=失败)
  41. def send_camera_response(success=1, error_message=''):
  42. payload = {
  43. "success": success,
  44. "error_message": "",
  45. "timestamp": int(time.time() * 1000)
  46. }
  47. client.publish(TOPIC_CAMERA_RESPONSE, json.dumps(payload), qos=1)
  48. print(f"Sent camera response: {payload}")
  49. # 定义处理接收到的消息的回调函数
  50. def on_message(client, userdata, msg):
  51. print('------------------')
  52. print(f"[Get Topic: {msg.topic}: {msg.payload.decode()}]")
  53. # 处理状态消息
  54. if msg.topic == TOPIC_STATUS:
  55. status_data = json.loads(msg.payload.decode())
  56. print("statue:", status_data)
  57. # 处理错误消息
  58. elif msg.topic == TOPIC_ERROR:
  59. error_data = json.loads(msg.payload.decode())
  60. print("error:", error_data)
  61. elif msg.topic == TOPIC_CAMERA_COMMAND:
  62. camera_command_data = json.loads(msg.payload.decode())
  63. print("arm request capture: ", camera_command_data)
  64. # 拍照
  65. if camera_command_data.get("cmd") == "capture":
  66. img_name = camera_command_data.get("id", "0")
  67. print(f"--- start capture: {img_name}[{time.time()}] ---")
  68. capture(f"{img_name}.jpg")
  69. print(f"--- save img {img_name}[{time.time()}] ---")
  70. # 通知拍照完成
  71. send_camera_response(success=1)
  72. # 初始化 MQTT 客户端
  73. client = mqtt.Client(CLIENT_ID)
  74. # 设置回调函数
  75. client.on_message = on_message
  76. # 连接到 MQTT broker
  77. client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEP_ALIVE_INTERVAL)
  78. # 订阅相关主题
  79. client.subscribe(TOPIC_STATUS, qos=1)
  80. client.subscribe(TOPIC_ERROR, qos=1)
  81. client.subscribe(TOPIC_CAMERA_COMMAND, qos=1)
  82. print("--已订阅主题--")
  83. # 发送 START 命令,假设 request_id 为 "req-2024-001"
  84. send_command("start", "req-2026-001", cycles=1)
  85. # 开始监听 MQTT 消息
  86. client.loop_start()
  87. # 在这里保持长时间运行,等待响应
  88. time.sleep(70) # 可以根据实际需求调整
  89. # 停止监听
  90. client.loop_stop()