import asyncio import aiohttp import aiofiles import json import os from typing import Dict, Any, Tuple from datetime import datetime # --- 配置区域 --- # 1. 服务 URL INFERENCE_SERVICE_URL = "http://127.0.0.1:7744" STORAGE_SERVICE_URL = "http://127.0.0.1:7745" # INFERENCE_SERVICE_URL = "http://192.168.31.243:7744" # STORAGE_SERVICE_URL = "http://192.168.31.243:7745" # 2. 要处理的卡片信息 formate_time = datetime.now().strftime("%Y-%m-%d_%H:%M") CARD_NAME = f"卡 {formate_time}" # 3. 四张卡片图片的本地路径 front_face_img_path = r"C:\Code\ML\Image\Card\_250915_many_capture_img\_250919_1500_no_reflect_nature_defect\15_front_coaxial_1_0.jpg" front_edge_img_path = r"C:\Code\ML\Image\Card\_250915_many_capture_img\_250919_1500_no_reflect_nature_defect\15_front_ring_0_1.jpg" back_face_img_path = r"C:\Code\ML\Image\Card\_250915_many_capture_img\_250919_1500_no_reflect_nature_defect\15_back_coaxial_1_0.jpg" back_edge_img_path = r"C:\Code\ML\Image\Card\_250915_many_capture_img\_250919_1500_no_reflect_nature_defect\15_back_ring_0_1.jpg" IMAGE_PATHS = [ front_edge_img_path, front_face_img_path, back_edge_img_path, back_face_img_path ] # 4. 推理服务需要的 score_type 参数 SCORE_TYPES = [ "front_corner_edge", "front_face", "back_corner_edge", "back_face" ] SCORE_TO_IMAGE_TYPE_MAP = { "front_corner_edge": "front_edge", "front_face": "front_face", "back_corner_edge": "back_edge", "back_face": "back_face" } # --- 脚本主逻辑 --- async def call_api_with_file( session: aiohttp.ClientSession, url: str, file_path: str, params: Dict[str, Any] = None, form_fields: Dict[str, Any] = None ) -> Tuple[int, bytes]: """通用的文件上传API调用函数 (从文件路径读取)""" form_data = aiohttp.FormData() if form_fields: for key, value in form_fields.items(): form_data.add_field(key, str(value)) async with aiofiles.open(file_path, 'rb') as f: content = await f.read() form_data.add_field( 'file', content, filename=os.path.basename(file_path), content_type='image/jpeg' ) try: async with session.post(url, data=form_data, params=params) as response: response_content = await response.read() if not response.ok: print(f"错误: 调用 {url} 失败, 状态码: {response.status}") print(f" 错误详情: {response_content.decode(errors='ignore')}") return response.status, response_content except aiohttp.ClientConnectorError as e: print(f"错误: 无法连接到服务 {url} - {e}") return 503, b"Connection Error" async def process_single_image( session: aiohttp.ClientSession, image_path: str, score_type: str ) -> Dict[str, Any]: """处理单张图片:获取转正图和分数JSON""" print(f" 正在处理图片: {image_path} (类型: {score_type})") # 1. 获取转正后的图片 rectify_url = f"{INFERENCE_SERVICE_URL}/api/card_inference/card_rectify_and_center" rectify_status, rectified_image_bytes = await call_api_with_file( session, url=rectify_url, file_path=image_path ) if rectify_status >= 300: raise Exception(f"获取转正图失败: {image_path}") print(f" -> 已成功获取转正图") # 2. 获取分数JSON score_url = f"{INFERENCE_SERVICE_URL}/api/card_score/score_inference" score_params = { "score_type": score_type, "is_reflect_card": "false" } score_status, score_json_bytes = await call_api_with_file( session, url=score_url, file_path=image_path, params=score_params ) if score_status >= 300: raise Exception(f"获取分数JSON失败: {image_path}") score_json = json.loads(score_json_bytes) print(f" -> 已成功获取分数JSON") return { "score_type": score_type, "rectified_image": rectified_image_bytes, "score_json": score_json } async def create_card_set(session: aiohttp.ClientSession, card_name: str) -> int: """创建一个新的卡组并返回其ID""" url = f"{STORAGE_SERVICE_URL}/api/cards/created" params = {'card_name': card_name} print(f"\n[步骤 2] 正在创建卡组,名称: '{card_name}'...") try: async with session.post(url, params=params) as response: if response.ok: data = await response.json() card_id = data.get('id') if card_id is not None: print(f" -> 成功创建卡组, ID: {card_id}") return card_id else: raise Exception("创建卡组API的响应中未找到'id'字段") else: error_text = await response.text() raise Exception(f"创建卡组失败, 状态码: {response.status}, 详情: {error_text}") except aiohttp.ClientConnectorError as e: raise Exception(f"无法连接到存储服务 {url} - {e}") # 【修改点】: 修正此函数 async def upload_processed_data( session: aiohttp.ClientSession, card_id: int, processed_data: Dict[str, Any] ): """上传单张转正图和对应的JSON到存储服务""" score_type = processed_data['score_type'] image_type_for_storage = SCORE_TO_IMAGE_TYPE_MAP[score_type] print(f" 正在上传图片, 类型: {image_type_for_storage}...") url = f"{STORAGE_SERVICE_URL}/api/images/insert/{card_id}" # 直接构建FormData,因为图片数据已经在内存中 (processed_data['rectified_image']) form_data = aiohttp.FormData() form_data.add_field('image_type', image_type_for_storage) form_data.add_field('json_data_str', json.dumps(processed_data['score_json'], ensure_ascii=False)) form_data.add_field( 'image', processed_data['rectified_image'], filename='rectified.jpg', content_type='image/jpeg' ) try: async with session.post(url, data=form_data) as response: if response.status == 201: print(f" -> 成功上传并关联图片: {image_type_for_storage}") else: error_text = await response.text() print( f" -> 错误: 上传失败! 类型: {image_type_for_storage}, 状态码: {response.status}, 详情: {error_text}") except aiohttp.ClientConnectorError as e: print(f" -> 错误: 无法连接到存储服务 {url} - {e}") async def main(): """主执行函数""" async with aiohttp.ClientSession() as session: # 步骤 1: 并发处理所有图片, 获取转正图和分数 print("[步骤 1] 开始并发处理所有图片...") process_tasks = [] for path, s_type in zip(IMAGE_PATHS, SCORE_TYPES): if not os.path.exists(path): print(f"错误:文件不存在,请检查路径配置: {path}") return task = asyncio.create_task(process_single_image(session, path, s_type)) process_tasks.append(task) try: processed_results = await asyncio.gather(*process_tasks) print(" -> 所有图片处理完成!") except Exception as e: print(f"\n在处理图片过程中发生错误: {e}") return # 步骤 2: 创建卡组 try: card_id = await create_card_set(session, CARD_NAME) except Exception as e: print(f"\n创建卡组时发生严重错误: {e}") return # 步骤 3: 并发上传所有处理好的数据 print(f"\n[步骤 3] 开始为卡组ID {card_id} 并发上传图片和数据...") upload_tasks = [] for result in processed_results: task = asyncio.create_task(upload_processed_data(session, card_id, result)) upload_tasks.append(task) await asyncio.gather(*upload_tasks) print(" -> 所有数据上传完成!") print("\n====================") print("所有流程执行完毕!") print("====================") if __name__ == "__main__": if len(IMAGE_PATHS) != 4 or len(SCORE_TYPES) != 4: print("错误: IMAGE_PATHS 和 SCORE_TYPES 列表的长度必须为4,请检查配置。") else: # 在 Windows 上使用 ProactorEventLoop 可能会更稳定 if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) asyncio.run(main())