import asyncio import aiohttp from minio_client import minio_client, get_full_url from services import MINIO_BUCKET, MINIO_BASE_PREFIX, RECOGNIZE_API_URL, DATA_PREFIX import time import datetime import os async def upload_image_to_cloud(local_file_path: str) -> str: """ 实际调用图片服务器上传接口 """ print(f"正在上传: {local_file_path} 到 minio") if not os.path.exists(local_file_path): print("错误: 本地文件不存在") return None try: # 格式化输出:2023-10-27 10:00:00 formatted_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") object_name = f"raspi_{formatted_time}.jpg" cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/") minio_client.fput_object(MINIO_BUCKET, cloud_path, local_file_path, content_type="image/jpeg") print(f"成功上传 {object_name} 到 {MINIO_BUCKET}") img_url = get_full_url(object_name) return img_url except Exception as e: print(f"上传异常: {e}") return None async def recognize_card_info(local_file_path: str) -> dict: """ 调用卡片识别接口 """ print(f"[Recognize] 正在识别卡片: {local_file_path} ...") EMPTY_CARD_INFO = { "year": "", "series": "", "cardSet": "", "player": "", "confidentialId": "", "limitId": "" } if not os.path.exists(local_file_path): print("[Recognize] 文件不存在,返回空信息") return EMPTY_CARD_INFO try: async with aiohttp.ClientSession() as session: # 构造 Multipart/form-data data = aiohttp.FormData() # 根据 curl -F 'imgFile=@...',字段名必须是 'imgFile' data.add_field('imgFile', open(local_file_path, 'rb'), filename=os.path.basename(local_file_path), content_type='image/jpeg') # 发送请求 async with session.post(RECOGNIZE_API_URL, data=data, timeout=15) as response: if response.status != 200: print(f"[Recognize] 接口报错: Status {response.status}") text = await response.text() print(f"[Recognize] 错误内容: {text}") return EMPTY_CARD_INFO # 解析 JSON result_list = await response.json() # 校验返回数据是否有效 if not result_list or not isinstance(result_list, list): print("[Recognize] 返回格式不是列表或为空") return EMPTY_CARD_INFO # 取第一个结果 first_item = result_list[0] detail = first_item.get("detail", {}) # 映射字段 # mapping: no -> confidentialId path = detail.get("path", "") if path!="" or path is not None: path = f"{DATA_PREFIX}{path}" info = { "year": detail.get("year", ""), "series": detail.get("series", ""), "cardSet": detail.get("cardset", ""), "player": detail.get("player", ""), "confidentialId": detail.get("no", ""), "limitId": "", # 源数据没有,留空 "path": path } print(f"[Recognize] 识别成功: {first_item.get('tag', 'No Tag')}") return info except Exception as e: print(f"[Recognize] 请求发生异常: {e}") import traceback traceback.print_exc() return EMPTY_CARD_INFO