CardScorer.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. import json
  2. from typing import List, Dict, Any, Union
  3. from app.core.logger import get_logger
  4. logger = get_logger(__name__)
  5. class CardScorer:
  6. """
  7. 它从一个JSON配置文件加载评分规则,并根据输入的卡片数据计算分数。
  8. """
  9. def __init__(self, config_path: str):
  10. try:
  11. with open(config_path, 'r', encoding='utf-8') as f:
  12. self.config = json.load(f)
  13. self.base_score = self.config.get("base_score", 10.0)
  14. except FileNotFoundError:
  15. raise ValueError(f"配置文件未找到: {config_path}")
  16. except json.JSONDecodeError:
  17. raise ValueError(f"配置文件格式错误: {config_path}")
  18. @staticmethod
  19. def _get_score_from_tiers(value: float, rules: List[Dict[str, Any]]) -> float:
  20. """
  21. 根据新的区间规则查找值对应的分数。
  22. 格式: [{"min": 0, "max": 0.5, "deduction": -1}, ...]
  23. 逻辑: min <= value < max (最后一项如果是inf,则包含)
  24. """
  25. for tier in rules:
  26. # 获取范围,处理可能的字符串
  27. min_val = tier.get("min", 0)
  28. max_val_raw = tier.get("max", "inf")
  29. deduction = tier.get("deduction", 0.0)
  30. # 处理 inf
  31. if max_val_raw == "inf":
  32. max_val = float('inf')
  33. else:
  34. max_val = float(max_val_raw)
  35. min_val = float(min_val)
  36. # 判定范围:左闭右开 [min, max)
  37. if min_val <= value < max_val:
  38. return float(deduction)
  39. # 如果数值非常大(超过了所有定义的max),或者没有匹配到
  40. if rules:
  41. return float(rules[-1].get("deduction", 0))
  42. return 0.0
  43. # 增加一个辅助方法用于查找严重程度系数
  44. def _get_severity_coefficient(self, severity_type_key: str, level_name: str) -> float:
  45. """
  46. 从配置中获取严重程度对应的系数
  47. :param severity_type_key: 对应 severity_level 下的键,如 "wear", "stain"
  48. :param level_name: 具体的等级名,如 "一般", "严重"
  49. :return: 系数 value
  50. """
  51. severity_config = self.config.get("severity_level", {})
  52. rules = severity_config.get(severity_type_key, [])
  53. # 默认系数为 1.0 (如果找不到配置)
  54. default_val = 1.0
  55. # 寻找匹配的 name
  56. for rule in rules:
  57. if rule.get("name") == level_name:
  58. return float(rule.get("value", 1.0))
  59. # 如果传入了"一般"但配置里没"一般",则返回 1.0
  60. return default_val
  61. def calculate_defect_score(self,
  62. card_defect_type: str,
  63. card_aspect: str,
  64. card_light_type: str,
  65. defect_data: Dict,
  66. is_write_score: bool = True) -> Union[float, dict]:
  67. """
  68. 一个通用的缺陷计分函数,用于计算角、边、表面的加权分数。
  69. card_defect_type: 'corner', 'edge', 'face'
  70. card_aspect: 为 front或 back
  71. is_write_score: 是否将分数写入json并返回
  72. """
  73. if card_defect_type != "corner" and card_defect_type != "edge" and card_defect_type != "face":
  74. raise TypeError("calculate_centering_score:card_type 只能为 'corner', 'edge', 'face'")
  75. if card_aspect != "front" and card_aspect != "back":
  76. raise TypeError("calculate_defect_score:card_type 只能为 front 或 back")
  77. if card_light_type != "ring" and card_light_type != "coaxial":
  78. raise TypeError("calculate_defect_score:card_type 只能为 ring 或 coaxial")
  79. aspect_config = self.config[card_defect_type]
  80. total_deduction = 0.0
  81. weighted_scores = {}
  82. # 1. 计算每种缺陷类型的总扣分
  83. for defect in defect_data['defects']:
  84. if defect['defect_type'] != card_defect_type:
  85. continue
  86. if card_defect_type == 'corner' or card_defect_type == 'edge':
  87. if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain']:
  88. defect_type = "wear_area"
  89. elif defect['label'] in ['impact', 'damaged']:
  90. defect_type = "loss_area"
  91. elif defect['label'] in ['scratch', 'scuff']:
  92. defect_type = "scratch_length"
  93. elif defect['label'] in ['pit', 'protrudent']:
  94. defect_type = "pit_area"
  95. elif defect['label'] in ['stain']:
  96. defect_type = "stain_area"
  97. else:
  98. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  99. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  100. else:
  101. if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain', 'damaged']:
  102. defect_type = "wear_area"
  103. elif defect['label'] in ['scratch', 'scuff']:
  104. defect_type = "scratch_length"
  105. elif defect['label'] in ['pit', 'impact', 'protrudent']:
  106. defect_type = "pit_area"
  107. elif defect['label'] in ['stain']:
  108. defect_type = "stain_area"
  109. else:
  110. logger.error(f"数据缺陷类型不存在: {defect['label']}")
  111. raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
  112. # 获取规则列表
  113. rules = aspect_config['rules'].get(defect_type)
  114. if not rules:
  115. logger.error(f"计算分数过程, 未找到配置规则: {defect_type}")
  116. raise KeyError(f"计算分数过程, 未找到配置规则: {defect_type}")
  117. # 对于划痕取长度, 其他取面积
  118. if defect_type == "scratch_length":
  119. area_mm = max(defect['width'], defect['height'])
  120. defect['scratch_length'] = area_mm
  121. else:
  122. area_mm = defect['actual_area']
  123. # 累加所有同类型缺陷的扣分
  124. if defect_type not in weighted_scores.keys():
  125. weighted_scores[defect_type] = 0
  126. # --- 计算分数并应用严重程度 ---
  127. # 1. 基础分计算
  128. base_score = self._get_score_from_tiers(area_mm, rules)
  129. # 2. 获取严重程度系数
  130. # 首先将 defect_type (如 wear_area) 映射到 severity key (如 wear)
  131. if defect_type == "wear_area":
  132. severity_key = "wear"
  133. elif defect_type == "loss_area":
  134. severity_key = "loss" # 对应截图里的 loss
  135. elif defect_type == "scratch_length":
  136. severity_key = "scratch"
  137. elif defect_type == "pit_area":
  138. severity_key = "pit"
  139. elif defect_type == "stain_area":
  140. severity_key = "stain"
  141. else:
  142. severity_key = "wear" # 默认 fallback
  143. # 获取数据中的 level,默认为 "一般"
  144. severity_level_name = defect.get("severity_level", "一般")
  145. if not severity_level_name: # 防止为 None 或空字符串
  146. severity_level_name = "一般"
  147. severity_coef = self._get_severity_coefficient(severity_key, severity_level_name)
  148. # 3. 计算最终单项分数
  149. the_score = base_score * severity_coef
  150. print(f"[{card_defect_type}, {defect_type}]: 面积/长={area_mm}, 基础分={base_score}, 等级={severity_level_name}({severity_coef}), 最终分={the_score}")
  151. weighted_scores[defect_type] += the_score
  152. # 将分数写入json
  153. if is_write_score:
  154. if "score" not in defect:
  155. logger.info("新写入")
  156. # 新建的时候
  157. # logger.info(f"新建分数score: {the_score}")
  158. defect['score'] = the_score
  159. defect["new_score"] = None
  160. elif defect.get("new_score") is None:
  161. logger.info("初次修改")
  162. # 初次修改
  163. # if defect["score"] != the_score:
  164. # logger.info(f"初次修改 -> new_score: {the_score} (原score: {defect['score']})")
  165. if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add":
  166. defect["new_score"] = the_score
  167. elif defect.get("edit_type") == "del":
  168. logger.info(f"del {defect_type} 补回: {the_score}")
  169. defect["new_score"] = 0
  170. weighted_scores[defect_type] -= the_score
  171. else:
  172. defect['score'] = the_score
  173. elif "score" in defect and defect["new_score"] is not None:
  174. logger.info("多次修改")
  175. # 多次修改
  176. # if defect["new_score"] != the_score:
  177. # defect["score"] = defect["new_score"]
  178. if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add":
  179. defect["new_score"] = the_score
  180. elif defect.get("edit_type") == "del":
  181. logger.info(f"del {defect_type} 补回: {the_score}")
  182. defect["new_score"] = 0
  183. weighted_scores[defect_type] -= the_score
  184. else:
  185. logger.info("这里直接赋值分数")
  186. defect['score'] = the_score
  187. defect["new_score"] = None
  188. # 2. 根据权重/系数计算最终扣分
  189. weights = aspect_config.get(f"{card_aspect}_weights") or aspect_config.get("coefficients")
  190. if not weights:
  191. raise ValueError(f"在配置中未找到 '{card_defect_type}' 的权重/系数")
  192. print(weighted_scores)
  193. for defect_type, score in weighted_scores.items():
  194. total_deduction += score * weights.get(defect_type, 1.0)
  195. final_weights = aspect_config["final_weights"][card_aspect]
  196. # 对于面的缺陷类型, 根据不同的光类型, 给予不同权重
  197. if card_defect_type == "face":
  198. light_weights = aspect_config["light_weights"][f"{card_light_type}_weight"]
  199. final_weights = final_weights * light_weights
  200. final_score = total_deduction * final_weights
  201. logger.info(f"final weights: {final_weights}, final score: {final_score}_weight")
  202. if is_write_score:
  203. defect_data[f"{card_aspect}_{card_defect_type}_deduct_score"] = final_score
  204. return defect_data
  205. else:
  206. return final_score
  207. def calculate_centering_score(self,
  208. card_aspect: str,
  209. center_data: dict,
  210. is_write_score: bool = False) -> Union[float, dict]:
  211. """
  212. 计算居中度分数。
  213. card_type 为 front或 back
  214. is_write_score: 是否将分数写入json并返回
  215. """
  216. if card_aspect != "front" and card_aspect != "back":
  217. raise TypeError("calculate_centering_score:card_type 只能为 front 或 back")
  218. centering_config = self.config['centering'][card_aspect]
  219. rules = centering_config['rules']
  220. coefficients = centering_config['coefficients']
  221. center_left = center_data['box_result']['center_inference']['center_left']
  222. center_right = center_data['box_result']['center_inference']['center_right']
  223. center_top = center_data['box_result']['center_inference']['center_top']
  224. center_bottom = center_data['box_result']['center_inference']['center_bottom']
  225. # 将比例转换为用于查找规则的单个最大值
  226. h_lookup_val = max(center_left, center_right)
  227. v_lookup_val = max(center_top, center_bottom)
  228. h_deduction = self._get_score_from_tiers(h_lookup_val, rules) * coefficients['horizontal']
  229. v_deduction = self._get_score_from_tiers(v_lookup_val, rules) * coefficients['vertical']
  230. print(h_deduction, v_deduction)
  231. final_weight = self.config['centering']["final_weights"][card_aspect]
  232. final_score = (h_deduction + v_deduction) * final_weight
  233. logger.info(f"final weight: {final_weight}, final score: {final_score}")
  234. if is_write_score:
  235. center_data['deduct_score'] = final_score
  236. return center_data
  237. else:
  238. return final_score
  239. def formate_one_card_result(self, center_result: dict,
  240. defect_result: dict,
  241. card_light_type: str,
  242. card_aspect: str,
  243. imageHeight: int,
  244. imageWidth: int):
  245. try:
  246. # 获取计算总分的权重
  247. card_config = self.config['card']['PSA']
  248. # 计算各部分的最后分数
  249. # 计算居中
  250. final_center_score = None
  251. if card_light_type == "ring":
  252. center_score = center_result['deduct_score']
  253. center_weight = card_config['center']
  254. final_center_score = center_score * center_weight
  255. corner_score = defect_result[f"{card_aspect}_corner_deduct_score"]
  256. edge_score = defect_result[f"{card_aspect}_edge_deduct_score"]
  257. face_score = defect_result[f"{card_aspect}_face_deduct_score"]
  258. corner_weight = card_config['corner']
  259. edge_weight = card_config['edge']
  260. face_weight = card_config['face']
  261. final_defect_score = corner_score * corner_weight + edge_score * edge_weight + face_score * face_weight
  262. _used_compute_deduct_score = final_center_score + final_defect_score
  263. card_score = self.base_score + final_center_score + final_defect_score
  264. else:
  265. face_score = defect_result[f"{card_aspect}_face_deduct_score"]
  266. face_weight = card_config['face']
  267. final_defect_score = face_score * face_weight
  268. _used_compute_deduct_score = final_defect_score
  269. card_score = self.base_score + final_defect_score
  270. except Exception as e:
  271. logger.error(f"formate_one_card_result 从json获取分数失败: {e}")
  272. raise e
  273. data = {
  274. "result": {
  275. "center_result": center_result,
  276. "defect_result": defect_result,
  277. "imageHeight": imageHeight,
  278. "imageWidth": imageWidth,
  279. "card_center_deduct_score": final_center_score,
  280. "card_defect_deduct_score": final_defect_score,
  281. "_used_compute_deduct_score": _used_compute_deduct_score,
  282. "card_score": card_score
  283. }
  284. }
  285. return data
  286. if __name__ == '__main__':
  287. # 1. 初始化评分器,加载规则
  288. scorer = CardScorer(r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\app\core\scoring_config.json")
  289. # rulers = scorer.config['corners']['rules']['wear_area']
  290. #
  291. # score = scorer._get_score_from_tiers(0.06, rulers)
  292. # print(score)
  293. # print()
  294. # 居中分数
  295. center_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_card_center-center_result.json"
  296. with open(center_data_path, 'r', encoding='utf-8') as f:
  297. center_data = json.load(f)
  298. center_data = scorer.calculate_centering_score("front", center_data, True)
  299. print(center_data)
  300. # 边角分数
  301. edge_corner_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-corner_result.json"
  302. with open(edge_corner_data_path, 'r', encoding='utf-8') as f:
  303. edge_corner_data = json.load(f)
  304. corner_data = scorer.calculate_defect_score("corner", 'front', edge_corner_data, True)
  305. print(corner_data)
  306. score = scorer.calculate_defect_score("edge", 'front', edge_corner_data, True)
  307. print(score)
  308. # 面分数
  309. face_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_face_no_reflect_defect-face_result.json"
  310. with open(face_data_path, 'r', encoding='utf-8') as f:
  311. face_data = json.load(f)
  312. score = scorer.calculate_defect_score("face", 'front', face_data, True)
  313. print(score)