| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- import cv2
- import numpy as np
- from ..core.model_loader import get_predictor
- from app.utils.defect_inference.CardDefectAggregator import CardDefectAggregator
- from app.utils.defect_inference.arean_anylize_draw import DefectProcessor, DrawingParams
- from app.utils.defect_inference.AnalyzeCenter import analyze_centering_rotated, formate_center_data
- from app.utils.defect_inference.DrawCenterInfo import draw_boxes_and_center_info
- from app.utils.defect_inference.ClassifyEdgeCorner import ClassifyEdgeCorner
- from app.utils.json_data_formate import formate_face_data, formate_add_edit_type
- from app.core.config import settings
- from app.core.logger import get_logger
- import json
- logger = get_logger(__name__)
- class DefectInferenceService:
- def defect_inference(self, inference_type: str, img_bgr: np.ndarray,
- is_draw_image=True) -> dict:
- """
- 执行卡片识别推理。
- Args:
- inference_type: 模型类型 (e.g., 'outer_box').
- img_bgr: 图像。
- Returns:
- 一个包含推理结果的字典。
- """
- # 面
- if (inference_type == "pokemon_front_face_no_reflect_defect"
- or inference_type == "pokemon_front_face_reflect_defect"
- or inference_type == "pokemon_back_face_defect"):
- # 1. 获取对应的预测器实例
- predictor = get_predictor(inference_type)
- # 3. 调用我们新加的 predict_from_image 方法进行推理
- # result = predictor.predict_from_image(img_bgr)
- # 3. 实例化我们聚合器,传入预测器
- aggregator = CardDefectAggregator(
- predictor=predictor,
- tile_size=512,
- overlap_ratio=0.1, # 10% 重叠
- )
- json_data = aggregator.process_image(
- image=img_bgr,
- mode='face'
- )
- # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json'
- # with open(merge_json_path, 'w', encoding='utf-8') as f:
- # json.dump(json_data, f, ensure_ascii=False, indent=4)
- # logger.info(f"合并结束")
- processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
- area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.json'
- if is_draw_image:
- drawing_params_with_rect = DrawingParams(draw_min_rect=True)
- drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data,
- drawing_params_with_rect)
- temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.jpg'
- cv2.imwrite(temp_img_path, drawn_image)
- else:
- area_json = processor.analyze_from_json(json_data)
- face_json_result = formate_face_data(area_json)
- face_json_result = formate_add_edit_type(face_json_result)
- with open(area_json_path, 'w', encoding='utf-8') as f:
- json.dump(face_json_result, f, ensure_ascii=False, indent=2)
- logger.info("面的面积计算结束")
- return face_json_result
- # 边角
- elif (inference_type == "pokemon_front_corner_no_reflect_defect"
- or inference_type == "pokemon_front_corner_reflect_defect"
- or inference_type == "pokemon_back_corner_defect"):
- predictor = get_predictor(inference_type)
- aggregator = CardDefectAggregator(
- predictor=predictor,
- tile_size=512,
- overlap_ratio=0.1, # 10% 重叠
- )
- json_data = aggregator.process_image(
- image=img_bgr,
- mode='edge'
- )
- # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json'
- # with open(merge_json_path, 'w', encoding='utf-8') as f:
- # json.dump(json_data, f, ensure_ascii=False, indent=4)
- # logger.info(f"合并结束")
- processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
- area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.json'
- if is_draw_image:
- drawing_params_with_rect = DrawingParams(draw_min_rect=True)
- drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data,
- drawing_params_with_rect)
- temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.jpg'
- cv2.imwrite(temp_img_path, drawn_image)
- else:
- area_json: dict = processor.analyze_from_json(json_data)
- logger.info("边角缺陷面积计算结束")
- # 推理外框
- predictor_outer = get_predictor("outer_box")
- outer_result = predictor_outer.predict_from_image(img_bgr)
- classifier = ClassifyEdgeCorner(settings.PIXEL_RESOLUTION, settings.CORNER_SIZE_MM)
- edge_corner_data = classifier.classify_defects_location(area_json, outer_result)
- edge_corner_data = formate_add_edit_type(edge_corner_data)
- with open(area_json_path, 'w', encoding='utf-8') as f:
- json.dump(edge_corner_data, f, ensure_ascii=False, indent=2)
- logger.info("边角面积计算结束")
- return edge_corner_data
- elif inference_type == "pokemon_front_card_center" \
- or inference_type == "pokemon_back_card_center":
- predictor_inner = get_predictor(settings.DEFECT_TYPE[inference_type]['inner_box'])
- predictor_outer = get_predictor(settings.DEFECT_TYPE[inference_type]['outer_box'])
- inner_result = predictor_inner.predict_from_image(img_bgr)
- outer_result = predictor_outer.predict_from_image(img_bgr)
- # temp_inner_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-inner_result.json'
- # temp_outer_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-outer_result.json'
- # with open(temp_inner_json_path, 'w', encoding='utf-8') as f:
- # json.dump(inner_result, f, ensure_ascii=False, indent=4)
- # with open(temp_outer_json_path, 'w', encoding='utf-8') as f:
- # json.dump(outer_result, f, ensure_ascii=False, indent=4)
- inner_points = inner_result['shapes'][0]['points']
- outer_points = outer_result['shapes'][0]['points']
- center_result, inner_rect_box, outer_rect_box = analyze_centering_rotated(inner_points, outer_points)
- # logger.info(f"inner_rect_box: {type(inner_rect_box)}, {inner_rect_box}")
- # logger.info(f"outer_rect_box: , {outer_rect_box}")
- logger.info("格式化居中数据")
- center_result = formate_center_data(center_result,
- inner_result, outer_result,
- inner_rect_box, outer_rect_box)
- draw_img = draw_boxes_and_center_info(img_bgr, center_result)
- temp_center_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.jpg'
- cv2.imwrite(temp_center_img_path, draw_img)
- temp_center_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.json'
- with open(temp_center_json_path, 'w', encoding='utf-8') as f:
- json.dump(center_result, f, ensure_ascii=False, indent=2)
- return center_result
- else:
- return {}
- # inference_type: center, face, corner_edge
- def re_inference_from_json(self, inference_type: str, center_json: dict, defect_json: dict) -> dict:
- inference_type_list = ["center", "face", "corner_edge"]
- if inference_type not in inference_type_list:
- logger.error(f"inference_type 只能为{inference_type_list}, 输入为{inference_type}")
- raise ValueError(f"inference_type 只能为{inference_type_list}, 输入为{inference_type}")
- processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
- # 对于面的图, 不计算居中相关, 这里得到的center_json 应该为 {}
- if inference_type == "face":
- area_json = processor.re_analyze_from_json(defect_json)
- face_json_result = formate_face_data(area_json)
- logger.info("面缺陷面积计算结束")
- return face_json_result
- inner_result = center_json['box_result']['inner_box']
- outer_result = center_json['box_result']['outer_box']
- if inference_type == "center":
- inner_points = inner_result['shapes'][0]['points']
- outer_points = outer_result['shapes'][0]['points']
- center_result, inner_rect_box, outer_rect_box = analyze_centering_rotated(inner_points, outer_points)
- center_result = formate_center_data(center_result,
- inner_result, outer_result,
- inner_rect_box, outer_rect_box)
- return center_result
- elif inference_type == "corner_edge":
- area_json: dict = processor.re_analyze_from_json(defect_json)
- logger.info("边角缺陷面积计算结束")
- # 根据外框区分边和角
- classifier = ClassifyEdgeCorner(settings.PIXEL_RESOLUTION, settings.CORNER_SIZE_MM)
- edge_corner_data = classifier.classify_defects_location(area_json, outer_result)
- logger.info("边角面积计算结束")
- return edge_corner_data
- else:
- return {}
- # 创建一个单例服务
- # defect_service = DefectInferenceService()
|