import os import cv2 import json import numpy as np from pathlib import Path from dataclasses import dataclass, field from typing import Dict, List, Tuple, Optional, Any from enum import Enum import matplotlib.pyplot as plt from sklearn.linear_model import RANSACRegressor # matplotlib解决中文乱码 plt.rcParams["font.sans-serif"] = ["SimHei"] plt.rcParams["font.family"] = "sans-serif" plt.rcParams['axes.unicode_minus'] = False def fry_algo_print(level_str: str, info_str: str): print(f"[{level_str}] : {info_str}") def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR): """支持中文路径的imread""" try: with open(filename, 'rb') as f: chunk = f.read() chunk_arr = np.frombuffer(chunk, dtype=np.uint8) img = cv2.imdecode(chunk_arr, flags) if img is None: fry_algo_print("警告", f"Warning: Unable to decode image: {filename}") return img except IOError as e: fry_algo_print("错误", f"IOError: Unable to read file: {filename}") fry_algo_print("错误", f"Error details: {str(e)}") return None def fry_cv2_imwrite(filename, img, params=None): """支持中文路径的imwrite""" try: ext = os.path.splitext(filename)[1].lower() result, encoded_img = cv2.imencode(ext, img, params) if result: with open(filename, 'wb') as f: encoded_img.tofile(f) return True else: fry_algo_print("警告", f"Warning: Unable to encode image: {filename}") return False except Exception as e: fry_algo_print("错误", f"Error: Unable to write file: {filename}") fry_algo_print("错误", f"Error details: {str(e)}") return False def fry_opencv_Chinese_path_init(): """覆盖OpenCV的原始函数""" cv2.imread = fry_cv2_imread cv2.imwrite = fry_cv2_imwrite OPENCV_IO_ALREADY_INIT = False if not OPENCV_IO_ALREADY_INIT: fry_opencv_Chinese_path_init() OPENCV_IO_ALREADY_INIT = True def to_json_serializable(obj): """将包含自定义对象的数据结构转换为JSON可序列化的格式""" if obj is None or isinstance(obj, (bool, int, float, str)): return obj elif isinstance(obj, dict): return {key: to_json_serializable(value) for key, value in obj.items()} elif isinstance(obj, (list, tuple)): return [to_json_serializable(item) for item in obj] elif isinstance(obj, set): return [to_json_serializable(item) for item in obj] elif isinstance(obj, bytes): return obj.decode('utf-8', errors='ignore') else: if hasattr(obj, '__dict__'): return to_json_serializable(obj.__dict__) elif hasattr(obj, 'to_dict'): return to_json_serializable(obj.to_dict()) elif hasattr(obj, 'to_json'): return to_json_serializable(obj.to_json()) else: return str(obj) class EdgeCornerType(Enum): """边角类型枚举""" TOP_LEFT = "左上角" TOP_RIGHT = "右上角" BOTTOM_LEFT = "左下角" BOTTOM_RIGHT = "右下角" TOP = "上边" BOTTOM = "下边" LEFT = "左边" RIGHT = "右边" @dataclass class EdgeCornerDefect: """边角缺陷数据结构""" type: EdgeCornerType protrusion_area: float = 0.0 # 突出面积(像素) depression_area: float = 0.0 # 凹陷面积(像素) protrusion_pixels: int = 0 # 突出像素数 depression_pixels: int = 0 # 凹陷像素数 contour_points: List[List[float]] = field(default_factory=list) # 该边角的轮廓点 fitted_points: List[List[float]] = field(default_factory=list) # 拟合的边角点 region_points: List[List[float]] = field(default_factory=list) # 区域边界点 def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "type": self.type.value, "protrusion_area": float(self.protrusion_area), "depression_area": float(self.depression_area), "protrusion_pixels": int(self.protrusion_pixels), "depression_pixels": int(self.depression_pixels), "contour_points": self.contour_points, "fitted_points": self.fitted_points, "region_points": self.region_points } @classmethod def load_from_dict(cls, data: Dict[str, Any]) -> 'EdgeCornerDefect': """从字典加载""" return cls( type=EdgeCornerType(data["type"]), protrusion_area=data.get("protrusion_area", 0.0), depression_area=data.get("depression_area", 0.0), protrusion_pixels=data.get("protrusion_pixels", 0), depression_pixels=data.get("depression_pixels", 0), contour_points=data.get("contour_points", []), fitted_points=data.get("fitted_points", []), region_points=data.get("region_points", []) ) @dataclass class FryAlgoParamsBase: """算法参数基类""" algo_name: Optional[str] = None debug_level: str = "no" # no, normal, detail save_json_params: bool = False is_first_algo: bool = False is_last_algo: bool = False def to_dict(self) -> Dict[str, Any]: """转换为字典""" result = {} for key, value in self.__dict__.items(): if hasattr(value, 'to_dict'): result[key] = value.to_dict() elif isinstance(value, Enum): result[key] = value.value elif isinstance(value, Path): result[key] = str(value) else: result[key] = value return result @dataclass class CardDefectDetectionParams(FryAlgoParamsBase): """卡片缺陷检测算法参数""" algo_name: str = "卡片边角缺陷检测" label_name: str = "outer_box" # JSON中的框标签名称 long_edge_corner_length: float = 200 # 矩形长边的角的长度 short_edge_corner_length: float = 200 # 矩形短边的角的长度 edge_width: float = 50 # 边区域向内延伸的宽度 iteration_rounds: int = 5 # 步骤d和e迭代轮数 ransac_residual_threshold: float = 5.0 # RANSAC残差阈值 save_intermediate_results: bool = True # 是否保存中间结果 save_overlay_images: bool = True # 是否保存叠加图像 class MaskProcessor: """Mask处理工具类""" @staticmethod def polygon_to_mask(polygon_points: np.ndarray, img_shape: Tuple[int, int]) -> np.ndarray: """将多边形转换为mask Args: polygon_points: 多边形顶点坐标 img_shape: (height, width) Returns: 二值mask图像 """ mask = np.zeros(img_shape[:2], dtype=np.uint8) if len(polygon_points) > 0: cv2.fillPoly(mask, [polygon_points.astype(np.int32)], 255) return mask @staticmethod def calculate_inner_points( fitted_quad: np.ndarray, long_edge_indices: List[int], short_edge_indices: List[int], long_corner_length: float, short_corner_length: float ) -> np.ndarray: """计算内部四个点B1-B4 Args: fitted_quad: 拟合四边形的四个顶点 long_edge_indices: 长边索引 short_edge_indices: 短边索引 long_corner_length: 长边角长度 short_corner_length: 短边角长度 Returns: 内部四个点坐标 """ inner_points = np.zeros((4, 2)) for i in range(4): # 获取当前顶点 current_point = fitted_quad[i] # 获取相邻两条边的向量 prev_idx = (i - 1) % 4 next_idx = (i + 1) % 4 edge_to_prev = fitted_quad[prev_idx] - current_point edge_to_next = fitted_quad[next_idx] - current_point # 判断边的类型并设置对应的长度 # 边索引: 0(0-1), 1(1-2), 2(2-3), 3(3-0) edge_prev_idx = prev_idx if prev_idx < i else 3 edge_next_idx = i # 根据边类型设置移动距离 if edge_prev_idx in long_edge_indices: dist_prev = long_corner_length else: dist_prev = short_corner_length if edge_next_idx in long_edge_indices: dist_next = long_corner_length else: dist_next = short_corner_length # 单位化向量 edge_to_prev_unit = edge_to_prev / np.linalg.norm(edge_to_prev) edge_to_next_unit = edge_to_next / np.linalg.norm(edge_to_next) # 计算内部点 inner_points[i] = current_point + dist_prev * edge_to_prev_unit + dist_next * edge_to_next_unit return inner_points @staticmethod def get_corner_region_mask( corner_idx: int, fitted_quad: np.ndarray, inner_points: np.ndarray, img_shape: Tuple[int, int] ) -> np.ndarray: """获取角区域mask Args: corner_idx: 角索引(0-3) fitted_quad: 拟合四边形顶点 inner_points: 内部四个点 img_shape: 图像尺寸 Returns: 角区域mask """ mask = np.zeros(img_shape[:2], dtype=np.uint8) # 获取角区域的四个顶点 prev_idx = (corner_idx - 1) % 4 next_idx = (corner_idx + 1) % 4 # 计算角区域的边界点 corner_point = fitted_quad[corner_idx] inner_point = inner_points[corner_idx] # 从内部点向两条边做垂线得到边界点 edge_to_prev = fitted_quad[prev_idx] - corner_point edge_to_next = fitted_quad[next_idx] - corner_point edge_to_prev_unit = edge_to_prev / np.linalg.norm(edge_to_prev) edge_to_next_unit = edge_to_next / np.linalg.norm(edge_to_next) # 计算垂足 proj_prev = np.dot(inner_point - corner_point, edge_to_prev_unit) proj_next = np.dot(inner_point - corner_point, edge_to_next_unit) point_on_prev_edge = corner_point + proj_prev * edge_to_prev_unit point_on_next_edge = corner_point + proj_next * edge_to_next_unit # 创建角区域多边形 corner_polygon = np.array([ corner_point, point_on_prev_edge, inner_point, point_on_next_edge ], dtype=np.int32) cv2.fillPoly(mask, [corner_polygon], 255) return mask @staticmethod def get_edge_region_mask( edge_idx: int, fitted_quad: np.ndarray, inner_points: np.ndarray, edge_width: float, img_shape: Tuple[int, int] ) -> np.ndarray: """获取边区域mask Args: edge_idx: 边索引(0-3) fitted_quad: 拟合四边形顶点 inner_points: 内部四个点 edge_width: 边区域宽度 img_shape: 图像尺寸 Returns: 边区域mask """ mask = np.zeros(img_shape[:2], dtype=np.uint8) # 获取边的两个端点 p1_idx = edge_idx p2_idx = (edge_idx + 1) % 4 p1 = fitted_quad[p1_idx] p2 = fitted_quad[p2_idx] # 获取对应的内部点 inner_p1 = inner_points[p1_idx] inner_p2 = inner_points[p2_idx] # 计算边向量和法向量 edge_vec = p2 - p1 edge_unit = edge_vec / np.linalg.norm(edge_vec) # 法向量(向内) normal = np.array([-edge_unit[1], edge_unit[0]]) # 确保法向量指向内部 center = np.mean(fitted_quad, axis=0) if np.dot(normal, center - (p1 + p2) / 2) < 0: normal = -normal # 从内部点向边做垂线 proj1 = np.dot(inner_p1 - p1, edge_unit) proj2 = np.dot(inner_p2 - p1, edge_unit) point_on_edge_1 = p1 + proj1 * edge_unit point_on_edge_2 = p1 + proj2 * edge_unit # 创建边区域梯形 inner_edge_p1 = point_on_edge_1 + edge_width * normal inner_edge_p2 = point_on_edge_2 + edge_width * normal edge_polygon = np.array([ point_on_edge_1, point_on_edge_2, inner_edge_p2, inner_edge_p1 ], dtype=np.int32) cv2.fillPoly(mask, [edge_polygon], 255) return mask @staticmethod def calculate_defect_pixels( contour_mask: np.ndarray, fitted_mask: np.ndarray, region_mask: np.ndarray ) -> Tuple[int, int]: """计算缺陷像素数 Args: contour_mask: 轮廓mask fitted_mask: 拟合mask region_mask: 区域mask Returns: (突出像素数, 凹陷像素数) """ # 限制在区域内 contour_in_region = cv2.bitwise_and(contour_mask, region_mask) fitted_in_region = cv2.bitwise_and(fitted_mask, region_mask) # 计算并集 union_mask = cv2.bitwise_or(contour_in_region, fitted_in_region) # 突出:并集减去拟合区域 protrusion_mask = cv2.bitwise_and(union_mask, cv2.bitwise_not(fitted_in_region)) protrusion_pixels = np.sum(protrusion_mask > 0) # 凹陷:并集减去轮廓区域 depression_mask = cv2.bitwise_and(union_mask, cv2.bitwise_not(contour_in_region)) depression_pixels = np.sum(depression_mask > 0) return protrusion_pixels, depression_pixels class CardDefectDetectionAlgo: """卡片边角缺陷检测算法类""" def __init__(self): """初始化""" self.intermediate_results = {} self.mask_processor = MaskProcessor() @staticmethod def load_json_data(json_path) -> Dict[str, Any]: """加载JSON数据""" with open(json_path, 'r', encoding='utf-8') as f: return json.load(f) @staticmethod def _create_blank_image(width: int, height: int) -> np.ndarray: """创建空白图像""" return np.ones((height, width, 3), dtype=np.uint8) * 255 @staticmethod def load_or_create_image(image_path: str) -> np.ndarray: """加载图像或创建空白图像""" if os.path.exists(image_path): img = cv2.imread(str(image_path)) if img is not None: return img raise ValueError("警告", f"图像不存在或无法读取: {image_path},创建空白图像") @staticmethod def _get_unique_shape(shapes: List[Dict], label_name: str) -> Optional[Dict]: """获取唯一的形状""" target_shapes = [s for s in shapes if s.get('label') == label_name] if not target_shapes: return None if len(target_shapes) == 1: return target_shapes[0] def get_shape_score(shape): confidence = shape.get('probability', 0) points = np.array(shape['points']) area = cv2.contourArea(points.astype(np.float32)) return (confidence, area) target_shapes.sort(key=get_shape_score, reverse=True) return target_shapes[0] @staticmethod def _get_min_area_rect(points: np.ndarray) -> Tuple[np.ndarray, float, float, List[int], List[int]]: """获取最小外接矩形并识别长短边 Returns: (矩形顶点, 长边长度, 短边长度, 长边索引列表, 短边索引列表) """ rect = cv2.minAreaRect(points.astype(np.float32)) box = cv2.boxPoints(rect) box = np.intp(box) # 计算每条边的长度 edge_lengths = [] for i in range(4): p1 = box[i] p2 = box[(i + 1) % 4] length = np.linalg.norm(p2 - p1) edge_lengths.append(length) # 识别长边和短边 sorted_indices = np.argsort(edge_lengths) short_edge_indices = sorted_indices[:2].tolist() long_edge_indices = sorted_indices[2:].tolist() long_edge = np.mean([edge_lengths[i] for i in long_edge_indices]) short_edge = np.mean([edge_lengths[i] for i in short_edge_indices]) return box, long_edge, short_edge, long_edge_indices, short_edge_indices @staticmethod def _classify_points_to_edges_corners( contour_points: np.ndarray, rect_corners: np.ndarray, long_edge: float, short_edge: float, long_corner_length: float, short_corner_length: float ) -> Dict[EdgeCornerType, np.ndarray]: """将轮廓点分类到边和角""" classified_points = {edge_type: [] for edge_type in EdgeCornerType} edges = [] for i in range(4): p1 = rect_corners[i] p2 = rect_corners[(i + 1) % 4] edges.append((p1, p2)) edge_lengths = [np.linalg.norm(e[1] - e[0]) for e in edges] long_edge_indices = np.argsort(edge_lengths)[-2:] short_edge_indices = np.argsort(edge_lengths)[:2] for point in contour_points: min_dist = float('inf') min_type = None for i, corner in enumerate(rect_corners): dist = np.linalg.norm(point - corner) if dist < min_dist: min_dist = dist if i == 0: min_type = EdgeCornerType.TOP_LEFT elif i == 1: min_type = EdgeCornerType.TOP_RIGHT elif i == 2: min_type = EdgeCornerType.BOTTOM_RIGHT else: min_type = EdgeCornerType.BOTTOM_LEFT for i, (p1, p2) in enumerate(edges): dist = CardDefectDetectionAlgo._point_to_segment_distance(point, p1, p2) edge_vector = p2 - p1 edge_length = np.linalg.norm(edge_vector) projection = np.dot(point - p1, edge_vector) / edge_length if i in long_edge_indices: corner_ratio = long_corner_length / edge_length else: corner_ratio = short_corner_length / edge_length if corner_ratio < projection / edge_length < (1 - corner_ratio): if dist < min_dist: min_dist = dist if i == 0: min_type = EdgeCornerType.TOP elif i == 1: min_type = EdgeCornerType.RIGHT elif i == 2: min_type = EdgeCornerType.BOTTOM else: min_type = EdgeCornerType.LEFT if min_type: classified_points[min_type].append(point) for key in classified_points: if classified_points[key]: classified_points[key] = np.array(classified_points[key]) else: classified_points[key] = np.array([]).reshape(0, 2) return classified_points @staticmethod def _point_to_segment_distance(point: np.ndarray, seg_p1: np.ndarray, seg_p2: np.ndarray) -> float: """计算点到线段的距离""" line_vec = seg_p2 - seg_p1 point_vec = point - seg_p1 line_len = np.linalg.norm(line_vec) if line_len == 0: return np.linalg.norm(point - seg_p1) line_unitvec = line_vec / line_len proj_length = np.dot(point_vec, line_unitvec) if proj_length < 0: return np.linalg.norm(point - seg_p1) elif proj_length > line_len: return np.linalg.norm(point - seg_p2) else: proj_point = seg_p1 + proj_length * line_unitvec return np.linalg.norm(point - proj_point) @staticmethod def _fit_line_with_ransac(points: np.ndarray, max_iterations: int = 1000, threshold: float = 5.0) -> Tuple[float, float, float]: """使用RANSAC拟合直线 Args: points: 点集 (N, 2) max_iterations: 最大迭代次数 threshold: 内点判定阈值 Returns: (a, b, c) 直线方程系数 ax + by + c = 0 """ if len(points) < 2: return None best_line = None best_inliers = 0 for _ in range(max_iterations): # 随机选择两个点 idx = np.random.choice(len(points), 2, replace=False) p1, p2 = points[idx] # 计算直线参数 if abs(p2[0] - p1[0]) < 1e-6: # 垂直线 a, b, c = 1, 0, -p1[0] else: # 一般直线 y = kx + m => kx - y + m = 0 k = (p2[1] - p1[1]) / (p2[0] - p1[0]) m = p1[1] - k * p1[0] a, b, c = k, -1, m # 归一化 norm = np.sqrt(a * a + b * b) a, b, c = a / norm, b / norm, c / norm # 计算内点数 distances = np.abs(a * points[:, 0] + b * points[:, 1] + c) inliers = np.sum(distances < threshold) if inliers > best_inliers: best_inliers = inliers best_line = (a, b, c) # 使用所有内点重新拟合(精细化) if best_line: a, b, c = best_line distances = np.abs(a * points[:, 0] + b * points[:, 1] + c) inlier_points = points[distances < threshold] if len(inlier_points) >= 2: # 使用OpenCV的fitLine进行最终拟合 vx, vy, x0, y0 = cv2.fitLine(inlier_points, cv2.DIST_L2, 0, 0.01, 0.01) # 转换为一般式 ax + by + c = 0 # 直线方向向量(vx, vy),点(x0, y0)在直线上 # 法向量为(-vy, vx) a = -vy[0] b = vx[0] c = -(a * x0[0] + b * y0[0]) # 归一化 norm = np.sqrt(a * a + b * b) return (a / norm, b / norm, c / norm) return best_line @staticmethod def _fit_lines_opencv( classified_points: Dict[EdgeCornerType, np.ndarray], threshold: float = 5.0 ) -> Dict[EdgeCornerType, Tuple[float, float, float]]: """使用OpenCV拟合四条边的直线(只使用边点,不使用角点) Args: classified_points: 分类后的点 threshold: RANSAC阈值 Returns: 拟合的直线字典 {边类型: (a, b, c)} """ fitted_lines = {} # 边类型映射 edge_only_types = { EdgeCornerType.TOP: EdgeCornerType.TOP, EdgeCornerType.BOTTOM: EdgeCornerType.BOTTOM, EdgeCornerType.LEFT: EdgeCornerType.LEFT, EdgeCornerType.RIGHT: EdgeCornerType.RIGHT } for edge_type in edge_only_types: # 只使用纯边点,不包含角点 edge_points = classified_points.get(edge_type, np.array([])) if len(edge_points) < 2: fry_algo_print("警告", f"{edge_type.value}边点数不足,跳过拟合") continue # 使用RANSAC拟合直线 line_params = CardDefectDetectionAlgo._fit_line_with_ransac( edge_points, threshold=threshold ) if line_params: fitted_lines[edge_type] = line_params fry_algo_print("信息", f"{edge_type.value}拟合成功,点数: {len(edge_points)}") else: fry_algo_print("警告", f"{edge_type.value}拟合失败") return fitted_lines @staticmethod def _get_line_intersection(line1: Tuple[float, float, float], line2: Tuple[float, float, float]) -> Optional[np.ndarray]: """计算两条直线的交点 Args: line1: 直线1参数 (a1, b1, c1) line2: 直线2参数 (a2, b2, c2) Returns: 交点坐标 [x, y] 或 None """ a1, b1, c1 = line1 a2, b2, c2 = line2 det = a1 * b2 - a2 * b1 if abs(det) < 1e-10: return None x = (b1 * c2 - b2 * c1) / det y = (a2 * c1 - a1 * c2) / det return np.array([x, y]) @staticmethod def _get_quadrilateral_from_lines(fitted_lines: Dict[EdgeCornerType, Tuple[float, float, float]]) -> np.ndarray: """从拟合的直线获取四边形的四个角点 Args: fitted_lines: 拟合的直线字典 Returns: 四个角点坐标,按左上、右上、右下、左下顺序 """ # 检查是否有四条边 required_edges = [EdgeCornerType.TOP, EdgeCornerType.RIGHT, EdgeCornerType.BOTTOM, EdgeCornerType.LEFT] for edge in required_edges: if edge not in fitted_lines: fry_algo_print("警告", f"缺少{edge.value}的拟合直线") return None # 计算四个角点 corners = {} # 左上角 = 左边 ∩ 上边 top_left = CardDefectDetectionAlgo._get_line_intersection( fitted_lines[EdgeCornerType.LEFT], fitted_lines[EdgeCornerType.TOP] ) if top_left is not None: corners['top_left'] = top_left # 右上角 = 右边 ∩ 上边 top_right = CardDefectDetectionAlgo._get_line_intersection( fitted_lines[EdgeCornerType.RIGHT], fitted_lines[EdgeCornerType.TOP] ) if top_right is not None: corners['top_right'] = top_right # 右下角 = 右边 ∩ 下边 bottom_right = CardDefectDetectionAlgo._get_line_intersection( fitted_lines[EdgeCornerType.RIGHT], fitted_lines[EdgeCornerType.BOTTOM] ) if bottom_right is not None: corners['bottom_right'] = bottom_right # 左下角 = 左边 ∩ 下边 bottom_left = CardDefectDetectionAlgo._get_line_intersection( fitted_lines[EdgeCornerType.LEFT], fitted_lines[EdgeCornerType.BOTTOM] ) if bottom_left is not None: corners['bottom_left'] = bottom_left # 检查是否获得了所有角点 if len(corners) != 4: fry_algo_print("警告", f"只计算出{len(corners)}个角点") return None # 按顺序返回 return np.array([ corners['top_left'], corners['top_right'], corners['bottom_right'], corners['bottom_left'] ]) def _calculate_defects_with_mask( self, contour_points: np.ndarray, fitted_quad: np.ndarray, classified_points: Dict[EdgeCornerType, np.ndarray], img_shape: Tuple[int, int], long_corner_length: float, short_corner_length: float, edge_width: float ) -> Tuple[Dict[EdgeCornerType, EdgeCornerDefect], np.ndarray]: """使用mask方法计算缺陷 Returns: (缺陷字典, 内部四个点) """ defects = {} # 获取拟合四边形的最小外接矩形,识别长短边 _, _, _, long_edge_indices, short_edge_indices = self._get_min_area_rect(fitted_quad) # 计算内部四个点 inner_points = self.mask_processor.calculate_inner_points( fitted_quad, long_edge_indices, short_edge_indices, long_corner_length, short_corner_length ) # 创建轮廓和拟合四边形的mask contour_mask = self.mask_processor.polygon_to_mask(contour_points, img_shape) fitted_mask = self.mask_processor.polygon_to_mask(fitted_quad, img_shape) # 处理四个角 corner_types = [ EdgeCornerType.TOP_LEFT, EdgeCornerType.TOP_RIGHT, EdgeCornerType.BOTTOM_RIGHT, EdgeCornerType.BOTTOM_LEFT ] for i, corner_type in enumerate(corner_types): defect = EdgeCornerDefect(type=corner_type) # 获取角区域mask region_mask = self.mask_processor.get_corner_region_mask( i, fitted_quad, inner_points, img_shape ) # 计算缺陷 protrusion_pixels, depression_pixels = self.mask_processor.calculate_defect_pixels( contour_mask, fitted_mask, region_mask ) defect.protrusion_pixels = protrusion_pixels defect.depression_pixels = depression_pixels defect.protrusion_area = float(protrusion_pixels) defect.depression_area = float(depression_pixels) # 保存该区域的点 edge_corner_points = classified_points.get(corner_type, np.array([])) if len(edge_corner_points) > 0: defect.contour_points = edge_corner_points.tolist() defects[corner_type] = defect # 处理四条边 edge_types = [ EdgeCornerType.TOP, EdgeCornerType.RIGHT, EdgeCornerType.BOTTOM, EdgeCornerType.LEFT ] for i, edge_type in enumerate(edge_types): defect = EdgeCornerDefect(type=edge_type) # 获取边区域mask region_mask = self.mask_processor.get_edge_region_mask( i, fitted_quad, inner_points, edge_width, img_shape ) # 计算缺陷 protrusion_pixels, depression_pixels = self.mask_processor.calculate_defect_pixels( contour_mask, fitted_mask, region_mask ) defect.protrusion_pixels = protrusion_pixels defect.depression_pixels = depression_pixels defect.protrusion_area = float(protrusion_pixels) defect.depression_area = float(depression_pixels) # 保存该区域的点 edge_points = classified_points.get(edge_type, np.array([])) if len(edge_points) > 0: defect.contour_points = edge_points.tolist() defects[edge_type] = defect return defects, inner_points def _draw_text_with_background( self, img: np.ndarray, text: str, position: Tuple[int, int], font_scale: float = 0.6, font_thickness: int = 1, text_color: Tuple[int, int, int] = (255, 255, 255), bg_color: Tuple[int, int, int] = (0, 0, 0), bg_opacity: float = 0.7 ) -> np.ndarray: """在图像上绘制带半透明背景的文字""" font = cv2.FONT_HERSHEY_SIMPLEX (text_width, text_height), baseline = cv2.getTextSize( text, font, font_scale, font_thickness ) x, y = position padding = 5 bg_pt1 = (x - padding, y - text_height - padding) bg_pt2 = (x + text_width + padding, y + baseline + padding) overlay = img.copy() cv2.rectangle(overlay, bg_pt1, bg_pt2, bg_color, -1) cv2.addWeighted(overlay, bg_opacity, img, 1 - bg_opacity, 0, img) cv2.putText(img, text, position, font, font_scale, text_color, font_thickness) return img def _draw_overlay_on_image( self, image: np.ndarray, step_name: str, contour_points: Optional[np.ndarray] = None, min_rect: Optional[np.ndarray] = None, fitted_quad: Optional[np.ndarray] = None, inner_points: Optional[np.ndarray] = None, classified_points: Optional[Dict[EdgeCornerType, np.ndarray]] = None, defects: Optional[Dict[EdgeCornerType, EdgeCornerDefect]] = None ) -> np.ndarray: """在图像上绘制叠加可视化""" overlay = image.copy() # 绘制轮廓 if contour_points is not None and len(contour_points) > 0: cv2.polylines(overlay, [contour_points.astype(np.int32)], True, (255, 0, 0), 2) # 绘制最小外接矩形 if min_rect is not None and len(min_rect) == 4: cv2.polylines(overlay, [min_rect.astype(np.int32)], True, (0, 255, 0), 2) # 绘制拟合四边形 if fitted_quad is not None and len(fitted_quad) == 4: cv2.polylines(overlay, [fitted_quad.astype(np.int32)], True, (0, 0, 255), 3) for i, point in enumerate(fitted_quad): cv2.circle(overlay, tuple(point.astype(int)), 8, (0, 0, 255), -1) self._draw_text_with_background( overlay, f"C{i}", tuple(point.astype(int)), font_scale=0.8, text_color=(255, 255, 255) ) # 绘制内部四个点 if inner_points is not None and len(inner_points) == 4: for i, point in enumerate(inner_points): cv2.circle(overlay, tuple(point.astype(int)), 10, (255, 0, 255), -1) self._draw_text_with_background( overlay, f"B{i + 1}", tuple(point.astype(int) + np.array([15, 0])), font_scale=0.8, text_color=(255, 255, 0), bg_color=(128, 0, 128) ) # 绘制连接线(可选) if fitted_quad is not None: cv2.line(overlay, tuple(fitted_quad[i].astype(int)), tuple(point.astype(int)), (255, 0, 255), 1, cv2.LINE_AA) # 绘制分类点 if classified_points: colors = { EdgeCornerType.TOP_LEFT: (255, 0, 0), EdgeCornerType.TOP_RIGHT: (0, 255, 0), EdgeCornerType.BOTTOM_LEFT: (255, 255, 0), EdgeCornerType.BOTTOM_RIGHT: (0, 255, 255), EdgeCornerType.TOP: (255, 0, 255), EdgeCornerType.BOTTOM: (128, 0, 255), EdgeCornerType.LEFT: (0, 128, 255), EdgeCornerType.RIGHT: (255, 128, 0) } for edge_type, points in classified_points.items(): if len(points) > 0: color = colors.get(edge_type, (128, 128, 128)) for point in points: cv2.circle(overlay, tuple(point.astype(int)), 3, color, -1) # 绘制缺陷信息 if defects: y_offset = 30 for edge_type, defect in defects.items(): if defect.protrusion_pixels >= 0 or defect.depression_pixels >= 0: text = f"{edge_type.name}: protrusion={defect.protrusion_pixels}px, depression={defect.depression_pixels}px" self._draw_text_with_background( overlay, text, (10, y_offset), font_scale=0.5, text_color=(255, 255, 255), bg_color=(0, 0, 128), bg_opacity=0.7 ) y_offset += 25 # 添加步骤标题 self._draw_text_with_background( overlay, step_name, (10, overlay.shape[0] - 20), font_scale=1.0, text_color=(255, 255, 255), bg_color=(255, 0, 0), bg_opacity=0.7 ) return overlay def _save_intermediate_result( self, step_num: int, step_name: str, data: Any, output_dir: Path, image: Optional[np.ndarray] = None, save_overlay: bool = True, inner_points: Optional[np.ndarray] = None ): """保存中间结果""" step_dir = output_dir / "intermediate_results" step_dir.mkdir(parents=True, exist_ok=True) json_path = step_dir / f"step_{step_num:02d}_{step_name}.json" with open(json_path, 'w', encoding='utf-8') as f: json.dump(to_json_serializable(data), f, ensure_ascii=False, indent=2) if save_overlay and image is not None: overlay_image = self._draw_overlay_on_image( image=image, step_name=f"Step {step_num}: {step_name}", contour_points=np.array(data.get("contour", [])) if "contour" in data else None, min_rect=np.array(data.get("min_rect", [])) if "min_rect" in data else None, fitted_quad=np.array(data.get("fitted_quad", [])) if "fitted_quad" in data else None, inner_points=inner_points, classified_points={EdgeCornerType(k): np.array(v) for k, v in data.get("classified_points", {}).items()} if "classified_points" in data else None, defects={EdgeCornerType(k): EdgeCornerDefect.load_from_dict(v) for k, v in data.get("defects", {}).items()} if "defects" in data else None ) overlay_path = step_dir / f"step_{step_num:02d}_{step_name}_overlay.jpg" cv2.imwrite(str(overlay_path), overlay_image) def _interpolate_contour_points(self, contour_points: np.ndarray, interpolation_method: str = "linear") -> np.ndarray: """ 在轮廓点之间插入新点,增加点的密度 Args: contour_points: 原始轮廓点 interpolation_method: 插值方法,可选 "linear" 或 "midpoint" Returns: 插值后的轮廓点 """ if len(contour_points) < 2: return contour_points interpolated_points = [] for i in range(len(contour_points)): # 添加原始点 interpolated_points.append(contour_points[i]) # 获取下一个点(形成闭合轮廓) next_idx = (i + 1) % len(contour_points) next_point = contour_points[next_idx] if interpolation_method == "midpoint": # 简单的中点插值 mid_point = (contour_points[i] + next_point) / 2.0 interpolated_points.append(mid_point) elif interpolation_method == "linear": # 线性插值,可以插入多个点 distance = np.linalg.norm(next_point - contour_points[i]) # 根据距离决定插入点的数量(每50像素插入一个点) num_insert = max(1, int(distance / 50)) for j in range(1, num_insert + 1): t = j / (num_insert + 1) interp_point = (1 - t) * contour_points[i] + t * next_point interpolated_points.append(interp_point) return np.array(interpolated_points, dtype=np.int32) def process_single_group_entrance( self, data_dict: Dict, algo_params: CardDefectDetectionParams ) -> Dict: """处理单个数据组的核心逻辑""" if algo_params.debug_level == 'detail': fry_algo_print("信息", f"开始处理数据,参数: {algo_params.to_dict()}") image = data_dict.get("image") json_data = data_dict.get("label", {}) output_dir = Path(data_dict.get("output_dir", "./output")) output_dir.mkdir(parents=True, exist_ok=True) # 获取图像尺寸 if image is not None: img_shape = image.shape else: img_shape = (json_data.get("imageHeight", 2048), json_data.get("imageWidth", 2048), 3) # 步骤1:读取并筛选唯一形状 shapes = json_data.get("shapes", []) unique_shape = self._get_unique_shape(shapes, algo_params.label_name) if unique_shape is None: fry_algo_print("错误", f"未找到标签为{algo_params.label_name}的形状") return data_dict contour_points = np.array(unique_shape["points"]) if algo_params.save_intermediate_results: self._save_intermediate_result( 1, "筛选唯一形状", {"contour": contour_points.tolist(), "shape_info": unique_shape}, output_dir, image, algo_params.save_overlay_images ) # ========== 步骤1.5: 轮廓点插值(新增) ========== interpolated_contour = self._interpolate_contour_points(contour_points, "linear") if algo_params.debug_level == 'detail': fry_algo_print("信息", f"轮廓点插值: {len(contour_points)} -> {len(interpolated_contour)} 点") # 使用插值后的轮廓点进行后续处理 contour_points = interpolated_contour # 步骤2:获取最小外接矩形 min_rect, long_edge, short_edge, _, _ = self._get_min_area_rect(contour_points) if algo_params.save_intermediate_results: self._save_intermediate_result( 2, "最小外接矩形", { "contour": contour_points.tolist(), "min_rect": min_rect.tolist(), "long_edge": float(long_edge), "short_edge": float(short_edge) }, output_dir, image, algo_params.save_overlay_images ) # 步骤3:初始分类 classified_points = self._classify_points_to_edges_corners( contour_points, min_rect, long_edge, short_edge, algo_params.long_edge_corner_length, algo_params.short_edge_corner_length ) if algo_params.save_intermediate_results: classified_dict = {k.value: v.tolist() if len(v) > 0 else [] for k, v in classified_points.items()} self._save_intermediate_result( 3, "初始点分类", {"classified_points": classified_dict, "min_rect": min_rect.tolist()}, output_dir, image, algo_params.save_overlay_images ) # 步骤4-5:迭代拟合 fitted_quad = min_rect for iteration in range(algo_params.iteration_rounds): # 使用新的OpenCV拟合方法(只用边点) fitted_lines = self._fit_lines_opencv( classified_points, algo_params.ransac_residual_threshold ) new_quad = self._get_quadrilateral_from_lines(fitted_lines) if new_quad is not None: fitted_quad = new_quad # 重新分类点 classified_points = self._classify_points_to_edges_corners( contour_points, fitted_quad, long_edge, short_edge, algo_params.long_edge_corner_length, algo_params.short_edge_corner_length ) if algo_params.save_intermediate_results and iteration == algo_params.iteration_rounds - 1: classified_dict = {k.value: v.tolist() if len(v) > 0 else [] for k, v in classified_points.items()} # 转换fitted_lines格式用于保存 fitted_lines_dict = {k.value: v for k, v in fitted_lines.items()} self._save_intermediate_result( 4 + iteration, f"迭代{iteration + 1}_拟合结果", { "classified_points": classified_dict, "fitted_quad": fitted_quad.tolist(), "fitted_lines": fitted_lines_dict }, output_dir, image, algo_params.save_overlay_images ) # 步骤6:计算缺陷 defects, inner_points = self._calculate_defects_with_mask( contour_points, fitted_quad, classified_points, img_shape, algo_params.long_edge_corner_length, algo_params.short_edge_corner_length, algo_params.edge_width ) defects_dict = {k.value: v.to_dict() for k, v in defects.items()} if algo_params.save_intermediate_results: self._save_intermediate_result( 5 + algo_params.iteration_rounds, "最终缺陷结果", { "defects": defects_dict, "fitted_quad": fitted_quad.tolist(), "inner_points": inner_points.tolist(), "contour": contour_points.tolist() }, output_dir, image, algo_params.save_overlay_images, inner_points ) # 保存最终结果 result_path = output_dir / "defect_detection_result.json" final_result = { "defects": defects_dict, "fitted_quad": fitted_quad.tolist(), "inner_points": inner_points.tolist() } with open(result_path, 'w', encoding='utf-8') as f: json.dump(final_result, f, ensure_ascii=False, indent=2) if algo_params.debug_level in ['normal', 'detail']: fry_algo_print("成功", f"缺陷检测完成,结果已保存到: {result_path}") data_dict["defects"] = defects_dict data_dict["fitted_quad"] = fitted_quad.tolist() data_dict["inner_points"] = inner_points.tolist() return data_dict def process_batch_group_entrance( self, input_dir_str: str, output_dir_str: str, algo_params: CardDefectDetectionParams ) -> None: """批量处理多个JSON文件""" input_dir = Path(input_dir_str) output_dir = Path(output_dir_str) output_dir.mkdir(parents=True, exist_ok=True) json_files = list(input_dir.rglob("*.json")) if not json_files: fry_algo_print("警告", f"未在{input_dir}中找到JSON文件") return fry_algo_print("信息", f"找到{len(json_files)}个JSON文件") for json_file in json_files: fry_algo_print("组开始", f"处理文件: {json_file.name}") try: json_data = self.load_json_data(json_file) image_stem = json_file.stem image_extensions = ['.jpg', '.jpeg', '.png', '.bmp'] image_path = None for ext in image_extensions: potential_path = json_file.parent / f"{image_stem}{ext}" if potential_path.exists(): image_path = potential_path break image_width = json_data.get("imageWidth", 2048) image_height = json_data.get("imageHeight", 2048) if image_path: image = self.load_or_create_image(image_path, image_width, image_height) else: raise ValueError("警告", f"未找到对应的图像文件") relative_path = json_file.relative_to(input_dir) file_output_dir = output_dir / relative_path.parent / json_file.stem file_output_dir.mkdir(parents=True, exist_ok=True) data_dict = { "image": image, "label": json_data, "output_dir": str(file_output_dir), "answer_json": {} } result = self.process_single_group_entrance(data_dict, algo_params) fry_algo_print("成功", f"文件{json_file.name}处理完成") except Exception as e: fry_algo_print("错误", f"处理文件{json_file.name}时出错: {str(e)}") import traceback traceback.print_exc() fry_algo_print("组结束", f"文件{json_file.name}处理结束") fry_algo_print("成功", "批量处理完成") def _test_card_defect_detection_real(): """测试真实数据""" temp_dir = Path("_test_real").resolve() input_dir = temp_dir / "input" output_dir = temp_dir / "output" try: algo = CardDefectDetectionAlgo() params = CardDefectDetectionParams( # debug_level="detail", debug_level="no", save_intermediate_results=True, save_overlay_images=True, label_name="outer_box", long_edge_corner_length=200, short_edge_corner_length=200, edge_width=50, iteration_rounds=3, ransac_residual_threshold=5.0 ) algo.process_batch_group_entrance( str(input_dir), str(output_dir), params ) fry_algo_print("成功", f"测试完成,结果保存在: {output_dir}") result_files = list(output_dir.rglob("*")) for result_file in result_files: if result_file.is_file(): fry_algo_print("信息", f"生成文件: {result_file}") finally: pass def _test_one_img(): algo = CardDefectDetectionAlgo() image_path = r"C:\Code\ML\Image\Card\_250915_many_capture_img\_250915_1743_reflect_nature_defrct\1_front_coaxial_1_0.jpg" json_path = r"C:\Code\ML\Project\卡片缺陷检测项目组\_250903_1644_边角不直脚本优化\_test_real\input\1_front_coaxial_1_0.json" image = algo.load_or_create_image(image_path) json_data = algo.load_json_data(json_path) temp_out_dir = r"temp_out_dir" data_dict = { "image": image, "label": json_data, "output_dir": str(temp_out_dir), "answer_json": {} } algo_params = CardDefectDetectionParams( # debug_level="detail", debug_level="no", save_intermediate_results=True, save_overlay_images=True, label_name="outer_box", long_edge_corner_length=200, short_edge_corner_length=200, edge_width=50, iteration_rounds=3, ransac_residual_threshold=5.0 ) result = algo.process_single_group_entrance(data_dict, algo_params) print(result) if __name__ == "__main__": # _test_card_defect_detection_real() _test_one_img()