import cv2 import numpy as np from ..core.model_loader import get_predictor from app.utils.defect_inference.CardDefectAggregator import CardDefectAggregator from app.utils.defect_inference.arean_anylize_draw import DefectProcessor, DrawingParams from app.utils.defect_inference.AnalyzeCenter import analyze_centering_rotated from app.utils.defect_inference.ClassifyEdgeCorner import ClassifyEdgeCorner from app.utils.json_data_formate import formate_center_data, formate_face_data from app.core.config import settings from app.core.logger import logger import json class DefectInferenceService: def defect_inference(self, inference_type: str, image_bytes: bytes, is_draw_image=False) -> dict: """ 执行卡片识别推理。 Args: inference_type: 模型类型 (e.g., 'outer_box'). image_bytes: 从API请求中获得的原始图像字节。 Returns: 一个包含推理结果的字典。 """ # 2. 将字节流解码为OpenCV图像 # 将字节数据转换为numpy数组 np_arr = np.frombuffer(image_bytes, np.uint8) # 从numpy数组中解码图像 img_bgr = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) if img_bgr is None: logger.error("无法解码图像,请确保上传的是有效的图片格式 (JPG, PNG, etc.)") return {} # 面 if (inference_type == "pokemon_front_face_no_reflect_defect" or inference_type == "pokemon_front_face_reflect_defect" or inference_type == "pokemon_back_face_defect"): # 1. 获取对应的预测器实例 predictor = get_predictor(inference_type) # 3. 调用我们新加的 predict_from_image 方法进行推理 # result = predictor.predict_from_image(img_bgr) # 3. 实例化我们聚合器,传入预测器 aggregator = CardDefectAggregator( predictor=predictor, tile_size=512, overlap_ratio=0.1, # 10% 重叠 ) json_data = aggregator.process_image( image=img_bgr, mode='face' ) # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json' # with open(merge_json_path, 'w', encoding='utf-8') as f: # json.dump(json_data, f, ensure_ascii=False, indent=4) # logger.info(f"合并结束") processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION) area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.json' if is_draw_image: drawing_params_with_rect = DrawingParams(draw_min_rect=True) drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data, drawing_params_with_rect) temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.jpg' cv2.imwrite(temp_img_path, drawn_image) else: area_json = processor.analyze_from_json(json_data) face_json_result = formate_face_data(area_json) with open(area_json_path, 'w', encoding='utf-8') as f: json.dump(face_json_result, f, ensure_ascii=False, indent=2) logger.info("面的面积计算结束") return face_json_result # 边角 elif (inference_type == "pokemon_front_corner_no_reflect_defect" or inference_type == "pokemon_front_corner_reflect_defect" or inference_type == "pokemon_back_corner_defect"): predictor = get_predictor(inference_type) aggregator = CardDefectAggregator( predictor=predictor, tile_size=512, overlap_ratio=0.1, # 10% 重叠 ) json_data = aggregator.process_image( image=img_bgr, mode='edge' ) # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json' # with open(merge_json_path, 'w', encoding='utf-8') as f: # json.dump(json_data, f, ensure_ascii=False, indent=4) # logger.info(f"合并结束") processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION) area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.json' if is_draw_image: drawing_params_with_rect = DrawingParams(draw_min_rect=True) drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data, drawing_params_with_rect) temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.jpg' cv2.imwrite(temp_img_path, drawn_image) else: area_json = processor.analyze_from_json(json_data) # 推理外框 predictor_outer = get_predictor("pokemon_outer_box") outer_result = predictor_outer.predict_from_image(img_bgr) classifier = ClassifyEdgeCorner(settings.PIXEL_RESOLUTION, settings.CORNER_SIZE_MM) edge_corner_data = classifier.classify_defects_location(area_json, outer_result) with open(area_json_path, 'w', encoding='utf-8') as f: json.dump(edge_corner_data, f, ensure_ascii=False, indent=2) logger.info("边角面积计算结束") return edge_corner_data elif inference_type == "pokemon_card_center": predictor_inner = get_predictor(settings.DEFECT_TYPE[inference_type]['inner_box']) predictor_outer = get_predictor(settings.DEFECT_TYPE[inference_type]['outer_box']) inner_result = predictor_inner.predict_from_image(img_bgr) outer_result = predictor_outer.predict_from_image(img_bgr) # temp_inner_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-inner_result.json' # temp_outer_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-outer_result.json' # with open(temp_inner_json_path, 'w', encoding='utf-8') as f: # json.dump(inner_result, f, ensure_ascii=False, indent=4) # with open(temp_outer_json_path, 'w', encoding='utf-8') as f: # json.dump(outer_result, f, ensure_ascii=False, indent=4) inner_points = inner_result['shapes'][0]['points'] outer_points = outer_result['shapes'][0]['points'] center_result = analyze_centering_rotated(inner_points, outer_points) logger.info("格式化居中数据") center_result = formate_center_data(center_result, inner_result, outer_result) temp_center_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.json' with open(temp_center_json_path, 'w', encoding='utf-8') as f: json.dump(center_result, f, ensure_ascii=False, indent=2) return center_result else: return {} # 创建一个单例服务 # defect_service = DefectInferenceService()