| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- import os.path
- from fastapi import APIRouter, File, Body, HTTPException, status, Query
- from fastapi.responses import FileResponse, JSONResponse
- from ..core.config import settings
- import datetime
- import json
- from app.core.logger import get_logger
- logger = get_logger(__name__)
- router = APIRouter(tags=["Config"])
- def compare_json_structure(template: dict, data: dict, path: str = "") -> (bool, str):
- """
- 递归比较两个字典的结构(键),忽略值。
- 如果结构不匹配,返回 False 和错误原因。
- :param template: 模板字典(原始配置)
- :param data: 新的字典(待验证的配置)
- :param path: 用于错误报告的当前递归路径
- :return: 一个元组 (is_match: bool, reason: str)
- """
- template_keys = set(template.keys())
- data_keys = set(data.keys())
- # 检查当前层的键是否完全匹配
- if template_keys != data_keys:
- missing_keys = template_keys - data_keys
- extra_keys = data_keys - template_keys
- error_msg = f"结构不匹配。在路径 '{path or 'root'}' "
- if missing_keys:
- error_msg += f"缺少字段: {missing_keys}。"
- if extra_keys:
- error_msg += f"存在多余字段: {extra_keys}。"
- return False, error_msg
- # 递归检查所有子字典
- for key in template_keys:
- if isinstance(template[key], dict) and isinstance(data[key], dict):
- # 如果两个值都是字典,则递归深入
- is_match, reason = compare_json_structure(template[key], data[key], path=f"{path}.{key}" if path else key)
- if not is_match:
- return False, reason
- elif isinstance(template[key], dict) or isinstance(data[key], dict):
- # 如果只有一个是字典,说明结构已改变(例如,一个对象被替换成了字符串)
- current_path = f"{path}.{key}" if path else key
- return False, f"结构不匹配。字段 '{current_path}' 的类型已从字典变为非字典(或反之)。"
- return True, "结构匹配"
- def validate_rule_ranges(data: dict, path: str = "") -> (bool, str):
- """
- 递归检查配置中看起来像评分规则的列表。
- 规则:
- 1. 列表中的元素必须包含 min, max, deduction。
- 2. 第一个元素的 min 必须为 0。
- 3. 当前元素的 min 必须等于上一个元素的 max。
- """
- if isinstance(data, dict):
- for key, value in data.items():
- is_valid, error = validate_rule_ranges(value, path=f"{path}.{key}" if path else key)
- if not is_valid:
- return False, error
- elif isinstance(data, list):
- # 检查这是否是一个规则列表(通过检查第一个元素是否包含特定key)
- if len(data) > 0 and isinstance(data[0], dict) and "min" in data[0] and "max" in data[0] and "deduction" in \
- data[0]:
- current_path = path
- # 1. 检查首项 min 是否为 0
- first_min = data[0].get("min")
- if first_min != 0:
- return False, f"范围错误 [{current_path}]: 第一个区间的 min 必须为 0,当前为 {first_min}"
- # 2. 检查连续性
- prev_max = None
- for idx, rule in enumerate(data):
- current_min = rule.get("min")
- current_max = rule.get("max")
- # 处理 inf 字符串
- if current_max == "inf":
- current_max_val = float("inf")
- else:
- try:
- current_max_val = float(current_max)
- except (ValueError, TypeError):
- return False, f"数值错误 [{current_path}]: max 值 '{current_max}' 无效"
- try:
- current_min_val = float(current_min)
- except (ValueError, TypeError):
- return False, f"数值错误 [{current_path}]: min 值 '{current_min}' 无效"
- # 检查连续性 (除了第一个元素)
- if idx > 0:
- # 使用一个微小的容差处理浮点数比较,或者直接相等
- if prev_max == "inf":
- return False, f"逻辑错误 [{current_path}]: 'inf' 只能出现在最后一个区间的 max"
- prev_max_val = float(prev_max) if prev_max != "inf" else float("inf")
- if current_min_val != prev_max_val:
- return False, f"不连续错误 [{current_path}]: 第 {idx + 1} 项的 min ({current_min}) 不等于上一项的 max ({prev_max})"
- prev_max = current_max
- return True, "验证通过"
- @router.get("/scoring_config", summary="获取评分配置")
- async def get_scoring_config():
- """
- 读取并返回 scoring_config.json 文件的内容。
- """
- if not settings.SCORE_CONFIG_PATH.exists():
- logger.error(f"评分配置文件未找到: {settings.SCORE_CONFIG_PATH}")
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="评分配置文件未找到"
- )
- try:
- with open(settings.SCORE_CONFIG_PATH, 'r', encoding='utf-8') as f:
- config_data = json.load(f)
- return config_data
- except json.JSONDecodeError:
- logger.error(f"评分配置文件格式错误,无法解析: {settings.SCORE_CONFIG_PATH}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="评分配置文件格式错误"
- )
- except Exception as e:
- logger.error(f"读取配置文件时发生未知错误: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"读取配置文件时发生未知错误: {e}"
- )
- @router.put("/scoring_config", summary="更新评分配置")
- async def update_scoring_config(new_config: dict = Body(...)):
- """
- 接收新的JSON配置,验证结构一致性以及分值范围的连续性(min=prev_max)
- """
- # 1. 检查并读取当前的配置文件作为模板
- if not settings.SCORE_CONFIG_PATH.exists():
- # 如果文件不存在,可能无法进行结构对比,视情况处理,这里假设必须存在
- raise HTTPException(status_code=404, detail="原始配置文件未找到")
- try:
- with open(settings.SCORE_CONFIG_PATH, 'r', encoding='utf-8') as f:
- current_config = json.load(f)
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"读取原始配置文件失败: {e}")
- # 2. 比较新旧配置的结构 (Key的一致性)
- is_structure_valid, struct_reason = compare_json_structure(current_config, new_config)
- if not is_structure_valid:
- logger.warning(f"结构校验警告: {struct_reason}")
- # raise HTTPException(status_code=400, detail=struct_reason)
- # 3. **验证数值范围的连续性**
- is_range_valid, range_reason = validate_rule_ranges(new_config)
- if not is_range_valid:
- logger.warning(f"数值范围校验失败: {range_reason}")
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=f"配置数值逻辑错误: {range_reason}"
- )
- # 4. 写入新文件
- try:
- with open(settings.SCORE_CONFIG_PATH, 'w', encoding='utf-8') as f:
- json.dump(new_config, f, indent=2, ensure_ascii=False)
- logger.info("评分配置文件已成功更新。")
- logger.info("保存备份")
- formatted_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- backup_name = f"scors_backup_{formatted_time}.json"
- temp_path = os.path.join(settings.SCORE_BACKUP_PATH, backup_name)
- with open(temp_path, 'w', encoding='utf-8') as f:
- json.dump(new_config, f, indent=2, ensure_ascii=False)
- logger.info("配置备份结束")
- return JSONResponse(
- status_code=status.HTTP_200_OK,
- content={"message": "配置已成功更新"}
- )
- except Exception as e:
- logger.error(f"写入新配置文件时发生错误: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"保存新配置文件时发生错误: {e}"
- )
- @router.get("/severity_names", summary="获取缺陷严重程度列表")
- async def get_severity_names(defect_label: str = Query(..., description="缺陷标签,如 wear, damaged, stain")):
- """
- 根据缺陷标签返回对应的严重程度列表。
- """
- if not settings.SCORE_CONFIG_PATH.exists():
- raise HTTPException(status_code=404, detail="评分配置文件未找到")
- try:
- with open(settings.SCORE_CONFIG_PATH, 'r', encoding='utf-8') as f:
- config = json.load(f)
- severity_config = config.get("severity_level", {})
- # 1. 定义映射关系:从 缺陷label 映射到 severity_level 的 key
- # 逻辑依据:参考 score_service 中将 label 转换为 rule key 的逻辑,再简化为 severity key
- # rule key -> severity key:
- # wear_area -> wear
- # loss_area -> loss
- # scratch_length -> scratch
- # pit_area -> pit
- # stain_area -> stain
- severity_key = None
- # 磨损类
- if defect_label in ['wear', 'wear_and_impact', 'wear_and_stain']:
- severity_key = "wear"
- elif defect_label in ['damaged']:
- severity_key = "loss"
- elif defect_label in ['impact']:
- # 这是一个歧义点,取决于它是边角还是面。
- # 既然是获取列表,通常 pit 和 loss 的等级名可能是一样的(轻微/一般/严重)。
- # 这里优先映射为 pit (常见于面),或者你可以根据实际 json 结构调整
- severity_key = "pit"
- elif defect_label in ['scratch', 'scuff']:
- severity_key = "scratch"
- elif defect_label in ['pit', 'protrudent']:
- severity_key = "pit"
- elif defect_label in ['stain']:
- severity_key = "stain"
- if not severity_key:
- # 如果没匹配到,尝试直接用 label 查找 (容错)
- if defect_label in severity_config:
- severity_key = defect_label
- else:
- # 默认返回 wear 或抛出异常,这里选择返回空列表表示未找到
- return JSONResponse(content={"data": {"type": defect_label, "names": []}})
- # 2. 获取对应的配置列表
- levels = severity_config.get(severity_key, [])
- # 3. 提取 name 列表
- # 假设结构是 [{"name": "轻微", "value": 0.5}, ...]
- names = [item.get("name") for item in levels]
- if "一般" not in names:
- names.append("一般")
- return JSONResponse(content={
- "data": {
- "defect_label": severity_key,
- "names": names
- }
- })
- except Exception as e:
- logger.error(f"获取严重程度列表失败: {e}")
- raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}")
|