||
- from typing import Optional, List, Dict, Any
- from datetime import date
- from mysql.connector.pooling import PooledMySQLConnection
- import json
- from datetime import datetime
- import copy
- from app.core.config import settings
- from app.utils.scheme import CardImageResponse, CardType, SortBy, SortOrder, ImageType
- from app.utils.card_score_calculate import calculate_scores_from_images
- from app.core.logger import get_logger
- logger = get_logger(__name__)
- # 定义灰度图固定的 detection_json 结构
- EMPTY_DETECTION_JSON = {
- "result": {
- "center_result": {},
- "defect_result": {
- "defects": []
- }
- }
- }
- def update_card_scores_and_status(db_conn: PooledMySQLConnection, card_id: int):
- """
- 更新cards表中的分数和状态。注意:只基于 card_images 表(主4张图)计算。
- """
- with db_conn.cursor(dictionary=True) as cursor:
- # 1. 获取所有关联图片 (主表)
- query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s"
- cursor.execute(query_images, (card_id,))
- image_records = cursor.fetchall()
- images = [CardImageResponse.model_validate(row) for row in image_records]
- # 2. 计算分数和状态
- scores_data = calculate_scores_from_images(images)
- # 3. 更新 cards 表
- query_update_card = (
- f"UPDATE {settings.DB_CARD_TABLE_NAME} SET "
- "detection_score = %s, modified_score = %s, is_edited = %s, updated_at = %s "
- "WHERE id = %s"
- )
- params = (
- scores_data["detection_score"],
- scores_data["modified_score"],
- scores_data["is_edited"],
- datetime.now(),
- card_id,
- )
- cursor.execute(query_update_card, params)
- db_conn.commit()
- def _construct_gray_image_json(gray_type: ImageType, ring_image_data: Optional[Dict[str, Any]]) -> Dict[str, Any]:
- """
- 内部辅助:构建灰度图的 modified_json
- gray_type: front_gray 或 back_gray
- ring_image_data: 对应的 ring 图的数据库原始字典数据 (包含 detection_json/modified_json 字段)
- """
- if not ring_image_data:
- return None
- # 获取 Ring 图最新的 JSON (优先取 modified, 没有则取 detection)
- source_json = ring_image_data.get('modified_json')
- if not source_json:
- source_json = ring_image_data.get('detection_json')
- # 解析 JSON 字符串
- if isinstance(source_json, str):
- try:
- source_json = json.loads(source_json)
- except:
- return None
- if not source_json:
- return None
- # 开始筛选 defects
- defects = source_json.get("result", {}).get("defect_result", {}).get("defects", [])
- filtered_defects = []
- for defect in defects:
- # 寻找存在 "gray_id" 字段的数据
- if "gray_id" in defect:
- filtered_defects.append(defect)
- if not filtered_defects:
- return None # 这里暂定如果没有相关缺陷,modified_json 为 None
- gray_modified_json = copy.deepcopy(EMPTY_DETECTION_JSON)
- gray_modified_json["result"]["defect_result"]["defects"] = filtered_defects
- # 还可以把 Ring 图的宽高带过来,防止前端报错
- gray_modified_json["result"]["imageHeight"] = source_json.get("result", {}).get("imageHeight", 0)
- gray_modified_json["result"]["imageWidth"] = source_json.get("result", {}).get("imageWidth", 0)
- return gray_modified_json
- def get_card_with_details(db_conn: PooledMySQLConnection, card_id: int) -> Optional[Dict[str, Any]]:
- """获取单个卡牌的完整信息,包含主图和灰度图。"""
- with db_conn.cursor(dictionary=True) as cursor:
- # 1. 获取卡牌信息
- query_card = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s"
- cursor.execute(query_card, (card_id,))
- card_data = cursor.fetchone()
- if not card_data:
- return None
- # 2. 获取主图片 (Card Images)
- query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s"
- cursor.execute(query_images, (card_id,))
- main_image_records = cursor.fetchall()
- # 3. 获取灰度图片 (Gray Images)
- query_gray = f"SELECT * FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id = %s"
- cursor.execute(query_gray, (card_id,))
- gray_image_records = cursor.fetchall()
- # 4. 寻找 Ring 图的数据,用于辅助构建灰度图 JSON
- # 建立映射: ImageType -> Record Dict
- main_images_map = {rec['image_type']: rec for rec in main_image_records}
- final_images_list = []
- # 处理主图片
- for row in main_image_records:
- final_images_list.append(CardImageResponse.model_validate(row))
- # 处理灰度图片
- for row in gray_image_records:
- g_type = row['image_type']
- # 确定对应的 Ring 类型
- target_ring_type = None
- if g_type == ImageType.front_gray.value:
- target_ring_type = ImageType.front_ring.value
- elif g_type == ImageType.back_gray.value:
- target_ring_type = ImageType.back_ring.value
- # 构造虚拟数据
- # a. detection_json 写死
- virtual_detection_json = copy.deepcopy(EMPTY_DETECTION_JSON)
- # b. modified_json 动态计算
- virtual_modified_json = None
- if target_ring_type:
- ring_data = main_images_map.get(target_ring_type)
- virtual_modified_json = _construct_gray_image_json(g_type, ring_data)
- # 构造字典以符合 Pydantic 模型
- # 灰度图表里没有的字段补 None
- gray_image_dict = {
- "id": row['id'],
- "card_id": row['card_id'],
- "image_type": row['image_type'],
- "image_path": row['image_path'],
- "created_at": row['created_at'],
- "updated_at": row['updated_at'],
- # 虚拟字段
- "detection_json": virtual_detection_json,
- "modified_json": virtual_modified_json,
- "image_name": None,
- "detection_image_path": None,
- "modified_image_path": None,
- "is_edited": False # 灰度图本身不算被编辑,它只是展示 Ring 的编辑结果
- }
- final_images_list.append(CardImageResponse.model_validate(gray_image_dict))
- # 5. 获取分数详情 (只基于主图片计算)
- # 过滤掉灰度图进行分数计算,防止干扰逻辑
- main_images_objs = [img for img in final_images_list if
- img.image_type not in [ImageType.front_gray.value, ImageType.back_gray.value]]
- score_details = calculate_scores_from_images(main_images_objs)
- # 6. 对图片列表进行自定义排序
- # 顺序: [front_gray, back_gray, front_ring, back_ring, front_coaxial, back_coaxial]
- sort_priority = {
- ImageType.front_gray.value: 0,
- ImageType.back_gray.value: 1,
- ImageType.front_ring.value: 2,
- ImageType.back_ring.value: 3,
- ImageType.front_coaxial.value: 4,
- ImageType.back_coaxial.value: 5
- }
- final_images_list.sort(key=lambda x: sort_priority.get(x.image_type, 999))
- card_data.update({
- "images": final_images_list, # 包含所有图片
- "detection_score_detail": score_details["detection_score_detail"],
- "modified_score_detail": score_details["modified_score_detail"]
- })
- return card_data
- def get_card_list_with_images(
- db_conn: PooledMySQLConnection,
- card_id: Optional[int],
- card_name: Optional[str],
- card_type: Optional[CardType],
- is_edited: Optional[bool],
- min_detect_score: Optional[float],
- max_detect_score: Optional[float],
- min_mod_score: Optional[float],
- max_mod_score: Optional[float],
- created_start: Optional[date],
- created_end: Optional[date],
- updated_start: Optional[date],
- updated_end: Optional[date],
- sort_by: SortBy,
- sort_order: SortOrder,
- skip: int,
- limit: int
- ) -> List[Dict[str, Any]]:
- # 此函数逻辑主要是列表展示,不需要复杂的 JSON 构造,
- # 只需要把灰度图的路径也带出来即可。
- with db_conn.cursor(dictionary=True) as cursor:
- query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}"
- conditions = []
- params = []
- if card_id is not None: conditions.append("id = %s"); params.append(card_id)
- if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%")
- if card_type: conditions.append("card_type = %s"); params.append(card_type.value)
- if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited)
- if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score)
- if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score)
- if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score)
- if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score)
- if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start)
- if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end)
- if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start)
- if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end)
- if conditions: query += " WHERE " + " AND ".join(conditions)
- query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC"
- query += " LIMIT %s OFFSET %s"
- params.extend([limit, skip])
- cursor.execute(query, tuple(params))
- cards = cursor.fetchall()
- if not cards:
- return []
- card_ids = [card['id'] for card in cards]
- format_strings = ','.join(['%s'] * len(card_ids))
- # 1. 查询主图片
- image_query = (
- f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path "
- f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
- )
- cursor.execute(image_query, tuple(card_ids))
- images = cursor.fetchall()
- # 2. 查询灰度图片
- gray_query = (
- f"SELECT id, card_id, image_type, image_path "
- f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
- )
- cursor.execute(gray_query, tuple(card_ids))
- gray_images = cursor.fetchall()
- # 分组
- images_by_card_id = {}
- for img in images:
- cid = img['card_id']
- if cid not in images_by_card_id: images_by_card_id[cid] = []
- images_by_card_id[cid].append(img)
- for g_img in gray_images:
- cid = g_img['card_id']
- if cid not in images_by_card_id: images_by_card_id[cid] = []
- # 补齐字段结构以便前端统一处理
- g_img['detection_image_path'] = None
- g_img['modified_image_path'] = None
- images_by_card_id[cid].append(g_img)
- # 附加到卡牌
- for card in cards:
- card['image_path_list'] = {}
- card['detection_image_path_list'] = {}
- card['modified_image_path_list'] = {}
- related_images = images_by_card_id.get(card['id'], [])
- for image_data in related_images:
- image_type = image_data['image_type']
- if image_type:
- card['image_path_list'][image_type] = image_data.get('image_path')
- card['detection_image_path_list'][image_type] = image_data.get('detection_image_path')
- card['modified_image_path_list'][image_type] = image_data.get('modified_image_path')
- return cards
- def get_card_list_and_count(
- db_conn: PooledMySQLConnection,
- card_id: Optional[int],
- card_name: Optional[str],
- card_type: Optional[CardType],
- is_edited: Optional[bool],
- min_detect_score: Optional[float],
- max_detect_score: Optional[float],
- min_mod_score: Optional[float],
- max_mod_score: Optional[float],
- created_start: Optional[date],
- created_end: Optional[date],
- updated_start: Optional[date],
- updated_end: Optional[date],
- sort_by: SortBy,
- sort_order: SortOrder,
- skip: int,
- limit: int
- ) -> Dict[str, Any]:
- with db_conn.cursor(dictionary=True) as cursor:
- conditions = []
- params = []
- if card_id is not None: conditions.append("id = %s"); params.append(card_id)
- if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%")
- if card_type: conditions.append("card_type = %s"); params.append(card_type.value)
- if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited)
- if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score)
- if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score)
- if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score)
- if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score)
- if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start)
- if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end)
- if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start)
- if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end)
- where_clause = ""
- if conditions: where_clause = " WHERE " + " AND ".join(conditions)
- # Count
- count_query = f"SELECT COUNT(*) as total FROM {settings.DB_CARD_TABLE_NAME}" + where_clause
- cursor.execute(count_query, tuple(params))
- total_count = cursor.fetchone()['total']
- # List
- data_query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}" + where_clause
- data_query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC"
- data_query += " LIMIT %s OFFSET %s"
- data_params = params.copy()
- data_params.extend([limit, skip])
- cursor.execute(data_query, tuple(data_params))
- cards = cursor.fetchall()
- if cards:
- card_ids = [card['id'] for card in cards]
- format_strings = ','.join(['%s'] * len(card_ids))
- # 主图
- image_query = (
- f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path "
- f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
- )
- cursor.execute(image_query, tuple(card_ids))
- images = cursor.fetchall()
- # [NEW] 灰度图
- gray_query = (
- f"SELECT id, card_id, image_type, image_path "
- f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
- )
- cursor.execute(gray_query, tuple(card_ids))
- gray_images = cursor.fetchall()
- images_by_card_id = {}
- for img in images:
- cid = img['card_id']
- if cid not in images_by_card_id: images_by_card_id[cid] = []
- images_by_card_id[cid].append(img)
- # 混入灰度图
- for g_img in gray_images:
- cid = g_img['card_id']
- if cid not in images_by_card_id: images_by_card_id[cid] = []
- g_img['detection_image_path'] = None
- g_img['modified_image_path'] = None
- images_by_card_id[cid].append(g_img)
- for card in cards:
- card['image_path_list'] = {}
- card['detection_image_path_list'] = {}
- card['modified_image_path_list'] = {}
- related_images = images_by_card_id.get(card['id'], [])
- for image_data in related_images:
- image_type = image_data['image_type']
- if image_type:
- card['image_path_list'][image_type] = image_data.get('image_path')
- card['detection_image_path_list'][image_type] = image_data.get('detection_image_path')
- card['modified_image_path_list'][image_type] = image_data.get('modified_image_path')
- return {
- "total": total_count,
- "list": cards
- }
|