import json from typing import List, Dict, Any, Union from app.core.logger import get_logger logger = get_logger(__name__) class CardScorer: """ 它从一个JSON配置文件加载评分规则,并根据输入的卡片数据计算分数。 """ def __init__(self, config_path: str): try: with open(config_path, 'r', encoding='utf-8') as f: self.config = json.load(f) self.base_score = self.config.get("base_score", 10.0) except FileNotFoundError: raise ValueError(f"配置文件未找到: {config_path}") except json.JSONDecodeError: raise ValueError(f"配置文件格式错误: {config_path}") @staticmethod def _get_score_from_tiers(value: float, rules: List[Dict[str, Any]]) -> float: """ 根据新的区间规则查找值对应的分数。 格式: [{"min": 0, "max": 0.5, "deduction": -1}, ...] 逻辑: min <= value < max (最后一项如果是inf,则包含) """ for tier in rules: # 获取范围,处理可能的字符串 min_val = tier.get("min", 0) max_val_raw = tier.get("max", "inf") deduction = tier.get("deduction", 0.0) # 处理 inf if max_val_raw == "inf": max_val = float('inf') else: max_val = float(max_val_raw) min_val = float(min_val) # 判定范围:左闭右开 [min, max) if min_val <= value < max_val: return float(deduction) # 如果数值非常大(超过了所有定义的max),或者没有匹配到 if rules: return float(rules[-1].get("deduction", 0)) return 0.0 # 增加一个辅助方法用于查找严重程度系数 def _get_severity_coefficient(self, severity_type_key: str, level_name: str) -> float: """ 从配置中获取严重程度对应的系数 :param severity_type_key: 对应 severity_level 下的键,如 "wear", "stain" :param level_name: 具体的等级名,如 "一般", "严重" :return: 系数 value """ severity_config = self.config.get("severity_level", {}) rules = severity_config.get(severity_type_key, []) # 默认系数为 1.0 (如果找不到配置) default_val = 1.0 # 寻找匹配的 name for rule in rules: if rule.get("name") == level_name: return float(rule.get("value", 1.0)) # 如果传入了"一般"但配置里没"一般",则返回 1.0 return default_val def calculate_defect_score(self, card_defect_type: str, card_aspect: str, card_light_type: str, defect_data: Dict, is_write_score: bool = True) -> Union[float, dict]: """ 一个通用的缺陷计分函数,用于计算角、边、表面的加权分数。 card_defect_type: 'corner', 'edge', 'face' card_aspect: 为 front或 back is_write_score: 是否将分数写入json并返回 """ if card_defect_type != "corner" and card_defect_type != "edge" and card_defect_type != "face": raise TypeError("calculate_centering_score:card_type 只能为 'corner', 'edge', 'face'") if card_aspect != "front" and card_aspect != "back": raise TypeError("calculate_defect_score:card_type 只能为 front 或 back") if card_light_type != "ring" and card_light_type != "coaxial": raise TypeError("calculate_defect_score:card_type 只能为 ring 或 coaxial") aspect_config = self.config[card_defect_type] total_deduction = 0.0 weighted_scores = {} # 1. 计算每种缺陷类型的总扣分 for defect in defect_data['defects']: if defect['defect_type'] != card_defect_type: continue if card_defect_type == 'corner' or card_defect_type == 'edge': if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain']: defect_type = "wear_area" elif defect['label'] in ['impact', 'damaged']: defect_type = "loss_area" elif defect['label'] in ['scratch', 'scuff']: defect_type = "scratch_length" elif defect['label'] in ['pit', 'protrudent']: defect_type = "pit_area" elif defect['label'] in ['stain']: defect_type = "stain_area" else: logger.error(f"数据缺陷类型不存在: {defect['label']}") raise TypeError(f"数据缺陷类型不存在: {defect['label']}") else: if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain', 'damaged']: defect_type = "wear_area" elif defect['label'] in ['scratch', 'scuff']: defect_type = "scratch_length" elif defect['label'] in ['pit', 'impact', 'protrudent']: defect_type = "pit_area" elif defect['label'] in ['stain']: defect_type = "stain_area" else: logger.error(f"数据缺陷类型不存在: {defect['label']}") raise TypeError(f"数据缺陷类型不存在: {defect['label']}") # 获取规则列表 rules = aspect_config['rules'].get(defect_type) if not rules: logger.error(f"计算分数过程, 未找到配置规则: {defect_type}") raise KeyError(f"计算分数过程, 未找到配置规则: {defect_type}") # 对于划痕取长度, 其他取面积 if defect_type == "scratch_length": area_mm = max(defect['width'], defect['height']) defect['scratch_length'] = area_mm else: area_mm = defect['actual_area'] # 累加所有同类型缺陷的扣分 if defect_type not in weighted_scores.keys(): weighted_scores[defect_type] = 0 # --- 计算分数并应用严重程度 --- # 1. 基础分计算 base_score = self._get_score_from_tiers(area_mm, rules) # 2. 获取严重程度系数 # 首先将 defect_type (如 wear_area) 映射到 severity key (如 wear) if defect_type == "wear_area": severity_key = "wear" elif defect_type == "loss_area": severity_key = "loss" # 对应截图里的 loss elif defect_type == "scratch_length": severity_key = "scratch" elif defect_type == "pit_area": severity_key = "pit" elif defect_type == "stain_area": severity_key = "stain" else: severity_key = "wear" # 默认 fallback # 获取数据中的 level,默认为 "一般" severity_level_name = defect.get("severity_level", "一般") if not severity_level_name: # 防止为 None 或空字符串 severity_level_name = "一般" severity_coef = self._get_severity_coefficient(severity_key, severity_level_name) # 3. 计算最终单项分数 the_score = base_score * severity_coef print(f"[{card_defect_type}, {defect_type}]: 面积/长={area_mm}, 基础分={base_score}, 等级={severity_level_name}({severity_coef}), 最终分={the_score}") weighted_scores[defect_type] += the_score # 将分数写入json if is_write_score: if "score" not in defect: logger.info("新写入") # 新建的时候 # logger.info(f"新建分数score: {the_score}") defect['score'] = the_score defect["new_score"] = None elif defect.get("new_score") is None: logger.info("初次修改") # 初次修改 # if defect["score"] != the_score: # logger.info(f"初次修改 -> new_score: {the_score} (原score: {defect['score']})") if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add": defect["new_score"] = the_score elif defect.get("edit_type") == "del": logger.info(f"del {defect_type} 补回: {the_score}") defect["new_score"] = 0 weighted_scores[defect_type] -= the_score else: defect['score'] = the_score elif "score" in defect and defect["new_score"] is not None: logger.info("多次修改") # 多次修改 # if defect["new_score"] != the_score: # defect["score"] = defect["new_score"] if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add": defect["new_score"] = the_score elif defect.get("edit_type") == "del": logger.info(f"del {defect_type} 补回: {the_score}") defect["new_score"] = 0 weighted_scores[defect_type] -= the_score else: logger.info("这里直接赋值分数") defect['score'] = the_score defect["new_score"] = None # 2. 根据权重/系数计算最终扣分 weights = aspect_config.get(f"{card_aspect}_weights") or aspect_config.get("coefficients") if not weights: raise ValueError(f"在配置中未找到 '{card_defect_type}' 的权重/系数") print(weighted_scores) for defect_type, score in weighted_scores.items(): total_deduction += score * weights.get(defect_type, 1.0) final_weights = aspect_config["final_weights"][card_aspect] # 对于面的缺陷类型, 根据不同的光类型, 给予不同权重 if card_defect_type == "face": light_weights = aspect_config["light_weights"][f"{card_light_type}_weight"] final_weights = final_weights * light_weights final_score = total_deduction * final_weights logger.info(f"final weights: {final_weights}, final score: {final_score}_weight") if is_write_score: defect_data[f"{card_aspect}_{card_defect_type}_deduct_score"] = final_score return defect_data else: return final_score def calculate_centering_score(self, card_aspect: str, center_data: dict, is_write_score: bool = False) -> Union[float, dict]: """ 计算居中度分数。 card_type 为 front或 back is_write_score: 是否将分数写入json并返回 """ if card_aspect != "front" and card_aspect != "back": raise TypeError("calculate_centering_score:card_type 只能为 front 或 back") centering_config = self.config['centering'][card_aspect] rules = centering_config['rules'] coefficients = centering_config['coefficients'] center_left = center_data['box_result']['center_inference']['center_left'] center_right = center_data['box_result']['center_inference']['center_right'] center_top = center_data['box_result']['center_inference']['center_top'] center_bottom = center_data['box_result']['center_inference']['center_bottom'] # 将比例转换为用于查找规则的单个最大值 h_lookup_val = max(center_left, center_right) v_lookup_val = max(center_top, center_bottom) h_deduction = self._get_score_from_tiers(h_lookup_val, rules) * coefficients['horizontal'] v_deduction = self._get_score_from_tiers(v_lookup_val, rules) * coefficients['vertical'] print(h_deduction, v_deduction) final_weight = self.config['centering']["final_weights"][card_aspect] final_score = (h_deduction + v_deduction) * final_weight logger.info(f"final weight: {final_weight}, final score: {final_score}") if is_write_score: center_data['deduct_score'] = final_score return center_data else: return final_score def formate_one_card_result(self, center_result: dict, defect_result: dict, card_light_type: str, card_aspect: str, imageHeight: int, imageWidth: int): try: # 获取计算总分的权重 card_config = self.config['card']['PSA'] # 计算各部分的最后分数 # 计算居中 final_center_score = None if card_light_type == "ring": center_score = center_result['deduct_score'] center_weight = card_config['center'] final_center_score = center_score * center_weight corner_score = defect_result[f"{card_aspect}_corner_deduct_score"] edge_score = defect_result[f"{card_aspect}_edge_deduct_score"] face_score = defect_result[f"{card_aspect}_face_deduct_score"] corner_weight = card_config['corner'] edge_weight = card_config['edge'] face_weight = card_config['face'] final_defect_score = corner_score * corner_weight + edge_score * edge_weight + face_score * face_weight _used_compute_deduct_score = final_center_score + final_defect_score card_score = self.base_score + final_center_score + final_defect_score else: face_score = defect_result[f"{card_aspect}_face_deduct_score"] face_weight = card_config['face'] final_defect_score = face_score * face_weight _used_compute_deduct_score = final_defect_score card_score = self.base_score + final_defect_score except Exception as e: logger.error(f"formate_one_card_result 从json获取分数失败: {e}") raise e data = { "result": { "center_result": center_result, "defect_result": defect_result, "imageHeight": imageHeight, "imageWidth": imageWidth, "card_center_deduct_score": final_center_score, "card_defect_deduct_score": final_defect_score, "_used_compute_deduct_score": _used_compute_deduct_score, "card_score": card_score } } return data if __name__ == '__main__': # 1. 初始化评分器,加载规则 scorer = CardScorer(r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\app\core\scoring_config.json") # rulers = scorer.config['corners']['rules']['wear_area'] # # score = scorer._get_score_from_tiers(0.06, rulers) # print(score) # print() # 居中分数 center_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_card_center-center_result.json" with open(center_data_path, 'r', encoding='utf-8') as f: center_data = json.load(f) center_data = scorer.calculate_centering_score("front", center_data, True) print(center_data) # 边角分数 edge_corner_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-corner_result.json" with open(edge_corner_data_path, 'r', encoding='utf-8') as f: edge_corner_data = json.load(f) corner_data = scorer.calculate_defect_score("corner", 'front', edge_corner_data, True) print(corner_data) score = scorer.calculate_defect_score("edge", 'front', edge_corner_data, True) print(score) # 面分数 face_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_face_no_reflect_defect-face_result.json" with open(face_data_path, 'r', encoding='utf-8') as f: face_data = json.load(f) score = scorer.calculate_defect_score("face", 'front', face_data, True) print(score)