import cv2 import numpy as np from ..core.model_loader import get_predictor from app.utils.defect_inference.CardDefectAggregator import CardDefectAggregator from app.utils.defect_inference.arean_anylize_draw import DefectProcessor, DrawingParams from app.utils.defect_inference.AnalyzeCenter import analyze_centering_rotated from app.utils.defect_inference.DrawCenterInfo import draw_boxes_and_center_info from app.utils.defect_inference.ClassifyEdgeCorner import ClassifyEdgeCorner from app.utils.json_data_formate import formate_center_data, formate_face_data from app.core.config import settings from app.core.logger import get_logger import json logger = get_logger(__name__) class DefectInferenceService: def defect_inference(self, inference_type: str , img_bgr: np.ndarray, is_draw_image=True) -> dict: """ 执行卡片识别推理。 Args: inference_type: 模型类型 (e.g., 'outer_box'). img_bgr: 图像。 Returns: 一个包含推理结果的字典。 """ # 面 if (inference_type == "pokemon_front_face_no_reflect_defect" or inference_type == "pokemon_front_face_reflect_defect" or inference_type == "pokemon_back_face_defect"): # 1. 获取对应的预测器实例 predictor = get_predictor(inference_type) # 3. 调用我们新加的 predict_from_image 方法进行推理 # result = predictor.predict_from_image(img_bgr) # 3. 实例化我们聚合器,传入预测器 aggregator = CardDefectAggregator( predictor=predictor, tile_size=512, overlap_ratio=0.1, # 10% 重叠 ) json_data = aggregator.process_image( image=img_bgr, mode='face' ) # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json' # with open(merge_json_path, 'w', encoding='utf-8') as f: # json.dump(json_data, f, ensure_ascii=False, indent=4) # logger.info(f"合并结束") processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION) area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.json' if is_draw_image: drawing_params_with_rect = DrawingParams(draw_min_rect=True) drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data, drawing_params_with_rect) temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.jpg' cv2.imwrite(temp_img_path, drawn_image) else: area_json = processor.analyze_from_json(json_data) face_json_result = formate_face_data(area_json) with open(area_json_path, 'w', encoding='utf-8') as f: json.dump(face_json_result, f, ensure_ascii=False, indent=2) logger.info("面的面积计算结束") return face_json_result # 边角 elif (inference_type == "pokemon_front_corner_no_reflect_defect" or inference_type == "pokemon_front_corner_reflect_defect" or inference_type == "pokemon_back_corner_defect"): predictor = get_predictor(inference_type) aggregator = CardDefectAggregator( predictor=predictor, tile_size=512, overlap_ratio=0.1, # 10% 重叠 ) json_data = aggregator.process_image( image=img_bgr, mode='edge' ) # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json' # with open(merge_json_path, 'w', encoding='utf-8') as f: # json.dump(json_data, f, ensure_ascii=False, indent=4) # logger.info(f"合并结束") processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION) area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.json' if is_draw_image: drawing_params_with_rect = DrawingParams(draw_min_rect=True) drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data, drawing_params_with_rect) temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.jpg' cv2.imwrite(temp_img_path, drawn_image) else: area_json: dict = processor.analyze_from_json(json_data) logger.info("边角缺陷面积计算结束") # 推理外框 predictor_outer = get_predictor("outer_box") outer_result = predictor_outer.predict_from_image(img_bgr) classifier = ClassifyEdgeCorner(settings.PIXEL_RESOLUTION, settings.CORNER_SIZE_MM) edge_corner_data = classifier.classify_defects_location(area_json, outer_result) with open(area_json_path, 'w', encoding='utf-8') as f: json.dump(edge_corner_data, f, ensure_ascii=False, indent=2) logger.info("边角面积计算结束") return edge_corner_data elif inference_type == "pokemon_front_card_center" \ or inference_type == "pokemon_back_card_center": predictor_inner = get_predictor(settings.DEFECT_TYPE[inference_type]['inner_box']) predictor_outer = get_predictor(settings.DEFECT_TYPE[inference_type]['outer_box']) inner_result = predictor_inner.predict_from_image(img_bgr) outer_result = predictor_outer.predict_from_image(img_bgr) # temp_inner_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-inner_result.json' # temp_outer_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-outer_result.json' # with open(temp_inner_json_path, 'w', encoding='utf-8') as f: # json.dump(inner_result, f, ensure_ascii=False, indent=4) # with open(temp_outer_json_path, 'w', encoding='utf-8') as f: # json.dump(outer_result, f, ensure_ascii=False, indent=4) inner_points = inner_result['shapes'][0]['points'] outer_points = outer_result['shapes'][0]['points'] center_result = analyze_centering_rotated(inner_points, outer_points) logger.info("格式化居中数据") center_result = formate_center_data(center_result, inner_result, outer_result) draw_img = draw_boxes_and_center_info(img_bgr, center_result) temp_center_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.jpg' cv2.imwrite(temp_center_img_path, draw_img) temp_center_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.json' with open(temp_center_json_path, 'w', encoding='utf-8') as f: json.dump(center_result, f, ensure_ascii=False, indent=2) return center_result else: return {} # 创建一个单例服务 # defect_service = DefectInferenceService()