| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import paho.mqtt.client as mqtt
- import json
- import time
- import picamera2
- import cv2
- picam2 = picamera2.Picamera2()
- picam2.configure(picam2.create_still_configuration(
- main={"size": (2048, 2048),
- "format": "RGB888"}
- ))
- picam2.start()
- # MQTT 服务器配置
- MQTT_BROKER = "192.168.77.132" # 机械臂的IP地址
- MQTT_PORT = 1883 # 端口
- MQTT_KEEP_ALIVE_INTERVAL = 60 # 保持连接时间间隔
- CLIENT_ID = "arm_client_001" # 客户端ID
- # 定义 topic
- # 发送
- TOPIC_COMMAND = "arm_card_dealer/command"
- TOPIC_CAMERA_RESPONSE = "arm_card_dealer/camera/response"
- # 接收
- TOPIC_STATUS = "arm_card_dealer/status"
- TOPIC_ERROR = "arm_card_dealer/error"
- TOPIC_CAMERA_COMMAND = "arm_card_dealer/camera/command"
- def capture(img_path: str):
- frame = picam2.capture_array() # 捕获图像
- # 保存图片查看结果
- cv2.imwrite(img_path, frame)
- return True
- # 定义发送指令函数
- def send_command(cmd, request_id, cycles=-1):
- payload = {
- "cmd": cmd,
- "cycles": cycles,
- "request_id": request_id,
- "timestamp": int(time.time() * 1000) # 当前时间戳
- }
- client.publish(TOPIC_COMMAND, json.dumps(payload), qos=1)
- print(f"Sent command: {payload}")
- # success integer 拍照是否成功(1=成功,0=失败)
- def send_camera_response(success=1, error_message=''):
- payload = {
- "success": success,
- "error_message": "",
- "timestamp": int(time.time() * 1000)
- }
- client.publish(TOPIC_CAMERA_RESPONSE, json.dumps(payload), qos=1)
- print(f"Sent camera response: {payload}")
- # 定义处理接收到的消息的回调函数
- def on_message(client, userdata, msg):
- print('------------------')
- print(f"[Get Topic: {msg.topic}: {msg.payload.decode()}]")
- # 处理状态消息
- if msg.topic == TOPIC_STATUS:
- status_data = json.loads(msg.payload.decode())
- print("statue:", status_data)
- # 处理错误消息
- elif msg.topic == TOPIC_ERROR:
- error_data = json.loads(msg.payload.decode())
- print("error:", error_data)
- elif msg.topic == TOPIC_CAMERA_COMMAND:
- camera_command_data = json.loads(msg.payload.decode())
- print("arm request capture: ", camera_command_data)
- # 拍照
- if camera_command_data.get("cmd") == "capture":
- img_name = camera_command_data.get("id", "0")
- capture(f"{img_name}.jpg")
- time.sleep(0.5)
- print(f"--- save img {img_name} ---")
- # 通知拍照完成
- send_camera_response(success=1)
- # 初始化 MQTT 客户端
- client = mqtt.Client(CLIENT_ID)
- # 设置回调函数
- client.on_message = on_message
- # 连接到 MQTT broker
- client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEP_ALIVE_INTERVAL)
- # 订阅相关主题
- client.subscribe(TOPIC_STATUS, qos=1)
- client.subscribe(TOPIC_ERROR, qos=1)
- client.subscribe(TOPIC_CAMERA_COMMAND, qos=1)
- print("--已订阅主题--")
- # 发送 START 命令,假设 request_id 为 "req-2024-001"
- send_command("start", "req-2026-001", cycles=1)
- # 开始监听 MQTT 消息
- client.loop_start()
- # 在这里保持长时间运行,等待响应
- time.sleep(70) # 可以根据实际需求调整
- # 停止监听
- client.loop_stop()
|