import asyncio import aiohttp import aiofiles import json import os from typing import Dict, Any, Tuple, List from datetime import datetime # --- 配置区域 (可根据需要修改) --- INFERENCE_SERVICE_URL = "http://192.168.77.78:7744" STORAGE_SERVICE_URL = "http://192.168.77.78:7745" # 固定的处理类型映射 SCORE_TYPES = [ "front_corner_edge", "front_face", "back_corner_edge", "back_face" ] SCORE_TO_IMAGE_TYPE_MAP = { "front_corner_edge": "front_edge", "front_face": "front_face", "back_corner_edge": "back_edge", "back_face": "back_face" } # --- 辅助功能函数 (内部逻辑) --- async def call_api_with_file( session: aiohttp.ClientSession, url: str, file_path: str, params: Dict[str, Any] = None, form_fields: Dict[str, Any] = None ) -> Tuple[int, bytes]: """通用的文件上传API调用函数""" form_data = aiohttp.FormData() if form_fields: for key, value in form_fields.items(): form_data.add_field(key, str(value)) async with aiofiles.open(file_path, 'rb') as f: content = await f.read() form_data.add_field( 'file', content, filename=os.path.basename(file_path), content_type='image/jpeg' ) try: async with session.post(url, data=form_data, params=params) as response: response_content = await response.read() if not response.ok: print(f"错误: 调用 {url} 失败, 状态码: {response.status}") return response.status, response_content except aiohttp.ClientConnectorError as e: print(f"错误: 无法连接到服务 {url} - {e}") return 503, b"Connection Error" async def process_single_image( session: aiohttp.ClientSession, image_path: str, score_type: str, is_reflect_card: str ) -> Dict[str, Any]: """处理单张图片:获取转正图和分数JSON""" print(f" 正在处理图片: {os.path.basename(image_path)} ({score_type})") # 1. 获取转正后的图片 rectify_url = f"{INFERENCE_SERVICE_URL}/api/card_inference/card_rectify_and_center" rectify_status, rectified_image_bytes = await call_api_with_file( session, url=rectify_url, file_path=image_path ) if rectify_status >= 300: raise Exception(f"获取转正图失败: {image_path}") # 2. 获取分数JSON score_url = f"{INFERENCE_SERVICE_URL}/api/card_score/score_inference" score_params = { "score_type": score_type, "is_reflect_card": is_reflect_card } score_status, score_json_bytes = await call_api_with_file( session, url=score_url, file_path=image_path, params=score_params ) if score_status >= 300: raise Exception(f"获取分数JSON失败: {image_path}") score_json = json.loads(score_json_bytes) return { "score_type": score_type, "rectified_image": rectified_image_bytes, "score_json": score_json } async def create_card_set(session: aiohttp.ClientSession, card_name: str) -> int: """创建一个新的卡组并返回其ID""" url = f"{STORAGE_SERVICE_URL}/api/cards/created" params = {'card_name': card_name} print(f"\n[步骤 2] 创建卡组: '{card_name}'") try: async with session.post(url, params=params) as response: if response.ok: data = await response.json() card_id = data.get('id') if card_id is not None: print(f" -> 成功创建卡组 ID: {card_id}") return card_id raise Exception("响应中未找到 'id' 字段") else: raise Exception(f"状态码: {response.status}") except Exception as e: raise Exception(f"创建卡组失败: {e}") async def upload_processed_data( session: aiohttp.ClientSession, card_id: int, processed_data: Dict[str, Any] ): """上传单张转正图和对应的JSON""" score_type = processed_data['score_type'] image_type = SCORE_TO_IMAGE_TYPE_MAP[score_type] url = f"{STORAGE_SERVICE_URL}/api/images/insert/{card_id}" form_data = aiohttp.FormData() form_data.add_field('image_type', image_type) form_data.add_field('json_data_str', json.dumps(processed_data['score_json'], ensure_ascii=False)) form_data.add_field( 'image', processed_data['rectified_image'], filename='rectified.jpg', content_type='image/jpeg' ) try: async with session.post(url, data=form_data) as response: if response.status == 201: print(f" -> 上传成功: {image_type}") else: print(f" -> 上传失败 ({image_type}): {response.status}") except Exception as e: print(f" -> 上传异常 ({image_type}): {e}") # --- 核心 API 函数 --- async def process_card_images( is_reflect: bool, front_face_path: str, front_edge_path: str, back_face_path: str, back_edge_path: str ) -> int: """ 核心异步处理函数。 :return: 成功创建的 card_id,如果失败返回 -1 """ # 0. 数据准备 is_reflect_str = "true" if is_reflect else "false" # 注意:这里的顺序必须与全局 SCORE_TYPES 列表的顺序一一对应 # SCORE_TYPES = ["front_corner_edge", "front_face", "back_corner_edge", "back_face"] path_list = [ front_edge_path, # 对应 front_corner_edge front_face_path, # 对应 front_face back_edge_path, # 对应 back_corner_edge back_face_path # 对应 back_face ] # 检查路径是否存在 for p in path_list: if not os.path.exists(p): print(f"错误: 文件路径不存在 -> {p}") return -1 # 生成卡片名称 card_name = f"卡 {datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}" async with aiohttp.ClientSession() as session: try: # 步骤 1: 并发处理图片 (转正 + 评分) print(f"--- 开始处理: {card_name} ---") print("[步骤 1] 并发图像处理...") process_tasks = [] for path, s_type in zip(path_list, SCORE_TYPES): task = process_single_image(session, path, s_type, is_reflect_str) process_tasks.append(task) processed_results = await asyncio.gather(*process_tasks) # 步骤 2: 创建卡组 card_id = await create_card_set(session, card_name) # 步骤 3: 并发上传结果 print(f"\n[步骤 3] 上传数据到卡组 {card_id}...") upload_tasks = [] for result in processed_results: task = upload_processed_data(session, card_id, result) upload_tasks.append(task) await asyncio.gather(*upload_tasks) print(f"--- 流程完成,卡组ID: {card_id} ---\n") return card_id except Exception as e: print(f"\n流程执行中发生错误: {e}") return -1 # --- 同步封装函数 (方便直接调用) --- def run_card_processing_sync( is_reflect: bool, front_face_path: str, front_edge_path: str, back_face_path: str, back_edge_path: str ): """ 同步包装函数,会自动处理 EventLoop。 如果你的代码不是异步的,直接调用这个函数即可。 """ if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) return asyncio.run(process_card_images( is_reflect, front_face_path, front_edge_path, back_face_path, back_edge_path )) # --- 使用示例 --- if __name__ == "__main__": # 准备参数 # !!!!!🪸是否反光 my_is_reflect = True for img_num in range(10, 15): base_path = r"C:\Code\ML\Image\Card\_2025_1119_many_img\reflect2" p1 = os.path.join(base_path, f"{img_num}_front_coaxial_1_0.jpg") p2 = os.path.join(base_path, f"{img_num}_front_ring_0_1.jpg") p3 = os.path.join(base_path, f"{img_num}_back_coaxial_1_0.jpg") p4 = os.path.join(base_path, f"{img_num}_back_ring_0_1.jpg") # 调用函数 print("开始调用函数...") final_card_id = run_card_processing_sync( is_reflect=my_is_reflect, front_face_path=p1, front_edge_path=p2, back_face_path=p3, back_edge_path=p4 ) if final_card_id != -1: print(f"调用成功,生成的卡片ID是: {final_card_id}") else: print("调用失败")