score_service.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import cv2
  2. from app.core.config import settings
  3. from app.core.logger import get_logger
  4. from app.services.defect_service import DefectInferenceService
  5. from app.services.card_rectify_and_center import CardRectifyAndCenter
  6. from app.utils.score_inference.CardScorer import CardScorer
  7. import numpy as np
  8. import json
  9. logger = get_logger(__name__)
  10. class ScoreService:
  11. def __init__(self):
  12. self.scoring_config_path = settings.SCORE_CONFIG_PATH
  13. self.card_scorer = CardScorer(config_path=self.scoring_config_path)
  14. self.defect_service = DefectInferenceService()
  15. self.rectify_center_service = CardRectifyAndCenter()
  16. def score_inference(self, score_type: str, is_reflect_card: bool,
  17. img_bgr: np.ndarray) -> dict:
  18. # 解包返回值,获取转正后的外框数据
  19. img_bgr, transformed_outer_json = self.rectify_center_service.rectify_and_center(img_bgr)
  20. if img_bgr is None:
  21. raise ValueError("图像转正处理失败")
  22. imageHeight, imageWidth = img_bgr.shape[:2]
  23. logger.info("开始进行卡片分数推理, 使用变换后的外框")
  24. # 定义通用参数,传入 pre_calculated_outer_result
  25. defect_kwargs = {
  26. "img_bgr": img_bgr.copy(),
  27. "pre_calculated_outer_result": transformed_outer_json
  28. }
  29. if score_type == 'front_ring' or score_type == 'front_coaxial':
  30. center_data = self.defect_service.defect_inference("pokemon_front_card_center", **defect_kwargs)
  31. else:
  32. center_data = self.defect_service.defect_inference("pokemon_back_card_center", **defect_kwargs)
  33. if is_reflect_card:
  34. if score_type == 'front_ring':
  35. defect_data = self.defect_service.defect_inference('pokemon_front_face_reflect_ring_light_defect',
  36. **defect_kwargs)
  37. elif score_type == 'front_coaxial':
  38. defect_data = self.defect_service.defect_inference('pokemon_front_face_reflect_coaxial_light_defect',
  39. **defect_kwargs)
  40. elif score_type == 'back_ring':
  41. defect_data = self.defect_service.defect_inference('pokemon_back_face_ring_light_defect',
  42. **defect_kwargs)
  43. elif score_type == 'back_coaxial':
  44. defect_data = self.defect_service.defect_inference('pokemon_back_face_coaxial_light_defect',
  45. **defect_kwargs)
  46. else:
  47. return {}
  48. else:
  49. if score_type == 'front_ring':
  50. defect_data = self.defect_service.defect_inference('pokemon_front_face_no_reflect_ring_light_defect',
  51. **defect_kwargs)
  52. elif score_type == 'front_coaxial':
  53. defect_data = self.defect_service.defect_inference('pokemon_front_face_no_reflect_coaxial_light_defect',
  54. **defect_kwargs)
  55. elif score_type == 'back_ring':
  56. defect_data = self.defect_service.defect_inference('pokemon_back_face_ring_light_defect',
  57. **defect_kwargs)
  58. elif score_type == 'back_coaxial':
  59. defect_data = self.defect_service.defect_inference('pokemon_back_face_coaxial_light_defect',
  60. **defect_kwargs)
  61. else:
  62. return {}
  63. logger.info("模型推理结束, 开始计算分数")
  64. if score_type == 'front_ring' or score_type == 'front_coaxial':
  65. card_aspect = "front"
  66. else:
  67. card_aspect = "back"
  68. if score_type == 'front_ring' or score_type == 'back_ring':
  69. card_light_type = "ring"
  70. else:
  71. card_light_type = "coaxial"
  72. if card_light_type == 'ring':
  73. center_score_data = self.card_scorer.calculate_centering_score(card_aspect, center_data, True)
  74. # 计算角, 边, 面的分数, 会把分数写入json, 然后传入刚写好的, 继续写边的分数
  75. defect_data = self.card_scorer.calculate_defect_score('corner', card_aspect, card_light_type,
  76. defect_data, True)
  77. defect_data = self.card_scorer.calculate_defect_score('edge', card_aspect, card_light_type,
  78. defect_data, True)
  79. defect_score_data = self.card_scorer.calculate_defect_score('face', card_aspect, card_light_type,
  80. defect_data, True)
  81. elif card_light_type == 'coaxial':
  82. # 居中
  83. center_score_data = {}
  84. # 同轴光照片只计算面缺陷
  85. defect_score_data = self.card_scorer.calculate_defect_score('face', card_aspect, card_light_type,
  86. defect_data, True)
  87. else:
  88. return {}
  89. result_json = self.card_scorer.formate_one_card_result(center_score_data, defect_score_data,
  90. card_light_type=card_light_type,
  91. card_aspect=card_aspect,
  92. imageHeight=imageHeight,
  93. imageWidth=imageWidth)
  94. temp_score_json_path = settings.TEMP_WORK_DIR / f'{score_type}_score.json'
  95. with open(temp_score_json_path, 'w', encoding='utf-8') as f:
  96. json.dump(result_json, f, ensure_ascii=False, indent=2)
  97. logger.info("分数推理完成 ")
  98. return result_json
  99. @staticmethod
  100. def _has_valid_center_box(center_json: dict) -> bool:
  101. """判断 center_result 是否包含可用于居中重算的内外框 shapes。
  102. StitchFusion(同轴光)输出的 center_result 里 inner_box/outer_box 的 shapes 为空,
  103. 无法做居中重算; 此时应降级为只算面缺陷, 避免索引空列表报错。
  104. """
  105. if not center_json:
  106. return False
  107. box_result = center_json.get('box_result') or {}
  108. inner_shapes = (box_result.get('inner_box') or {}).get('shapes') or []
  109. outer_shapes = (box_result.get('outer_box') or {}).get('shapes') or []
  110. return len(inner_shapes) > 0 and len(outer_shapes) > 0
  111. def recalculate_defect_score(self, score_type: str, json_data: dict):
  112. center_json_data = json_data["result"]['center_result']
  113. defect_json_data = json_data["result"]['defect_result']
  114. imageHeight = json_data["result"].get('imageHeight', 0)
  115. imageWidth = json_data["result"].get('imageWidth', 0)
  116. # 正反面类型分类
  117. if score_type == 'front_ring' or score_type == 'front_coaxial':
  118. card_aspect = "front"
  119. else:
  120. card_aspect = "back"
  121. if score_type == 'front_ring' or score_type == 'back_ring':
  122. card_light_type = "ring"
  123. else:
  124. card_light_type = "coaxial"
  125. # 环光重算依赖 center_result 的内外框; 若缺失(如同轴光/StitchFusion 数据),
  126. # 自动降级为同轴光面缺陷重算, 避免 list index out of range。
  127. if card_light_type == 'ring' and not self._has_valid_center_box(center_json_data):
  128. logger.warning("score_type 为环光但 center_result 无有效内外框, 自动降级为同轴光(coaxial)面缺陷重算")
  129. card_light_type = "coaxial"
  130. logger.info("开始进行缺陷信息重计算")
  131. defect_data = self.defect_service.re_inference_from_json(card_light_type=card_light_type,
  132. center_json=center_json_data,
  133. defect_json=defect_json_data)
  134. logger.info("开始重新计算分数")
  135. if card_light_type == 'ring':
  136. logger.info("开始进行居中信息重计算")
  137. center_data = self.defect_service.re_inference_from_json(card_light_type="center",
  138. center_json=center_json_data,
  139. defect_json=defect_json_data)
  140. center_score_data = self.card_scorer.calculate_centering_score(card_aspect, center_data, True)
  141. # 计算角, 边, 面的分数, 会把分数写入json, 然后传入刚写好的, 继续写边的分数
  142. logger.info("开始重新计算:边角面")
  143. defect_data = self.card_scorer.calculate_defect_score('corner', card_aspect, card_light_type,
  144. defect_data, True)
  145. defect_data = self.card_scorer.calculate_defect_score('edge', card_aspect, card_light_type,
  146. defect_data, True)
  147. defect_score_data = self.card_scorer.calculate_defect_score('face', card_aspect, card_light_type,
  148. defect_data, True)
  149. elif card_light_type == 'coaxial':
  150. # 居中
  151. center_score_data = {}
  152. # 同轴光照片只计算面缺陷
  153. logger.info("开始重新计算:面")
  154. defect_score_data = self.card_scorer.calculate_defect_score('face', card_aspect, card_light_type,
  155. defect_data, True)
  156. else:
  157. return {}
  158. result_json = self.card_scorer.formate_one_card_result(center_score_data, defect_score_data,
  159. card_light_type=card_light_type,
  160. card_aspect=card_aspect,
  161. imageHeight=imageHeight,
  162. imageWidth=imageWidth)
  163. temp_score_json_path = settings.TEMP_WORK_DIR / f're_{score_type}_score.json'
  164. with open(temp_score_json_path, 'w', encoding='utf-8') as f:
  165. json.dump(result_json, f, ensure_ascii=False, indent=2)
  166. logger.info("分数推理完成 ")
  167. return result_json