Ver código fonte

重构评级报告引用调用

AnlaAnla 3 dias atrás
pai
commit
e828996457
3 arquivos alterados com 152 adições e 144 exclusões
  1. 2 2
      Test/minio测试.py
  2. 9 142
      app/api/rating_report.py
  3. 141 0
      app/utils/rating_report_utils.py

+ 2 - 2
Test/minio测试.py

@@ -7,7 +7,7 @@ MINIO_ACCESS_KEY = "pZEwCGnpNN05KPnmC2Yh"
 MINIO_SECRET_KEY = "KfJRuWiv9pVxhIMcFqbkv8hZT9SnNTZ6LPx592D4"  # 替换为你的 Secret Key
 MINIO_SECURE = False  # 是否使用 https
 MINIO_BUCKET = "grading"
-MINIO_BASE_PREFIX = "raspi_img_data"
+MINIO_BASE_PREFIX = "monitor_video_data"
 
 DATA_HOST_URL = f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{MINIO_BASE_PREFIX}"
 
@@ -42,7 +42,7 @@ try:
         print(f"存储桶 {MINIO_BUCKET} 已存在")
 
     object_name = "img_test.jpg"
-    img_path = "./img.jpg"
+    img_path = "./31_front_coaxial_1_0.jpg"
     cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/")
 
     minio_client.fput_object(MINIO_BUCKET, cloud_path, img_path, content_type="image/jpeg")

+ 9 - 142
app/api/rating_report.py

@@ -1,152 +1,19 @@
 from fastapi import APIRouter, HTTPException, Depends, Query
 from mysql.connector.pooling import PooledMySQLConnection
 
-import io
-import json
-import cv2
-import numpy as np
 from datetime import datetime
-from app.core.minio_client import minio_client
-from typing import List, Dict, Any, Optional
-from PIL import Image
 
 from app.core.logger import get_logger
 from app.core.config import settings
 from app.core.database_loader import get_db_connection
 from app.crud import crud_card
-
-from app.utils.scheme import ImageType
+from app.utils.rating_report_utils import (get_active_json, save_rating_report_history,
+                                           format_datetime, parse_json_value, crop_defect_image)
 
 logger = get_logger(__name__)
 router = APIRouter()
 
 
-def _parse_json_value(raw_value: Any) -> Dict[str, Any]:
-    if raw_value is None:
-        return {}
-    if isinstance(raw_value, dict):
-        return raw_value
-    if isinstance(raw_value, str):
-        try:
-            return json.loads(raw_value)
-        except json.JSONDecodeError:
-            return {}
-    return {}
-
-
-def _format_datetime(dt: Any) -> str:
-    if isinstance(dt, datetime):
-        return dt.strftime("%Y-%m-%d %H:%M:%S")
-    return ""
-
-
-def _save_rating_report_history(
-        db_conn: PooledMySQLConnection,
-        card_id: int,
-        card_no: str,
-        report_name: str,
-        report_json: Dict[str, Any]
-) -> int:
-    try:
-        with db_conn.cursor() as cursor:
-            insert_sql = (
-                f"INSERT INTO {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
-                "(card_id, cardNo, report_name, report_json) "
-                "VALUES (%s, %s, %s, %s)"
-            )
-            cursor.execute(
-                insert_sql,
-                (card_id, card_no, report_name, json.dumps(report_json, ensure_ascii=False))
-            )
-            db_conn.commit()
-            return cursor.lastrowid
-    except Exception as e:
-        db_conn.rollback()
-        logger.error(f"保存评级报告历史失败: {e}")
-        raise HTTPException(status_code=500, detail="保存评级报告历史失败。")
-
-
-def _get_active_json(image_data: Any) -> Optional[Dict]:
-    """获取有效的json数据,优先 modified_json"""
-    if not image_data:
-        return None
-
-    # image_data 可能是 Pydantic 对象或 字典,做兼容处理
-    if hasattr(image_data, "modified_json"):
-        mj = image_data.modified_json
-        dj = image_data.detection_json
-    else:
-        mj = image_data.get("modified_json")
-        dj = image_data.get("detection_json")
-
-    # 注意:根据 schema.py,这里读出来已经是 dict 了,不需要 json.loads
-    # 如果数据库里存的是 null,读出来是 None
-    if mj:
-        return mj
-    return dj
-
-
-def _crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str:
-    """
-    通过 MinIO 切割缺陷图片为正方形并保存
-    """
-    try:
-        # ★ 将进来的全路径 URL 剥离为相对路径 (如 /Data/xxx.jpg) 供 MinIO 读取
-        rel_path = original_image_path_str.replace(settings.DATA_HOST_URL, "")
-        rel_path = "/" + rel_path.lstrip('/\\')
-        object_name = f"{settings.MINIO_BASE_PREFIX}{rel_path}"
-
-        # 1. 从 MinIO 获取原图字节
-        try:
-            response = minio_client.get_object(settings.MINIO_BUCKET, object_name)
-            image_bytes = response.read()
-            response.close()
-            response.release_conn()
-        except Exception as e:
-            logger.warning(f"从MinIO获取原图失败: {object_name} -> {e}")
-            return ""
-
-        # 2. 在内存中用 PIL 切图
-        with Image.open(io.BytesIO(image_bytes)) as img:
-            img_w, img_h = img.size
-            (center_x, center_y), (rect_w, rect_h), angle = min_rect
-            center_x = int(center_x)
-            center_y = int(center_y)
-            rect_w = int(rect_w)
-            rect_h = int(rect_h)
-
-            resize_scale = 1.5 - abs(abs(angle % 90) - 45) / 90
-            side_length = max(max(rect_w, rect_h) * resize_scale, 100)
-            half_side = side_length / 2
-
-            left, top = max(0, center_x - half_side), max(0, center_y - half_side)
-            right, bottom = min(img_w, center_x + half_side), min(img_h, center_y + half_side)
-
-            cropped_img = img.crop((left, top, right, bottom))
-
-            # 3. 将切割后的图存入内存流,并上传到 MinIO
-            out_bytes = io.BytesIO()
-            cropped_img.save(out_bytes, format="JPEG", quality=95)
-            out_bytes.seek(0)
-
-            out_rel_path = f"/DefectImage/{output_filename}"
-            out_object_name = f"{settings.MINIO_BASE_PREFIX}{out_rel_path}"
-
-            minio_client.put_object(
-                settings.MINIO_BUCKET,
-                out_object_name,
-                out_bytes,
-                len(out_bytes.getvalue()),
-                content_type="image/jpeg"
-            )
-
-            return settings.get_full_url(out_rel_path)
-
-    except Exception as e:
-        logger.error(f"切割并上传图片失败: {e}")
-        return ""
-
-
 @router.get("/generate", status_code=200, summary="生成评级报告数据")
 def generate_rating_report(
         cardNo: str,
@@ -237,7 +104,7 @@ def generate_rating_report(
             response_data["backImageUrl"] = img.image_path
 
         # 获取有效 JSON
-        json_data = _get_active_json(img)
+        json_data = get_active_json(img)
         if not json_data or "result" not in json_data:
             continue
 
@@ -315,7 +182,7 @@ def generate_rating_report(
 
         if min_rect and len(min_rect) == 3:
             # 切图并保存
-            defect_img_url = _crop_defect_image(original_img_path, min_rect, filename)
+            defect_img_url = crop_defect_image(original_img_path, min_rect, filename)
 
             # 计算 Location (中心坐标)
             # min_rect[0] 是 [x, y]
@@ -343,7 +210,7 @@ def generate_rating_report(
 
     response_data["defectDetailList"] = final_defect_list
     report_name = f"{cardNo}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
-    rating_id = _save_rating_report_history(db_conn, card_id, cardNo, report_name, response_data)
+    rating_id = save_rating_report_history(db_conn, card_id, cardNo, report_name, response_data)
     response_data["ratingId"] = rating_id
     response_data["reportName"] = report_name
 
@@ -395,7 +262,7 @@ def get_rating_report_history_list(
             "cardId": row.get("card_id"),
             "cardNo": row.get("cardNo"),
             "reportName": row.get("report_name"),
-            "createdAt": _format_datetime(row.get("created_at"))
+            "createdAt": format_datetime(row.get("created_at"))
         })
 
     return {
@@ -433,7 +300,7 @@ def get_rating_report_history_detail(
         "cardId": row.get("card_id"),
         "cardNo": row.get("cardNo"),
         "reportName": row.get("report_name"),
-        "createdAt": _format_datetime(row.get("created_at")),
-        "updatedAt": _format_datetime(row.get("updated_at")),
-        "reportData": _parse_json_value(row.get("report_json"))
+        "createdAt": format_datetime(row.get("created_at")),
+        "updatedAt": format_datetime(row.get("updated_at")),
+        "reportData": parse_json_value(row.get("report_json"))
     }

+ 141 - 0
app/utils/rating_report_utils.py

@@ -0,0 +1,141 @@
+import io
+import json
+import cv2
+import numpy as np
+from fastapi import HTTPException
+from datetime import datetime
+from app.core.minio_client import minio_client
+from app.core.config import settings
+from typing import List, Dict, Any, Optional
+from PIL import Image
+from app.utils.scheme import ImageType
+from mysql.connector.pooling import PooledMySQLConnection
+from app.core.logger import get_logger
+
+logger = get_logger(__name__)
+
+
+def parse_json_value(raw_value: Any) -> Dict[str, Any]:
+    if raw_value is None:
+        return {}
+    if isinstance(raw_value, dict):
+        return raw_value
+    if isinstance(raw_value, str):
+        try:
+            return json.loads(raw_value)
+        except json.JSONDecodeError:
+            return {}
+    return {}
+
+
+def format_datetime(dt: Any) -> str:
+    if isinstance(dt, datetime):
+        return dt.strftime("%Y-%m-%d %H:%M:%S")
+    return ""
+
+
+def save_rating_report_history(
+        db_conn: PooledMySQLConnection,
+        card_id: int,
+        card_no: str,
+        report_name: str,
+        report_json: Dict[str, Any]
+) -> int:
+    try:
+        with db_conn.cursor() as cursor:
+            insert_sql = (
+                f"INSERT INTO {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
+                "(card_id, cardNo, report_name, report_json) "
+                "VALUES (%s, %s, %s, %s)"
+            )
+            cursor.execute(
+                insert_sql,
+                (card_id, card_no, report_name, json.dumps(report_json, ensure_ascii=False))
+            )
+            db_conn.commit()
+            return cursor.lastrowid
+    except Exception as e:
+        db_conn.rollback()
+        logger.error(f"保存评级报告历史失败: {e}")
+        raise HTTPException(status_code=500, detail="保存评级报告历史失败。")
+
+
+def get_active_json(image_data: Any) -> Optional[Dict]:
+    """获取有效的json数据,优先 modified_json"""
+    if not image_data:
+        return None
+
+    # image_data 可能是 Pydantic 对象或 字典,做兼容处理
+    if hasattr(image_data, "modified_json"):
+        mj = image_data.modified_json
+        dj = image_data.detection_json
+    else:
+        mj = image_data.get("modified_json")
+        dj = image_data.get("detection_json")
+
+    # 注意:根据 schema.py,这里读出来已经是 dict 了,不需要 json.loads
+    # 如果数据库里存的是 null,读出来是 None
+    if mj:
+        return mj
+    return dj
+
+
+def crop_defect_image(original_image_path_str: str, min_rect: List, output_filename: str) -> str:
+    """
+    通过 MinIO 切割缺陷图片为正方形并保存
+    """
+    try:
+        # ★ 将进来的全路径 URL 剥离为相对路径 (如 /Data/xxx.jpg) 供 MinIO 读取
+        rel_path = original_image_path_str.replace(settings.DATA_HOST_URL, "")
+        rel_path = "/" + rel_path.lstrip('/\\')
+        object_name = f"{settings.MINIO_BASE_PREFIX}{rel_path}"
+
+        # 1. 从 MinIO 获取原图字节
+        try:
+            response = minio_client.get_object(settings.MINIO_BUCKET, object_name)
+            image_bytes = response.read()
+            response.close()
+            response.release_conn()
+        except Exception as e:
+            logger.warning(f"从MinIO获取原图失败: {object_name} -> {e}")
+            return ""
+
+        # 2. 在内存中用 PIL 切图
+        with Image.open(io.BytesIO(image_bytes)) as img:
+            img_w, img_h = img.size
+            (center_x, center_y), (rect_w, rect_h), angle = min_rect
+            center_x = int(center_x)
+            center_y = int(center_y)
+            rect_w = int(rect_w)
+            rect_h = int(rect_h)
+
+            resize_scale = 1.5 - abs(abs(angle % 90) - 45) / 90
+            side_length = max(max(rect_w, rect_h) * resize_scale, 100)
+            half_side = side_length / 2
+
+            left, top = max(0, center_x - half_side), max(0, center_y - half_side)
+            right, bottom = min(img_w, center_x + half_side), min(img_h, center_y + half_side)
+
+            cropped_img = img.crop((left, top, right, bottom))
+
+            # 3. 将切割后的图存入内存流,并上传到 MinIO
+            out_bytes = io.BytesIO()
+            cropped_img.save(out_bytes, format="JPEG", quality=95)
+            out_bytes.seek(0)
+
+            out_rel_path = f"/DefectImage/{output_filename}"
+            out_object_name = f"{settings.MINIO_BASE_PREFIX}{out_rel_path}"
+
+            minio_client.put_object(
+                settings.MINIO_BUCKET,
+                out_object_name,
+                out_bytes,
+                len(out_bytes.getvalue()),
+                content_type="image/jpeg"
+            )
+
+            return settings.get_full_url(out_rel_path)
+
+    except Exception as e:
+        logger.error(f"切割并上传图片失败: {e}")
+        return ""