| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- import copy
- import io
- import json
- from collections import Counter
- from datetime import datetime
- from typing import Any, Dict, List, Optional, Tuple
- from fastapi import HTTPException
- from PIL import Image
- from mysql.connector.pooling import PooledMySQLConnection
- from app.core.config import settings
- from app.core.logger import get_logger
- from app.core.minio_client import minio_client
- logger = get_logger(__name__)
- def parse_json_value(raw_value: Any) -> Dict[str, Any]:
- if raw_value is None:
- return {}
- if isinstance(raw_value, dict):
- return raw_value
- if isinstance(raw_value, str):
- try:
- return json.loads(raw_value)
- except json.JSONDecodeError:
- return {}
- return {}
- def format_datetime(dt: Any) -> str:
- if isinstance(dt, datetime):
- return dt.strftime("%Y-%m-%d %H:%M:%S")
- return ""
- def _normalize_compare_value(value: Any) -> Any:
- """
- 归一化对比数据,避免浮点精度和容器类型差异造成误判。
- """
- if isinstance(value, float):
- return round(value, 4)
- if isinstance(value, tuple):
- return [_normalize_compare_value(item) for item in value]
- if isinstance(value, list):
- return [_normalize_compare_value(item) for item in value]
- if isinstance(value, dict):
- return {key: _normalize_compare_value(val) for key, val in value.items()}
- return value
- def build_defect_compare_key(defect: Dict[str, Any], side: str = "", location: str = "") -> str:
- """
- 为原始缺陷构造稳定的对比键。
- 按用户需求,简化为 actual_area + label + side + location 的组合。
- """
- actual_area = _normalize_compare_value(defect.get("actual_area"))
- label = defect.get("label") or ""
- final_side = side or defect.get("side") or ""
- final_location = location or defect.get("location") or ""
- return f"{final_side}|{final_location}|{label}|{actual_area}"
- def _build_report_defect_compare_key(defect_item: Dict[str, Any]) -> str:
- """
- 为历史报告里的 defectDetailList 项构造对比键。
- 新数据优先复用 compareKey,老数据回退到 side/location/label/actual_area 组合。
- """
- compare_key = defect_item.get("compareKey")
- if compare_key:
- return compare_key
- actual_area = defect_item.get("actualArea")
- if actual_area is None:
- actual_area = defect_item.get("actual_area")
- actual_area = _normalize_compare_value(actual_area)
- label = defect_item.get("label")
- if label is None:
- label = defect_item.get("type") or ""
- side = defect_item.get("side") or ""
- location = defect_item.get("location") or ""
- return f"{side}|{location}|{label}|{actual_area}"
- def enrich_report_for_compare(report_json: Any) -> Dict[str, Any]:
- """
- 给历史报告补齐 compareKey,兼容旧数据。
- """
- report_data = copy.deepcopy(parse_json_value(report_json))
- defect_list = report_data.get("defectDetailList")
- if not isinstance(defect_list, list):
- report_data["defectDetailList"] = []
- return report_data
- for defect_item in defect_list:
- if not isinstance(defect_item, dict):
- continue
- defect_item["compareKey"] = _build_report_defect_compare_key(defect_item)
- return report_data
- def build_report_compare_snapshot(report_json: Any) -> Dict[str, Any]:
- """
- 生成用于“历史去重”的归一化快照。
- 这里会忽略 defectImgUrl 等波动字段,只保留真正反映缺陷是否变化的部分。
- """
- report_data = enrich_report_for_compare(report_json)
- compare_snapshot = copy.deepcopy(report_data)
- normalized_defects = []
- for defect_item in report_data.get("defectDetailList", []):
- if not isinstance(defect_item, dict):
- continue
- normalized_defects.append({
- "side": defect_item.get("side") or "",
- "type": defect_item.get("type") or "",
- "location": defect_item.get("location") or "",
- "compareKey": defect_item.get("compareKey") or "",
- })
- normalized_defects.sort(key=lambda item: item["compareKey"])
- compare_snapshot["defectDetailList"] = normalized_defects
- return _normalize_compare_value(compare_snapshot)
- def _filter_common_defects(defect_list: List[Dict[str, Any]], common_counter: Counter) -> List[Dict[str, Any]]:
- """
- 逐项扣减重复计数,只移除两边真正重合的那一部分。
- """
- filtered_list = []
- for defect_item in defect_list:
- compare_key = defect_item.get("compareKey") or _build_report_defect_compare_key(defect_item)
- if common_counter[compare_key] > 0:
- common_counter[compare_key] -= 1
- continue
- filtered_list.append(defect_item)
- return filtered_list
- def remove_common_defects(
- left_report_json: Any,
- right_report_json: Any
- ) -> Tuple[Dict[str, Any], Dict[str, Any], int]:
- """
- 移除两份历史报告中相同的 defectDetailList 项,只保留差异项。
- """
- left_report = enrich_report_for_compare(left_report_json)
- right_report = enrich_report_for_compare(right_report_json)
- left_defects = [item for item in left_report.get("defectDetailList", []) if isinstance(item, dict)]
- right_defects = [item for item in right_report.get("defectDetailList", []) if isinstance(item, dict)]
- left_counter = Counter(item.get("compareKey") or _build_report_defect_compare_key(item) for item in left_defects)
- right_counter = Counter(item.get("compareKey") or _build_report_defect_compare_key(item) for item in right_defects)
- common_counter = left_counter & right_counter
- common_count = sum(common_counter.values())
- left_report["defectDetailList"] = _filter_common_defects(left_defects, common_counter.copy())
- right_report["defectDetailList"] = _filter_common_defects(right_defects, common_counter.copy())
- return left_report, right_report, common_count
- def save_rating_report_history(
- db_conn: PooledMySQLConnection,
- card_id: int,
- card_no: str,
- report_name: str,
- report_json: Dict[str, Any]
- ) -> Dict[str, Any]:
- try:
- current_snapshot = build_report_compare_snapshot(report_json)
- with db_conn.cursor(dictionary=True) as cursor:
- # 只和上一条历史比较,满足“当前和上一次一样就不存”的需求。
- latest_sql = (
- f"SELECT rating_id, report_name, report_json "
- f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
- "WHERE card_id = %s "
- "ORDER BY rating_id DESC LIMIT 1"
- )
- cursor.execute(latest_sql, (card_id,))
- latest_row = cursor.fetchone()
- if latest_row:
- latest_snapshot = build_report_compare_snapshot(latest_row.get("report_json"))
- if latest_snapshot == current_snapshot:
- logger.info(
- f"评级报告历史未发生变化,跳过保存: cardNo={card_no}, rating_id={latest_row.get('rating_id')}"
- )
- return {
- "rating_id": latest_row.get("rating_id"),
- "report_name": latest_row.get("report_name"),
- "created": False
- }
- insert_sql = (
- f"INSERT INTO {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
- "(card_id, cardNo, report_name, report_json) "
- "VALUES (%s, %s, %s, %s)"
- )
- cursor.execute(
- insert_sql,
- (card_id, card_no, report_name, json.dumps(report_json, ensure_ascii=False))
- )
- db_conn.commit()
- logger.info(f"评级报告历史保存成功: cardNo={card_no}, rating_id={cursor.lastrowid}")
- return {
- "rating_id": cursor.lastrowid,
- "report_name": report_name,
- "created": True
- }
- except Exception as e:
- db_conn.rollback()
- logger.error(f"保存评级报告历史失败: {e}")
- raise HTTPException(status_code=500, detail="保存评级报告历史失败")
- def get_active_json(image_data: Any) -> Optional[Dict]:
- """获取有效的 json 数据,优先使用 modified_json。"""
- if not image_data:
- return None
- # image_data 可能是 Pydantic 对象或者字典,这里做兼容。
- if hasattr(image_data, "modified_json"):
- modified_json = image_data.modified_json
- detection_json = image_data.detection_json
- else:
- modified_json = image_data.get("modified_json")
- detection_json = image_data.get("detection_json")
- if modified_json:
- return modified_json
- return detection_json
- def crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str:
- """
- 通过 MinIO 裁剪缺陷图,生成正方形缺陷图并回传最终 URL。
- """
- try:
- # 把完整 URL 转成 MinIO 对象路径,例如 /Data/xxx.jpg。
- rel_path = original_image_path_str.replace(settings.DATA_HOST_URL, "")
- rel_path = "/" + rel_path.lstrip('/\\')
- object_name = f"{settings.MINIO_BASE_PREFIX}{rel_path}"
- try:
- response = minio_client.get_object(settings.MINIO_BUCKET, object_name)
- image_bytes = response.read()
- response.close()
- response.release_conn()
- except Exception as e:
- logger.warning(f"从 MinIO 获取原图失败: {object_name} -> {e}")
- return ""
- with Image.open(io.BytesIO(image_bytes)) as img:
- img_w, img_h = img.size
- (center_x, center_y), (rect_w, rect_h), angle = min_rect
- center_x = int(center_x)
- center_y = int(center_y)
- rect_w = int(rect_w)
- rect_h = int(rect_h)
- resize_scale = 1.5 - abs(abs(angle % 90) - 45) / 90
- side_length = max(max(rect_w, rect_h) * resize_scale, 100)
- half_side = side_length / 2
- left = max(0, center_x - half_side)
- top = max(0, center_y - half_side)
- right = min(img_w, center_x + half_side)
- bottom = min(img_h, center_y + half_side)
- cropped_img = img.crop((left, top, right, bottom))
- out_bytes = io.BytesIO()
- cropped_img.save(out_bytes, format="JPEG", quality=95)
- out_bytes.seek(0)
- out_rel_path = f"/DefectImage/{output_filename}"
- out_object_name = f"{settings.MINIO_BASE_PREFIX}{out_rel_path}"
- minio_client.put_object(
- settings.MINIO_BUCKET,
- out_object_name,
- out_bytes,
- len(out_bytes.getvalue()),
- content_type="image/jpeg"
- )
- return settings.get_full_url(out_rel_path)
- except Exception as e:
- logger.error(f"裁剪并上传缺陷图失败: {e}")
- return ""
|