import json import asyncio import aiohttp from typing import Dict, Any, Tuple, Optional, Union, List from fastapi import APIRouter, HTTPException, Request, UploadFile, File, Form from app.core.config import settings from app.core.logger import get_logger from app.utils.scheme import CardType, ImageType logger = get_logger(__name__) router = APIRouter() IMPORT_REQUEST_TIMEOUT = aiohttp.ClientTimeout( total=3600, connect=30, sock_connect=30, sock_read=1800, ) # 推理服务 stitch 接口需要的表单字段顺序,按 ring + gray + stripe1..4 组装 STITCH_FORM_FIELDS = ["ring", "gray", "stripe1", "stripe2", "stripe3", "stripe4"] # 一面(front/back)对应的所有 image_type SIDE_IMAGE_TYPES: Dict[str, Dict[str, str]] = { "front": { "fusion": ImageType.front_fusion.value, "ring": ImageType.front_ring.value, "gray": ImageType.front_gray.value, "stripe1": ImageType.front_stripe1.value, "stripe2": ImageType.front_stripe2.value, "stripe3": ImageType.front_stripe3.value, "stripe4": ImageType.front_stripe4.value, }, "back": { "fusion": ImageType.back_fusion.value, "ring": ImageType.back_ring.value, "gray": ImageType.back_gray.value, "stripe1": ImageType.back_stripe1.value, "stripe2": ImageType.back_stripe2.value, "stripe3": ImageType.back_stripe3.value, "stripe4": ImageType.back_stripe4.value, }, } # 主表落库的图(带 JSON) MAIN_IMAGE_TYPES = {ImageType.front_fusion.value, ImageType.back_fusion.value} def _flat_image_types() -> List[str]: """按接口参数顺序展开 14 个 image_type,便于日志输出。""" flat = [] for side in ("front", "back"): side_map = SIDE_IMAGE_TYPES[side] flat.extend([ side_map["fusion"], side_map["ring"], side_map["gray"], side_map["stripe1"], side_map["stripe2"], side_map["stripe3"], side_map["stripe4"], ]) return flat # --- 内部辅助函数 --- async def _post_form( session: aiohttp.ClientSession, url: str, files: List[Tuple[str, bytes, str] | Tuple[str, bytes, str, str]], params: Optional[Dict[str, Any]] = None, form_fields: Optional[Dict[str, Any]] = None, ) -> Tuple[int, bytes]: """通用 multipart 调用:files 元素为 (field_name, file_bytes, filename[, content_type])。""" form_data = aiohttp.FormData() if form_fields: for key, value in form_fields.items(): form_data.add_field(key, str(value)) for file_item in files: if len(file_item) == 4: field_name, file_bytes, filename, content_type = file_item else: field_name, file_bytes, filename = file_item content_type = "image/jpeg" form_data.add_field( field_name, file_bytes, filename=filename or f"{field_name}.jpg", content_type=content_type, ) try: async with session.post(url, data=form_data, params=params) as response: response_content = await response.read() if not response.ok: logger.error( "[API Error] %s -> Status: %s, Msg: %s", url, response.status, response_content.decode("utf-8", errors="ignore")[:200], ) return response.status, response_content except Exception as e: logger.error(f"[Conn Error] {url} -> {e}") raise async def call_stitch_inference( session: aiohttp.ClientSession, side: str, card_name: str, side_bytes: Dict[str, Tuple[bytes, str]], ) -> Dict[str, Any]: """ 调用 stitch_score_inference 接口,一次处理一面(front/back)。 side_bytes 的 key 必须包含 STITCH_FORM_FIELDS 全部字段,value 是 (bytes, filename)。 """ missing = [k for k in STITCH_FORM_FIELDS if k not in side_bytes] if missing: raise HTTPException( status_code=400, detail=f"{side} 面缺少推理所需图片字段: {missing}", ) files: List[Tuple[str, bytes, str]] = [] for field in STITCH_FORM_FIELDS: f_bytes, f_name = side_bytes[field] if len(f_bytes) == 0: raise HTTPException(status_code=400, detail=f"{side} 面 {field} 文件内容为空: {f_name}") files.append((field, f_bytes, f_name)) inference_base_url = settings.SCORE_UPDATE_SERVER_URL url = f"{inference_base_url}/api/card_score/stitch_score_inference" params = {"score_type": side, "card_name": card_name} logger.info( "调用 stitch_score_inference: side=%s card_name=%s fields=%s", side, card_name, ",".join(STITCH_FORM_FIELDS), ) status, body = await _post_form(session, url=url, files=files, params=params) if status >= 300: raise HTTPException(status_code=500, detail=f"{side} 面 stitch 推理失败 status={status}") try: return json.loads(body) except json.JSONDecodeError as e: raise HTTPException(status_code=500, detail=f"{side} 面推理结果解析失败: {e}") async def call_rectify_and_center( session: aiohttp.ClientSession, image_type: str, file_bytes: bytes, filename: str, ) -> Tuple[bytes, str]: """调用推理服务转正居中接口,返回转正后的图片 bytes。""" if len(file_bytes) == 0: raise HTTPException(status_code=400, detail=f"图片文件 {image_type} 内容为空: {filename}") inference_base_url = settings.SCORE_UPDATE_SERVER_URL url = f"{inference_base_url}/api/card_inference/card_rectify_and_center" logger.info("调用 card_rectify_and_center: image_type=%s filename=%s", image_type, filename) status, body = await _post_form( session, url=url, files=[("file", file_bytes, filename or f"{image_type}.jpg")], ) if status >= 300: raise HTTPException(status_code=500, detail=f"图片转正居中失败: {image_type} status={status}") name_root = filename.rsplit(".", 1)[0] if filename else image_type return body, f"{name_root}_rectified.jpg" async def rectify_side_images( session: aiohttp.ClientSession, side: str, side_bytes: Dict[str, Tuple[bytes, str]], ) -> Dict[str, Tuple[bytes, str]]: """将 stitch 需要的一面 6 张图全部转正居中。""" side_map = SIDE_IMAGE_TYPES[side] async def rectify_one(field: str) -> Tuple[str, Tuple[bytes, str]]: file_bytes, filename = side_bytes[field] image_type = side_map[field] rectified_bytes, rectified_filename = await call_rectify_and_center( session=session, image_type=image_type, file_bytes=file_bytes, filename=filename, ) return field, (rectified_bytes, rectified_filename) logger.info("%s 面开始转正居中: fields=%s", side, ",".join(STITCH_FORM_FIELDS)) rectified_pairs = await asyncio.gather(*[rectify_one(field) for field in STITCH_FORM_FIELDS]) logger.info("%s 面转正居中完成", side) return dict(rectified_pairs) async def create_card_record( session: aiohttp.ClientSession, base_url: str, card_name: str, cardNo: Optional[str], card_type: CardType, ) -> int: """调用自身服务创建新的卡牌记录""" url = f"{base_url}{settings.API_PREFIX}/cards/created" params = {"card_name": card_name, "card_type": card_type.value} if cardNo: params["cardNo"] = cardNo async with session.post(url, params=params) as response: if response.status == 201: data = await response.json() return data.get("id") text = await response.text() raise HTTPException(status_code=response.status, detail=f"创建卡牌记录失败: {text}") async def upload_fusion_image( session: aiohttp.ClientSession, base_url: str, card_id: int, image_type: str, file_bytes: bytes, filename: str, score_json: Dict[str, Any], ): """融合图带 JSON 走主图入库接口。""" url = f"{base_url}{settings.API_PREFIX}/images/insert/{card_id}" form_fields = { "image_type": image_type, } json_bytes = json.dumps(score_json, ensure_ascii=False).encode("utf-8") status, body = await _post_form( session, url=url, files=[ ("image", file_bytes, filename or f"{image_type}.jpg"), ("json_data_file", json_bytes, f"{image_type}_score.json", "application/json"), ], form_fields=form_fields, ) if status != 201: logger.error( "[融合图上传失败] image_type=%s status=%s msg=%s", image_type, status, body.decode("utf-8", errors="ignore")[:200], ) raise HTTPException(status_code=500, detail=f"融合图保存失败: {image_type}") async def upload_auxiliary_image( session: aiohttp.ClientSession, base_url: str, card_id: int, image_type: str, file_bytes: bytes, filename: str, ): """ring / gray / stripe 这些辅助图走灰度图入库接口(不带 JSON)。""" url = f"{base_url}{settings.API_PREFIX}/images/insert/gray/{card_id}" form_fields = {"image_type": image_type} status, body = await _post_form( session, url=url, files=[("image", file_bytes, filename or f"{image_type}.jpg")], form_fields=form_fields, ) if status != 201: logger.error( "[辅助图上传失败] image_type=%s status=%s msg=%s", image_type, status, body.decode("utf-8", errors="ignore")[:200], ) raise HTTPException(status_code=500, detail=f"辅助图保存失败: {image_type}") def _collect_side_bytes( side: str, bytes_map: Dict[str, Tuple[bytes, str]], ) -> Optional[Dict[str, Tuple[bytes, str]]]: """ 根据一面(front/back)的 image_type,从 bytes_map 中抽出 stitch 推理所需的 6 张图。 如果有任意一张缺失则返回 None,表示该面无法推理。 """ side_map = SIDE_IMAGE_TYPES[side] side_bytes: Dict[str, Tuple[bytes, str]] = {} for field in STITCH_FORM_FIELDS: image_type = side_map[field] if image_type not in bytes_map: return None side_bytes[field] = bytes_map[image_type] return side_bytes async def _run_import_flow( local_base_url: str, card_name: str, cardNo: Optional[str], card_type: CardType, strict_mode: bool, bytes_map: Dict[str, Tuple[bytes, str]], ) -> Dict[str, Any]: """ 共用导入流程:bytes_map 的 key 是 image_type,value 是 (bytes, filename)。 """ expected_types = _flat_image_types() provided = [t for t in expected_types if t in bytes_map] missing = [t for t in expected_types if t not in bytes_map] if strict_mode and missing: raise HTTPException( status_code=400, detail=f"严格模式开启,必须提供全部 14 张图,缺少: {missing}", ) if not provided: raise HTTPException(status_code=400, detail="未提供任何图片文件,无法创建。") logger.info( "auto_import start card_name=%s cardNo=%s card_type=%s strict_mode=%s provided=%s missing=%s", card_name, cardNo or "", card_type.value, strict_mode, ",".join(provided) or "none", ",".join(missing) or "none", ) connector = aiohttp.TCPConnector(limit=20, force_close=True) async with aiohttp.ClientSession(timeout=IMPORT_REQUEST_TIMEOUT, connector=connector) as session: # 1. 正反面分别调用 stitch 推理(要求该面 6 张推理图齐全) side_score_json: Dict[str, Optional[Dict[str, Any]]] = {"front": None, "back": None} for side in ("front", "back"): side_bytes = _collect_side_bytes(side, bytes_map) if side_bytes is None: logger.warning("%s 面推理跳过,缺少 ring/gray/stripe1..4 中的某些图", side) continue rectified_side_bytes = await rectify_side_images( session=session, side=side, side_bytes=side_bytes, ) side_score_json[side] = await call_stitch_inference( session=session, side=side, card_name=card_name, side_bytes=rectified_side_bytes, ) # 2. 创建卡牌 card_id = await create_card_record(session, local_base_url, card_name, cardNo, card_type) logger.info("auto_import card_created card_name=%s card_id=%s", card_name, card_id) # 3. 串行上传所有图:避免同时打满本服务的数据库连接池 upload_count = 0 for side in ("front", "back"): side_map = SIDE_IMAGE_TYPES[side] fusion_type = side_map["fusion"] fusion_data = bytes_map.get(fusion_type) score_json = side_score_json.get(side) if fusion_data is not None: f_bytes, f_name = fusion_data if score_json is not None: await upload_fusion_image( session, local_base_url, card_id, fusion_type, f_bytes, f_name, score_json, ) upload_count += 1 else: # 没有该面推理结果,融合图退化为辅助图,避免主表 detection_json 为空 logger.warning( "%s 面缺少 stitch 推理结果,融合图 %s 退化为辅助图入库", side, fusion_type, ) await upload_auxiliary_image( session, local_base_url, card_id, fusion_type, f_bytes, f_name, ) upload_count += 1 # ring / gray / stripe1..4 走辅助表 for field in STITCH_FORM_FIELDS: aux_type = side_map[field] aux_data = bytes_map.get(aux_type) if aux_data is None: continue a_bytes, a_name = aux_data await upload_auxiliary_image( session, local_base_url, card_id, aux_type, a_bytes, a_name, ) upload_count += 1 logger.info( "auto_import finished card_name=%s card_id=%s upload_task_count=%s " "front_score=%s back_score=%s", card_name, card_id, upload_count, "yes" if side_score_json["front"] is not None else "no", "yes" if side_score_json["back"] is not None else "no", ) return { "message": "导入成功", "card_id": card_id, "card_name": card_name, "cardNo": cardNo, } # --- 暴露的API接口 --- @router.post("/process_and_import", summary="自动化处理并导入卡牌数据[相机后台调用]") async def auto_import_script_api( request: Request, card_name: str = Form(..., description="卡牌名称"), cardNo: Optional[str] = Form(None, description="卡牌编号"), card_type: CardType = Form(CardType.pokemon, description="卡牌类型"), is_reflect_card: bool = Form(True, description="是否是反光卡(保留参数,兼容旧调用)"), strict_mode: bool = Form(False, description="如果为True,必须提供所有14张图"), front_fusion: Union[UploadFile, str, None] = File(None, description="正面融合图"), back_fusion: Union[UploadFile, str, None] = File(None, description="反面融合图"), front_ring: Union[UploadFile, str, None] = File(None, description="正面环光图"), front_gray: Union[UploadFile, str, None] = File(None, description="正面灰度图"), front_stripe1: Union[UploadFile, str, None] = File(None, description="正面调光图1"), front_stripe2: Union[UploadFile, str, None] = File(None, description="正面调光图2"), front_stripe3: Union[UploadFile, str, None] = File(None, description="正面调光图3"), front_stripe4: Union[UploadFile, str, None] = File(None, description="正面调光图4"), back_ring: Union[UploadFile, str, None] = File(None, description="反面环光图"), back_gray: Union[UploadFile, str, None] = File(None, description="反面灰度图"), back_stripe1: Union[UploadFile, str, None] = File(None, description="反面调光图1"), back_stripe2: Union[UploadFile, str, None] = File(None, description="反面调光图2"), back_stripe3: Union[UploadFile, str, None] = File(None, description="反面调光图3"), back_stripe4: Union[UploadFile, str, None] = File(None, description="反面调光图4"), ): """ 按 image_type 直接命名 14 个文件字段: front_fusion / back_fusion front_ring / back_ring front_gray / back_gray front_stripe1..4 / back_stripe1..4 推理服务调用 stitch_score_inference,按面(front/back)一次性提交 ring + gray + stripe1..4,返回的 JSON 落到对应 fusion 主图记录中。 """ local_base_url = str(request.base_url).rstrip("/") raw_inputs: Dict[str, Union[UploadFile, str, None]] = { ImageType.front_fusion.value: front_fusion, ImageType.back_fusion.value: back_fusion, ImageType.front_ring.value: front_ring, ImageType.front_gray.value: front_gray, ImageType.front_stripe1.value: front_stripe1, ImageType.front_stripe2.value: front_stripe2, ImageType.front_stripe3.value: front_stripe3, ImageType.front_stripe4.value: front_stripe4, ImageType.back_ring.value: back_ring, ImageType.back_gray.value: back_gray, ImageType.back_stripe1.value: back_stripe1, ImageType.back_stripe2.value: back_stripe2, ImageType.back_stripe3.value: back_stripe3, ImageType.back_stripe4.value: back_stripe4, } valid_uploads: Dict[str, Any] = { k: v for k, v in raw_inputs.items() if getattr(v, "filename", None) and callable(getattr(v, "read", None)) } # 读取所有字节 bytes_map: Dict[str, Tuple[bytes, str]] = {} for image_type, upload in valid_uploads.items(): data = await upload.read() if len(data) == 0: raise HTTPException(status_code=400, detail=f"图片文件 {image_type} 内容为空") bytes_map[image_type] = (data, upload.filename) logger.info( "auto_import_script_api received is_reflect_card=%s strict_mode=%s provided_count=%s", is_reflect_card, strict_mode, len(bytes_map), ) try: return await _run_import_flow( local_base_url=local_base_url, card_name=card_name, cardNo=cardNo, card_type=card_type, strict_mode=strict_mode, bytes_map=bytes_map, ) except HTTPException as e: logger.error( "auto_import_script_api failed_http card_name=%s detail=%s status_code=%s", card_name, e.detail, e.status_code, ) raise except Exception as e: logger.exception("auto_import_script_api failed_unexpected card_name=%s", card_name) raise HTTPException(status_code=500, detail=f"自动化处理异常: {e}") @router.post("/process_and_import_url", summary="通过URL自动化处理并导入卡牌数据") async def auto_import_url_script_api( request: Request, card_name: str = Form(..., description="卡牌名称"), cardNo: Optional[str] = Form(None, description="卡牌编号"), card_type: CardType = Form(CardType.pokemon, description="卡牌类型"), is_reflect_card: bool = Form(True, description="是否是反光卡(保留参数,兼容旧调用)"), strict_mode: bool = Form(False, description="如果为True,必须提供所有14张图URL"), front_fusion: Optional[str] = Form(None, description="正面融合图URL"), back_fusion: Optional[str] = Form(None, description="反面融合图URL"), front_ring: Optional[str] = Form(None, description="正面环光图URL"), front_gray: Optional[str] = Form(None, description="正面灰度图URL"), front_stripe1: Optional[str] = Form(None, description="正面调光图1 URL"), front_stripe2: Optional[str] = Form(None, description="正面调光图2 URL"), front_stripe3: Optional[str] = Form(None, description="正面调光图3 URL"), front_stripe4: Optional[str] = Form(None, description="正面调光图4 URL"), back_ring: Optional[str] = Form(None, description="反面环光图URL"), back_gray: Optional[str] = Form(None, description="反面灰度图URL"), back_stripe1: Optional[str] = Form(None, description="反面调光图1 URL"), back_stripe2: Optional[str] = Form(None, description="反面调光图2 URL"), back_stripe3: Optional[str] = Form(None, description="反面调光图3 URL"), back_stripe4: Optional[str] = Form(None, description="反面调光图4 URL"), ): local_base_url = str(request.base_url).rstrip("/") url_inputs: Dict[str, Optional[str]] = { ImageType.front_fusion.value: front_fusion, ImageType.back_fusion.value: back_fusion, ImageType.front_ring.value: front_ring, ImageType.front_gray.value: front_gray, ImageType.front_stripe1.value: front_stripe1, ImageType.front_stripe2.value: front_stripe2, ImageType.front_stripe3.value: front_stripe3, ImageType.front_stripe4.value: front_stripe4, ImageType.back_ring.value: back_ring, ImageType.back_gray.value: back_gray, ImageType.back_stripe1.value: back_stripe1, ImageType.back_stripe2.value: back_stripe2, ImageType.back_stripe3.value: back_stripe3, ImageType.back_stripe4.value: back_stripe4, } valid_urls = {k: v for k, v in url_inputs.items() if v and v.strip()} if not valid_urls: raise HTTPException(status_code=400, detail="未提供任何图片URL,无法创建。") logger.info( "auto_import_url_script_api received is_reflect_card=%s strict_mode=%s provided=%s", is_reflect_card, strict_mode, ",".join(valid_urls.keys()), ) connector = aiohttp.TCPConnector(limit=20, force_close=True) async with aiohttp.ClientSession(timeout=IMPORT_REQUEST_TIMEOUT, connector=connector) as session: async def fetch_image(image_type: str, img_url: str) -> Tuple[str, Tuple[bytes, str]]: try: async with session.get(img_url) as resp: if resp.status != 200: raise HTTPException(status_code=400, detail=f"下载图片失败: {image_type} -> {resp.status}") file_bytes = await resp.read() filename = img_url.split("/")[-1].split("?")[0] if not filename or "." not in filename: filename = f"{image_type}.jpg" return image_type, (file_bytes, filename) except HTTPException: raise except Exception as e: raise HTTPException(status_code=400, detail=f"访问图片URL异常: {image_type} -> {e}") downloaded = await asyncio.gather(*[fetch_image(k, u) for k, u in valid_urls.items()]) bytes_map: Dict[str, Tuple[bytes, str]] = {} for image_type, payload in downloaded: file_bytes, filename = payload if len(file_bytes) == 0: raise HTTPException(status_code=400, detail=f"图片文件 {filename} 内容为空") bytes_map[image_type] = (file_bytes, filename) try: return await _run_import_flow( local_base_url=local_base_url, card_name=card_name, cardNo=cardNo, card_type=card_type, strict_mode=strict_mode, bytes_map=bytes_map, ) except HTTPException: raise except Exception as e: logger.exception("auto_import_url_script_api failed_unexpected card_name=%s", card_name) raise HTTPException(status_code=500, detail=f"自动化处理异常: {e}")