| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- import json
- from typing import List, Dict, Any, Union
- from app.core.logger import get_logger
- logger = get_logger(__name__)
- class CardScorer:
- """
- 它从一个JSON配置文件加载评分规则,并根据输入的卡片数据计算分数。
- """
- def __init__(self, config_path: str):
- try:
- with open(config_path, 'r', encoding='utf-8') as f:
- self.config = json.load(f)
- self.base_score = self.config.get("base_score", 10.0)
- except FileNotFoundError:
- raise ValueError(f"配置文件未找到: {config_path}")
- except json.JSONDecodeError:
- raise ValueError(f"配置文件格式错误: {config_path}")
- @staticmethod
- def _get_score_from_tiers(value: float, rules: List[Dict[str, Any]]) -> float:
- """
- 根据新的区间规则查找值对应的分数。
- 格式: [{"min": 0, "max": 0.5, "deduction": -1}, ...]
- 逻辑: min <= value < max (最后一项如果是inf,则包含)
- """
- for tier in rules:
- # 获取范围,处理可能的字符串
- min_val = tier.get("min", 0)
- max_val_raw = tier.get("max", "inf")
- deduction = tier.get("deduction", 0.0)
- # 处理 inf
- if max_val_raw == "inf":
- max_val = float('inf')
- else:
- max_val = float(max_val_raw)
- min_val = float(min_val)
- # 判定范围:左闭右开 [min, max)
- if min_val <= value < max_val:
- return float(deduction)
- # 如果数值非常大(超过了所有定义的max),或者没有匹配到
- if rules:
- return float(rules[-1].get("deduction", 0))
- return 0.0
- def calculate_defect_score(self,
- card_defect_type: str,
- card_aspect: str,
- defect_data: Dict,
- is_write_score: bool = True) -> Union[float, dict]:
- """
- 一个通用的缺陷计分函数,用于计算角、边、表面的加权分数。
- card_defect_type: 'corner', 'edge', 'face'
- card_aspect: 为 front或 back
- is_write_score: 是否将分数写入json并返回
- """
- if card_defect_type != "corner" and card_defect_type != "edge" and card_defect_type != "face":
- raise TypeError("calculate_centering_score:card_type 只能为 'corner', 'edge', 'face'")
- if card_aspect != "front" and card_aspect != "back":
- raise TypeError("calculate_defect_score:card_type 只能为 front 或 back")
- aspect_config = self.config[card_defect_type]
- total_deduction = 0.0
- weighted_scores = {}
- # 1. 计算每种缺陷类型的总扣分
- for defect in defect_data['defects']:
- if defect['defect_type'] != card_defect_type:
- continue
- if card_defect_type == 'corner' or card_defect_type == 'edge':
- if defect['label'] in ['wear', 'wear_and_impact', 'wear_and_stain']:
- defect_type = "wear_area"
- elif defect['label'] in ['impact', 'damaged']:
- defect_type = "loss_area"
- else:
- logger.error(f"数据缺陷类型不存在: {defect['label']}")
- raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
- else:
- if defect['label'] in ['wear', 'wear_and_impact', 'damaged']:
- defect_type = "wear_area"
- elif defect['label'] in ['scratch', 'scuff']:
- defect_type = "scratch_length"
- elif defect['label'] in ['pit', 'impact']:
- defect_type = "pit_area"
- elif defect['label'] in ['stain']:
- defect_type = "stain_area"
- else:
- logger.error(f"数据缺陷类型不存在: {defect['label']}")
- raise TypeError(f"数据缺陷类型不存在: {defect['label']}")
- # 获取规则列表
- rules = aspect_config['rules'].get(defect_type)
- if not rules:
- logger.error(f"计算分数过程, 未找到配置规则: {defect_type}")
- raise KeyError(f"计算分数过程, 未找到配置规则: {defect_type}")
- # 对于划痕取长度, 其他取面积
- if defect_type == "scratch_length":
- area_mm = max(defect['width'], defect['height'])
- else:
- area_mm = defect['actual_area']
- # 累加所有同类型缺陷的扣分
- if defect_type not in weighted_scores.keys():
- weighted_scores[defect_type] = 0
- # 计算单个缺陷扣分
- the_score = self._get_score_from_tiers(area_mm, rules)
- print(f"[{card_defect_type}, {defect_type}]: {area_mm}, {the_score}")
- weighted_scores[defect_type] += the_score
- # 将分数写入json
- if is_write_score:
- if "score" not in defect:
- # 新建的时候
- logger.info(f"新建分数score: {the_score}")
- defect['score'] = the_score
- defect["new_score"] = None
- elif defect.get("new_score") is None:
- # 初次修改
- # if defect["score"] != the_score:
- # logger.info(f"初次修改 -> new_score: {the_score} (原score: {defect['score']})")
- if defect.get("edit_type") == "edit" or defect.get("edit_type") == "add":
- defect["new_score"] = the_score
- elif "score" in defect and defect["new_score"] is not None:
- # 多次修改
- # if defect["new_score"] != the_score:
- # defect["score"] = defect["new_score"]
- if defect.get("edit_type") == "edit":
- defect["new_score"] = the_score
- else:
- defect['score'] = the_score
- defect["new_score"] = None
- # 2. 根据权重/系数计算最终扣分
- weights = aspect_config.get(f"{card_aspect}_weights") or aspect_config.get("coefficients")
- if not weights:
- raise ValueError(f"在配置中未找到 '{card_defect_type}' 的权重/系数")
- print(weighted_scores)
- for defect_type, score in weighted_scores.items():
- total_deduction += score * weights.get(defect_type, 1.0)
- final_weights = aspect_config["final_weights"][card_aspect]
- final_score = total_deduction * final_weights
- logger.info(f"final weights: {final_weights}, final score: {final_score}")
- if is_write_score:
- defect_data[f"{card_aspect}_{card_defect_type}_deduct_score"] = final_score
- return defect_data
- else:
- return final_score
- def calculate_centering_score(self,
- card_aspect: str,
- center_data: dict,
- is_write_score: bool = False) -> Union[float, dict]:
- """
- 计算居中度分数。
- card_type 为 front或 back
- is_write_score: 是否将分数写入json并返回
- """
- if card_aspect != "front" and card_aspect != "back":
- raise TypeError("calculate_centering_score:card_type 只能为 front 或 back")
- centering_config = self.config['centering'][card_aspect]
- rules = centering_config['rules']
- coefficients = centering_config['coefficients']
- center_left = center_data['box_result']['center_inference']['center_left']
- center_right = center_data['box_result']['center_inference']['center_right']
- center_top = center_data['box_result']['center_inference']['center_top']
- center_bottom = center_data['box_result']['center_inference']['center_bottom']
- # 将比例转换为用于查找规则的单个最大值
- h_lookup_val = max(center_left, center_right)
- v_lookup_val = max(center_top, center_bottom)
- h_deduction = self._get_score_from_tiers(h_lookup_val, rules) * coefficients['horizontal']
- v_deduction = self._get_score_from_tiers(v_lookup_val, rules) * coefficients['vertical']
- print(h_deduction, v_deduction)
- final_weight = self.config['centering']["final_weights"][card_aspect]
- final_score = (h_deduction + v_deduction) * final_weight
- logger.info(f"final weight: {final_weight}, final score: {final_score}")
- if is_write_score:
- center_data['deduct_score'] = final_score
- return center_data
- else:
- return final_score
- def formate_one_card_result(self, center_result: dict,
- defect_result: dict,
- card_defect_type: str,
- card_aspect: str):
- try:
- # 获取计算总分的权重
- card_config = self.config['card']['PSA']
- # 计算各部分的最后分数
- # 计算居中
- final_center_score = None
- if card_defect_type == "corner_edge":
- center_score = center_result['deduct_score']
- center_weight = card_config['center']
- final_center_score = center_score * center_weight
- corner_score = defect_result[f"{card_aspect}_corner_deduct_score"]
- edge_score = defect_result[f"{card_aspect}_edge_deduct_score"]
- corner_weight = card_config['corner']
- edge_weight = card_config['edge']
- final_defect_score = corner_score * corner_weight + edge_score * edge_weight
- _used_compute_deduct_score = final_center_score + final_defect_score
- card_score = self.base_score + final_center_score + final_defect_score
- else:
- face_score = defect_result[f"{card_aspect}_face_deduct_score"]
- face_weight = card_config['face']
- final_defect_score = face_score * face_weight
- _used_compute_deduct_score = final_defect_score
- card_score = self.base_score + final_defect_score
- except Exception as e:
- logger.error(f"formate_one_card_result 从json获取分数失败: {e}")
- raise e
- data = {
- "result": {
- "center_result": center_result,
- "defect_result": defect_result,
- "card_center_deduct_score": final_center_score,
- "card_defect_deduct_score": final_defect_score,
- "_used_compute_deduct_score": _used_compute_deduct_score,
- "card_score": card_score
- }
- }
- return data
- if __name__ == '__main__':
- # 1. 初始化评分器,加载规则
- scorer = CardScorer(r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\app\core\scoring_config.json")
- # rulers = scorer.config['corners']['rules']['wear_area']
- #
- # score = scorer._get_score_from_tiers(0.06, rulers)
- # print(score)
- # print()
- # 居中分数
- center_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_card_center-center_result.json"
- with open(center_data_path, 'r', encoding='utf-8') as f:
- center_data = json.load(f)
- center_data = scorer.calculate_centering_score("front", center_data, True)
- print(center_data)
- # 边角分数
- edge_corner_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-corner_result.json"
- with open(edge_corner_data_path, 'r', encoding='utf-8') as f:
- edge_corner_data = json.load(f)
- corner_data = scorer.calculate_defect_score("corner", 'front', edge_corner_data, True)
- print(corner_data)
- score = scorer.calculate_defect_score("edge", 'front', edge_corner_data, True)
- print(score)
- # 面分数
- face_data_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_face_no_reflect_defect-face_result.json"
- with open(face_data_path, 'r', encoding='utf-8') as f:
- face_data = json.load(f)
- score = scorer.calculate_defect_score("face", 'front', face_data, True)
- print(score)
|