import os import cv2 import numpy as np import json from pathlib import Path from dataclasses import dataclass, field from typing import Dict, List, Tuple, Optional, Any from enum import Enum from datetime import datetime from app.core.logger import get_logger logger = get_logger(__name__) def fry_algo_print(level_str: str, info_str: str): """通用日志打印函数""" logger.info(f"[{level_str.upper()}] : {info_str}") def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR): """支持中文路径的 imread""" try: with open(filename, 'rb') as f: chunk = f.read() chunk_arr = np.frombuffer(chunk, dtype=np.uint8) img = cv2.imdecode(chunk_arr, flags) if img is None: fry_algo_print("警告", f"Warning: Unable to decode image: {filename}") return img except IOError as e: fry_algo_print("错误", f"IOError: Unable to read file: {filename}") return None def fry_cv2_imwrite(filename, img, params=None): """支持中文路径的 imwrite""" try: ext = os.path.splitext(filename)[1].lower() result, encoded_img = cv2.imencode(ext, img, params) if result: with open(filename, 'wb') as f: encoded_img.tofile(f) return True else: fry_algo_print("警告", f"Warning: Unable to encode image: {filename}") return False except Exception as e: fry_algo_print("错误", f"Error: Unable to write file: {filename}") return False # 初始化OpenCV以支持中文路径 cv2.imread = fry_cv2_imread cv2.imwrite = fry_cv2_imwrite @dataclass class FryAlgoParamsBase: """算法参数基类""" debug_level: str = "no" # 可选: "no", "normal", "detail" class CenterMode(Enum): """居中模式枚举""" CENTROID = "centroid" BOUNDING_RECT = "bounding_rect" class FillMode(Enum): """填充模式枚举""" BLACK = "black" WHITE = "white" REPLICATE = "replicate" class EdgeType(Enum): """边类型枚举""" TOP = "上边" BOTTOM = "下边" LEFT = "左边" RIGHT = "右边" @dataclass class ContourInfo: """轮廓信息数据类""" contour: np.ndarray area: float centroid: Tuple[int, int] bounding_rect: Tuple[int, int, int, int] @dataclass class FryCardProcessParams(FryAlgoParamsBase): """ 卡片处理(转正+居中)的统一参数类 """ # --- 转正相关参数 --- label_name: str = "outer_box" # 在JSON中要查找的形状标签 corner_distance_ratio: float = 0.15 # 定义角点区域占边长的比例 # --- 居中相关参数 --- center_mode: CenterMode = CenterMode.BOUNDING_RECT # 居中模式 fill_mode: FillMode = FillMode.BLACK # 图像平移后的背景填充模式 fill_color: Tuple[int, int, int] = (0, 0, 0) # 自定义填充颜色 (BGR) class FryCardProcessor: """ 整合了卡片转正与居中功能的处理器 """ def __init__(self): pass def process_image_with_json( self, image: np.ndarray, seg_json: Dict, params: FryCardProcessParams ) -> Optional[np.ndarray]: """ (V2: 简化版,合并旋转与居中) 使用预先计算的分割JSON对单个图像进行转正和居中处理,并保持原图大小。 """ if params.debug_level in ['normal', 'detail']: fry_algo_print("信息", "开始处理图像...") # ==================== 步骤 1: 从JSON提取轮廓 ==================== contour_points = self._extract_largest_contour(seg_json.get("shapes", []), params.label_name) if contour_points is None: fry_algo_print("错误", f"在JSON中未找到标签为 '{params.label_name}' 的轮廓。") return None # ==================== 步骤 2: 拟合最小外接矩形 ==================== # 注意:这里我们直接在原始轮廓点上操作,插值对于minAreaRect影响不大 rect = cv2.minAreaRect(contour_points.astype(np.float32)) fitted_quad = np.intp(cv2.boxPoints(rect)) if params.debug_level == 'detail': fry_algo_print("信息", "成功拟合卡片的最小外接矩形。") # ==================== 步骤 3: 图像转正与居中 (一步完成) ==================== final_image = self._rectify_and_center_image(image, fitted_quad) if final_image is None: fry_algo_print("错误", "图像转正与居中失败。") return None if params.debug_level in ['normal', 'detail']: fry_algo_print("成功", f"处理完成。输入尺寸: {image.shape[:2]}, 输出尺寸: {final_image.shape[:2]}") return final_image # ------------------- 辅助方法: 轮廓处理 ------------------- @staticmethod def _extract_largest_contour(shapes: List[Dict], label_name: str) -> Optional[np.ndarray]: target_shapes = [s for s in shapes if s.get("label") == label_name and s.get("points")] if not target_shapes: return None max_shape = max( target_shapes, key=lambda s: cv2.contourArea(np.array(s["points"], dtype=np.float32)) ) return np.array(max_shape["points"], dtype=np.int32) @staticmethod def _interpolate_contour_points(contour: np.ndarray, num_points_per_edge=50) -> np.ndarray: if len(contour) < 2: return contour interpolated_points = [] for i in range(len(contour)): p1 = contour[i] p2 = contour[(i + 1) % len(contour)] x_vals = np.linspace(p1[0], p2[0], num_points_per_edge) y_vals = np.linspace(p1[1], p2[1], num_points_per_edge) interpolated_points.extend(np.vstack((x_vals, y_vals)).T) return np.array(interpolated_points, dtype=np.int32) @staticmethod def _rectify_and_center_image(image: np.ndarray, quad: np.ndarray) -> Optional[np.ndarray]: """ 一步完成图像的旋转矫正和居中,并保持原始图像尺寸。 Args: image (np.ndarray): 原始图像. quad (np.ndarray): 卡片的四个角点. Returns: Optional[np.ndarray]: 处理后的图像. """ # 获取原始图像的尺寸,这将是我们最终输出的尺寸 h_img, w_img = image.shape[:2] # 1. 计算最小外接矩形以获取旋转信息 rect = cv2.minAreaRect(quad.astype(np.float32)) center, (width, height), angle = rect # 2. 标准化角度 # OpenCV的minAreaRect返回的角度在[-90, 0)之间。 # 让图像保持竖着放 if width > height: angle -= 90 fry_algo_print("DETAIL", f"检测到旋转角度: {angle:.2f}°") # 3. 计算旋转矩阵 (围绕卡片中心) M = cv2.getRotationMatrix2D(center, angle, 1.0) # 4. 将平移变换合并到旋转矩阵中 # 我们希望卡片的中心点 center = (cx, cy) 被移动到图像的中心点 (w_img/2, h_img/2) # 因此,平移量为 (w_img/2 - cx, h_img/2 - cy) M[0, 2] += (w_img / 2) - center[0] M[1, 2] += (h_img / 2) - center[1] # 5. 应用合并后的仿射变换 # 输出尺寸 dsize 设置为原始图像尺寸 (w_img, h_img) final_image = cv2.warpAffine( image, M, (w_img, h_img), # <-- 核心:使用原始图像尺寸 flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0) # 默认使用黑色填充 ) return final_image # ------------------- 辅助方法: 居中 (Centering) ------------------- @staticmethod def _analyze_contours(contours: List[np.ndarray]) -> Dict[int, ContourInfo]: info_dict = {} for idx, contour in enumerate(contours): area = cv2.contourArea(contour) if area < 100: continue M = cv2.moments(contour) cx = int(M["m10"] / M["m00"]) if M["m00"] != 0 else 0 cy = int(M["m01"] / M["m00"]) if M["m00"] != 0 else 0 x, y, w, h = cv2.boundingRect(contour) info_dict[idx] = ContourInfo(contour, area, (cx, cy), (x, y, w, h)) return info_dict @staticmethod def _calculate_offset(contour_info: ContourInfo, image_shape: Tuple[int, int], params: FryCardProcessParams) -> \ Tuple[int, int]: h, w = image_shape[:2] if params.center_mode == CenterMode.CENTROID: cx, cy = contour_info.centroid return w // 2 - cx, h // 2 - cy elif params.center_mode == CenterMode.BOUNDING_RECT: x, y, rect_w, rect_h = contour_info.bounding_rect rect_cx = x + rect_w // 2 rect_cy = y + rect_h // 2 return w // 2 - rect_cx, h // 2 - rect_cy return 0, 0 @staticmethod def _apply_translation(image: np.ndarray, offset: Tuple[int, int], params: FryCardProcessParams) -> np.ndarray: h, w = image.shape[:2] offset_x, offset_y = offset M = np.float32([[1, 0, offset_x], [0, 1, offset_y]]) if params.fill_mode == FillMode.WHITE: borderValue = (255, 255, 255) elif params.fill_mode == FillMode.BLACK: borderValue = (0, 0, 0) else: # REPLICATE 或其他 borderValue = params.fill_color borderMode = cv2.BORDER_REPLICATE if params.fill_mode == FillMode.REPLICATE else cv2.BORDER_CONSTANT return cv2.warpAffine(image, M, (w, h), borderMode=borderMode, borderValue=borderValue) # ------------------- RANSAC 拟合静态方法集 ------------------- @staticmethod def _classify_contour_points(contour: np.ndarray, rect_corners: np.ndarray, corner_ratio: float) -> Dict[str, List]: classified = {et.value: [] for et in EdgeType} edges = [(rect_corners[i], rect_corners[(i + 1) % 4]) for i in range(4)] edge_lengths = [np.linalg.norm(e[1] - e[0]) for e in edges] for point in contour: min_dist = float('inf') min_type = None edge_types = [EdgeType.TOP, EdgeType.RIGHT, EdgeType.BOTTOM, EdgeType.LEFT] for i, (p1, p2) in enumerate(edges): dist_to_seg = FryCardProcessor._point_to_segment_distance(point, p1, p2) if dist_to_seg < min_dist: edge_vec = p2 - p1 edge_len = edge_lengths[i] proj_len = np.dot(point - p1, edge_vec) / edge_len if edge_len > 0 else 0 if corner_ratio * edge_len < proj_len < (1 - corner_ratio) * edge_len: min_dist = dist_to_seg min_type = edge_types[i].value if min_type: classified[min_type].append(point) return classified @staticmethod def _point_to_segment_distance(point: np.ndarray, p1: np.ndarray, p2: np.ndarray) -> float: line_vec = p2 - p1 p_vec = point - p1 line_len_sq = np.dot(line_vec, line_vec) if line_len_sq == 0: return np.linalg.norm(p_vec) t = np.dot(p_vec, line_vec) / line_len_sq t = max(0, min(1, t)) proj_point = p1 + t * line_vec return np.linalg.norm(point - proj_point) @staticmethod def _fit_lines_with_ransac(classified_points: Dict[str, List], threshold: float) -> Dict[str, Tuple]: lines = {} for edge_name, points in classified_points.items(): if len(points) < 2: continue points_np = np.array(points, dtype=np.float32) vx, vy, x0, y0 = cv2.fitLine(points_np, cv2.DIST_L2, 0, 0.01, 0.01) a, b = -vy[0], vx[0] c = -(a * x0[0] + b * y0[0]) norm = np.sqrt(a ** 2 + b ** 2) lines[edge_name] = (a / norm, b / norm, c / norm) return lines @staticmethod def _get_quadrilateral_from_lines(lines: Dict[str, Tuple]) -> Optional[np.ndarray]: def line_intersection(line1, line2): a1, b1, c1 = line1 a2, b2, c2 = line2 det = a1 * b2 - a2 * b1 if abs(det) < 1e-10: return None x = (b1 * c2 - b2 * c1) / det y = (a2 * c1 - a1 * c2) / det return [x, y] edge_map = { "tl": (EdgeType.LEFT.value, EdgeType.TOP.value), "tr": (EdgeType.RIGHT.value, EdgeType.TOP.value), "br": (EdgeType.RIGHT.value, EdgeType.BOTTOM.value), "bl": (EdgeType.LEFT.value, EdgeType.BOTTOM.value), } corners = {} for corner, (edge1_name, edge2_name) in edge_map.items(): if edge1_name not in lines or edge2_name not in lines: return None pt = line_intersection(lines[edge1_name], lines[edge2_name]) if pt is None: return None corners[corner] = pt return np.array([corners["tl"], corners["tr"], corners["br"], corners["bl"]], dtype=np.float32) def main(): """主函数,用于演示如何使用 FryCardProcessor""" # 创建输出目录 output_dir = Path("./test_output") / datetime.now().strftime("%Y%m%d_%H%M%S") output_dir.mkdir(parents=True, exist_ok=True) fry_algo_print("信息", f"所有输出将保存在: {output_dir.resolve()}") # 自动生成测试数据 image_path = r"C:\Code\ML\Image\Card\_250917_1157_pokemon_no flecct01\48_front_0_1.jpg.jpg" json_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\temp\outer\48_front_0_1.jpg.json" # 1. 设置处理参数 params = FryCardProcessParams( debug_level="detail", label_name="outer_box", center_mode=CenterMode.BOUNDING_RECT, fill_mode=FillMode.BLACK ) # 2. 初始化处理器 processor = FryCardProcessor() # 3. 读取图像和JSON image = cv2.imread(str(image_path)) with open(json_path, 'r', encoding='utf-8') as f: seg_json = json.load(f) if image is None: fry_algo_print("错误", "无法读取图像文件。") return # 4. 执行处理 final_image = processor.process_image_with_json(image, seg_json, params) # 5. 保存结果 if final_image is not None: result_path = output_dir / "final_processed_card.jpg" cv2.imwrite(str(result_path), final_image) fry_algo_print("成功", f"处理完成!最终图像已保存至: {result_path.resolve()}") else: fry_algo_print("失败", "处理过程中发生错误,未生成最终图像。") if __name__ == "__main__": main()