import asyncio import aiohttp import aiofiles import json import os from typing import Dict, Any, Tuple, Optional, List from datetime import datetime # --- 配置区域 --- # INFERENCE_SERVICE_URL = "http://127.0.0.1:7754" # STORAGE_SERVICE_URL = "http://127.0.0.1:7755" INFERENCE_SERVICE_URL = "http://192.168.77.249:7754" STORAGE_SERVICE_URL = "http://192.168.77.249:7755" # 映射关系:后端 score_type 和 image_type 现在是一致的 # 灰度图不需要映射,直接透传 MAIN_IMAGE_TYPES = [ "front_ring", "front_coaxial", "back_ring", "back_coaxial" ] GRAY_IMAGE_TYPES = [ "front_gray", "back_gray" ] # --- 基础工具函数 --- async def call_api_with_file( session: aiohttp.ClientSession, url: str, file_path: str, params: Dict[str, Any] = None, form_fields: Dict[str, Any] = None, file_field_name: str = 'file' ) -> Tuple[int, bytes]: """通用的文件上传API调用函数""" form_data = aiohttp.FormData() if form_fields: for key, value in form_fields.items(): form_data.add_field(key, str(value)) # 异步读取文件 if not os.path.exists(file_path): return 404, b"File not found" async with aiofiles.open(file_path, 'rb') as f: content = await f.read() form_data.add_field( file_field_name, content, filename=os.path.basename(file_path), content_type='image/jpeg' ) try: async with session.post(url, data=form_data, params=params) as response: response_content = await response.read() if not response.ok: print( f" [API Error] {url} -> Status: {response.status}, Msg: {response_content.decode('utf-8')[:100]}") return response.status, response_content except Exception as e: print(f" [Conn Error] {url} -> {e}") return 503, str(e).encode() # --- 业务逻辑函数 --- async def process_main_image( session: aiohttp.ClientSession, image_path: str, score_type: str, is_reflect_card: str ) -> Dict[str, Any]: """ 处理【主图片】 (Ring/Coaxial): 1. 调用转正接口 2. 调用分数推理接口 """ print(f" [处理中] 主图: {score_type} -> {os.path.basename(image_path)}") # 1. 获取转正后的图片 rectify_url = f"{INFERENCE_SERVICE_URL}/api/card_inference/card_rectify_and_center" rectify_status, rectified_image_bytes = await call_api_with_file( session, url=rectify_url, file_path=image_path ) if rectify_status >= 300: raise Exception(f"转正失败: {score_type}") # 2. 获取分数JSON score_url = f"{INFERENCE_SERVICE_URL}/api/card_score/score_inference" score_params = { "score_type": score_type, "is_reflect_card": is_reflect_card } score_status, score_json_bytes = await call_api_with_file( session, url=score_url, file_path=image_path, params=score_params ) if score_status >= 300: raise Exception(f"推理分数失败: {score_type}") score_json = json.loads(score_json_bytes) return { "type": "main", "image_type": score_type, # 现在 score_type 等于 image_type "rectified_image": rectified_image_bytes, "score_json": score_json } async def create_card_record(session: aiohttp.ClientSession, card_name: str) -> int: """创建新的卡牌记录""" url = f"{STORAGE_SERVICE_URL}/api/cards/created" params = {'card_name': card_name} print(f"\n[Step 2] 创建卡组记录: '{card_name}'") async with session.post(url, params=params) as response: if response.status == 201: data = await response.json() card_id = data.get('id') print(f" -> 卡组创建成功 ID: {card_id}") return card_id else: text = await response.text() raise Exception(f"创建卡组失败: {response.status} - {text}") async def upload_main_image( session: aiohttp.ClientSession, card_id: int, processed_data: Dict[str, Any] ): """上传【主图片】的处理结果 (Rectified Image + JSON)""" image_type = processed_data['image_type'] url = f"{STORAGE_SERVICE_URL}/api/images/insert/{card_id}" form_data = aiohttp.FormData() form_data.add_field('image_type', image_type) form_data.add_field('json_data_str', json.dumps(processed_data['score_json'], ensure_ascii=False)) form_data.add_field( 'image', processed_data['rectified_image'], filename=f'{image_type}_rectified.jpg', content_type='image/jpeg' ) async with session.post(url, data=form_data) as response: if response.status == 201: print(f" -> [主图上传成功] {image_type}") else: print(f" -> [主图上传失败] {image_type} code={response.status}") async def upload_gray_image( session: aiohttp.ClientSession, card_id: int, image_type: str, file_path: str ): """上传【灰度图片】 (Raw Image only)""" print(f" [上传中] 灰度图: {image_type} -> {os.path.basename(file_path)}") url = f"{STORAGE_SERVICE_URL}/api/images/insert/gray/{card_id}" # 灰度图接口只需要 image_type 和 file form_fields = {'image_type': image_type} status, _ = await call_api_with_file( session, url=url, file_path=file_path, form_fields=form_fields, file_field_name='image' # 注意接口接收的字段名是 image ) if status == 201: print(f" -> [灰度图上传成功] {image_type}") else: print(f" -> [灰度图上传失败] {image_type} code={status}") # --- 核心控制流程 --- async def process_card_images( is_reflect: bool, # 四张主图 (Optional) path_front_ring: Optional[str] = None, path_front_coaxial: Optional[str] = None, path_back_ring: Optional[str] = None, path_back_coaxial: Optional[str] = None, # 两张灰度图 (Optional) path_front_gray: Optional[str] = None, path_back_gray: Optional[str] = None, # 模式控制 strict_mode: bool = True, img_index: int = None ) -> int: """ 核心处理函数 :param strict_mode: 如果为 True, 则必须提供所有4张主图,否则报错。 如果为 False, 允许主图缺失。灰度图始终是可选的。 """ # 1. 组装输入字典,过滤 None main_inputs = { "front_ring": path_front_ring, "front_coaxial": path_front_coaxial, "back_ring": path_back_ring, "back_coaxial": path_back_coaxial } gray_inputs = { "front_gray": path_front_gray, "back_gray": path_back_gray } # 2. 严格模式检查 provided_main_count = sum(1 for p in main_inputs.values() if p is not None) if strict_mode: if provided_main_count != 4: print(f"\n[错误] 严格模式开启,必须提供所有4张主图。当前提供了 {provided_main_count} 张。") return -1 else: if provided_main_count == 0 and not any(gray_inputs.values()): print(f"\n[错误] 未提供任何图片,无法创建。") return -1 # 3. 文件存在性检查 all_paths = [p for p in main_inputs.values() if p] + [p for p in gray_inputs.values() if p] for p in all_paths: if not os.path.exists(p): print(f"[错误] 文件不存在: {p}") return -1 if img_index is not None: card_name = f"{img_index} 外框修正测试 {datetime.now().strftime('%m%d_%H%M%S')}" else: card_name = f"测试图片 {datetime.now().strftime('%m%d_%H%M%S')}" is_reflect_str = "true" if is_reflect else "false" async with aiohttp.ClientSession() as session: try: # --- Step 1: 主图并发处理 (推理+转正) --- # 我们先处理主图,获得结果后再创建卡片,这样如果推理全挂了就不创建空卡片了 print(f"--- 开始任务: {card_name} (Strict: {strict_mode}) ---") processing_tasks = [] for img_type, path in main_inputs.items(): if path: task = process_main_image(session, path, img_type, is_reflect_str) processing_tasks.append(task) processed_results = [] if processing_tasks: print("[Step 1] 正在推理主图片...") # return_exceptions=False 表示如果有错直接抛出,中断流程 processed_results = await asyncio.gather(*processing_tasks) # --- Step 2: 创建卡片 --- card_id = await create_card_record(session, card_name) # --- Step 3: 并发上传 (主图结果 + 灰度图文件) --- print(f"\n[Step 3] 正在上传所有数据...") upload_tasks = [] # 3.1 添加主图上传任务 for res in processed_results: task = upload_main_image(session, card_id, res) upload_tasks.append(task) # 3.2 添加灰度图上传任务 (直接读取文件上传) for img_type, path in gray_inputs.items(): if path: task = upload_gray_image(session, card_id, img_type, path) upload_tasks.append(task) if upload_tasks: await asyncio.gather(*upload_tasks) print(f"--- 流程结束, Card ID: {card_id} ---\n") return card_id except Exception as e: print(f"\n[流程终止] 发生异常: {e}") return -1 # --- 同步调用封装 --- def run_sync( is_reflect: bool, front_ring: str = None, front_coaxial: str = None, back_ring: str = None, back_coaxial: str = None, front_gray: str = None, back_gray: str = None, strict_mode: bool = True, img_index: int = None ): if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) return asyncio.run(process_card_images( is_reflect=is_reflect, path_front_ring=front_ring, path_front_coaxial=front_coaxial, path_back_ring=back_ring, path_back_coaxial=back_coaxial, path_front_gray=front_gray, path_back_gray=back_gray, strict_mode=strict_mode, img_index=img_index )) if __name__ == "__main__": # --- 测试配置 --- IS_REFLECT = True BASE_PATH = r"C:\Code\ML\Image\Card\img20_test" # 模拟循环处理 for img_num in range(1, 11): print(f">>>>> 处理图片组: {img_num}") # 构造路径 (假设文件名格式如下,可根据实际修改) p_front_coaxial = os.path.join(BASE_PATH, f"{img_num}_front_coaxial.jpg") p_front_ring = os.path.join(BASE_PATH, f"{img_num}_front_ring.jpg") p_back_coaxial = os.path.join(BASE_PATH, f"{img_num}_back_coaxial.jpg") # p_back_ring = os.path.join(BASE_PATH, f"{img_num}_back_ring_0_1.jpg") p_back_ring = r"C:\Code\ML\Image\Card\b2.jpg" # 假设有灰度图 (如果没有文件,设置为 None) # p_front_gray = os.path.join(BASE_PATH, f"{img_num}_front_gray.jpg") # if not os.path.exists(p_front_gray): p_front_gray = None # 演示用 # # p_back_gray = None # 演示不传背面灰度图 p_front_gray = os.path.join(BASE_PATH, f"{img_num}_front_gray.jpg") p_back_gray = os.path.join(BASE_PATH, f"{img_num}_back_gray.jpg") # 调用 (严格模式) cid = run_sync( is_reflect=IS_REFLECT, front_coaxial=p_front_coaxial, front_ring=p_front_ring, back_coaxial=p_back_coaxial, back_ring=p_back_ring, front_gray=p_front_gray, back_gray=p_back_gray, strict_mode=True, # 严格模式:缺主图会报错 img_index=img_num ) # 调用 (非严格模式:可以缺图) # cid = run_sync( # is_reflect=IS_REFLECT, # front_coaxial=p_front_coaxial, # # front_ring=None, # 故意不传 # back_coaxial=p_back_coaxial, # # back_ring=None, # 故意不传 # front_gray=p_front_gray, # 传灰度图 # strict_mode=False # ) if cid != -1: print(f"成功生成 ID: {cid}") else: print("生成失败")