CardScorer.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. import json
  2. from typing import List, Dict, Any, Union
  3. from app.core.logger import get_logger
  4. logger = get_logger(__name__)
  5. class CardScorer:
  6. """
  7. 它从一个JSON配置文件加载评分规则,并根据输入的卡片数据计算分数。
  8. """
  9. def __init__(self, config_path: str):
  10. try:
  11. with open(config_path, 'r', encoding='utf-8') as f:
  12. self.config = json.load(f)
  13. self.base_score = self.config.get("base_score", 10.0)
  14. except FileNotFoundError:
  15. raise ValueError(f"配置文件未找到: {config_path}")
  16. except json.JSONDecodeError:
  17. raise ValueError(f"配置文件格式错误: {config_path}")
  18. @staticmethod
  19. def _get_score_from_tiers(value: float, rules: List[Dict[str, Any]]) -> float:
  20. """
  21. 根据新的区间规则查找值对应的分数。
  22. 格式: [{"min": 0, "max": 0.5, "deduction": -1}, ...]
  23. 逻辑: min <= value < max (最后一项如果是inf,则包含)
  24. """
  25. for tier in rules:
  26. # 获取范围,处理可能的字符串
  27. min_val = tier.get("min", 0)
  28. max_val_raw = tier.get("max", "inf")
  29. deduction = tier.get("deduction", 0.0)
  30. # 处理 inf
  31. if max_val_raw == "inf":
  32. max_val = float('inf')
  33. else:
  34. max_val = float(max_val_raw)
  35. min_val = float(min_val)
  36. # 判定范围:左闭右开 [min, max)
  37. if min_val <= value < max_val:
  38. return float(deduction)
  39. # 如果数值非常大(超过了所有定义的max),或者没有匹配到
  40. if rules:
  41. return float(rules[-1].get("deduction", 0))
  42. return 0.0
  43. def calculate_defect_score(self,
  44. card_defect_type: str,
  45. card_aspect: str,
  46. card_light_type: str,
  47. defect_data: Dict,
  48. is_write_score: bool = True) -> Union[float, dict]:
  49. """
  50. 一个通用的缺陷计分函数,用于计算角、边、表面的加权分数。
  51. card_defect_type: 'corner', 'edge', 'face'
  52. card_aspect: 为 front或 back
  53. is_write_score: 是否将分数写入json并返回
  54. """
  55. if card_defect_type != "corner" and card_defect_type != "edge" and card_defect_type != "face":
  56. raise TypeError("calculate_centering_score:card_type 只能为 'corner', 'edge', 'face'")
  57. if card_aspect != "front" and card_aspect != "back":
  58. raise TypeError("calculate_defect_score:card_type 只能为 front 或 back")
  59. if card_light_type != "ring" and card_light_type != "coaxial":
  60. raise TypeError("calculate_defect_score:card_type 只能为 ring 或 coaxial")
  61. aspect_config = self.config[card_defect_type]
  62. total_deduction = 0.0
  63. weighted_scores = {}
  64. # 1. 计算每种缺陷类型的总扣分
  65. for defect in defect_data['defects']:
  66. if defect['defect_type'] != card_defect_type:
  67. continue
  68. if card_defect_type == 'corner' or card_defect_type == 'edge':
  69. if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain']:
  70. defect_type = "wear_area"
  71. elif defect['label'] in ['impact', 'damaged']:
  72. defect_type = "loss_area"
  73. elif defect['label'] in ['scratch', 'scuff']:
  74. defect_type = "scratch_length"
  75. elif defect['label'] in ['pit', 'protrudent']:
  76. defect_type = "pit_area"
  77. elif defect['label'] in ['stain']:
  78. defect_type = "stain_area"
  79. else:
  80. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  81. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  82. else:
  83. if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain', 'damaged']:
  84. defect_type = "wear_area"
  85. elif defect['label'] in ['scratch', 'scuff']:
  86. defect_type = "scratch_length"
  87. elif defect['label'] in ['pit', 'impact', 'protrudent']:
  88. defect_type = "pit_area"
  89. elif defect['label'] in ['stain']:
  90. defect_type = "stain_area"
  91. else:
  92. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  93. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  94. # 获取规则列表
  95. rules = aspect_config['rules'].get(defect_type)
  96. if not rules:
  97. logger.error(f"计算分数过程, 未找到配置规则: {defect_type}")
  98. raise KeyError(f"计算分数过程, 未找到配置规则: {defect_type}")
  99. # 对于划痕取长度, 其他取面积
  100. if defect_type == "scratch_length":
  101. area_mm = max(defect['width'], defect['height'])
  102. defect['scratch_length'] = area_mm
  103. else:
  104. area_mm = defect['actual_area']
  105. # 累加所有同类型缺陷的扣分
  106. if defect_type not in weighted_scores.keys():
  107. weighted_scores[defect_type] = 0
  108. # 计算单个缺陷扣分
  109. the_score = self._get_score_from_tiers(area_mm, rules)
  110. print(f"[{card_defect_type}, {defect_type}]: {area_mm}, {the_score}")
  111. weighted_scores[defect_type] += the_score
  112. # 将分数写入json
  113. if is_write_score:
  114. if "score" not in defect:
  115. # 新建的时候
  116. # logger.info(f"新建分数score: {the_score}")
  117. defect['score'] = the_score
  118. defect["new_score"] = None
  119. elif defect.get("new_score") is None:
  120. # 初次修改
  121. # if defect["score"] != the_score:
  122. # logger.info(f"初次修改 -> new_score: {the_score} (原score: {defect['score']})")
  123. if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add":
  124. defect["new_score"] = the_score
  125. elif defect.get("edit_type") == "del":
  126. logger.info(f"del {defect_type} 补回: {the_score}")
  127. defect["new_score"] = 0
  128. weighted_scores[defect_type] -= the_score
  129. elif "score" in defect and defect["new_score"] is not None:
  130. # 多次修改
  131. # if defect["new_score"] != the_score:
  132. # defect["score"] = defect["new_score"]
  133. if defect.get("edit_type") == "edit":
  134. defect["new_score"] = the_score
  135. elif defect.get("edit_type") == "del":
  136. logger.info(f"del {defect_type} 补回: {the_score}")
  137. defect["new_score"] = 0
  138. weighted_scores[defect_type] -= the_score
  139. else:
  140. defect['score'] = the_score
  141. defect["new_score"] = None
  142. # 2. 根据权重/系数计算最终扣分
  143. weights = aspect_config.get(f"{card_aspect}_weights") or aspect_config.get("coefficients")
  144. if not weights:
  145. raise ValueError(f"在配置中未找到 '{card_defect_type}' 的权重/系数")
  146. print(weighted_scores)
  147. for defect_type, score in weighted_scores.items():
  148. total_deduction += score * weights.get(defect_type, 1.0)
  149. final_weights = aspect_config["final_weights"][card_aspect]
  150. # 对于面的缺陷类型, 根据不同的光类型, 给予不同权重
  151. if card_defect_type == "face":
  152. light_weights = aspect_config["light_weights"][f"{card_light_type}_weight"]
  153. final_weights = final_weights * light_weights
  154. final_score = total_deduction * final_weights
  155. logger.info(f"final weights: {final_weights}, final score: {final_score}_weight")
  156. if is_write_score:
  157. defect_data[f"{card_aspect}_{card_defect_type}_deduct_score"] = final_score
  158. return defect_data
  159. else:
  160. return final_score
  161. def calculate_centering_score(self,
  162. card_aspect: str,
  163. center_data: dict,
  164. is_write_score: bool = False) -> Union[float, dict]:
  165. """
  166. 计算居中度分数。
  167. card_type 为 front或 back
  168. is_write_score: 是否将分数写入json并返回
  169. """
  170. if card_aspect != "front" and card_aspect != "back":
  171. raise TypeError("calculate_centering_score:card_type 只能为 front 或 back")
  172. centering_config = self.config['centering'][card_aspect]
  173. rules = centering_config['rules']
  174. coefficients = centering_config['coefficients']
  175. center_left = center_data['box_result']['center_inference']['center_left']
  176. center_right = center_data['box_result']['center_inference']['center_right']
  177. center_top = center_data['box_result']['center_inference']['center_top']
  178. center_bottom = center_data['box_result']['center_inference']['center_bottom']
  179. # 将比例转换为用于查找规则的单个最大值
  180. h_lookup_val = max(center_left, center_right)
  181. v_lookup_val = max(center_top, center_bottom)
  182. h_deduction = self._get_score_from_tiers(h_lookup_val, rules) * coefficients['horizontal']
  183. v_deduction = self._get_score_from_tiers(v_lookup_val, rules) * coefficients['vertical']
  184. print(h_deduction, v_deduction)
  185. final_weight = self.config['centering']["final_weights"][card_aspect]
  186. final_score = (h_deduction + v_deduction) * final_weight
  187. logger.info(f"final weight: {final_weight}, final score: {final_score}")
  188. if is_write_score:
  189. center_data['deduct_score'] = final_score
  190. return center_data
  191. else:
  192. return final_score
  193. def formate_one_card_result(self, center_result: dict,
  194. defect_result: dict,
  195. card_light_type: str,
  196. card_aspect: str,
  197. imageHeight: int,
  198. imageWidth: int):
  199. try:
  200. # 获取计算总分的权重
  201. card_config = self.config['card']['PSA']
  202. # 计算各部分的最后分数
  203. # 计算居中
  204. final_center_score = None
  205. if card_light_type == "ring":
  206. center_score = center_result['deduct_score']
  207. center_weight = card_config['center']
  208. final_center_score = center_score * center_weight
  209. corner_score = defect_result[f"{card_aspect}_corner_deduct_score"]
  210. edge_score = defect_result[f"{card_aspect}_edge_deduct_score"]
  211. face_score = defect_result[f"{card_aspect}_face_deduct_score"]
  212. corner_weight = card_config['corner']
  213. edge_weight = card_config['edge']
  214. face_weight = card_config['face']
  215. final_defect_score = corner_score * corner_weight + edge_score * edge_weight + face_score * face_weight
  216. _used_compute_deduct_score = final_center_score + final_defect_score
  217. card_score = self.base_score + final_center_score + final_defect_score
  218. else:
  219. face_score = defect_result[f"{card_aspect}_face_deduct_score"]
  220. face_weight = card_config['face']
  221. final_defect_score = face_score * face_weight
  222. _used_compute_deduct_score = final_defect_score
  223. card_score = self.base_score + final_defect_score
  224. except Exception as e:
  225. logger.error(f"formate_one_card_result 从json获取分数失败: {e}")
  226. raise e
  227. data = {
  228. "result": {
  229. "center_result": center_result,
  230. "defect_result": defect_result,
  231. "imageHeight": imageHeight,
  232. "imageWidth": imageWidth,
  233. "card_center_deduct_score": final_center_score,
  234. "card_defect_deduct_score": final_defect_score,
  235. "_used_compute_deduct_score": _used_compute_deduct_score,
  236. "card_score": card_score
  237. }
  238. }
  239. return data
  240. if __name__ == '__main__':
  241. # 1. 初始化评分器,加载规则
  242. scorer = CardScorer(r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\app\core\scoring_config.json")
  243. # rulers = scorer.config['corners']['rules']['wear_area']
  244. #
  245. # score = scorer._get_score_from_tiers(0.06, rulers)
  246. # print(score)
  247. # print()
  248. # 居中分数
  249. center_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_card_center-center_result.json"
  250. with open(center_data_path, 'r', encoding='utf-8') as f:
  251. center_data = json.load(f)
  252. center_data = scorer.calculate_centering_score("front", center_data, True)
  253. print(center_data)
  254. # 边角分数
  255. edge_corner_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-corner_result.json"
  256. with open(edge_corner_data_path, 'r', encoding='utf-8') as f:
  257. edge_corner_data = json.load(f)
  258. corner_data = scorer.calculate_defect_score("corner", 'front', edge_corner_data, True)
  259. print(corner_data)
  260. score = scorer.calculate_defect_score("edge", 'front', edge_corner_data, True)
  261. print(score)
  262. # 面分数
  263. face_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_face_no_reflect_defect-face_result.json"
  264. with open(face_data_path, 'r', encoding='utf-8') as f:
  265. face_data = json.load(f)
  266. score = scorer.calculate_defect_score("face", 'front', face_data, True)
  267. print(score)