import requests import json import copy import hashlib from enum import Enum from time import perf_counter from fastapi import APIRouter, Depends, HTTPException, Query, Body from fastapi.concurrency import run_in_threadpool from mysql.connector.pooling import PooledMySQLConnection from app.core.config import settings from app.core.logger import get_logger from app.core.database_loader import get_db_connection from app.api.users import check_card_permission, get_current_user from app.utils.scheme import ( CardDetailResponse, ImageType ) from app.crud import crud_card from app.utils.xy_process import convert_internal_to_xy_format, convert_xy_to_internal_format from app.core.minio_client import minio_client from app.utils.rating_report_utils import crop_defect_image logger = get_logger(__name__) router = APIRouter() db_dependency = Depends(get_db_connection) class QueryMode(str, Enum): current = "current" next = "next" prev = "prev" def _resolve_recalc_score_type(image_type: str): """ 将 14 类新版 image_type 归一到 score_recalculate 接口接受的 score_type。 新版 stitch 导入后,同一面的 fusion/ring/stripe 共用该面 JSON; 编辑重算时按正反面统一归到 front_ring / back_ring。 """ image_type_to_recalc_score_type = { ImageType.front_fusion.value: ImageType.front_ring.value, ImageType.front_ring.value: ImageType.front_ring.value, ImageType.front_gray.value: ImageType.front_ring.value, ImageType.front_stripe1.value: ImageType.front_ring.value, ImageType.front_stripe2.value: ImageType.front_ring.value, ImageType.front_stripe3.value: ImageType.front_ring.value, ImageType.front_stripe4.value: ImageType.front_ring.value, ImageType.back_fusion.value: ImageType.back_ring.value, ImageType.back_ring.value: ImageType.back_ring.value, ImageType.back_gray.value: ImageType.back_ring.value, ImageType.back_stripe1.value: ImageType.back_ring.value, ImageType.back_stripe2.value: ImageType.back_ring.value, ImageType.back_stripe3.value: ImageType.back_ring.value, ImageType.back_stripe4.value: ImageType.back_ring.value, # 兼容历史同轴光数据 ImageType.front_coaxial.value: ImageType.front_coaxial.value, ImageType.back_coaxial.value: ImageType.back_coaxial.value, } return image_type_to_recalc_score_type.get(image_type) def _is_center_box_shapes_empty(center_result: dict) -> bool: """ 判断 center_result 中 inner/outer box 的 shapes 是否都为空。 前端某些场景会传空 shapes,直接下发给算分服务可能触发其内部越界。 """ if not isinstance(center_result, dict): return True box_result = center_result.get("box_result", {}) if not isinstance(box_result, dict): return True inner_shapes = box_result.get("inner_box", {}).get("shapes", []) outer_shapes = box_result.get("outer_box", {}).get("shapes", []) return not inner_shapes and not outer_shapes def _normalize_center_result(center_result: dict) -> dict: """ 兜底补齐算分服务依赖的 center_result 结构,避免 KeyError: 'box_result'。 """ normalized = center_result if isinstance(center_result, dict) else {} box_result = normalized.get("box_result") if not isinstance(box_result, dict): box_result = {} normalized["box_result"] = box_result inner_box = box_result.get("inner_box") if not isinstance(inner_box, dict): inner_box = {} box_result["inner_box"] = inner_box if not isinstance(inner_box.get("shapes"), list): inner_box["shapes"] = [] outer_box = box_result.get("outer_box") if not isinstance(outer_box, dict): outer_box = {} box_result["outer_box"] = outer_box if not isinstance(outer_box.get("shapes"), list): outer_box["shapes"] = [] return normalized def _prepare_recalculate_payload(edited_json: dict, source_json: dict) -> dict: """ 以数据库里的原始 JSON 为底稿,合并前端编辑结果,得到更稳定的重算入参。 当前仅明确覆盖 defects;center_result 只有在前端传了非空 shapes 时才覆盖。 对前端标记为删除(edit_type=del)的缺陷,在这里直接过滤,避免重算后再次写回 modified_json。 """ base = copy.deepcopy(source_json) if isinstance(source_json, dict) else {} incoming = edited_json if isinstance(edited_json, dict) else {} if "id" in incoming: base["id"] = incoming["id"] if "imageWidth" in incoming: base["imageWidth"] = incoming["imageWidth"] if "imageHeight" in incoming: base["imageHeight"] = incoming["imageHeight"] base.setdefault("result", {}) incoming_result = incoming.get("result", {}) if not isinstance(incoming_result, dict): incoming_result = {} # defects 使用前端编辑结果覆盖;前端标记删除的项不参与重算,也不写回 modified_json incoming_defects = ( incoming_result.get("defect_result", {}).get("defects", []) if isinstance(incoming_result.get("defect_result", {}), dict) else [] ) base["result"].setdefault("defect_result", {}) filtered_defects = [] if isinstance(incoming_defects, list): filtered_defects = [ defect for defect in incoming_defects if not (isinstance(defect, dict) and defect.get("edit_type") == "del") ] base["result"]["defect_result"]["defects"] = filtered_defects # center_result 仅在前端有有效 shapes 时覆盖;否则沿用底稿 incoming_center = incoming_result.get("center_result") if isinstance(incoming_center, dict) and not _is_center_box_shapes_empty(incoming_center): base["result"]["center_result"] = incoming_center elif "center_result" not in base["result"]: base["result"]["center_result"] = incoming_center if isinstance(incoming_center, dict) else {} base["result"]["center_result"] = _normalize_center_result(base["result"].get("center_result")) return base _GRAY_IMAGE_TYPES = frozenset({ ImageType.front_gray.value, ImageType.back_gray.value, }) # 以融合图 JSON 中的缺陷为准,在同面下列类型原图上裁图(不含灰度图) _FRONT_DEFECT_URL_TARGET_TYPES = [ ImageType.front_fusion.value, ImageType.front_ring.value, ImageType.front_stripe1.value, ImageType.front_stripe2.value, ImageType.front_stripe3.value, ImageType.front_stripe4.value, ImageType.front_coaxial.value, # 兼容历史同轴光 ] _BACK_DEFECT_URL_TARGET_TYPES = [ ImageType.back_fusion.value, ImageType.back_ring.value, ImageType.back_stripe1.value, ImageType.back_stripe2.value, ImageType.back_stripe3.value, ImageType.back_stripe4.value, ImageType.back_coaxial.value, ] _DEFECT_URL_TARGET_TYPES_BY_SIDE = { "front": _FRONT_DEFECT_URL_TARGET_TYPES, "back": _BACK_DEFECT_URL_TARGET_TYPES, } # ring / stripe 等可能落在 card_gray_images,裁图时需与主表合并 _ALL_DEFECT_URL_TARGET_TYPES = frozenset( _FRONT_DEFECT_URL_TARGET_TYPES + _BACK_DEFECT_URL_TARGET_TYPES ) def _is_gray_image_type(image_type: str) -> bool: return image_type in _GRAY_IMAGE_TYPES def _side_key_from_image_type(image_type: str) -> str: if image_type.startswith("front_"): return "front" if image_type.startswith("back_"): return "back" return "" def _defect_rect_hash(min_rect) -> str: if not min_rect or len(min_rect) != 3: return "" rect_str = str(min_rect) return hashlib.md5(rect_str.encode("utf-8")).hexdigest()[:8] def _resolve_fusion_images_by_side(all_images: list) -> dict: """每面仅以融合图 JSON 作为缺陷与裁图坐标来源。""" type_to_img = {getattr(img, "image_type", ""): img for img in all_images} return { "front": type_to_img.get(ImageType.front_fusion.value), "back": type_to_img.get(ImageType.back_fusion.value), } class _DefectCropImageRef: """裁图用的轻量图片引用(主表 Pydantic 或灰度辅助表行均可)。""" __slots__ = ("id", "image_type", "image_path") def __init__(self, image_id: int, image_type: str, image_path: str): self.id = image_id self.image_type = image_type self.image_path = image_path def _build_defect_crop_pool_by_type( card_id: int, all_images: list, db_conn: PooledMySQLConnection = None, ) -> dict: """ 合并主表 card_images 与辅助表 card_gray_images 中的裁图目标。 同类型主表优先;ring/stripe 导入在灰度表时也能被 defectImgUrls 用到。 """ pool: dict = {} for img in all_images or []: image_type = getattr(img, "image_type", "") if image_type not in _ALL_DEFECT_URL_TARGET_TYPES: continue pool[image_type] = img if db_conn is not None: cursor = None try: cursor = db_conn.cursor(dictionary=True) cursor.execute( f"SELECT id, image_type, image_path FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} " f"WHERE card_id = %s", (card_id,), ) for row in cursor.fetchall(): image_type = row.get("image_type") or "" if image_type not in _ALL_DEFECT_URL_TARGET_TYPES: continue if image_type in pool: continue pool[image_type] = _DefectCropImageRef( row["id"], image_type, settings.get_full_url(row.get("image_path")), ) finally: if cursor: cursor.close() return pool def _defect_url_target_type_list(crop_pool_by_type: dict, side_key: str) -> list: """同面裁图类型列表;若已有 stripe 则不再使用历史 coaxial。""" target_types = list(_DEFECT_URL_TARGET_TYPES_BY_SIDE.get(side_key, [])) has_stripe = any( t.startswith(f"{side_key}_stripe") and t in crop_pool_by_type for t in target_types ) if has_stripe: coaxial = ImageType.front_coaxial.value if side_key == "front" else ImageType.back_coaxial.value target_types = [t for t in target_types if t != coaxial] return target_types def _defect_url_target_images(crop_pool_by_type: dict, side_key: str) -> list: """按固定类型顺序返回同面需生成 defectImgUrls 的图片(灰度图除外)。""" targets = [] for image_type in _defect_url_target_type_list(crop_pool_by_type, side_key): img = crop_pool_by_type.get(image_type) if img is not None: targets.append(img) return targets def _sanitize_defects_for_recalculate(defects: list): """ 清理前端展示/编辑辅助字段,减少算分服务解析失败概率。 """ if not isinstance(defects, list): return for d in defects: if not isinstance(d, dict): continue if d.get("label") == "slight_scratch": d["label"] = "scratch" d.pop("defectImgUrl", None) d.pop("defectImgUrls", None) d.pop("gray_id", None) d.pop("fusion_id", None) d.pop("edit_type", None) d.pop("severity_level", None) d.pop("new_score", None) def _generate_defect_img_urls_for_json( card_id: int, fusion_img_id: int, json_data: dict, side_key: str, crop_pool_by_type: dict, generate_related_images: bool = True, ) -> dict: """ 以融合图 JSON 中的缺陷 min_rect 为准,在同面各目标类型原图上裁图, 生成 defectImgUrls 并返回 rect_hash -> urls 缓存,供同面其它图复用。 """ start_time = perf_counter() url_cache_by_rect = {} if not json_data or "result" not in json_data: logger.info( "耗时埋点 _generate_defect_img_urls: card_id=%s fusion_image_id=%s side=%s defects=0 elapsed_ms=%.2f", card_id, fusion_img_id, side_key, (perf_counter() - start_time) * 1000, ) return url_cache_by_rect defect_result = json_data["result"].get("defect_result", {}) defects = defect_result.get("defects", []) crop_target_images = _defect_url_target_images(crop_pool_by_type, side_key) target_types = _defect_url_target_type_list(crop_pool_by_type, side_key) missing_types = [t for t in target_types if t not in crop_pool_by_type] if missing_types: logger.info( "defectImgUrls 裁图目标缺失: card_id=%s side=%s missing=%s", card_id, side_key, ",".join(missing_types), ) for idx, defect in enumerate(defects, start=1): min_rect = defect.get("min_rect") defect_img_url_list = [] rect_hash = _defect_rect_hash(min_rect) if min_rect and len(min_rect) == 3 and generate_related_images and crop_target_images: for s_img in crop_target_images: s_img_type = getattr(s_img, "image_type", "") s_img_path = getattr(s_img, "image_path", "") s_img_id = getattr(s_img, "id", 0) s_filename = f"xy_{card_id}_{s_img_id}_{idx}_{rect_hash}.jpg" s_out_rel_path = f"/DefectImage/{s_filename}" s_out_object_name = f"{settings.MINIO_BASE_PREFIX}{s_out_rel_path}" s_url = "" try: minio_client.stat_object(settings.MINIO_BUCKET, s_out_object_name) s_url = settings.get_full_url(s_out_rel_path) except Exception: if s_img_path: s_url = crop_defect_image(s_img_path, min_rect, s_filename) if s_url: defect_img_url_list.append({ "image_type": s_img_type, "url": s_url, }) defect["defectImgUrls"] = defect_img_url_list if rect_hash: url_cache_by_rect[rect_hash] = copy.deepcopy(defect_img_url_list) logger.info( "耗时埋点 _generate_defect_img_urls: card_id=%s fusion_image_id=%s side=%s " "defects=%s target_types=%s elapsed_ms=%.2f", card_id, fusion_img_id, side_key, len(defects), len(crop_target_images), (perf_counter() - start_time) * 1000, ) return url_cache_by_rect def _apply_defect_img_urls_from_cache(json_data: dict, url_cache_by_rect: dict): """同面非 canonical 图:按 min_rect 哈希复用已生成的 defectImgUrls,不再访问 MinIO。""" if not json_data or "result" not in json_data or not url_cache_by_rect: return defects = json_data["result"].get("defect_result", {}).get("defects", []) for defect in defects: if not isinstance(defect, dict): continue rect_hash = _defect_rect_hash(defect.get("min_rect")) defect["defectImgUrls"] = copy.deepcopy(url_cache_by_rect.get(rect_hash, [])) def _clear_defect_img_urls(json_data: dict): """灰度图不生成关联缺陷图,清空展示用字段。""" if not json_data or "result" not in json_data: return defects = json_data["result"].get("defect_result", {}).get("defects", []) for defect in defects: if isinstance(defect, dict): defect["defectImgUrls"] = [] def _process_images_to_xy_format( card_data: dict, generate_related_images: bool = True, db_conn: PooledMySQLConnection = None, ): """ 内部辅助函数:遍历卡牌数据中的图片,将 JSON 格式转换为前端需要的 XY 格式。 每面仅以融合图 JSON 生成一次 defectImgUrls(按固定 14 类中的同面非灰类型裁图), 再拷贝到同面 ring / stripe 等;灰度图不生成 defectImgUrls。 直接修改传入的 card_data 字典。 """ start_time = perf_counter() card_id = card_data.get("id") all_images = card_data.get("images", []) fusion_by_side = _resolve_fusion_images_by_side(all_images) if all_images else {} crop_pool_by_type = _build_defect_crop_pool_by_type( card_id, all_images, db_conn=db_conn, ) detection_url_cache_by_side = {} modified_url_cache_by_side = {} if all_images: parsed_json_by_img_id = {} for img in all_images: d_internal = img.detection_json if isinstance(d_internal, str): d_internal = json.loads(d_internal) if d_internal else None m_internal = img.modified_json if isinstance(m_internal, str): m_internal = json.loads(m_internal) if m_internal else None parsed_json_by_img_id[img.id] = { "detection": d_internal, "modified": m_internal, } if generate_related_images: for side_key, fusion_img in fusion_by_side.items(): if not fusion_img: continue parsed = parsed_json_by_img_id.get(fusion_img.id, {}) d_internal = parsed.get("detection") if d_internal: detection_url_cache_by_side[side_key] = _generate_defect_img_urls_for_json( card_id, fusion_img.id, d_internal, side_key, crop_pool_by_type, generate_related_images=True, ) m_internal = parsed.get("modified") if m_internal: modified_url_cache_by_side[side_key] = _generate_defect_img_urls_for_json( card_id, fusion_img.id, m_internal, side_key, crop_pool_by_type, generate_related_images=True, ) for img in all_images: image_type = img.image_type is_gray = _is_gray_image_type(image_type) side_key = _side_key_from_image_type(image_type) parsed = parsed_json_by_img_id.get(img.id, {}) d_internal = parsed.get("detection") m_internal = parsed.get("modified") if d_internal: if is_gray: _clear_defect_img_urls(d_internal) elif side_key and detection_url_cache_by_side.get(side_key): _apply_defect_img_urls_from_cache( d_internal, detection_url_cache_by_side[side_key], ) img.detection_json = convert_internal_to_xy_format(d_internal) else: img.detection_json = convert_internal_to_xy_format({}) if m_internal: if is_gray: _clear_defect_img_urls(m_internal) elif side_key and modified_url_cache_by_side.get(side_key): _apply_defect_img_urls_from_cache( m_internal, modified_url_cache_by_side[side_key], ) img.modified_json = convert_internal_to_xy_format(m_internal) else: m_fallback = copy.deepcopy(d_internal) if d_internal else {} img.modified_json = convert_internal_to_xy_format(m_fallback) logger.info( "耗时埋点 _process_images_to_xy_format: card_id=%s image_count=%s elapsed_ms=%.2f", card_id, len(all_images), (perf_counter() - start_time) * 1000 ) return card_data def pregenerate_defect_images_for_card(db_conn: PooledMySQLConnection, card_id: int) -> int: """ 导入阶段预生成缺陷裁图:以融合图 JSON 为准,按面在同面各类型原图上裁图并写入 MinIO。 查询接口 get_card_details 命中已存在的裁图后即可直接拼 URL,无需再实时裁图。 返回本次涉及裁图的缺陷数量(仅用于日志/统计)。 """ start_time = perf_counter() card_data = crud_card.get_card_with_details(db_conn, card_id) if not card_data: logger.warning("预生成缺陷裁图跳过:card_id=%s 未找到卡牌", card_id) return 0 all_images = card_data.get("images", []) if not all_images: return 0 fusion_by_side = _resolve_fusion_images_by_side(all_images) crop_pool_by_type = _build_defect_crop_pool_by_type(card_id, all_images, db_conn=db_conn) total_defects = 0 for side_key, fusion_img in fusion_by_side.items(): if not fusion_img: continue for json_field in ("detection_json", "modified_json"): raw_json = getattr(fusion_img, json_field, None) if isinstance(raw_json, str): raw_json = json.loads(raw_json) if raw_json else None if not raw_json: continue # 复制一份,避免污染 Pydantic 对象;只关心裁图副作用(写入 MinIO) cache = _generate_defect_img_urls_for_json( card_id, fusion_img.id, copy.deepcopy(raw_json), side_key, crop_pool_by_type, generate_related_images=True, ) total_defects += len(cache) logger.info( "预生成缺陷裁图完成: card_id=%s defects=%s elapsed_ms=%.2f", card_id, total_defects, (perf_counter() - start_time) * 1000, ) return total_defects @router.get("/query", response_model=CardDetailResponse, summary="获取卡牌详细信息(格式化xy), 支持前后翻页 [用户调用]") def get_card_details( card_id: int = Query(..., description="基准卡牌ID"), mode: QueryMode = Query(QueryMode.current, description="查询模式: current(当前), next(下一个), prev(上一个)"), db_conn: PooledMySQLConnection = db_dependency ): """ 获取卡牌元数据以及所有与之关联的图片信息,并将坐标转换为 xy 格式。 同时返回上一张和下一张卡牌的ID。 - **current**: 查询 card_id 对应的卡牌。 - **next**: 查询 ID 比 card_id 大的第一张卡牌。 - **prev**: 查询 ID 比 card_id 小的第一张卡牌。 """ target_id = card_id cursor = None start_time = perf_counter() try: cursor = db_conn.cursor(dictionary=True) # 1. 如果是查询上一个或下一个,先计算目标ID if mode != QueryMode.current: if mode == QueryMode.next: query_target = (f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} " f"WHERE id > %s ORDER BY id ASC LIMIT 1") else: # mode == QueryMode.prev query_target = (f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} " f"WHERE id < %s ORDER BY id DESC LIMIT 1") cursor.execute(query_target, (card_id,)) row = cursor.fetchone() if not row: msg = "没有下一张" if mode == QueryMode.next else "没有上一张" # 边界场景返回 404,避免与 response_model=CardDetailResponse 冲突 raise HTTPException(status_code=404, detail=msg) target_id = row['id'] # 2. 获取目标卡牌的详细数据 (Dict 格式) card_data = crud_card.get_card_with_details(db_conn, target_id) if not card_data: raise HTTPException(status_code=404, detail=f"ID为 {target_id} 的卡牌未找到。") # 3. 补充当前目标卡牌的 id_prev 和 id_next # 注意:这里需要重新获取 cursor,或者使用 cursor (非 dict 模式可能更方便取值,但 dict 模式也行) # 这里为了简单直接用 raw SQL # 查询上一个ID sql_prev = f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} WHERE id < %s ORDER BY id DESC LIMIT 1" cursor.execute(sql_prev, (target_id,)) row_prev = cursor.fetchone() card_data['id_prev'] = row_prev['id'] if row_prev else None # 查询下一个ID sql_next = f"SELECT id FROM {settings.DB_CARD_TABLE_NAME} WHERE id > %s ORDER BY id ASC LIMIT 1" cursor.execute(sql_next, (target_id,)) row_next = cursor.fetchone() card_data['id_next'] = row_next['id'] if row_next else None # 4. 遍历图片,转换格式 (使用抽取出的辅助函数) _process_images_to_xy_format( card_data, generate_related_images=True, db_conn=db_conn, ) # 5. 将 images 从 Pydantic 对象转为 dict,避免 model_validate 重复验证导致类型异常 if "images" in card_data: card_data["images"] = [ img.model_dump() if hasattr(img, 'model_dump') else img for img in card_data["images"] ] # 6. 验证并返回 return CardDetailResponse.model_validate(card_data) except HTTPException: raise except Exception as e: logger.error(f"查询卡牌详情失败 (Mode: {mode}, BaseID: {card_id}): {e}") raise HTTPException(status_code=500, detail="数据库查询失败") finally: logger.info( "耗时埋点 get_card_details: base_card_id=%s target_card_id=%s mode=%s elapsed_ms=%.2f", card_id, target_id, mode, (perf_counter() - start_time) * 1000 ) if cursor: cursor.close() @router.put("/update/json/{id}", status_code=200, summary="接收xy格式, 还原后重计算分数并保存 [用户调用]") async def update_image_modified_json( id: int, new_json_data: dict = Body(..., description="前端传来的包含xy对象格式的JSON"), current_user: dict = Depends(get_current_user), db_conn: PooledMySQLConnection = db_dependency ): """ 接收前端传来的特殊格式 JSON (points 为对象列表)。 1. 将格式还原为后端标准格式 (points 为 [[x,y]])。 2. 根据 id 获取 image_type。 3. 调用外部接口重新计算分数。 4. 更新 modified_json。 """ card_id_to_update = None cursor = None # *** 1. 格式还原 *** # 将前端的 xy dict 格式转回 [[x,y]] internal_json_payload = convert_xy_to_internal_format(new_json_data) try: cursor = db_conn.cursor(dictionary=True) # 2. 获取图片信息 cursor.execute( f"SELECT image_type, card_id, detection_json, modified_json " f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE id = %s", (id,) ) row = cursor.fetchone() if not row: raise HTTPException(status_code=404, detail=f"ID为 {id} 的图片未找到。") card_id_to_update = row["card_id"] check_card_permission(db_conn, current_user, card_id_to_update) image_type = row["image_type"] # score_recalculate 接口只接受 coaxial / ring 类型的 score_type, # 融合图/灰度图/调光图都按正反面归到对应 ring。 score_type = _resolve_recalc_score_type(image_type) if not score_type: raise HTTPException(status_code=400, detail=f"未知的 image_type: {image_type}") # 3. 准备重算 payload:以库内原始 JSON 为底稿,仅覆盖编辑后的 defects # 对 fusion 图,score_type 会映射到 ring;此时底稿也应优先使用 ring 图 JSON, # 否则 fusion 图常见的空框结构会导致算分服务在 ring 逻辑下越界。 source_json_str = row["modified_json"] if row["modified_json"] else row["detection_json"] if image_type in (ImageType.front_fusion.value, ImageType.back_fusion.value): target_ring_type = ( ImageType.front_ring.value if image_type == ImageType.front_fusion.value else ImageType.back_ring.value ) cursor.execute( f"SELECT detection_json, modified_json FROM {settings.DB_IMAGE_TABLE_NAME} " f"WHERE card_id = %s AND image_type = %s LIMIT 1", (card_id_to_update, target_ring_type) ) ring_row = cursor.fetchone() if ring_row: source_json_str = ring_row["modified_json"] if ring_row["modified_json"] else ring_row["detection_json"] else: logger.warning( "fusion图重算未找到对应ring底稿,回退到当前图: image_id=%s card_id=%s image_type=%s target_ring_type=%s", id, card_id_to_update, image_type, target_ring_type ) if isinstance(source_json_str, str): source_json_data = json.loads(source_json_str) else: source_json_data = source_json_str if isinstance(source_json_str, dict) else {} payload_for_recalculate = _prepare_recalculate_payload(internal_json_payload, source_json_data) _defects = payload_for_recalculate.get("result", {}).get("defect_result", {}).get("defects", []) _sanitize_defects_for_recalculate(_defects) logger.info(f"开始计算分数 (ID: {id}, Type: {score_type})") # 4. 调用远程计算接口 try: response = await run_in_threadpool( lambda: requests.post( settings.SCORE_RECALCULATE_ENDPOINT, params={"score_type": score_type}, json=payload_for_recalculate, timeout=20 ) ) except Exception as e: logger.error( "调用分数计算服务失败(update/json): image_id=%s card_id=%s score_type=%s endpoint=%s error=%s", id, card_id_to_update, score_type, settings.SCORE_RECALCULATE_ENDPOINT, e, exc_info=True ) raise HTTPException(status_code=500, detail=f"调用分数计算服务失败: {e}") if response.status_code != 200: logger.error( "分数计算接口返回错误(update/json): image_id=%s card_id=%s score_type=%s endpoint=%s status=%s body=%s", id, card_id_to_update, score_type, settings.SCORE_RECALCULATE_ENDPOINT, response.status_code, response.text ) raise HTTPException(status_code=response.status_code, detail=f"分数计算接口返回错误: {response.text}") logger.info("分数计算完成") # 获取计算服务返回的结果(这个结果通常已经是标准的 internal 格式,带有分数和面积) final_json_data = response.json() # 5. 保存结果到数据库 recalculated_json_str = json.dumps(final_json_data, ensure_ascii=False) update_query = (f"UPDATE {settings.DB_IMAGE_TABLE_NAME} " f"SET modified_json = %s, is_edited = TRUE " f"WHERE id = %s") cursor.execute(update_query, (recalculated_json_str, id)) db_conn.commit() logger.info(f"图片ID {id} 的 modified_json 已更新并重新计算。") # 更新对应的 cards 的分数状态 try: crud_card.update_card_scores_and_status(db_conn, card_id_to_update) logger.info(f"卡牌 {card_id_to_update} 的分数和状态已更新。") except Exception as score_update_e: logger.error(f"更新卡牌 {card_id_to_update} 分数失败: {score_update_e}") # 更新卡牌审核状态 try: with db_conn.cursor() as cursor: review_state = 2 # 更新指定 card_id 的 review_state 字段 # 注意:MySQL 在“值未变化”时 rowcount 可能为 0,这不代表记录不存在。 query_update = f"UPDATE {settings.DB_CARD_TABLE_NAME} SET review_state = %s WHERE id = %s" cursor.execute(query_update, (review_state, card_id_to_update)) if cursor.rowcount == 0: cursor.execute(f"SELECT 1 FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s LIMIT 1", (card_id_to_update,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail=f"ID为 {card_id_to_update} 的卡牌未找到。") db_conn.commit() logger.info(f"卡牌 ID {card_id_to_update} 的审核状态已成功修改为 {review_state}。") except Exception as e: db_conn.rollback() logger.error(f"修改卡牌 {id} 审核状态失败: {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail="修改审核状态失败,数据库操作错误。") return { "detail": f"成功更新图片ID {id} 的JSON数据", "image_type": image_type, "score_type": score_type } except HTTPException: db_conn.rollback() raise except Exception as e: db_conn.rollback() logger.error(f"更新JSON失败 ({id}): {e}") raise HTTPException(status_code=500, detail=f"更新JSON数据失败: {e}") finally: if cursor: cursor.close() # 处理灰度如 @router.put("/update/json_gray/{id}", status_code=200, summary="[灰度] 接收xy格式, 合并至Ring图重计算并保存 [用户调用]") async def update_gray_image_json( id: int, new_json_data: dict = Body(..., description="前端传来的灰度图编辑后的JSON(xy格式)"), current_user: dict = Depends(get_current_user), db_conn: PooledMySQLConnection = db_dependency ): """ 针对灰度图 (front_gray/back_gray) 的保存逻辑。 """ cursor = None # 1. 格式还原 internal_gray_json = convert_xy_to_internal_format(new_json_data) gray_defects = internal_gray_json.get("result", {}).get("defect_result", {}).get("defects", []) # 丢弃前端展示用的辅助字段,防止传给算分服务导致报错 for d in gray_defects: if d.get("label") == "slight_scratch": d["label"] = "scratch" d.pop("defectImgUrl", None) d.pop("defectImgUrls", None) try: cursor = db_conn.cursor(dictionary=True) # 2. 获取辅助图(灰度图/融合图)信息 # 以前只查 card_gray_images,现在融合图是在 card_images 表里 # 先查 card_gray_images cursor.execute(f"SELECT card_id, image_type FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE id = %s", (id,)) gray_row = cursor.fetchone() if not gray_row: # 如果灰度表没找到,去主表找找看是不是融合图 cursor.execute(f"SELECT card_id, image_type FROM {settings.DB_IMAGE_TABLE_NAME} WHERE id = %s AND image_type IN ('front_fusion', 'back_fusion')", (id,)) gray_row = cursor.fetchone() if not gray_row: raise HTTPException(status_code=404, detail=f"ID为 {id} 的辅助图未找到。") card_id = gray_row['card_id'] check_card_permission(db_conn, current_user, card_id) gray_image_type = gray_row['image_type'] # 3. 确定目标 Ring 图类型 target_ring_type = None if gray_image_type in (ImageType.front_gray.value, ImageType.front_fusion.value): target_ring_type = ImageType.front_ring.value elif gray_image_type in (ImageType.back_gray.value, ImageType.back_fusion.value): target_ring_type = ImageType.back_ring.value else: raise HTTPException(status_code=400, detail=f"不支持的辅助图类型: {gray_image_type}") # 4. 获取目标 Ring 图数据 (Card Images 表) cursor.execute( f"SELECT id, detection_json, modified_json FROM {settings.DB_IMAGE_TABLE_NAME} " f"WHERE card_id = %s AND image_type = %s", (card_id, target_ring_type) ) ring_row = cursor.fetchone() if not ring_row: raise HTTPException(status_code=404, detail=f"未找到对应的 Ring 图 ({target_ring_type}),无法应用修改。") ring_image_id = ring_row['id'] # 优先使用 modified_json,如果没有则使用 detection_json source_json_str = ring_row['modified_json'] if ring_row['modified_json'] else ring_row['detection_json'] if isinstance(source_json_str, str): ring_json_data = json.loads(source_json_str) else: ring_json_data = source_json_str # 5. 合并逻辑 (Merge Logic) # 确保路径存在 if "result" not in ring_json_data: ring_json_data["result"] = {} if "defect_result" not in ring_json_data["result"]: ring_json_data["result"]["defect_result"] = {} if "defects" not in ring_json_data["result"]["defect_result"]: ring_json_data["result"]["defect_result"][ "defects"] = [] ring_defects = ring_json_data["result"]["defect_result"]["defects"] # 遍历灰度图传来的新缺陷列表 for new_defect in gray_defects: is_fusion = gray_image_type in (ImageType.front_fusion.value, ImageType.back_fusion.value) key_to_check = "fusion_id" if is_fusion else "gray_id" identifier = new_defect.get(key_to_check) # 只有带有对应标识的才进行特殊合并处理 # 如果没有,视作普通新缺陷直接添加 if not identifier: ring_defects.append(new_defect) continue # 在 Ring 图现有的缺陷中寻找匹配的标识 match_index = -1 for i, old_defect in enumerate(ring_defects): if old_defect.get(key_to_check) == identifier: match_index = i break if match_index != -1: # 存在:替换 (Replace) ring_defects[match_index] = new_defect else: # 不存在:添加 (Append) ring_defects.append(new_defect) # 6. 调用计算服务 (对 Ring 图数据进行重算) # score_recalculate 接口接受 ring 类型,直接用目标 ring 类型即可 score_type = _resolve_recalc_score_type(target_ring_type) logger.info(f"开始重计算 Ring 图分数 (GrayID: {id} -> RingID: {ring_image_id}, Type: {score_type})") try: response = await run_in_threadpool( lambda: requests.post( settings.SCORE_RECALCULATE_ENDPOINT, params={"score_type": score_type}, json=ring_json_data, # 发送合并后的 Ring 数据 timeout=20 ) ) except Exception as e: logger.error( "调用分数计算服务失败(update/json_gray): gray_id=%s ring_id=%s card_id=%s score_type=%s endpoint=%s error=%s", id, ring_image_id, card_id, score_type, settings.SCORE_RECALCULATE_ENDPOINT, e, exc_info=True ) raise HTTPException(status_code=500, detail=f"调用分数计算服务失败: {e}") if response.status_code != 200: logger.error( "分数计算接口返回错误(update/json_gray): gray_id=%s ring_id=%s card_id=%s score_type=%s endpoint=%s status=%s body=%s", id, ring_image_id, card_id, score_type, settings.SCORE_RECALCULATE_ENDPOINT, response.status_code, response.text ) raise HTTPException(status_code=response.status_code, detail=f"分数计算接口返回错误: {response.text}") final_ring_json = response.json() # 7. 保存结果到数据库 (保存到 Ring 图记录) final_json_str = json.dumps(final_ring_json, ensure_ascii=False) update_query = ( f"UPDATE {settings.DB_IMAGE_TABLE_NAME} " f"SET modified_json = %s, is_edited = TRUE " f"WHERE id = %s" ) cursor.execute(update_query, (final_json_str, ring_image_id)) db_conn.commit() logger.info(f"Ring 图 {ring_image_id} 已根据灰度图 {id} 的修改进行了更新。") # 8. 更新卡牌总分状态 try: crud_card.update_card_scores_and_status(db_conn, card_id) except Exception as e: logger.error(f"更新卡牌 {card_id} 分数状态失败: {e}") # 更新卡牌审核状态 try: with db_conn.cursor() as cursor: review_state = 2 # 更新指定 card_id 的 review_state 字段 # 注意:MySQL 在 “值未变化” 的情况下 rowcount 可能为 0,但这不代表记录不存在。 query_update = f"UPDATE {settings.DB_CARD_TABLE_NAME} SET review_state = %s WHERE id = %s" cursor.execute(query_update, (review_state, card_id)) if cursor.rowcount == 0: cursor.execute(f"SELECT 1 FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s LIMIT 1", (card_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail=f"ID为 {card_id} 的卡牌未找到。") db_conn.commit() logger.info(f"卡牌 ID {card_id} 的审核状态已成功修改为 {review_state}。") except Exception as e: db_conn.rollback() logger.error(f"修改卡牌 {id} 审核状态失败: {e}") if isinstance(e, HTTPException): raise e raise HTTPException(status_code=500, detail="修改审核状态失败,数据库操作错误。") return { "detail": f"成功应用灰度图修改到 {target_ring_type}", "target_ring_id": ring_image_id, "gray_id": id } except HTTPException: db_conn.rollback() raise except Exception as e: db_conn.rollback() logger.error(f"灰度图更新失败 ({id}): {e}") raise HTTPException(status_code=500, detail=f"系统内部错误: {e}") finally: if cursor: cursor.close()