| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654 |
- import json
- import asyncio
- import aiohttp
- from typing import Dict, Any, Tuple, Optional, Union, List
- from fastapi import APIRouter, HTTPException, Request, UploadFile, File, Form
- from app.core.config import settings
- from app.core.logger import get_logger
- from app.utils.scheme import CardType, ImageType
- logger = get_logger(__name__)
- router = APIRouter()
- IMPORT_REQUEST_TIMEOUT = aiohttp.ClientTimeout(
- total=3600,
- connect=30,
- sock_connect=30,
- sock_read=1800,
- )
- # 推理服务 stitch 接口需要的表单字段顺序,按 ring + gray + stripe1..4 组装
- STITCH_FORM_FIELDS = ["ring", "gray", "stripe1", "stripe2", "stripe3", "stripe4"]
- # 一面(front/back)对应的所有 image_type
- SIDE_IMAGE_TYPES: Dict[str, Dict[str, str]] = {
- "front": {
- "fusion": ImageType.front_fusion.value,
- "ring": ImageType.front_ring.value,
- "gray": ImageType.front_gray.value,
- "stripe1": ImageType.front_stripe1.value,
- "stripe2": ImageType.front_stripe2.value,
- "stripe3": ImageType.front_stripe3.value,
- "stripe4": ImageType.front_stripe4.value,
- },
- "back": {
- "fusion": ImageType.back_fusion.value,
- "ring": ImageType.back_ring.value,
- "gray": ImageType.back_gray.value,
- "stripe1": ImageType.back_stripe1.value,
- "stripe2": ImageType.back_stripe2.value,
- "stripe3": ImageType.back_stripe3.value,
- "stripe4": ImageType.back_stripe4.value,
- },
- }
- # 主表落库的图(带 JSON)
- MAIN_IMAGE_TYPES = {ImageType.front_fusion.value, ImageType.back_fusion.value}
- def _flat_image_types() -> List[str]:
- """按接口参数顺序展开 14 个 image_type,便于日志输出。"""
- flat = []
- for side in ("front", "back"):
- side_map = SIDE_IMAGE_TYPES[side]
- flat.extend([
- side_map["fusion"],
- side_map["ring"],
- side_map["gray"],
- side_map["stripe1"],
- side_map["stripe2"],
- side_map["stripe3"],
- side_map["stripe4"],
- ])
- return flat
- def _resolve_internal_base_url(request: Request) -> str:
- """
- 导入流程内部调用本服务时优先走集群内地址,避免经 ingress 再次鉴权导致 401。
- """
- configured = (settings.INTERNAL_API_BASE_URL or "").strip().rstrip("/")
- if configured:
- return configured
- return str(request.base_url).rstrip("/")
- # --- 内部辅助函数 ---
- async def _post_form(
- session: aiohttp.ClientSession,
- url: str,
- files: List[Tuple[str, bytes, str] | Tuple[str, bytes, str, str]],
- params: Optional[Dict[str, Any]] = None,
- form_fields: Optional[Dict[str, Any]] = None,
- headers: Optional[Dict[str, str]] = None,
- ) -> Tuple[int, bytes]:
- """通用 multipart 调用:files 元素为 (field_name, file_bytes, filename[, content_type])。"""
- form_data = aiohttp.FormData()
- if form_fields:
- for key, value in form_fields.items():
- form_data.add_field(key, str(value))
- for file_item in files:
- if len(file_item) == 4:
- field_name, file_bytes, filename, content_type = file_item
- else:
- field_name, file_bytes, filename = file_item
- content_type = "image/jpeg"
- form_data.add_field(
- field_name,
- file_bytes,
- filename=filename or f"{field_name}.jpg",
- content_type=content_type,
- )
- try:
- async with session.post(url, data=form_data, params=params, headers=headers) as response:
- response_content = await response.read()
- if not response.ok:
- logger.error(
- "[API Error] %s -> Status: %s, Msg: %s",
- url,
- response.status,
- response_content.decode("utf-8", errors="ignore")[:200],
- )
- return response.status, response_content
- except Exception as e:
- logger.error(f"[Conn Error] {url} -> {e}")
- raise
- async def call_stitch_inference(
- session: aiohttp.ClientSession,
- side: str,
- card_name: str,
- side_bytes: Dict[str, Tuple[bytes, str]],
- ) -> Dict[str, Any]:
- """
- 调用 stitch_score_inference 接口,一次处理一面(front/back)。
- side_bytes 的 key 必须包含 STITCH_FORM_FIELDS 全部字段,value 是 (bytes, filename)。
- """
- missing = [k for k in STITCH_FORM_FIELDS if k not in side_bytes]
- if missing:
- raise HTTPException(
- status_code=400,
- detail=f"{side} 面缺少推理所需图片字段: {missing}",
- )
- files: List[Tuple[str, bytes, str]] = []
- for field in STITCH_FORM_FIELDS:
- f_bytes, f_name = side_bytes[field]
- if len(f_bytes) == 0:
- raise HTTPException(status_code=400, detail=f"{side} 面 {field} 文件内容为空: {f_name}")
- files.append((field, f_bytes, f_name))
- inference_base_url = settings.SCORE_UPDATE_SERVER_URL
- url = f"{inference_base_url}/api/card_score/stitch_score_inference"
- params = {"score_type": side, "card_name": card_name}
- logger.info(
- "调用 stitch_score_inference: side=%s card_name=%s fields=%s",
- side, card_name, ",".join(STITCH_FORM_FIELDS),
- )
- status, body = await _post_form(session, url=url, files=files, params=params)
- if status >= 300:
- raise HTTPException(status_code=500, detail=f"{side} 面 stitch 推理失败 status={status}")
- try:
- return json.loads(body)
- except json.JSONDecodeError as e:
- raise HTTPException(status_code=500, detail=f"{side} 面推理结果解析失败: {e}")
- async def call_rectify_and_center(
- session: aiohttp.ClientSession,
- image_type: str,
- file_bytes: bytes,
- filename: str,
- ) -> Tuple[bytes, str]:
- """调用推理服务转正居中接口,返回转正后的图片 bytes。"""
- if len(file_bytes) == 0:
- raise HTTPException(status_code=400, detail=f"图片文件 {image_type} 内容为空: {filename}")
- inference_base_url = settings.SCORE_UPDATE_SERVER_URL
- url = f"{inference_base_url}/api/card_inference/card_rectify_and_center"
- logger.info("调用 card_rectify_and_center: image_type=%s filename=%s", image_type, filename)
- status, body = await _post_form(
- session,
- url=url,
- files=[("file", file_bytes, filename or f"{image_type}.jpg")],
- )
- if status >= 300:
- raise HTTPException(status_code=500, detail=f"图片转正居中失败: {image_type} status={status}")
- name_root = filename.rsplit(".", 1)[0] if filename else image_type
- return body, f"{name_root}_rectified.jpg"
- async def rectify_side_images(
- session: aiohttp.ClientSession,
- side: str,
- side_bytes: Dict[str, Tuple[bytes, str]],
- ) -> Dict[str, Tuple[bytes, str]]:
- """将 stitch 需要的一面 6 张图全部转正居中。"""
- side_map = SIDE_IMAGE_TYPES[side]
- async def rectify_one(field: str) -> Tuple[str, Tuple[bytes, str]]:
- file_bytes, filename = side_bytes[field]
- image_type = side_map[field]
- rectified_bytes, rectified_filename = await call_rectify_and_center(
- session=session,
- image_type=image_type,
- file_bytes=file_bytes,
- filename=filename,
- )
- return field, (rectified_bytes, rectified_filename)
- logger.info("%s 面开始转正居中: fields=%s", side, ",".join(STITCH_FORM_FIELDS))
- rectified_pairs = await asyncio.gather(*[rectify_one(field) for field in STITCH_FORM_FIELDS])
- logger.info("%s 面转正居中完成", side)
- return dict(rectified_pairs)
- async def create_card_record(
- session: aiohttp.ClientSession,
- base_url: str,
- card_name: str,
- cardNo: Optional[str],
- card_type: CardType,
- forward_headers: Optional[Dict[str, str]] = None,
- ) -> int:
- """调用自身服务创建新的卡牌记录"""
- url = f"{base_url}{settings.API_PREFIX}/cards/created"
- params = {"card_name": card_name, "card_type": card_type.value}
- if cardNo:
- params["cardNo"] = cardNo
- async with session.post(url, params=params, headers=forward_headers) as response:
- if response.status == 201:
- data = await response.json()
- return data.get("id")
- text = await response.text()
- raise HTTPException(status_code=response.status, detail=f"创建卡牌记录失败: {text}")
- async def upload_main_image(
- session: aiohttp.ClientSession,
- base_url: str,
- card_id: int,
- image_type: str,
- file_bytes: bytes,
- filename: str,
- score_json: Dict[str, Any],
- forward_headers: Optional[Dict[str, str]] = None,
- ):
- """带 JSON 的图走主图入库接口(fusion / ring / stripe 共用同一面的 detection_json)。"""
- url = f"{base_url}{settings.API_PREFIX}/images/insert/{card_id}"
- form_fields = {
- "image_type": image_type,
- }
- json_bytes = json.dumps(score_json, ensure_ascii=False).encode("utf-8")
- status, body = await _post_form(
- session,
- url=url,
- files=[
- ("image", file_bytes, filename or f"{image_type}.jpg"),
- ("json_data_file", json_bytes, f"{image_type}_score.json", "application/json"),
- ],
- form_fields=form_fields,
- headers=forward_headers,
- )
- if status != 201:
- logger.error(
- "[主表图上传失败] image_type=%s status=%s msg=%s",
- image_type, status, body.decode("utf-8", errors="ignore")[:200],
- )
- raise HTTPException(status_code=500, detail=f"主表图保存失败: {image_type}")
- async def upload_auxiliary_image(
- session: aiohttp.ClientSession,
- base_url: str,
- card_id: int,
- image_type: str,
- file_bytes: bytes,
- filename: str,
- forward_headers: Optional[Dict[str, str]] = None,
- ):
- """ring / gray / stripe 这些辅助图走灰度图入库接口(不带 JSON)。"""
- url = f"{base_url}{settings.API_PREFIX}/images/insert/gray/{card_id}"
- form_fields = {"image_type": image_type}
- status, body = await _post_form(
- session,
- url=url,
- files=[("image", file_bytes, filename or f"{image_type}.jpg")],
- form_fields=form_fields,
- headers=forward_headers,
- )
- if status != 201:
- logger.error(
- "[辅助图上传失败] image_type=%s status=%s msg=%s",
- image_type, status, body.decode("utf-8", errors="ignore")[:200],
- )
- raise HTTPException(status_code=500, detail=f"辅助图保存失败: {image_type}")
- def _collect_side_bytes(
- side: str,
- bytes_map: Dict[str, Tuple[bytes, str]],
- ) -> Optional[Dict[str, Tuple[bytes, str]]]:
- """
- 根据一面(front/back)的 image_type,从 bytes_map 中抽出 stitch 推理所需的 6 张图。
- 如果有任意一张缺失则返回 None,表示该面无法推理。
- """
- side_map = SIDE_IMAGE_TYPES[side]
- side_bytes: Dict[str, Tuple[bytes, str]] = {}
- for field in STITCH_FORM_FIELDS:
- image_type = side_map[field]
- if image_type not in bytes_map:
- return None
- side_bytes[field] = bytes_map[image_type]
- return side_bytes
- async def _run_import_flow(
- local_base_url: str,
- card_name: str,
- cardNo: Optional[str],
- card_type: CardType,
- strict_mode: bool,
- bytes_map: Dict[str, Tuple[bytes, str]],
- non_gray_to_main: bool = False,
- forward_headers: Optional[Dict[str, str]] = None,
- ) -> Dict[str, Any]:
- """
- 共用导入流程:bytes_map 的 key 是 image_type,value 是 (bytes, filename)。
- non_gray_to_main:
- False -> 仅融合图进主表(带 JSON),ring/gray/stripe 进辅助表(无 JSON)。
- True -> 除灰度图外(fusion/ring/stripe)都进主表,并共用所在面的 detection_json;
- 灰度图仍进辅助表。
- """
- expected_types = _flat_image_types()
- provided = [t for t in expected_types if t in bytes_map]
- missing = [t for t in expected_types if t not in bytes_map]
- if strict_mode and missing:
- raise HTTPException(
- status_code=400,
- detail=f"严格模式开启,必须提供全部 14 张图,缺少: {missing}",
- )
- if not provided:
- raise HTTPException(status_code=400, detail="未提供任何图片文件,无法创建。")
- logger.info(
- "auto_import start card_name=%s cardNo=%s card_type=%s strict_mode=%s provided=%s missing=%s",
- card_name,
- cardNo or "",
- card_type.value,
- strict_mode,
- ",".join(provided) or "none",
- ",".join(missing) or "none",
- )
- connector = aiohttp.TCPConnector(limit=20, force_close=True)
- async with aiohttp.ClientSession(timeout=IMPORT_REQUEST_TIMEOUT, connector=connector) as session:
- # 1. 正反面分别调用 stitch 推理(要求该面 6 张推理图齐全)
- side_score_json: Dict[str, Optional[Dict[str, Any]]] = {"front": None, "back": None}
- for side in ("front", "back"):
- side_bytes = _collect_side_bytes(side, bytes_map)
- if side_bytes is None:
- logger.warning("%s 面推理跳过,缺少 ring/gray/stripe1..4 中的某些图", side)
- continue
- rectified_side_bytes = await rectify_side_images(
- session=session,
- side=side,
- side_bytes=side_bytes,
- )
- side_score_json[side] = await call_stitch_inference(
- session=session,
- side=side,
- card_name=card_name,
- side_bytes=rectified_side_bytes,
- )
- # 2. 创建卡牌
- card_id = await create_card_record(
- session,
- local_base_url,
- card_name,
- cardNo,
- card_type,
- forward_headers=forward_headers,
- )
- logger.info("auto_import card_created card_name=%s card_id=%s", card_name, card_id)
- # 3. 串行上传所有图:避免同时打满本服务的数据库连接池
- upload_count = 0
- for side in ("front", "back"):
- side_map = SIDE_IMAGE_TYPES[side]
- score_json = side_score_json.get(side)
- # 按 fusion -> ring -> gray -> stripe1..4 顺序处理该面所有图
- for field in ["fusion"] + STITCH_FORM_FIELDS:
- image_type = side_map[field]
- data = bytes_map.get(image_type)
- if data is None:
- continue
- f_bytes, f_name = data
- is_gray = (field == "gray")
- # 进主表(带该面 detection_json)的条件:
- # - 非灰度图
- # - 该面有 stitch 结果
- # - URL 模式(non_gray_to_main) 下所有非灰度图都进;文件模式仅 fusion 进
- to_main = (
- not is_gray
- and score_json is not None
- and (non_gray_to_main or field == "fusion")
- )
- if to_main:
- await upload_main_image(
- session, local_base_url, card_id,
- image_type, f_bytes, f_name, score_json,
- forward_headers=forward_headers,
- )
- else:
- if field == "fusion" and score_json is None:
- logger.warning(
- "%s 面缺少 stitch 推理结果,融合图 %s 退化为辅助图入库",
- side, image_type,
- )
- await upload_auxiliary_image(
- session, local_base_url, card_id,
- image_type, f_bytes, f_name,
- forward_headers=forward_headers,
- )
- upload_count += 1
- logger.info(
- "auto_import finished card_name=%s card_id=%s upload_task_count=%s "
- "front_score=%s back_score=%s",
- card_name, card_id, upload_count,
- "yes" if side_score_json["front"] is not None else "no",
- "yes" if side_score_json["back"] is not None else "no",
- )
- return {
- "message": "导入成功",
- "card_id": card_id,
- "card_name": card_name,
- "cardNo": cardNo,
- }
- # --- 暴露的API接口 ---
- @router.post("/process_and_import", summary="自动化处理并导入卡牌数据[相机后台调用]")
- async def auto_import_script_api(
- request: Request,
- card_name: str = Form(..., description="卡牌名称"),
- cardNo: Optional[str] = Form(None, description="卡牌编号"),
- card_type: CardType = Form(CardType.pokemon, description="卡牌类型"),
- is_reflect_card: bool = Form(True, description="是否是反光卡(保留参数,兼容旧调用)"),
- strict_mode: bool = Form(False, description="如果为True,必须提供所有14张图"),
- front_fusion: Union[UploadFile, str, None] = File(None, description="正面融合图"),
- back_fusion: Union[UploadFile, str, None] = File(None, description="反面融合图"),
- front_ring: Union[UploadFile, str, None] = File(None, description="正面环光图"),
- front_gray: Union[UploadFile, str, None] = File(None, description="正面灰度图"),
- front_stripe1: Union[UploadFile, str, None] = File(None, description="正面调光图1"),
- front_stripe2: Union[UploadFile, str, None] = File(None, description="正面调光图2"),
- front_stripe3: Union[UploadFile, str, None] = File(None, description="正面调光图3"),
- front_stripe4: Union[UploadFile, str, None] = File(None, description="正面调光图4"),
- back_ring: Union[UploadFile, str, None] = File(None, description="反面环光图"),
- back_gray: Union[UploadFile, str, None] = File(None, description="反面灰度图"),
- back_stripe1: Union[UploadFile, str, None] = File(None, description="反面调光图1"),
- back_stripe2: Union[UploadFile, str, None] = File(None, description="反面调光图2"),
- back_stripe3: Union[UploadFile, str, None] = File(None, description="反面调光图3"),
- back_stripe4: Union[UploadFile, str, None] = File(None, description="反面调光图4"),
- ):
- """
- 按 image_type 直接命名 14 个文件字段:
- front_fusion / back_fusion
- front_ring / back_ring
- front_gray / back_gray
- front_stripe1..4 / back_stripe1..4
- 推理服务调用 stitch_score_inference,按面(front/back)一次性提交
- ring + gray + stripe1..4,返回的 JSON 落到对应 fusion 主图记录中。
- """
- local_base_url = _resolve_internal_base_url(request)
- forward_headers: Dict[str, str] = {}
- auth = request.headers.get("authorization")
- if auth:
- forward_headers["Authorization"] = auth
- raw_inputs: Dict[str, Union[UploadFile, str, None]] = {
- ImageType.front_fusion.value: front_fusion,
- ImageType.back_fusion.value: back_fusion,
- ImageType.front_ring.value: front_ring,
- ImageType.front_gray.value: front_gray,
- ImageType.front_stripe1.value: front_stripe1,
- ImageType.front_stripe2.value: front_stripe2,
- ImageType.front_stripe3.value: front_stripe3,
- ImageType.front_stripe4.value: front_stripe4,
- ImageType.back_ring.value: back_ring,
- ImageType.back_gray.value: back_gray,
- ImageType.back_stripe1.value: back_stripe1,
- ImageType.back_stripe2.value: back_stripe2,
- ImageType.back_stripe3.value: back_stripe3,
- ImageType.back_stripe4.value: back_stripe4,
- }
- valid_uploads: Dict[str, Any] = {
- k: v for k, v in raw_inputs.items()
- if getattr(v, "filename", None) and callable(getattr(v, "read", None))
- }
- # 读取所有字节
- bytes_map: Dict[str, Tuple[bytes, str]] = {}
- for image_type, upload in valid_uploads.items():
- data = await upload.read()
- if len(data) == 0:
- raise HTTPException(status_code=400, detail=f"图片文件 {image_type} 内容为空")
- bytes_map[image_type] = (data, upload.filename)
- logger.info(
- "auto_import_script_api received is_reflect_card=%s strict_mode=%s provided_count=%s",
- is_reflect_card,
- strict_mode,
- len(bytes_map),
- )
- try:
- return await _run_import_flow(
- local_base_url=local_base_url,
- card_name=card_name,
- cardNo=cardNo,
- card_type=card_type,
- strict_mode=strict_mode,
- bytes_map=bytes_map,
- non_gray_to_main=True,
- forward_headers=forward_headers or None,
- )
- except HTTPException as e:
- logger.error(
- "auto_import_script_api failed_http card_name=%s detail=%s status_code=%s",
- card_name, e.detail, e.status_code,
- )
- raise
- except Exception as e:
- logger.exception("auto_import_script_api failed_unexpected card_name=%s", card_name)
- raise HTTPException(status_code=500, detail=f"自动化处理异常: {e}")
- @router.post("/process_and_import_url", summary="通过URL自动化处理并导入卡牌数据")
- async def auto_import_url_script_api(
- request: Request,
- card_name: str = Form(..., description="卡牌名称"),
- cardNo: Optional[str] = Form(None, description="卡牌编号"),
- card_type: CardType = Form(CardType.pokemon, description="卡牌类型"),
- is_reflect_card: bool = Form(True, description="是否是反光卡(保留参数,兼容旧调用)"),
- strict_mode: bool = Form(False, description="如果为True,必须提供所有14张图URL"),
- front_fusion: Optional[str] = Form(None, description="正面融合图URL"),
- back_fusion: Optional[str] = Form(None, description="反面融合图URL"),
- front_ring: Optional[str] = Form(None, description="正面环光图URL"),
- front_gray: Optional[str] = Form(None, description="正面灰度图URL"),
- front_stripe1: Optional[str] = Form(None, description="正面调光图1 URL"),
- front_stripe2: Optional[str] = Form(None, description="正面调光图2 URL"),
- front_stripe3: Optional[str] = Form(None, description="正面调光图3 URL"),
- front_stripe4: Optional[str] = Form(None, description="正面调光图4 URL"),
- back_ring: Optional[str] = Form(None, description="反面环光图URL"),
- back_gray: Optional[str] = Form(None, description="反面灰度图URL"),
- back_stripe1: Optional[str] = Form(None, description="反面调光图1 URL"),
- back_stripe2: Optional[str] = Form(None, description="反面调光图2 URL"),
- back_stripe3: Optional[str] = Form(None, description="反面调光图3 URL"),
- back_stripe4: Optional[str] = Form(None, description="反面调光图4 URL"),
- ):
- local_base_url = _resolve_internal_base_url(request)
- forward_headers: Dict[str, str] = {}
- auth = request.headers.get("authorization")
- if auth:
- forward_headers["Authorization"] = auth
- url_inputs: Dict[str, Optional[str]] = {
- ImageType.front_fusion.value: front_fusion,
- ImageType.back_fusion.value: back_fusion,
- ImageType.front_ring.value: front_ring,
- ImageType.front_gray.value: front_gray,
- ImageType.front_stripe1.value: front_stripe1,
- ImageType.front_stripe2.value: front_stripe2,
- ImageType.front_stripe3.value: front_stripe3,
- ImageType.front_stripe4.value: front_stripe4,
- ImageType.back_ring.value: back_ring,
- ImageType.back_gray.value: back_gray,
- ImageType.back_stripe1.value: back_stripe1,
- ImageType.back_stripe2.value: back_stripe2,
- ImageType.back_stripe3.value: back_stripe3,
- ImageType.back_stripe4.value: back_stripe4,
- }
- valid_urls = {k: v for k, v in url_inputs.items() if v and v.strip()}
- if not valid_urls:
- raise HTTPException(status_code=400, detail="未提供任何图片URL,无法创建。")
- logger.info(
- "auto_import_url_script_api received is_reflect_card=%s strict_mode=%s provided=%s",
- is_reflect_card,
- strict_mode,
- ",".join(valid_urls.keys()),
- )
- connector = aiohttp.TCPConnector(limit=20, force_close=True)
- async with aiohttp.ClientSession(timeout=IMPORT_REQUEST_TIMEOUT, connector=connector) as session:
- async def fetch_image(image_type: str, img_url: str) -> Tuple[str, Tuple[bytes, str]]:
- try:
- async with session.get(img_url) as resp:
- if resp.status != 200:
- raise HTTPException(status_code=400, detail=f"下载图片失败: {image_type} -> {resp.status}")
- file_bytes = await resp.read()
- filename = img_url.split("/")[-1].split("?")[0]
- if not filename or "." not in filename:
- filename = f"{image_type}.jpg"
- return image_type, (file_bytes, filename)
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"访问图片URL异常: {image_type} -> {e}")
- downloaded = await asyncio.gather(*[fetch_image(k, u) for k, u in valid_urls.items()])
- bytes_map: Dict[str, Tuple[bytes, str]] = {}
- for image_type, payload in downloaded:
- file_bytes, filename = payload
- if len(file_bytes) == 0:
- raise HTTPException(status_code=400, detail=f"图片文件 {filename} 内容为空")
- bytes_map[image_type] = (file_bytes, filename)
- try:
- return await _run_import_flow(
- local_base_url=local_base_url,
- card_name=card_name,
- cardNo=cardNo,
- card_type=card_type,
- strict_mode=strict_mode,
- bytes_map=bytes_map,
- non_gray_to_main=True,
- forward_headers=forward_headers or None,
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.exception("auto_import_url_script_api failed_unexpected card_name=%s", card_name)
- raise HTTPException(status_code=500, detail=f"自动化处理异常: {e}")
|