config_api.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import os.path
  2. from fastapi import APIRouter, File, Body, HTTPException, status, Query
  3. from fastapi.responses import FileResponse, JSONResponse
  4. from ..core.config import settings
  5. import datetime
  6. import json
  7. from app.core.logger import get_logger
  8. logger = get_logger(__name__)
  9. router = APIRouter(tags=["Config"])
  10. def compare_json_structure(template: dict, data: dict, path: str = "") -> (bool, str):
  11. """
  12. 递归比较两个字典的结构(键),忽略值。
  13. 如果结构不匹配,返回 False 和错误原因。
  14. :param template: 模板字典(原始配置)
  15. :param data: 新的字典(待验证的配置)
  16. :param path: 用于错误报告的当前递归路径
  17. :return: 一个元组 (is_match: bool, reason: str)
  18. """
  19. template_keys = set(template.keys())
  20. data_keys = set(data.keys())
  21. # 检查当前层的键是否完全匹配
  22. if template_keys != data_keys:
  23. missing_keys = template_keys - data_keys
  24. extra_keys = data_keys - template_keys
  25. error_msg = f"结构不匹配。在路径 '{path or 'root'}' "
  26. if missing_keys:
  27. error_msg += f"缺少字段: {missing_keys}。"
  28. if extra_keys:
  29. error_msg += f"存在多余字段: {extra_keys}。"
  30. return False, error_msg
  31. # 递归检查所有子字典
  32. for key in template_keys:
  33. if isinstance(template[key], dict) and isinstance(data[key], dict):
  34. # 如果两个值都是字典,则递归深入
  35. is_match, reason = compare_json_structure(template[key], data[key], path=f"{path}.{key}" if path else key)
  36. if not is_match:
  37. return False, reason
  38. elif isinstance(template[key], dict) or isinstance(data[key], dict):
  39. # 如果只有一个是字典,说明结构已改变(例如,一个对象被替换成了字符串)
  40. current_path = f"{path}.{key}" if path else key
  41. return False, f"结构不匹配。字段 '{current_path}' 的类型已从字典变为非字典(或反之)。"
  42. return True, "结构匹配"
  43. def validate_rule_ranges(data: dict, path: str = "") -> (bool, str):
  44. """
  45. 递归检查配置中看起来像评分规则的列表。
  46. 规则:
  47. 1. 列表中的元素必须包含 min, max, deduction。
  48. 2. 第一个元素的 min 必须为 0。
  49. 3. 当前元素的 min 必须等于上一个元素的 max。
  50. """
  51. if isinstance(data, dict):
  52. for key, value in data.items():
  53. is_valid, error = validate_rule_ranges(value, path=f"{path}.{key}" if path else key)
  54. if not is_valid:
  55. return False, error
  56. elif isinstance(data, list):
  57. # 检查这是否是一个规则列表(通过检查第一个元素是否包含特定key)
  58. if len(data) > 0 and isinstance(data[0], dict) and "min" in data[0] and "max" in data[0] and "deduction" in \
  59. data[0]:
  60. current_path = path
  61. # 1. 检查首项 min 是否为 0
  62. first_min = data[0].get("min")
  63. if first_min != 0:
  64. return False, f"范围错误 [{current_path}]: 第一个区间的 min 必须为 0,当前为 {first_min}"
  65. # 2. 检查连续性
  66. prev_max = None
  67. for idx, rule in enumerate(data):
  68. current_min = rule.get("min")
  69. current_max = rule.get("max")
  70. # 处理 inf 字符串
  71. if current_max == "inf":
  72. current_max_val = float("inf")
  73. else:
  74. try:
  75. current_max_val = float(current_max)
  76. except (ValueError, TypeError):
  77. return False, f"数值错误 [{current_path}]: max 值 '{current_max}' 无效"
  78. try:
  79. current_min_val = float(current_min)
  80. except (ValueError, TypeError):
  81. return False, f"数值错误 [{current_path}]: min 值 '{current_min}' 无效"
  82. # 检查连续性 (除了第一个元素)
  83. if idx > 0:
  84. # 使用一个微小的容差处理浮点数比较,或者直接相等
  85. if prev_max == "inf":
  86. return False, f"逻辑错误 [{current_path}]: 'inf' 只能出现在最后一个区间的 max"
  87. prev_max_val = float(prev_max) if prev_max != "inf" else float("inf")
  88. if current_min_val != prev_max_val:
  89. return False, f"不连续错误 [{current_path}]: 第 {idx + 1} 项的 min ({current_min}) 不等于上一项的 max ({prev_max})"
  90. prev_max = current_max
  91. return True, "验证通过"
  92. @router.get("/scoring_config", summary="获取评分配置")
  93. async def get_scoring_config():
  94. """
  95. 读取并返回 scoring_config.json 文件的内容。
  96. """
  97. if not settings.SCORE_CONFIG_PATH.exists():
  98. logger.error(f"评分配置文件未找到: {settings.SCORE_CONFIG_PATH}")
  99. raise HTTPException(
  100. status_code=status.HTTP_404_NOT_FOUND,
  101. detail="评分配置文件未找到"
  102. )
  103. try:
  104. with open(settings.SCORE_CONFIG_PATH, 'r', encoding='utf-8') as f:
  105. config_data = json.load(f)
  106. return config_data
  107. except json.JSONDecodeError:
  108. logger.error(f"评分配置文件格式错误,无法解析: {settings.SCORE_CONFIG_PATH}")
  109. raise HTTPException(
  110. status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  111. detail="评分配置文件格式错误"
  112. )
  113. except Exception as e:
  114. logger.error(f"读取配置文件时发生未知错误: {e}")
  115. raise HTTPException(
  116. status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  117. detail=f"读取配置文件时发生未知错误: {e}"
  118. )
  119. @router.put("/scoring_config", summary="更新评分配置")
  120. async def update_scoring_config(new_config: dict = Body(...)):
  121. """
  122. 接收新的JSON配置,验证结构一致性以及分值范围的连续性(min=prev_max)
  123. """
  124. # 1. 检查并读取当前的配置文件作为模板
  125. if not settings.SCORE_CONFIG_PATH.exists():
  126. # 如果文件不存在,可能无法进行结构对比,视情况处理,这里假设必须存在
  127. raise HTTPException(status_code=404, detail="原始配置文件未找到")
  128. try:
  129. with open(settings.SCORE_CONFIG_PATH, 'r', encoding='utf-8') as f:
  130. current_config = json.load(f)
  131. except Exception as e:
  132. raise HTTPException(status_code=500, detail=f"读取原始配置文件失败: {e}")
  133. # 2. 比较新旧配置的结构 (Key的一致性)
  134. is_structure_valid, struct_reason = compare_json_structure(current_config, new_config)
  135. if not is_structure_valid:
  136. logger.warning(f"结构校验警告: {struct_reason}")
  137. # raise HTTPException(status_code=400, detail=struct_reason)
  138. # 3. **验证数值范围的连续性**
  139. is_range_valid, range_reason = validate_rule_ranges(new_config)
  140. if not is_range_valid:
  141. logger.warning(f"数值范围校验失败: {range_reason}")
  142. raise HTTPException(
  143. status_code=status.HTTP_400_BAD_REQUEST,
  144. detail=f"配置数值逻辑错误: {range_reason}"
  145. )
  146. # 4. 写入新文件
  147. try:
  148. with open(settings.SCORE_CONFIG_PATH, 'w', encoding='utf-8') as f:
  149. json.dump(new_config, f, indent=2, ensure_ascii=False)
  150. logger.info("评分配置文件已成功更新。")
  151. logger.info("保存备份")
  152. formatted_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
  153. backup_name = f"scors_backup_{formatted_time}.json"
  154. temp_path = os.path.join(settings.SCORE_BACKUP_PATH, backup_name)
  155. with open(temp_path, 'w', encoding='utf-8') as f:
  156. json.dump(new_config, f, indent=2, ensure_ascii=False)
  157. logger.info("配置备份结束")
  158. return JSONResponse(
  159. status_code=status.HTTP_200_OK,
  160. content={"message": "配置已成功更新"}
  161. )
  162. except Exception as e:
  163. logger.error(f"写入新配置文件时发生错误: {e}")
  164. raise HTTPException(
  165. status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
  166. detail=f"保存新配置文件时发生错误: {e}"
  167. )
  168. @router.get("/severity_names", summary="获取缺陷严重程度列表")
  169. async def get_severity_names(defect_label: str = Query(..., description="缺陷标签,如 wear, damaged, stain")):
  170. """
  171. 根据缺陷标签返回对应的严重程度列表。
  172. """
  173. if not settings.SCORE_CONFIG_PATH.exists():
  174. raise HTTPException(status_code=404, detail="评分配置文件未找到")
  175. try:
  176. with open(settings.SCORE_CONFIG_PATH, 'r', encoding='utf-8') as f:
  177. config = json.load(f)
  178. severity_config = config.get("severity_level", {})
  179. # 1. 定义映射关系:从 缺陷label 映射到 severity_level 的 key
  180. # 逻辑依据:参考 score_service 中将 label 转换为 rule key 的逻辑,再简化为 severity key
  181. # rule key -> severity key:
  182. # wear_area -> wear
  183. # loss_area -> loss
  184. # scratch_length -> scratch
  185. # pit_area -> pit
  186. # stain_area -> stain
  187. severity_key = None
  188. # 磨损类
  189. if defect_label in ['wear', 'wear_and_impact', 'wear_and_stain']:
  190. severity_key = "wear"
  191. elif defect_label in ['damaged']:
  192. severity_key = "loss"
  193. elif defect_label in ['impact']:
  194. # 这是一个歧义点,取决于它是边角还是面。
  195. # 既然是获取列表,通常 pit 和 loss 的等级名可能是一样的(轻微/一般/严重)。
  196. # 这里优先映射为 pit (常见于面),或者你可以根据实际 json 结构调整
  197. severity_key = "pit"
  198. elif defect_label in ['scratch', 'scuff']:
  199. severity_key = "scratch"
  200. elif defect_label in ['pit', 'protrudent']:
  201. severity_key = "pit"
  202. elif defect_label in ['stain']:
  203. severity_key = "stain"
  204. if not severity_key:
  205. # 如果没匹配到,尝试直接用 label 查找 (容错)
  206. if defect_label in severity_config:
  207. severity_key = defect_label
  208. else:
  209. # 默认返回 wear 或抛出异常,这里选择返回空列表表示未找到
  210. return JSONResponse(content={"data": {"type": defect_label, "names": []}})
  211. # 2. 获取对应的配置列表
  212. levels = severity_config.get(severity_key, [])
  213. # 3. 提取 name 列表
  214. # 假设结构是 [{"name": "轻微", "value": 0.5}, ...]
  215. names = [item.get("name") for item in levels]
  216. return JSONResponse(content={
  217. "data": {
  218. "defect_label": severity_key,
  219. "names": names
  220. }
  221. })
  222. except Exception as e:
  223. logger.error(f"获取严重程度列表失败: {e}")
  224. raise HTTPException(status_code=500, detail=f"服务器内部错误: {e}")