CardScorer.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. import json
  2. from typing import List, Dict, Any, Union
  3. from app.core.logger import get_logger
  4. logger = get_logger(__name__)
  5. class CardScorer:
  6. """
  7. 它从一个JSON配置文件加载评分规则,并根据输入的卡片数据计算分数。
  8. """
  9. def __init__(self, config_path: str):
  10. try:
  11. with open(config_path, 'r', encoding='utf-8') as f:
  12. self.config = json.load(f)
  13. self.base_score = self.config.get("base_score", 10.0)
  14. except FileNotFoundError:
  15. raise ValueError(f"配置文件未找到: {config_path}")
  16. except json.JSONDecodeError:
  17. raise ValueError(f"配置文件格式错误: {config_path}")
  18. @staticmethod
  19. def _get_score_from_tiers(value: float, rules: List[Dict[str, Any]]) -> float:
  20. """
  21. 根据新的区间规则查找值对应的分数。
  22. 格式: [{"min": 0, "max": 0.5, "deduction": -1}, ...]
  23. 逻辑: min <= value < max (最后一项如果是inf,则包含)
  24. """
  25. for tier in rules:
  26. # 获取范围,处理可能的字符串
  27. min_val = tier.get("min", 0)
  28. max_val_raw = tier.get("max", "inf")
  29. deduction = tier.get("deduction", 0.0)
  30. # 处理 inf
  31. if max_val_raw == "inf":
  32. max_val = float('inf')
  33. else:
  34. max_val = float(max_val_raw)
  35. min_val = float(min_val)
  36. # 判定范围:左闭右开 [min, max)
  37. if min_val <= value < max_val:
  38. return float(deduction)
  39. # 如果数值非常大(超过了所有定义的max),或者没有匹配到
  40. if rules:
  41. return float(rules[-1].get("deduction", 0))
  42. return 0.0
  43. # 增加一个辅助方法用于查找严重程度系数
  44. def _get_severity_coefficient(self, severity_type_key: str, level_name: str) -> float:
  45. """
  46. 从配置中获取严重程度对应的系数
  47. :param severity_type_key: 对应 severity_level 下的键,如 "wear", "stain"
  48. :param level_name: 具体的等级名,如 "一般", "严重"
  49. :return: 系数 value
  50. """
  51. severity_config = self.config.get("severity_level", {})
  52. rules = severity_config.get(severity_type_key, [])
  53. # 默认系数为 1.0 (如果找不到配置)
  54. default_val = 1.0
  55. # 寻找匹配的 name
  56. for rule in rules:
  57. if rule.get("name") == level_name:
  58. return float(rule.get("value", 1.0))
  59. # 如果传入了"一般"但配置里没"一般",则返回 1.0
  60. return default_val
  61. def calculate_defect_score(self,
  62. card_defect_type: str,
  63. card_aspect: str,
  64. card_light_type: str,
  65. defect_data: Dict,
  66. is_write_score: bool = True) -> Union[float, dict]:
  67. """
  68. 一个通用的缺陷计分函数,用于计算角、边、表面的加权分数。
  69. card_defect_type: 'corner', 'edge', 'face'
  70. card_aspect: 为 front或 back
  71. is_write_score: 是否将分数写入json并返回
  72. """
  73. if card_defect_type != "corner" and card_defect_type != "edge" and card_defect_type != "face":
  74. raise TypeError("calculate_centering_score:card_type 只能为 'corner', 'edge', 'face'")
  75. if card_aspect != "front" and card_aspect != "back":
  76. raise TypeError("calculate_defect_score:card_type 只能为 front 或 back")
  77. if card_light_type != "ring" and card_light_type != "coaxial":
  78. raise TypeError("calculate_defect_score:card_type 只能为 ring 或 coaxial")
  79. aspect_config = self.config[card_defect_type]
  80. total_deduction = 0.0
  81. weighted_scores = {}
  82. # 1. 计算每种缺陷类型的总扣分
  83. for defect in defect_data['defects']:
  84. if defect['defect_type'] != card_defect_type:
  85. continue
  86. if card_defect_type == 'corner' or card_defect_type == 'edge':
  87. if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain']:
  88. defect_type = "wear_area"
  89. elif defect['label'] in ['impact', 'damaged']:
  90. defect_type = "loss_area"
  91. elif defect['label'] in ['scratch', 'scuff']:
  92. defect_type = "scratch_length"
  93. elif defect['label'] in ['pit', 'protrudent']:
  94. defect_type = "pit_area"
  95. elif defect['label'] in ['stain']:
  96. defect_type = "stain_area"
  97. else:
  98. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  99. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  100. else:
  101. if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain', 'damaged']:
  102. defect_type = "wear_area"
  103. elif defect['label'] in ['scratch', 'scuff']:
  104. defect_type = "scratch_length"
  105. elif defect['label'] in ['pit', 'impact', 'protrudent']:
  106. defect_type = "pit_area"
  107. elif defect['label'] in ['stain']:
  108. defect_type = "stain_area"
  109. else:
  110. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  111. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  112. # 获取规则列表
  113. rules = aspect_config['rules'].get(defect_type)
  114. if not rules:
  115. logger.error(f"计算分数过程, 未找到配置规则: {defect_type}")
  116. raise KeyError(f"计算分数过程, 未找到配置规则: {defect_type}")
  117. # 对于划痕取长度, 其他取面积
  118. if defect_type == "scratch_length":
  119. area_mm = max(defect['width'], defect['height'])
  120. defect['scratch_length'] = area_mm
  121. else:
  122. area_mm = defect['actual_area']
  123. # 累加所有同类型缺陷的扣分
  124. if defect_type not in weighted_scores.keys():
  125. weighted_scores[defect_type] = 0
  126. # --- 计算分数并应用严重程度 ---
  127. # 1. 基础分计算
  128. base_score = self._get_score_from_tiers(area_mm, rules)
  129. # 2. 获取严重程度系数
  130. # 首先将 defect_type (如 wear_area) 映射到 severity key (如 wear)
  131. if defect_type == "wear_area":
  132. severity_key = "wear"
  133. elif defect_type == "loss_area":
  134. severity_key = "loss" # 对应截图里的 loss
  135. elif defect_type == "scratch_length":
  136. severity_key = "scratch"
  137. elif defect_type == "pit_area":
  138. severity_key = "pit"
  139. elif defect_type == "stain_area":
  140. severity_key = "stain"
  141. else:
  142. severity_key = "wear" # 默认 fallback
  143. # 获取数据中的 level,默认为 "一般"
  144. severity_level_name = defect.get("severity_level", "一般")
  145. if not severity_level_name: # 防止为 None 或空字符串
  146. severity_level_name = "一般"
  147. severity_coef = self._get_severity_coefficient(severity_key, severity_level_name)
  148. # 3. 计算最终单项分数
  149. the_score = base_score * severity_coef
  150. print(f"[{card_defect_type}, {defect_type}]: 面积/长={area_mm}, 基础分={base_score}, 等级={severity_level_name}({severity_coef}), 最终分={the_score}")
  151. weighted_scores[defect_type] += the_score
  152. # 将分数写入json
  153. if is_write_score:
  154. if "score" not in defect:
  155. # 新建的时候
  156. # logger.info(f"新建分数score: {the_score}")
  157. defect['score'] = the_score
  158. defect["new_score"] = None
  159. elif defect.get("new_score") is None:
  160. # 初次修改
  161. # if defect["score"] != the_score:
  162. # logger.info(f"初次修改 -> new_score: {the_score} (原score: {defect['score']})")
  163. if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add":
  164. defect["new_score"] = the_score
  165. elif defect.get("edit_type") == "del":
  166. logger.info(f"del {defect_type} 补回: {the_score}")
  167. defect["new_score"] = 0
  168. weighted_scores[defect_type] -= the_score
  169. elif "score" in defect and defect["new_score"] is not None:
  170. # 多次修改
  171. # if defect["new_score"] != the_score:
  172. # defect["score"] = defect["new_score"]
  173. if defect.get("edit_type") == "edit":
  174. defect["new_score"] = the_score
  175. elif defect.get("edit_type") == "del":
  176. logger.info(f"del {defect_type} 补回: {the_score}")
  177. defect["new_score"] = 0
  178. weighted_scores[defect_type] -= the_score
  179. else:
  180. defect['score'] = the_score
  181. defect["new_score"] = None
  182. # 2. 根据权重/系数计算最终扣分
  183. weights = aspect_config.get(f"{card_aspect}_weights") or aspect_config.get("coefficients")
  184. if not weights:
  185. raise ValueError(f"在配置中未找到 '{card_defect_type}' 的权重/系数")
  186. print(weighted_scores)
  187. for defect_type, score in weighted_scores.items():
  188. total_deduction += score * weights.get(defect_type, 1.0)
  189. final_weights = aspect_config["final_weights"][card_aspect]
  190. # 对于面的缺陷类型, 根据不同的光类型, 给予不同权重
  191. if card_defect_type == "face":
  192. light_weights = aspect_config["light_weights"][f"{card_light_type}_weight"]
  193. final_weights = final_weights * light_weights
  194. final_score = total_deduction * final_weights
  195. logger.info(f"final weights: {final_weights}, final score: {final_score}_weight")
  196. if is_write_score:
  197. defect_data[f"{card_aspect}_{card_defect_type}_deduct_score"] = final_score
  198. return defect_data
  199. else:
  200. return final_score
  201. def calculate_centering_score(self,
  202. card_aspect: str,
  203. center_data: dict,
  204. is_write_score: bool = False) -> Union[float, dict]:
  205. """
  206. 计算居中度分数。
  207. card_type 为 front或 back
  208. is_write_score: 是否将分数写入json并返回
  209. """
  210. if card_aspect != "front" and card_aspect != "back":
  211. raise TypeError("calculate_centering_score:card_type 只能为 front 或 back")
  212. centering_config = self.config['centering'][card_aspect]
  213. rules = centering_config['rules']
  214. coefficients = centering_config['coefficients']
  215. center_left = center_data['box_result']['center_inference']['center_left']
  216. center_right = center_data['box_result']['center_inference']['center_right']
  217. center_top = center_data['box_result']['center_inference']['center_top']
  218. center_bottom = center_data['box_result']['center_inference']['center_bottom']
  219. # 将比例转换为用于查找规则的单个最大值
  220. h_lookup_val = max(center_left, center_right)
  221. v_lookup_val = max(center_top, center_bottom)
  222. h_deduction = self._get_score_from_tiers(h_lookup_val, rules) * coefficients['horizontal']
  223. v_deduction = self._get_score_from_tiers(v_lookup_val, rules) * coefficients['vertical']
  224. print(h_deduction, v_deduction)
  225. final_weight = self.config['centering']["final_weights"][card_aspect]
  226. final_score = (h_deduction + v_deduction) * final_weight
  227. logger.info(f"final weight: {final_weight}, final score: {final_score}")
  228. if is_write_score:
  229. center_data['deduct_score'] = final_score
  230. return center_data
  231. else:
  232. return final_score
  233. def formate_one_card_result(self, center_result: dict,
  234. defect_result: dict,
  235. card_light_type: str,
  236. card_aspect: str,
  237. imageHeight: int,
  238. imageWidth: int):
  239. try:
  240. # 获取计算总分的权重
  241. card_config = self.config['card']['PSA']
  242. # 计算各部分的最后分数
  243. # 计算居中
  244. final_center_score = None
  245. if card_light_type == "ring":
  246. center_score = center_result['deduct_score']
  247. center_weight = card_config['center']
  248. final_center_score = center_score * center_weight
  249. corner_score = defect_result[f"{card_aspect}_corner_deduct_score"]
  250. edge_score = defect_result[f"{card_aspect}_edge_deduct_score"]
  251. face_score = defect_result[f"{card_aspect}_face_deduct_score"]
  252. corner_weight = card_config['corner']
  253. edge_weight = card_config['edge']
  254. face_weight = card_config['face']
  255. final_defect_score = corner_score * corner_weight + edge_score * edge_weight + face_score * face_weight
  256. _used_compute_deduct_score = final_center_score + final_defect_score
  257. card_score = self.base_score + final_center_score + final_defect_score
  258. else:
  259. face_score = defect_result[f"{card_aspect}_face_deduct_score"]
  260. face_weight = card_config['face']
  261. final_defect_score = face_score * face_weight
  262. _used_compute_deduct_score = final_defect_score
  263. card_score = self.base_score + final_defect_score
  264. except Exception as e:
  265. logger.error(f"formate_one_card_result 从json获取分数失败: {e}")
  266. raise e
  267. data = {
  268. "result": {
  269. "center_result": center_result,
  270. "defect_result": defect_result,
  271. "imageHeight": imageHeight,
  272. "imageWidth": imageWidth,
  273. "card_center_deduct_score": final_center_score,
  274. "card_defect_deduct_score": final_defect_score,
  275. "_used_compute_deduct_score": _used_compute_deduct_score,
  276. "card_score": card_score
  277. }
  278. }
  279. return data
  280. if __name__ == '__main__':
  281. # 1. 初始化评分器,加载规则
  282. scorer = CardScorer(r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\app\core\scoring_config.json")
  283. # rulers = scorer.config['corners']['rules']['wear_area']
  284. #
  285. # score = scorer._get_score_from_tiers(0.06, rulers)
  286. # print(score)
  287. # print()
  288. # 居中分数
  289. center_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_card_center-center_result.json"
  290. with open(center_data_path, 'r', encoding='utf-8') as f:
  291. center_data = json.load(f)
  292. center_data = scorer.calculate_centering_score("front", center_data, True)
  293. print(center_data)
  294. # 边角分数
  295. edge_corner_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-corner_result.json"
  296. with open(edge_corner_data_path, 'r', encoding='utf-8') as f:
  297. edge_corner_data = json.load(f)
  298. corner_data = scorer.calculate_defect_score("corner", 'front', edge_corner_data, True)
  299. print(corner_data)
  300. score = scorer.calculate_defect_score("edge", 'front', edge_corner_data, True)
  301. print(score)
  302. # 面分数
  303. face_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_face_no_reflect_defect-face_result.json"
  304. with open(face_data_path, 'r', encoding='utf-8') as f:
  305. face_data = json.load(f)
  306. score = scorer.calculate_defect_score("face", 'front', face_data, True)
  307. print(score)