defect_service.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import cv2
  2. import numpy as np
  3. from ..core.model_loader import get_predictor
  4. from app.utils.defect_inference.CardDefectAggregator import CardDefectAggregator
  5. from app.utils.defect_inference.arean_anylize_draw import DefectProcessor, DrawingParams
  6. from app.utils.defect_inference.AnalyzeCenter import analyze_centering_rotated
  7. from app.utils.defect_inference.DrawCenterInfo import draw_boxes_and_center_info
  8. from app.utils.defect_inference.ClassifyEdgeCorner import ClassifyEdgeCorner
  9. from app.utils.json_data_formate import formate_center_data, formate_face_data
  10. from app.core.config import settings
  11. from app.core.logger import get_logger
  12. import json
  13. logger = get_logger(__name__)
  14. class DefectInferenceService:
  15. def defect_inference(self, inference_type: str , image_bytes: bytes,
  16. is_draw_image=False) -> dict:
  17. """
  18. 执行卡片识别推理。
  19. Args:
  20. inference_type: 模型类型 (e.g., 'outer_box').
  21. image_bytes: 从API请求中获得的原始图像字节。
  22. Returns:
  23. 一个包含推理结果的字典。
  24. """
  25. # 2. 将字节流解码为OpenCV图像
  26. # 将字节数据转换为numpy数组
  27. np_arr = np.frombuffer(image_bytes, np.uint8)
  28. # 从numpy数组中解码图像
  29. img_bgr = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
  30. if img_bgr is None:
  31. logger.error("无法解码图像,请确保上传的是有效的图片格式 (JPG, PNG, etc.)")
  32. return {}
  33. # 面
  34. if (inference_type == "pokemon_front_face_no_reflect_defect"
  35. or inference_type == "pokemon_front_face_reflect_defect"
  36. or inference_type == "pokemon_back_face_defect"):
  37. # 1. 获取对应的预测器实例
  38. predictor = get_predictor(inference_type)
  39. # 3. 调用我们新加的 predict_from_image 方法进行推理
  40. # result = predictor.predict_from_image(img_bgr)
  41. # 3. 实例化我们聚合器,传入预测器
  42. aggregator = CardDefectAggregator(
  43. predictor=predictor,
  44. tile_size=512,
  45. overlap_ratio=0.1, # 10% 重叠
  46. )
  47. json_data = aggregator.process_image(
  48. image=img_bgr,
  49. mode='face'
  50. )
  51. # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json'
  52. # with open(merge_json_path, 'w', encoding='utf-8') as f:
  53. # json.dump(json_data, f, ensure_ascii=False, indent=4)
  54. # logger.info(f"合并结束")
  55. processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
  56. area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.json'
  57. if is_draw_image:
  58. drawing_params_with_rect = DrawingParams(draw_min_rect=True)
  59. drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data,
  60. drawing_params_with_rect)
  61. temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-face_result.jpg'
  62. cv2.imwrite(temp_img_path, drawn_image)
  63. else:
  64. area_json = processor.analyze_from_json(json_data)
  65. face_json_result = formate_face_data(area_json)
  66. with open(area_json_path, 'w', encoding='utf-8') as f:
  67. json.dump(face_json_result, f, ensure_ascii=False, indent=2)
  68. logger.info("面的面积计算结束")
  69. return face_json_result
  70. # 边角
  71. elif (inference_type == "pokemon_front_corner_no_reflect_defect"
  72. or inference_type == "pokemon_front_corner_reflect_defect"
  73. or inference_type == "pokemon_back_corner_defect"):
  74. predictor = get_predictor(inference_type)
  75. aggregator = CardDefectAggregator(
  76. predictor=predictor,
  77. tile_size=512,
  78. overlap_ratio=0.1, # 10% 重叠
  79. )
  80. json_data = aggregator.process_image(
  81. image=img_bgr,
  82. mode='edge'
  83. )
  84. # merge_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-merge.json'
  85. # with open(merge_json_path, 'w', encoding='utf-8') as f:
  86. # json.dump(json_data, f, ensure_ascii=False, indent=4)
  87. # logger.info(f"合并结束")
  88. processor = DefectProcessor(pixel_resolution=settings.PIXEL_RESOLUTION)
  89. area_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.json'
  90. if is_draw_image:
  91. drawing_params_with_rect = DrawingParams(draw_min_rect=True)
  92. drawn_image, area_json = processor.analyze_and_draw(img_bgr, json_data,
  93. drawing_params_with_rect)
  94. temp_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-corner_result.jpg'
  95. cv2.imwrite(temp_img_path, drawn_image)
  96. else:
  97. area_json: dict = processor.analyze_from_json(json_data)
  98. logger.info("边角缺陷面积计算结束")
  99. # 推理外框
  100. predictor_outer = get_predictor("outer_box")
  101. outer_result = predictor_outer.predict_from_image(img_bgr)
  102. classifier = ClassifyEdgeCorner(settings.PIXEL_RESOLUTION, settings.CORNER_SIZE_MM)
  103. edge_corner_data = classifier.classify_defects_location(area_json, outer_result)
  104. with open(area_json_path, 'w', encoding='utf-8') as f:
  105. json.dump(edge_corner_data, f, ensure_ascii=False, indent=2)
  106. logger.info("边角面积计算结束")
  107. return edge_corner_data
  108. elif inference_type == "pokemon_front_card_center" \
  109. or inference_type == "pokemon_back_card_center":
  110. predictor_inner = get_predictor(settings.DEFECT_TYPE[inference_type]['inner_box'])
  111. predictor_outer = get_predictor(settings.DEFECT_TYPE[inference_type]['outer_box'])
  112. inner_result = predictor_inner.predict_from_image(img_bgr)
  113. outer_result = predictor_outer.predict_from_image(img_bgr)
  114. # temp_inner_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-inner_result.json'
  115. # temp_outer_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-outer_result.json'
  116. # with open(temp_inner_json_path, 'w', encoding='utf-8') as f:
  117. # json.dump(inner_result, f, ensure_ascii=False, indent=4)
  118. # with open(temp_outer_json_path, 'w', encoding='utf-8') as f:
  119. # json.dump(outer_result, f, ensure_ascii=False, indent=4)
  120. inner_points = inner_result['shapes'][0]['points']
  121. outer_points = outer_result['shapes'][0]['points']
  122. center_result = analyze_centering_rotated(inner_points, outer_points)
  123. logger.info("格式化居中数据")
  124. center_result = formate_center_data(center_result, inner_result, outer_result)
  125. draw_img = draw_boxes_and_center_info(img_bgr, center_result)
  126. temp_center_img_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.jpg'
  127. cv2.imwrite(temp_center_img_path, draw_img)
  128. temp_center_json_path = settings.TEMP_WORK_DIR / f'{inference_type}-center_result.json'
  129. with open(temp_center_json_path, 'w', encoding='utf-8') as f:
  130. json.dump(center_result, f, ensure_ascii=False, indent=2)
  131. return center_result
  132. else:
  133. return {}
  134. # 创建一个单例服务
  135. # defect_service = DefectInferenceService()