Ver Fonte

评级报告历史

AnlaAnla há 1 semana atrás
pai
commit
7486f2fa5b
5 ficheiros alterados com 152 adições e 79 exclusões
  1. 3 2
      Test/auto_img_insert.py
  2. 1 74
      Test/test01.py
  3. 131 3
      app/api/rating_report.py
  4. 1 0
      app/core/config.py
  5. 16 0
      app/core/database_loader.py

+ 3 - 2
Test/auto_img_insert.py

@@ -45,9 +45,10 @@ def auto_import(data, target_url):
 
 
 if __name__ == '__main__':
-    target_url = f"http://192.168.77.249:7755/api/import/process_and_import"
+    # target_url = f"http://192.168.77.249:7755/api/import/process_and_import"
+    target_url = f"http://127.0.0.1:7755/api/import/process_and_import"
 
-    for i in range(2, 11):
+    for i in range(2, 4):
         request_data = {
             "card_name": f"测试卡-{i}",
             "cardNo": f"testNo-{i}",

+ 1 - 74
Test/test01.py

@@ -1,74 +1 @@
-import requests
-import os
-from datetime import datetime
-
-# --- 配置区域 ---
-# 替换为你的存储服务端实际地址和端口
-SERVER_URL = "http://127.0.0.1:7755"
-API_ENDPOINT = f"{SERVER_URL}/api/import/process_and_import"
-
-
-def import_card(
-        card_name: str,
-        cardNo: str,
-        card_type: str,
-        is_reflect_card: bool,
-        strict_mode: bool,
-        paths: dict
-):
-    """
-    请求后端的自动导入接口
-    """
-    # 构造请求的 JSON Payload
-    payload = {
-        "card_name": card_name,
-        "cardNo": cardNo,
-        "card_type": card_type,
-        "is_reflect_card": is_reflect_card,
-        "strict_mode": strict_mode,
-        "path_front_ring": paths.get("front_ring"),
-        "path_front_coaxial": paths.get("front_coaxial"),
-        "path_back_ring": paths.get("back_ring"),
-        "path_back_coaxial": paths.get("back_coaxial"),
-        "path_front_gray": paths.get("front_gray"),
-        "path_back_gray": paths.get("back_gray")
-    }
-
-    print(payload)
-
-
-if __name__ == "__main__":
-    # --- 测试配置 ---
-    BASE_PATH = r"C:\Code\ML\Image\Card\img20_test"
-    IS_REFLECT = True
-    CARD_TYPE = "pokemon"
-
-    # 模拟循环处理 (类似你原来的逻辑)
-    for img_num in range(2, 3):  # 测试 1 到 3
-        print(f"\n>>>>> 准备处理图片组: {img_num}")
-
-        # 构造卡牌信息
-        timestamp = datetime.now().strftime('%m%d_%H%M%S')
-        card_name = f"API测试卡_{img_num}_{timestamp}"
-        cardNo = f"SN-TEST-{img_num:03d}"
-
-        # 构造路径字典 (根据你的实际命名规则修改)
-        # 注意:我在 back_ring 的地方复用了你之前脚本里硬编码的路径作为示例
-        image_paths = {
-            "front_coaxial": os.path.join(BASE_PATH, f"{img_num}_front_coaxial.jpg"),
-            "front_ring": os.path.join(BASE_PATH, f"{img_num}_front_ring.jpg"),
-            "back_coaxial": os.path.join(BASE_PATH, f"{img_num}_back_coaxial.jpg"),
-            "back_ring": r"C:\Code\ML\Image\Card\b2.jpg",  # 你原脚本中的硬编码测试路径
-            "front_gray": os.path.join(BASE_PATH, f"{img_num}_front_gray.jpg"),
-            "back_gray": os.path.join(BASE_PATH, f"{img_num}_back_gray.jpg")
-        }
-
-        # 调用封装的函数发送 HTTP 请求
-        import_card(
-            card_name=card_name,
-            cardNo=cardNo,
-            card_type=CARD_TYPE,
-            is_reflect_card=IS_REFLECT,
-            strict_mode=True,  # 严格模式:如果缺主图,服务端会直接拦截报错
-            paths=image_paths
-        )
+print(1)

+ 131 - 3
app/api/rating_report.py

@@ -2,6 +2,8 @@ from fastapi import APIRouter, HTTPException, Depends, Query
 from mysql.connector.pooling import PooledMySQLConnection
 
 import io
+import json
+from datetime import datetime
 from app.core.minio_client import minio_client
 from typing import List, Dict, Any, Optional
 from PIL import Image
@@ -16,6 +18,51 @@ logger = get_logger(__name__)
 router = APIRouter()
 
 
+def _parse_json_value(raw_value: Any) -> Dict[str, Any]:
+    if raw_value is None:
+        return {}
+    if isinstance(raw_value, dict):
+        return raw_value
+    if isinstance(raw_value, str):
+        try:
+            return json.loads(raw_value)
+        except json.JSONDecodeError:
+            return {}
+    return {}
+
+
+def _format_datetime(dt: Any) -> str:
+    if isinstance(dt, datetime):
+        return dt.strftime("%Y-%m-%d %H:%M:%S")
+    return ""
+
+
+def _save_rating_report_history(
+        db_conn: PooledMySQLConnection,
+        card_id: int,
+        card_no: str,
+        report_name: str,
+        report_json: Dict[str, Any]
+) -> int:
+    try:
+        with db_conn.cursor() as cursor:
+            insert_sql = (
+                f"INSERT INTO {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
+                "(card_id, cardNo, report_name, report_json) "
+                "VALUES (%s, %s, %s, %s)"
+            )
+            cursor.execute(
+                insert_sql,
+                (card_id, card_no, report_name, json.dumps(report_json, ensure_ascii=False))
+            )
+            db_conn.commit()
+            return cursor.lastrowid
+    except Exception as e:
+        db_conn.rollback()
+        logger.error(f"保存评级报告历史失败: {e}")
+        raise HTTPException(status_code=500, detail="保存评级报告历史失败。")
+
+
 def _get_active_json(image_data: Any) -> Optional[Dict]:
     """获取有效的json数据,优先 modified_json"""
     if not image_data:
@@ -117,8 +164,9 @@ def generate_rating_report(
         )
 
     card_id = row[0]
+    rating_time_now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
 
-    top_n_defects = 3
+    # top_n_defects = 3
     """
     根据 Card ID 生成评级报告 JSON
     """
@@ -237,7 +285,8 @@ def generate_rating_report(
     # 按实际面积从大到小排序
     all_defects_collected.sort(key=lambda x: x["area"], reverse=True)
 
-    top_defects = all_defects_collected[:top_n_defects]
+    # top_defects = all_defects_collected[:top_n_defects]
+    top_defects = all_defects_collected
 
     final_defect_list = []
     for idx, item in enumerate(top_defects, start=1):
@@ -249,7 +298,7 @@ def generate_rating_report(
         d_id = idx  # 1, 2, 3
 
         # 构造文件名: {card_id}_{seq_id}.jpg
-        filename = f"{card_id}_{d_id}.jpg"
+        filename = f"{card_id}_{d_id}_{rating_time_now}.jpg"
 
         # 执行切图
         min_rect = defect.get("min_rect")
@@ -285,5 +334,84 @@ def generate_rating_report(
         })
 
     response_data["defectDetailList"] = final_defect_list
+    report_name = f"{cardNo}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+    rating_id = _save_rating_report_history(db_conn, card_id, cardNo, report_name, response_data)
+    response_data["ratingId"] = rating_id
+    response_data["reportName"] = report_name
 
     return response_data
+
+
+@router.get("/history", status_code=200, summary="根据 cardNo 查询评级报告历史列表")
+def get_rating_report_history_list(
+        cardNo: str,
+        limit: int = Query(100, ge=1, le=1000),
+        db_conn: PooledMySQLConnection = Depends(get_db_connection)
+):
+    if not cardNo or not cardNo.strip():
+        raise HTTPException(status_code=400, detail="cardNo 不能为空")
+
+    try:
+        with db_conn.cursor(dictionary=True) as cursor:
+            query_sql = (
+                f"SELECT rating_id, card_id, cardNo, report_name, created_at "
+                f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
+                "WHERE cardNo = %s "
+                "ORDER BY rating_id DESC "
+                "LIMIT %s"
+            )
+            cursor.execute(query_sql, (cardNo, limit))
+            rows = cursor.fetchall()
+    except Exception as e:
+        logger.error(f"查询评级报告历史列表失败: {e}")
+        raise HTTPException(status_code=500, detail="查询评级报告历史列表失败。")
+
+    history_list = []
+    for row in rows:
+        history_list.append({
+            "ratingId": row.get("rating_id"),
+            "cardId": row.get("card_id"),
+            "cardNo": row.get("cardNo"),
+            "reportName": row.get("report_name"),
+            "createdAt": _format_datetime(row.get("created_at"))
+        })
+
+    return {
+        "cardNo": cardNo,
+        "data": {
+            "total": len(history_list),
+            "list": history_list
+        }
+    }
+
+
+@router.get("/history/{rating_id}", status_code=200, summary="根据 rating_id 查询单个评级报告历史")
+def get_rating_report_history_detail(
+        rating_id: int,
+        db_conn: PooledMySQLConnection = Depends(get_db_connection)
+):
+    try:
+        with db_conn.cursor(dictionary=True) as cursor:
+            query_sql = (
+                f"SELECT rating_id, card_id, cardNo, report_name, report_json, created_at, updated_at "
+                f"FROM {settings.RATING_REPORT_HISTORY_TABLE_NAME} "
+                "WHERE rating_id = %s LIMIT 1"
+            )
+            cursor.execute(query_sql, (rating_id,))
+            row = cursor.fetchone()
+    except Exception as e:
+        logger.error(f"查询评级报告历史详情失败: {e}")
+        raise HTTPException(status_code=500, detail="查询评级报告历史详情失败。")
+
+    if not row:
+        raise HTTPException(status_code=404, detail=f"未找到 rating_id={rating_id} 的历史记录")
+
+    return {
+        "ratingId": row.get("rating_id"),
+        "cardId": row.get("card_id"),
+        "cardNo": row.get("cardNo"),
+        "reportName": row.get("report_name"),
+        "createdAt": _format_datetime(row.get("created_at")),
+        "updatedAt": _format_datetime(row.get("updated_at")),
+        "reportData": _parse_json_value(row.get("report_json"))
+    }

+ 1 - 0
app/core/config.py

@@ -38,6 +38,7 @@ class Settings:
     DB_CARD_TABLE_NAME = 'cards'
     DB_IMAGE_TABLE_NAME = 'card_images'
     DB_GRAY_IMAGE_TABLE_NAME = 'card_gray_images'  # 灰度图表名
+    RATING_REPORT_HISTORY_TABLE_NAME = "rating_report_history"
 
     DATABASE_CONFIG: Dict[str, str] = {
         'user': 'root',

+ 16 - 0
app/core/database_loader.py

@@ -77,6 +77,22 @@ def init_database():
             ") ENGINE=InnoDB COMMENT='存储辅助灰度图'"
         )
         cursor.execute(card_gray_images_table)
+
+        rating_report_history_table = (
+            "CREATE TABLE IF NOT EXISTS `rating_report_history` ("
+            "  `rating_id` BIGINT AUTO_INCREMENT PRIMARY KEY,"
+            "  `card_id` INT NOT NULL,"
+            "  `cardNo` VARCHAR(100) NOT NULL,"
+            "  `report_name` VARCHAR(255) NOT NULL,"
+            "  `report_json` JSON NOT NULL,"
+            "  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+            "  `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,"
+            f"  FOREIGN KEY (`card_id`) REFERENCES `{settings.DB_CARD_TABLE_NAME}`(`id`) ON DELETE CASCADE,"
+            "  INDEX `idx_rating_history_card_no_created` (`cardNo`, `created_at`),"
+            "  INDEX `idx_rating_history_card_id_created` (`card_id`, `created_at`)"
+            ") ENGINE=InnoDB COMMENT='拆卡报告历史'"
+        )
+        cursor.execute(rating_report_history_table)
         logger.info(f"数据表 '{settings.DB_GRAY_IMAGE_TABLE_NAME}' 已准备就绪。")
 
     except mysql.connector.Error as err: