| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- import asyncio
- import aiohttp
- import aiofiles
- import json
- import os
- from typing import Dict, Any, Tuple, Optional, List
- from datetime import datetime
- # --- 配置区域 ---
- # INFERENCE_SERVICE_URL = "http://127.0.0.1:7754"
- # STORAGE_SERVICE_URL = "http://127.0.0.1:7755"
- INFERENCE_SERVICE_URL = "http://192.168.77.249:7754"
- STORAGE_SERVICE_URL = "http://192.168.77.249:7755"
- # 映射关系:后端 score_type 和 image_type 现在是一致的
- # 灰度图不需要映射,直接透传
- MAIN_IMAGE_TYPES = [
- "front_ring",
- "front_coaxial",
- "back_ring",
- "back_coaxial"
- ]
- GRAY_IMAGE_TYPES = [
- "front_gray",
- "back_gray"
- ]
- # --- 基础工具函数 ---
- async def call_api_with_file(
- session: aiohttp.ClientSession,
- url: str,
- file_path: str,
- params: Dict[str, Any] = None,
- form_fields: Dict[str, Any] = None,
- file_field_name: str = 'file'
- ) -> Tuple[int, bytes]:
- """通用的文件上传API调用函数"""
- form_data = aiohttp.FormData()
- if form_fields:
- for key, value in form_fields.items():
- form_data.add_field(key, str(value))
- # 异步读取文件
- if not os.path.exists(file_path):
- return 404, b"File not found"
- async with aiofiles.open(file_path, 'rb') as f:
- content = await f.read()
- form_data.add_field(
- file_field_name,
- content,
- filename=os.path.basename(file_path),
- content_type='image/jpeg'
- )
- try:
- async with session.post(url, data=form_data, params=params) as response:
- response_content = await response.read()
- if not response.ok:
- print(
- f" [API Error] {url} -> Status: {response.status}, Msg: {response_content.decode('utf-8')[:100]}")
- return response.status, response_content
- except Exception as e:
- print(f" [Conn Error] {url} -> {e}")
- return 503, str(e).encode()
- # --- 业务逻辑函数 ---
- async def process_main_image(
- session: aiohttp.ClientSession,
- image_path: str,
- score_type: str,
- is_reflect_card: str
- ) -> Dict[str, Any]:
- """
- 处理【主图片】 (Ring/Coaxial):
- 1. 调用转正接口
- 2. 调用分数推理接口
- """
- print(f" [处理中] 主图: {score_type} -> {os.path.basename(image_path)}")
- # 1. 获取转正后的图片
- rectify_url = f"{INFERENCE_SERVICE_URL}/api/card_inference/card_rectify_and_center"
- rectify_status, rectified_image_bytes = await call_api_with_file(
- session, url=rectify_url, file_path=image_path
- )
- if rectify_status >= 300:
- raise Exception(f"转正失败: {score_type}")
- # 2. 获取分数JSON
- score_url = f"{INFERENCE_SERVICE_URL}/api/card_score/score_inference"
- score_params = {
- "score_type": score_type,
- "is_reflect_card": is_reflect_card
- }
- score_status, score_json_bytes = await call_api_with_file(
- session,
- url=score_url,
- file_path=image_path,
- params=score_params
- )
- if score_status >= 300:
- raise Exception(f"推理分数失败: {score_type}")
- score_json = json.loads(score_json_bytes)
- return {
- "type": "main",
- "image_type": score_type, # 现在 score_type 等于 image_type
- "rectified_image": rectified_image_bytes,
- "score_json": score_json
- }
- async def create_card_record(session: aiohttp.ClientSession, card_name: str) -> int:
- """创建新的卡牌记录"""
- url = f"{STORAGE_SERVICE_URL}/api/cards/created"
- params = {'card_name': card_name}
- print(f"\n[Step 2] 创建卡组记录: '{card_name}'")
- async with session.post(url, params=params) as response:
- if response.status == 201:
- data = await response.json()
- card_id = data.get('id')
- print(f" -> 卡组创建成功 ID: {card_id}")
- return card_id
- else:
- text = await response.text()
- raise Exception(f"创建卡组失败: {response.status} - {text}")
- async def upload_main_image(
- session: aiohttp.ClientSession,
- card_id: int,
- processed_data: Dict[str, Any]
- ):
- """上传【主图片】的处理结果 (Rectified Image + JSON)"""
- image_type = processed_data['image_type']
- url = f"{STORAGE_SERVICE_URL}/api/images/insert/{card_id}"
- form_data = aiohttp.FormData()
- form_data.add_field('image_type', image_type)
- form_data.add_field('json_data_str', json.dumps(processed_data['score_json'], ensure_ascii=False))
- form_data.add_field(
- 'image',
- processed_data['rectified_image'],
- filename=f'{image_type}_rectified.jpg',
- content_type='image/jpeg'
- )
- async with session.post(url, data=form_data) as response:
- if response.status == 201:
- print(f" -> [主图上传成功] {image_type}")
- else:
- print(f" -> [主图上传失败] {image_type} code={response.status}")
- async def upload_gray_image(
- session: aiohttp.ClientSession,
- card_id: int,
- image_type: str,
- file_path: str
- ):
- """上传【灰度图片】 (Raw Image only)"""
- print(f" [上传中] 灰度图: {image_type} -> {os.path.basename(file_path)}")
- url = f"{STORAGE_SERVICE_URL}/api/images/insert/gray/{card_id}"
- # 灰度图接口只需要 image_type 和 file
- form_fields = {'image_type': image_type}
- status, _ = await call_api_with_file(
- session,
- url=url,
- file_path=file_path,
- form_fields=form_fields,
- file_field_name='image' # 注意接口接收的字段名是 image
- )
- if status == 201:
- print(f" -> [灰度图上传成功] {image_type}")
- else:
- print(f" -> [灰度图上传失败] {image_type} code={status}")
- # --- 核心控制流程 ---
- async def process_card_images(
- is_reflect: bool,
- # 四张主图 (Optional)
- path_front_ring: Optional[str] = None,
- path_front_coaxial: Optional[str] = None,
- path_back_ring: Optional[str] = None,
- path_back_coaxial: Optional[str] = None,
- # 两张灰度图 (Optional)
- path_front_gray: Optional[str] = None,
- path_back_gray: Optional[str] = None,
- # 模式控制
- strict_mode: bool = True,
- img_index: int = None
- ) -> int:
- """
- 核心处理函数
- :param strict_mode: 如果为 True, 则必须提供所有4张主图,否则报错。
- 如果为 False, 允许主图缺失。灰度图始终是可选的。
- """
- # 1. 组装输入字典,过滤 None
- main_inputs = {
- "front_ring": path_front_ring,
- "front_coaxial": path_front_coaxial,
- "back_ring": path_back_ring,
- "back_coaxial": path_back_coaxial
- }
- gray_inputs = {
- "front_gray": path_front_gray,
- "back_gray": path_back_gray
- }
- # 2. 严格模式检查
- provided_main_count = sum(1 for p in main_inputs.values() if p is not None)
- if strict_mode:
- if provided_main_count != 4:
- print(f"\n[错误] 严格模式开启,必须提供所有4张主图。当前提供了 {provided_main_count} 张。")
- return -1
- else:
- if provided_main_count == 0 and not any(gray_inputs.values()):
- print(f"\n[错误] 未提供任何图片,无法创建。")
- return -1
- # 3. 文件存在性检查
- all_paths = [p for p in main_inputs.values() if p] + [p for p in gray_inputs.values() if p]
- for p in all_paths:
- if not os.path.exists(p):
- print(f"[错误] 文件不存在: {p}")
- return -1
- if img_index is not None:
- card_name = f"{img_index} 外框修正测试 {datetime.now().strftime('%m%d_%H%M%S')}"
- else:
- card_name = f"测试图片 {datetime.now().strftime('%m%d_%H%M%S')}"
- is_reflect_str = "true" if is_reflect else "false"
- async with aiohttp.ClientSession() as session:
- try:
- # --- Step 1: 主图并发处理 (推理+转正) ---
- # 我们先处理主图,获得结果后再创建卡片,这样如果推理全挂了就不创建空卡片了
- print(f"--- 开始任务: {card_name} (Strict: {strict_mode}) ---")
- processing_tasks = []
- for img_type, path in main_inputs.items():
- if path:
- task = process_main_image(session, path, img_type, is_reflect_str)
- processing_tasks.append(task)
- processed_results = []
- if processing_tasks:
- print("[Step 1] 正在推理主图片...")
- # return_exceptions=False 表示如果有错直接抛出,中断流程
- processed_results = await asyncio.gather(*processing_tasks)
- # --- Step 2: 创建卡片 ---
- card_id = await create_card_record(session, card_name)
- # --- Step 3: 并发上传 (主图结果 + 灰度图文件) ---
- print(f"\n[Step 3] 正在上传所有数据...")
- upload_tasks = []
- # 3.1 添加主图上传任务
- for res in processed_results:
- task = upload_main_image(session, card_id, res)
- upload_tasks.append(task)
- # 3.2 添加灰度图上传任务 (直接读取文件上传)
- for img_type, path in gray_inputs.items():
- if path:
- task = upload_gray_image(session, card_id, img_type, path)
- upload_tasks.append(task)
- if upload_tasks:
- await asyncio.gather(*upload_tasks)
- print(f"--- 流程结束, Card ID: {card_id} ---\n")
- return card_id
- except Exception as e:
- print(f"\n[流程终止] 发生异常: {e}")
- return -1
- # --- 同步调用封装 ---
- def run_sync(
- is_reflect: bool,
- front_ring: str = None,
- front_coaxial: str = None,
- back_ring: str = None,
- back_coaxial: str = None,
- front_gray: str = None,
- back_gray: str = None,
- strict_mode: bool = True,
- img_index: int = None
- ):
- if os.name == 'nt':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- return asyncio.run(process_card_images(
- is_reflect=is_reflect,
- path_front_ring=front_ring,
- path_front_coaxial=front_coaxial,
- path_back_ring=back_ring,
- path_back_coaxial=back_coaxial,
- path_front_gray=front_gray,
- path_back_gray=back_gray,
- strict_mode=strict_mode,
- img_index=img_index
- ))
- if __name__ == "__main__":
- # --- 测试配置 ---
- IS_REFLECT = True
- BASE_PATH = r"C:\Code\ML\Image\Card\img20_test"
- # 模拟循环处理
- for img_num in range(1, 11):
- print(f">>>>> 处理图片组: {img_num}")
- # 构造路径 (假设文件名格式如下,可根据实际修改)
- p_front_coaxial = os.path.join(BASE_PATH, f"{img_num}_front_coaxial.jpg")
- p_front_ring = os.path.join(BASE_PATH, f"{img_num}_front_ring.jpg")
- p_back_coaxial = os.path.join(BASE_PATH, f"{img_num}_back_coaxial.jpg")
- # p_back_ring = os.path.join(BASE_PATH, f"{img_num}_back_ring_0_1.jpg")
- p_back_ring = r"C:\Code\ML\Image\Card\b2.jpg"
- # 假设有灰度图 (如果没有文件,设置为 None)
- # p_front_gray = os.path.join(BASE_PATH, f"{img_num}_front_gray.jpg")
- # if not os.path.exists(p_front_gray): p_front_gray = None # 演示用
- #
- # p_back_gray = None # 演示不传背面灰度图
- p_front_gray = os.path.join(BASE_PATH, f"{img_num}_front_gray.jpg")
- p_back_gray = os.path.join(BASE_PATH, f"{img_num}_back_gray.jpg")
- # 调用 (严格模式)
- cid = run_sync(
- is_reflect=IS_REFLECT,
- front_coaxial=p_front_coaxial,
- front_ring=p_front_ring,
- back_coaxial=p_back_coaxial,
- back_ring=p_back_ring,
- front_gray=p_front_gray,
- back_gray=p_back_gray,
- strict_mode=True, # 严格模式:缺主图会报错
- img_index=img_num
- )
- # 调用 (非严格模式:可以缺图)
- # cid = run_sync(
- # is_reflect=IS_REFLECT,
- # front_coaxial=p_front_coaxial,
- # # front_ring=None, # 故意不传
- # back_coaxial=p_back_coaxial,
- # # back_ring=None, # 故意不传
- # front_gray=p_front_gray, # 传灰度图
- # strict_mode=False
- # )
- if cid != -1:
- print(f"成功生成 ID: {cid}")
- else:
- print("生成失败")
|