defect_service.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import cv2
  2. import numpy as np
  3. from ..core.model_loader import get_predictor
  4. from app.utils.defect_inference.CardDefectAggregator import CardDefectAggregator
  5. from app.utils.defect_inference.arean_anylize_draw import DefectProcessor, DrawingParams
  6. from app.utils.defect_inference.AnalyzeCenter import (
  7. analyze_centering_rotated, analyze_centering_rect, formate_center_data)
  8. from app.utils.defect_inference.DrawCenterInfo import draw_boxes_and_center_info
  9. from app.utils.defect_inference.ClassifyEdgeCorner import ClassifyEdgeCorner
  10. from app.utils.json_data_formate import formate_face_data, formate_add_edit_type
  11. from app.utils.defect_inference.FilterOutsideDefects import FilterOutsideDefects
  12. from app.utils.simplify_points import SimplifyPoints
  13. from app.core.config import settings
  14. from app.core.logger import get_logger
  15. import json
  16. logger = get_logger(__name__)
  17. class DefectInferenceService:
  18. def _filter_max_prob_shape(self, json_data: dict, tag: str = "unknown") -> dict:
  19. """
  20. 通用过滤函数:只保留 shapes 中 probability 最大的那一个。
  21. """
  22. if not json_data or 'shapes' not in json_data or not json_data['shapes']:
  23. return json_data
  24. shapes = json_data['shapes']
  25. if len(shapes) <= 1:
  26. return json_data
  27. # 按 probability 降序排列,取第一个
  28. best_shape = max(shapes, key=lambda x: x.get('probability', 0))
  29. logger.info(f"[{tag}] 过滤多余框: 原有 {len(shapes)} 个, 保留最大置信度 {best_shape.get('probability'):.4f}")
  30. json_data['shapes'] = [best_shape]
  31. return json_data
  32. def defect_inference(self, inference_type: str, img_bgr: np.ndarray,
  33. is_draw_image=True,
  34. pre_calculated_outer_result: dict = None) -> dict:
  35. """
  36. 执行卡片识别推理。
  37. Args:
  38. inference_type: 模型类型 (e.g., 'outer_box').
  39. img_bgr: 图像。
  40. pre_calculated_outer_result: 如果提供了转正后的外框数据,则不再重新推理外框
  41. Returns:
  42. 一个包含推理结果的字典。
  43. """
  44. simplifyPoints = SimplifyPoints()
  45. outside_filter = FilterOutsideDefects(expansion_pixel=30)
  46. # 辅助函数:获取外框结果(优先使用传入的,否则现场推理)
  47. def get_outer_result(tag=""):
  48. if pre_calculated_outer_result:
  49. logger.info(f"[{tag}] 使用预计算的转正外框数据")
  50. return pre_calculated_outer_result
  51. else:
  52. logger.info(f"[{tag}] 未提供外框数据,开始推理外框...")
  53. p_outer = get_predictor("outer_box")
  54. res = p_outer.predict_from_image(img_bgr)
  55. # 如果是现场推理的,需要过滤一下
  56. if inference_type in ["pokemon_front_card_center", "pokemon_back_card_center"]:
  57. res = self._filter_max_prob_shape(res, tag=f"{tag}-过滤")
  58. return res
  59. # 面
  60. if (inference_type == "pokemon_back_face_coaxial_light_defect"
  61. or inference_type == "pokemon_front_face_reflect_coaxial_light_defect"
  62. or inference_type == "pokemon_front_face_no_reflect_coaxial_light_defect"):
  63. # 1. 获取对应的预测器实例
  64. predictor = get_predictor(inference_type)
  65. # 3. 调用我们新加的 predict_from_image 方法进行推理
  66. # result = predictor.predict_from_image(img_bgr)
  67. # 3. 实例化我们聚合器,传入预测器
  68. aggregator = CardDefectAggregator(
  69. predictor=predictor,
  70. tile_size=512,
  71. overlap_ratio=0.1, # 10% 重叠
  72. )
  73. json_data = aggregator.process_image(
  74. image=img_bgr,
  75. mode='face'
  76. )
  77. # 简化点数
  78. logger.info("开始简化点数")
  79. for shapes in json_data["shapes"]:
  80. shapes["points"] = simplifyPoints.simplify_points(shapes["points"])
  81. # num1 = len(points)
  82. # logger.info(f"num: {num1}, new_num1: {new_num1}")
  83. logger.info("开始执行外框过滤...")
  84. try:
  85. # 获取外框
  86. outer_result = get_outer_result(tag="面流程")
  87. # 2. 执行过滤
  88. json_data = outside_filter.execute(json_data, outer_result)
  89. except Exception as e:
  90. logger.error(f"面流程外框过滤失败: {e}")
  91. merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json'
  92. with open(merge_json_path, 'w', encoding='utf-8') as f:
  93. json.dump(json_data, f, ensure_ascii=False, indent=4)
  94. logger.info(f"合并结束")
  95. processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
  96. area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.json'
  97. if is_draw_image:
  98. drawing_params_with_rect = DrawingParams(draw_min_rect=True)
  99. drawn_image, area_json = processor.analyze_and_draw(img_bgr.copy(), json_data,
  100. drawing_params_with_rect)
  101. temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.jpg'
  102. cv2.imwrite(temp_img_path, drawn_image)
  103. else:
  104. area_json = processor.analyze_from_json(json_data)
  105. face_json_result = formate_face_data(area_json)
  106. face_json_result = formate_add_edit_type(face_json_result)
  107. with open(area_json_path, 'w', encoding='utf-8') as f:
  108. json.dump(face_json_result, f, ensure_ascii=False, indent=2)
  109. logger.info("面的面积计算结束")
  110. return face_json_result
  111. # 边角
  112. elif (inference_type == "pokemon_back_face_ring_light_defect"
  113. or inference_type == "pokemon_front_face_reflect_ring_light_defect"
  114. or inference_type == "pokemon_front_face_no_reflect_ring_light_defect"):
  115. predictor = get_predictor(inference_type)
  116. aggregator = CardDefectAggregator(
  117. predictor=predictor,
  118. tile_size=512,
  119. overlap_ratio=0.1, # 10% 重叠
  120. )
  121. json_data = aggregator.process_image(
  122. image=img_bgr,
  123. mode='face'
  124. )
  125. # 简化点数
  126. logger.info("开始进行点数简化")
  127. for shapes in json_data["shapes"]:
  128. shapes["points"] = simplifyPoints.simplify_points(shapes["points"])
  129. # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json'
  130. # with open(merge_json_path, 'w', encoding='utf-8') as f:
  131. # json.dump(json_data, f, ensure_ascii=False, indent=4)
  132. # logger.info(f"合并结束")
  133. logger.info("开始执行外框过滤...")
  134. # 外框推理
  135. outer_result = get_outer_result(tag="Corner流程")
  136. # 过滤外框,只留一个
  137. if not pre_calculated_outer_result:
  138. outer_result = self._filter_max_prob_shape(outer_result, tag="Corner流程-外框")
  139. # 2. 执行过滤, 去掉外框外的缺陷
  140. json_data = outside_filter.execute(json_data, outer_result)
  141. processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
  142. area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.json'
  143. if is_draw_image:
  144. drawing_params_with_rect = DrawingParams(draw_min_rect=True)
  145. drawn_image, area_json = processor.analyze_and_draw(img_bgr.copy(), json_data,
  146. drawing_params_with_rect)
  147. temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.jpg'
  148. cv2.imwrite(temp_img_path, drawn_image)
  149. else:
  150. area_json: dict = processor.analyze_from_json(json_data)
  151. logger.info("边角缺陷面积计算结束")
  152. classifier = ClassifyEdgeCorner(settings.PIXEL_RESOLUTION,
  153. settings.CORNER_SIZE_MM,
  154. settings.EDGE_SIZE_MM)
  155. edge_corner_data = classifier.classify_defects_location(area_json, outer_result)
  156. edge_corner_data = formate_add_edit_type(edge_corner_data)
  157. with open(area_json_path, 'w', encoding='utf-8') as f:
  158. json.dump(edge_corner_data, f, ensure_ascii=False, indent=2)
  159. logger.info("边角面积计算结束")
  160. return edge_corner_data
  161. # 居中
  162. elif inference_type == "pokemon_front_card_center" \
  163. or inference_type == "pokemon_back_card_center":
  164. predictor_inner = get_predictor(settings.DEFECT_TYPE[inference_type]['inner_box'])
  165. # predictor_outer = get_predictor(settings.DEFECT_TYPE[inference_type]['outer_box'])
  166. inner_result = predictor_inner.predict_from_image(img_bgr)
  167. outer_result = get_outer_result(tag="Center流程")
  168. # 过滤内框和外框,只留最大置信度的框
  169. inner_result = self._filter_max_prob_shape(inner_result, tag="Center流程-内框")
  170. # 如果是传入的预计算结果,它已经过滤过了;如果是现算的,需要过滤
  171. if not pre_calculated_outer_result:
  172. outer_result = self._filter_max_prob_shape(outer_result, tag="Center流程-外框")
  173. inner_points = inner_result['shapes'][0]['points']
  174. outer_points = outer_result['shapes'][0]['points']
  175. center_result, inner_rect_box, outer_rect_box = analyze_centering_rotated(inner_points, outer_points)
  176. # logger.info(f"inner_rect_box: {type(inner_rect_box)}, {inner_rect_box}")
  177. # logger.info(f"outer_rect_box: , {outer_rect_box}")
  178. logger.info("格式化居中数据")
  179. center_result = formate_center_data(center_result,
  180. inner_result, outer_result,
  181. inner_rect_box, outer_rect_box)
  182. draw_img = draw_boxes_and_center_info(img_bgr.copy(), center_result)
  183. temp_center_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.jpg'
  184. cv2.imwrite(temp_center_img_path, draw_img)
  185. temp_center_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.json'
  186. with open(temp_center_json_path, 'w', encoding='utf-8') as f:
  187. json.dump(center_result, f, ensure_ascii=False, indent=2)
  188. return center_result
  189. else:
  190. return {}
  191. # inference_type: center, face, corner_edge
  192. def re_inference_from_json(self, card_light_type: str, center_json: dict, defect_json: dict) -> dict:
  193. light_type_list = ["center", "coaxial", "ring"]
  194. if card_light_type not in light_type_list:
  195. logger.error(f"inference_type 只能为{light_type_list}, 输入为{card_light_type}")
  196. raise ValueError(f"inference_type 只能为{light_type_list}, 输入为{card_light_type}")
  197. processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
  198. # 对于面的图, 不计算居中相关, 这里得到的center_json 应该为 {}
  199. if card_light_type == "coaxial":
  200. area_json = processor.re_analyze_from_json(defect_json)
  201. face_json_result = formate_face_data(area_json)
  202. logger.info("面缺陷面积计算结束")
  203. return face_json_result
  204. inner_result = center_json['box_result']['inner_box']
  205. outer_result = center_json['box_result']['outer_box']
  206. if card_light_type == "center":
  207. logger.info("居中重计算")
  208. inner_rect = inner_result['shapes'][0]['rect_box']
  209. outer_rect = outer_result['shapes'][0]['rect_box']
  210. center_result, inner_rect_box, outer_rect_box = analyze_centering_rect(inner_rect, outer_rect)
  211. center_result = formate_center_data(center_result,
  212. inner_result, outer_result,
  213. inner_rect_box, outer_rect_box)
  214. return center_result
  215. elif card_light_type == "ring":
  216. area_json: dict = processor.re_analyze_from_json(defect_json)
  217. logger.info("边角缺陷面积计算结束")
  218. # 根据外框区分边和角
  219. classifier = ClassifyEdgeCorner(settings.PIXEL_RESOLUTION,
  220. settings.CORNER_SIZE_MM,
  221. settings.EDGE_SIZE_MM)
  222. edge_corner_data = classifier.classify_defects_location(area_json, outer_result)
  223. logger.info("边角面积计算结束")
  224. return edge_corner_data
  225. else:
  226. return {}
  227. # 创建一个单例服务
  228. # defect_service = DefectInferenceService()