from typing import Optional, List, Dict, Any from datetime import date from mysql.connector.pooling import PooledMySQLConnection import json from datetime import datetime import copy from app.core.config import settings from app.utils.scheme import CardImageResponse, CardType, SortBy, SortOrder, ImageType from app.utils.card_score_calculate import calculate_scores_from_images from app.core.logger import get_logger logger = get_logger(__name__) # 定义灰度图固定的 detection_json 结构 EMPTY_DETECTION_JSON = { "result": { "center_result": {}, "defect_result": { "defects": [] } } } def update_card_scores_and_status(db_conn: PooledMySQLConnection, card_id: int): """ 更新cards表中的分数和状态。注意:只基于 card_images 表(主4张图)计算。 """ with db_conn.cursor(dictionary=True) as cursor: # 1. 获取所有关联图片 (主表) query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s" cursor.execute(query_images, (card_id,)) image_records = cursor.fetchall() images = [CardImageResponse.model_validate(row) for row in image_records] # 2. 计算分数和状态 scores_data = calculate_scores_from_images(images) # 3. 更新 cards 表 query_update_card = ( f"UPDATE {settings.DB_CARD_TABLE_NAME} SET " "detection_score = %s, modified_score = %s, is_edited = %s, updated_at = %s " "WHERE id = %s" ) params = ( scores_data["detection_score"], scores_data["modified_score"], scores_data["is_edited"], datetime.now(), card_id, ) cursor.execute(query_update_card, params) db_conn.commit() def _construct_gray_image_json(gray_type: ImageType, ring_image_data: Optional[Dict[str, Any]]) -> Dict[str, Any]: """ 内部辅助:构建灰度图的 modified_json gray_type: front_gray 或 back_gray ring_image_data: 对应的 ring 图的数据库原始字典数据 (包含 detection_json/modified_json 字段) """ if not ring_image_data: return None # 获取 Ring 图最新的 JSON (优先取 modified, 没有则取 detection) source_json = ring_image_data.get('modified_json') if not source_json: source_json = ring_image_data.get('detection_json') # 解析 JSON 字符串 if isinstance(source_json, str): try: source_json = json.loads(source_json) except: return None if not source_json: return None # 开始筛选 defects defects = source_json.get("result", {}).get("defect_result", {}).get("defects", []) filtered_defects = [] for defect in defects: # 寻找存在 "gray_id" 字段的数据 if "gray_id" in defect: filtered_defects.append(defect) if not filtered_defects: return None # 这里暂定如果没有相关缺陷,modified_json 为 None gray_modified_json = copy.deepcopy(EMPTY_DETECTION_JSON) gray_modified_json["result"]["defect_result"]["defects"] = filtered_defects # 还可以把 Ring 图的宽高带过来,防止前端报错 gray_modified_json["result"]["imageHeight"] = source_json.get("result", {}).get("imageHeight", 0) gray_modified_json["result"]["imageWidth"] = source_json.get("result", {}).get("imageWidth", 0) return gray_modified_json def get_card_with_details(db_conn: PooledMySQLConnection, card_id: int) -> Optional[Dict[str, Any]]: """获取单个卡牌的完整信息,包含主图和灰度图。""" with db_conn.cursor(dictionary=True) as cursor: # 1. 获取卡牌信息 query_card = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s" cursor.execute(query_card, (card_id,)) card_data = cursor.fetchone() if not card_data: return None # 2. 获取主图片 (Card Images) query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s" cursor.execute(query_images, (card_id,)) main_image_records = cursor.fetchall() # 3. 获取灰度图片 (Gray Images) query_gray = f"SELECT * FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id = %s" cursor.execute(query_gray, (card_id,)) gray_image_records = cursor.fetchall() # 4. 寻找 Ring 图的数据,用于辅助构建灰度图 JSON # 建立映射: ImageType -> Record Dict main_images_map = {rec['image_type']: rec for rec in main_image_records} final_images_list = [] # 处理主图片 for row in main_image_records: final_images_list.append(CardImageResponse.model_validate(row)) # 处理灰度图片 for row in gray_image_records: g_type = row['image_type'] # 确定对应的 Ring 类型 target_ring_type = None if g_type == ImageType.front_gray.value: target_ring_type = ImageType.front_ring.value elif g_type == ImageType.back_gray.value: target_ring_type = ImageType.back_ring.value # 构造虚拟数据 # a. detection_json 写死 virtual_detection_json = copy.deepcopy(EMPTY_DETECTION_JSON) # b. modified_json 动态计算 virtual_modified_json = None if target_ring_type: ring_data = main_images_map.get(target_ring_type) virtual_modified_json = _construct_gray_image_json(g_type, ring_data) # 构造字典以符合 Pydantic 模型 # 灰度图表里没有的字段补 None gray_image_dict = { "id": row['id'], "card_id": row['card_id'], "image_type": row['image_type'], "image_path": row['image_path'], "created_at": row['created_at'], "updated_at": row['updated_at'], # 虚拟字段 "detection_json": virtual_detection_json, "modified_json": virtual_modified_json, "image_name": None, "detection_image_path": None, "modified_image_path": None, "is_edited": False # 灰度图本身不算被编辑,它只是展示 Ring 的编辑结果 } final_images_list.append(CardImageResponse.model_validate(gray_image_dict)) # 5. 获取分数详情 (只基于主图片计算) # 过滤掉灰度图进行分数计算,防止干扰逻辑 main_images_objs = [img for img in final_images_list if img.image_type not in [ImageType.front_gray.value, ImageType.back_gray.value]] score_details = calculate_scores_from_images(main_images_objs) # 6. 对图片列表进行自定义排序 # 顺序: [front_gray, back_gray, front_ring, back_ring, front_coaxial, back_coaxial] sort_priority = { ImageType.front_gray.value: 0, ImageType.back_gray.value: 1, ImageType.front_ring.value: 2, ImageType.back_ring.value: 3, ImageType.front_coaxial.value: 4, ImageType.back_coaxial.value: 5 } final_images_list.sort(key=lambda x: sort_priority.get(x.image_type, 999)) card_data.update({ "images": final_images_list, # 包含所有图片 "detection_score_detail": score_details["detection_score_detail"], "modified_score_detail": score_details["modified_score_detail"] }) return card_data def get_card_list_with_images( db_conn: PooledMySQLConnection, card_id: Optional[int], card_name: Optional[str], card_type: Optional[CardType], is_edited: Optional[bool], min_detect_score: Optional[float], max_detect_score: Optional[float], min_mod_score: Optional[float], max_mod_score: Optional[float], created_start: Optional[date], created_end: Optional[date], updated_start: Optional[date], updated_end: Optional[date], sort_by: SortBy, sort_order: SortOrder, skip: int, limit: int ) -> List[Dict[str, Any]]: # 此函数逻辑主要是列表展示,不需要复杂的 JSON 构造, # 只需要把灰度图的路径也带出来即可。 with db_conn.cursor(dictionary=True) as cursor: query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}" conditions = [] params = [] if card_id is not None: conditions.append("id = %s"); params.append(card_id) if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%") if card_type: conditions.append("card_type = %s"); params.append(card_type.value) if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited) if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score) if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score) if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score) if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score) if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start) if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end) if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start) if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end) if conditions: query += " WHERE " + " AND ".join(conditions) query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC" query += " LIMIT %s OFFSET %s" params.extend([limit, skip]) cursor.execute(query, tuple(params)) cards = cursor.fetchall() if not cards: return [] card_ids = [card['id'] for card in cards] format_strings = ','.join(['%s'] * len(card_ids)) # 1. 查询主图片 image_query = ( f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path " f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})" ) cursor.execute(image_query, tuple(card_ids)) images = cursor.fetchall() # 2. 查询灰度图片 gray_query = ( f"SELECT id, card_id, image_type, image_path " f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})" ) cursor.execute(gray_query, tuple(card_ids)) gray_images = cursor.fetchall() # 分组 images_by_card_id = {} for img in images: cid = img['card_id'] if cid not in images_by_card_id: images_by_card_id[cid] = [] images_by_card_id[cid].append(img) for g_img in gray_images: cid = g_img['card_id'] if cid not in images_by_card_id: images_by_card_id[cid] = [] # 补齐字段结构以便前端统一处理 g_img['detection_image_path'] = None g_img['modified_image_path'] = None images_by_card_id[cid].append(g_img) # 附加到卡牌 for card in cards: card['image_path_list'] = {} card['detection_image_path_list'] = {} card['modified_image_path_list'] = {} related_images = images_by_card_id.get(card['id'], []) for image_data in related_images: image_type = image_data['image_type'] if image_type: card['image_path_list'][image_type] = image_data.get('image_path') card['detection_image_path_list'][image_type] = image_data.get('detection_image_path') card['modified_image_path_list'][image_type] = image_data.get('modified_image_path') return cards def get_card_list_and_count( db_conn: PooledMySQLConnection, card_id: Optional[int], card_name: Optional[str], card_type: Optional[CardType], is_edited: Optional[bool], min_detect_score: Optional[float], max_detect_score: Optional[float], min_mod_score: Optional[float], max_mod_score: Optional[float], created_start: Optional[date], created_end: Optional[date], updated_start: Optional[date], updated_end: Optional[date], sort_by: SortBy, sort_order: SortOrder, skip: int, limit: int ) -> Dict[str, Any]: with db_conn.cursor(dictionary=True) as cursor: conditions = [] params = [] if card_id is not None: conditions.append("id = %s"); params.append(card_id) if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%") if card_type: conditions.append("card_type = %s"); params.append(card_type.value) if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited) if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score) if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score) if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score) if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score) if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start) if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end) if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start) if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end) where_clause = "" if conditions: where_clause = " WHERE " + " AND ".join(conditions) # Count count_query = f"SELECT COUNT(*) as total FROM {settings.DB_CARD_TABLE_NAME}" + where_clause cursor.execute(count_query, tuple(params)) total_count = cursor.fetchone()['total'] # List data_query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}" + where_clause data_query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC" data_query += " LIMIT %s OFFSET %s" data_params = params.copy() data_params.extend([limit, skip]) cursor.execute(data_query, tuple(data_params)) cards = cursor.fetchall() if cards: card_ids = [card['id'] for card in cards] format_strings = ','.join(['%s'] * len(card_ids)) # 主图 image_query = ( f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path " f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})" ) cursor.execute(image_query, tuple(card_ids)) images = cursor.fetchall() # [NEW] 灰度图 gray_query = ( f"SELECT id, card_id, image_type, image_path " f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})" ) cursor.execute(gray_query, tuple(card_ids)) gray_images = cursor.fetchall() images_by_card_id = {} for img in images: cid = img['card_id'] if cid not in images_by_card_id: images_by_card_id[cid] = [] images_by_card_id[cid].append(img) # 混入灰度图 for g_img in gray_images: cid = g_img['card_id'] if cid not in images_by_card_id: images_by_card_id[cid] = [] g_img['detection_image_path'] = None g_img['modified_image_path'] = None images_by_card_id[cid].append(g_img) for card in cards: card['image_path_list'] = {} card['detection_image_path_list'] = {} card['modified_image_path_list'] = {} related_images = images_by_card_id.get(card['id'], []) for image_data in related_images: image_type = image_data['image_type'] if image_type: card['image_path_list'][image_type] = image_data.get('image_path') card['detection_image_path_list'][image_type] = image_data.get('detection_image_path') card['modified_image_path_list'][image_type] = image_data.get('modified_image_path') return { "total": total_count, "list": cards }