CardScorer.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import json
  2. from typing import List, Dict, Any, Union
  3. from app.core.logger import get_logger
  4. logger = get_logger(__name__)
  5. class CardScorer:
  6. """
  7. 它从一个JSON配置文件加载评分规则,并根据输入的卡片数据计算分数。
  8. """
  9. def __init__(self, config_path: str):
  10. try:
  11. with open(config_path, 'r', encoding='utf-8') as f:
  12. self.config = json.load(f)
  13. self.base_score = self.config.get("base_score", 10.0)
  14. except FileNotFoundError:
  15. raise ValueError(f"配置文件未找到: {config_path}")
  16. except json.JSONDecodeError:
  17. raise ValueError(f"配置文件格式错误: {config_path}")
  18. @staticmethod
  19. def _get_score_from_tiers(value: float, rules: List[Dict[str, Any]]) -> float:
  20. """
  21. 根据新的区间规则查找值对应的分数。
  22. 格式: [{"min": 0, "max": 0.5, "deduction": -1}, ...]
  23. 逻辑: min <= value < max (最后一项如果是inf,则包含)
  24. """
  25. for tier in rules:
  26. # 获取范围,处理可能的字符串
  27. min_val = tier.get("min", 0)
  28. max_val_raw = tier.get("max", "inf")
  29. deduction = tier.get("deduction", 0.0)
  30. # 处理 inf
  31. if max_val_raw == "inf":
  32. max_val = float('inf')
  33. else:
  34. max_val = float(max_val_raw)
  35. min_val = float(min_val)
  36. # 判定范围:左闭右开 [min, max)
  37. if min_val <= value < max_val:
  38. return float(deduction)
  39. # 如果数值非常大(超过了所有定义的max),或者没有匹配到
  40. if rules:
  41. return float(rules[-1].get("deduction", 0))
  42. return 0.0
  43. def calculate_defect_score(self,
  44. card_defect_type: str,
  45. card_aspect: str,
  46. defect_data: Dict,
  47. is_write_score: bool = True) -> Union[float, dict]:
  48. """
  49. 一个通用的缺陷计分函数,用于计算角、边、表面的加权分数。
  50. card_defect_type: 'corner', 'edge', 'face'
  51. card_aspect: 为 front或 back
  52. is_write_score: 是否将分数写入json并返回
  53. """
  54. if card_defect_type != "corner" and card_defect_type != "edge" and card_defect_type != "face":
  55. raise TypeError("calculate_centering_score:card_type 只能为 'corner', 'edge', 'face'")
  56. if card_aspect != "front" and card_aspect != "back":
  57. raise TypeError("calculate_defect_score:card_type 只能为 front 或 back")
  58. aspect_config = self.config[card_defect_type]
  59. total_deduction = 0.0
  60. weighted_scores = {}
  61. # 1. 计算每种缺陷类型的总扣分
  62. for defect in defect_data['defects']:
  63. if defect['defect_type'] != card_defect_type:
  64. continue
  65. if card_defect_type == 'corner' or card_defect_type == 'edge':
  66. if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain']:
  67. defect_type = "wear_area"
  68. elif defect['label'] in ['impact', 'damaged']:
  69. defect_type = "loss_area"
  70. else:
  71. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  72. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  73. else:
  74. if defect['label'] in ['wear', 'wear_and_impact', 'damaged']:
  75. defect_type = "wear_area"
  76. elif defect['label'] in ['scratch', 'scuff']:
  77. defect_type = "scratch_length"
  78. elif defect['label'] in ['pit', 'impact', 'protrudent']:
  79. defect_type = "pit_area"
  80. elif defect['label'] in ['stain']:
  81. defect_type = "stain_area"
  82. else:
  83. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  84. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  85. # 获取规则列表
  86. rules = aspect_config['rules'].get(defect_type)
  87. if not rules:
  88. logger.error(f"计算分数过程, 未找到配置规则: {defect_type}")
  89. raise KeyError(f"计算分数过程, 未找到配置规则: {defect_type}")
  90. # 对于划痕取长度, 其他取面积
  91. if defect_type == "scratch_length":
  92. area_mm = max(defect['width'], defect['height'])
  93. defect['scratch_length'] = area_mm
  94. else:
  95. area_mm = defect['actual_area']
  96. # 累加所有同类型缺陷的扣分
  97. if defect_type not in weighted_scores.keys():
  98. weighted_scores[defect_type] = 0
  99. # 计算单个缺陷扣分
  100. the_score = self._get_score_from_tiers(area_mm, rules)
  101. print(f"[{card_defect_type}, {defect_type}]: {area_mm}, {the_score}")
  102. weighted_scores[defect_type] += the_score
  103. # 将分数写入json
  104. if is_write_score:
  105. if "score" not in defect:
  106. # 新建的时候
  107. logger.info(f"新建分数score: {the_score}")
  108. defect['score'] = the_score
  109. defect["new_score"] = None
  110. elif defect.get("new_score") is None:
  111. # 初次修改
  112. # if defect["score"] != the_score:
  113. # logger.info(f"初次修改 -> new_score: {the_score} (原score: {defect['score']})")
  114. if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add":
  115. defect["new_score"] = the_score
  116. elif defect.get("edit_type") == "del":
  117. logger.info(f"del {defect_type} 补回: {the_score}")
  118. defect["new_score"] = 0
  119. weighted_scores[defect_type] -= the_score
  120. elif "score" in defect and defect["new_score"] is not None:
  121. # 多次修改
  122. # if defect["new_score"] != the_score:
  123. # defect["score"] = defect["new_score"]
  124. if defect.get("edit_type") == "edit":
  125. defect["new_score"] = the_score
  126. elif defect.get("edit_type") == "del":
  127. logger.info(f"del {defect_type} 补回: {the_score}")
  128. defect["new_score"] = 0
  129. weighted_scores[defect_type] -= the_score
  130. else:
  131. defect['score'] = the_score
  132. defect["new_score"] = None
  133. # 2. 根据权重/系数计算最终扣分
  134. weights = aspect_config.get(f"{card_aspect}_weights") or aspect_config.get("coefficients")
  135. if not weights:
  136. raise ValueError(f"在配置中未找到 '{card_defect_type}' 的权重/系数")
  137. print(weighted_scores)
  138. for defect_type, score in weighted_scores.items():
  139. total_deduction += score * weights.get(defect_type, 1.0)
  140. final_weights = aspect_config["final_weights"][card_aspect]
  141. final_score = total_deduction * final_weights
  142. logger.info(f"final weights: {final_weights}, final score: {final_score}")
  143. if is_write_score:
  144. defect_data[f"{card_aspect}_{card_defect_type}_deduct_score"] = final_score
  145. return defect_data
  146. else:
  147. return final_score
  148. def calculate_centering_score(self,
  149. card_aspect: str,
  150. center_data: dict,
  151. is_write_score: bool = False) -> Union[float, dict]:
  152. """
  153. 计算居中度分数。
  154. card_type 为 front或 back
  155. is_write_score: 是否将分数写入json并返回
  156. """
  157. if card_aspect != "front" and card_aspect != "back":
  158. raise TypeError("calculate_centering_score:card_type 只能为 front 或 back")
  159. centering_config = self.config['centering'][card_aspect]
  160. rules = centering_config['rules']
  161. coefficients = centering_config['coefficients']
  162. center_left = center_data['box_result']['center_inference']['center_left']
  163. center_right = center_data['box_result']['center_inference']['center_right']
  164. center_top = center_data['box_result']['center_inference']['center_top']
  165. center_bottom = center_data['box_result']['center_inference']['center_bottom']
  166. # 将比例转换为用于查找规则的单个最大值
  167. h_lookup_val = max(center_left, center_right)
  168. v_lookup_val = max(center_top, center_bottom)
  169. h_deduction = self._get_score_from_tiers(h_lookup_val, rules) * coefficients['horizontal']
  170. v_deduction = self._get_score_from_tiers(v_lookup_val, rules) * coefficients['vertical']
  171. print(h_deduction, v_deduction)
  172. final_weight = self.config['centering']["final_weights"][card_aspect]
  173. final_score = (h_deduction + v_deduction) * final_weight
  174. logger.info(f"final weight: {final_weight}, final score: {final_score}")
  175. if is_write_score:
  176. center_data['deduct_score'] = final_score
  177. return center_data
  178. else:
  179. return final_score
  180. def formate_one_card_result(self, center_result: dict,
  181. defect_result: dict,
  182. card_defect_type: str,
  183. card_aspect: str,
  184. imageHeight: int,
  185. imageWidth: int):
  186. try:
  187. # 获取计算总分的权重
  188. card_config = self.config['card']['PSA']
  189. # 计算各部分的最后分数
  190. # 计算居中
  191. final_center_score = None
  192. if card_defect_type == "corner_edge":
  193. center_score = center_result['deduct_score']
  194. center_weight = card_config['center']
  195. final_center_score = center_score * center_weight
  196. corner_score = defect_result[f"{card_aspect}_corner_deduct_score"]
  197. edge_score = defect_result[f"{card_aspect}_edge_deduct_score"]
  198. corner_weight = card_config['corner']
  199. edge_weight = card_config['edge']
  200. final_defect_score = corner_score * corner_weight + edge_score * edge_weight
  201. _used_compute_deduct_score = final_center_score + final_defect_score
  202. card_score = self.base_score + final_center_score + final_defect_score
  203. else:
  204. face_score = defect_result[f"{card_aspect}_face_deduct_score"]
  205. face_weight = card_config['face']
  206. final_defect_score = face_score * face_weight
  207. _used_compute_deduct_score = final_defect_score
  208. card_score = self.base_score + final_defect_score
  209. except Exception as e:
  210. logger.error(f"formate_one_card_result 从json获取分数失败: {e}")
  211. raise e
  212. data = {
  213. "result": {
  214. "center_result": center_result,
  215. "defect_result": defect_result,
  216. "imageHeight": imageHeight,
  217. "imageWidth": imageWidth,
  218. "card_center_deduct_score": final_center_score,
  219. "card_defect_deduct_score": final_defect_score,
  220. "_used_compute_deduct_score": _used_compute_deduct_score,
  221. "card_score": card_score
  222. }
  223. }
  224. return data
  225. if __name__ == '__main__':
  226. # 1. 初始化评分器,加载规则
  227. scorer = CardScorer(r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\app\core\scoring_config.json")
  228. # rulers = scorer.config['corners']['rules']['wear_area']
  229. #
  230. # score = scorer._get_score_from_tiers(0.06, rulers)
  231. # print(score)
  232. # print()
  233. # 居中分数
  234. center_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_card_center-center_result.json"
  235. with open(center_data_path, 'r', encoding='utf-8') as f:
  236. center_data = json.load(f)
  237. center_data = scorer.calculate_centering_score("front", center_data, True)
  238. print(center_data)
  239. # 边角分数
  240. edge_corner_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-corner_result.json"
  241. with open(edge_corner_data_path, 'r', encoding='utf-8') as f:
  242. edge_corner_data = json.load(f)
  243. corner_data = scorer.calculate_defect_score("corner", 'front', edge_corner_data, True)
  244. print(corner_data)
  245. score = scorer.calculate_defect_score("edge", 'front', edge_corner_data, True)
  246. print(score)
  247. # 面分数
  248. face_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_face_no_reflect_defect-face_result.json"
  249. with open(face_data_path, 'r', encoding='utf-8') as f:
  250. face_data = json.load(f)
  251. score = scorer.calculate_defect_score("face", 'front', face_data, True)
  252. print(score)