| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import asyncio
- import aiohttp
- from minio_client import minio_client, get_full_url
- from services import MINIO_BUCKET, MINIO_BASE_PREFIX, RECOGNIZE_API_URL, DATA_PREFIX
- import time
- import datetime
- import os
- async def upload_image_to_cloud(local_file_path: str) -> str:
- """
- 实际调用图片服务器上传接口
- """
- print(f"正在上传: {local_file_path} 到 minio")
- if not os.path.exists(local_file_path):
- print("错误: 本地文件不存在")
- return None
- try:
- # 格式化输出:2023-10-27 10:00:00
- formatted_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- object_name = f"raspi_{formatted_time}.jpg"
- cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/")
- minio_client.fput_object(MINIO_BUCKET, cloud_path, local_file_path, content_type="image/jpeg")
- print(f"成功上传 {object_name} 到 {MINIO_BUCKET}")
- img_url = get_full_url(object_name)
- return img_url
- except Exception as e:
- print(f"上传异常: {e}")
- return None
- async def recognize_card_info(local_file_path: str) -> dict:
- """
- 调用卡片识别接口
- """
- print(f"[Recognize] 正在识别卡片: {local_file_path} ...")
- EMPTY_CARD_INFO = {
- "year": "",
- "series": "",
- "cardSet": "",
- "player": "",
- "confidentialId": "",
- "limitId": ""
- }
- if not os.path.exists(local_file_path):
- print("[Recognize] 文件不存在,返回空信息")
- return EMPTY_CARD_INFO
- try:
- async with aiohttp.ClientSession() as session:
- # 构造 Multipart/form-data
- data = aiohttp.FormData()
- # 根据 curl -F 'imgFile=@...',字段名必须是 'imgFile'
- data.add_field('imgFile',
- open(local_file_path, 'rb'),
- filename=os.path.basename(local_file_path),
- content_type='image/jpeg')
- # 发送请求
- async with session.post(RECOGNIZE_API_URL, data=data, timeout=15) as response:
- if response.status != 200:
- print(f"[Recognize] 接口报错: Status {response.status}")
- text = await response.text()
- print(f"[Recognize] 错误内容: {text}")
- return EMPTY_CARD_INFO
- # 解析 JSON
- result_list = await response.json()
- # 校验返回数据是否有效
- if not result_list or not isinstance(result_list, list):
- print("[Recognize] 返回格式不是列表或为空")
- return EMPTY_CARD_INFO
- # 取第一个结果
- first_item = result_list[0]
- detail = first_item.get("detail", {})
- # 映射字段
- # mapping: no -> confidentialId
- path = detail.get("path", "")
- if path!="" or path is not None:
- path = f"{DATA_PREFIX}{path}"
- info = {
- "year": detail.get("year", ""),
- "series": detail.get("series", ""),
- "cardSet": detail.get("cardset", ""),
- "player": detail.get("player", ""),
- "confidentialId": detail.get("no", ""),
- "limitId": "", # 源数据没有,留空
- "path": path
- }
- print(f"[Recognize] 识别成功: {first_item.get('tag', 'No Tag')}")
- return info
- except Exception as e:
- print(f"[Recognize] 请求发生异常: {e}")
- import traceback
- traceback.print_exc()
- return EMPTY_CARD_INFO
|