|
@@ -0,0 +1,1383 @@
|
|
|
|
|
+import os
|
|
|
|
|
+import cv2
|
|
|
|
|
+import json
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+from dataclasses import dataclass, field
|
|
|
|
|
+from typing import Dict, List, Tuple, Optional, Any
|
|
|
|
|
+from enum import Enum
|
|
|
|
|
+import matplotlib.pyplot as plt
|
|
|
|
|
+from sklearn.linear_model import RANSACRegressor
|
|
|
|
|
+
|
|
|
|
|
+# matplotlib解决中文乱码
|
|
|
|
|
+plt.rcParams["font.sans-serif"] = ["SimHei"]
|
|
|
|
|
+plt.rcParams["font.family"] = "sans-serif"
|
|
|
|
|
+plt.rcParams['axes.unicode_minus'] = False
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def fry_algo_print(level_str: str, info_str: str):
|
|
|
|
|
+ print(f"[{level_str}] : {info_str}")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR):
|
|
|
|
|
+ """支持中文路径的imread"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ with open(filename, 'rb') as f:
|
|
|
|
|
+ chunk = f.read()
|
|
|
|
|
+ chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
|
|
|
|
|
+ img = cv2.imdecode(chunk_arr, flags)
|
|
|
|
|
+ if img is None:
|
|
|
|
|
+ fry_algo_print("警告", f"Warning: Unable to decode image: {filename}")
|
|
|
|
|
+ return img
|
|
|
|
|
+ except IOError as e:
|
|
|
|
|
+ fry_algo_print("错误", f"IOError: Unable to read file: {filename}")
|
|
|
|
|
+ fry_algo_print("错误", f"Error details: {str(e)}")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def fry_cv2_imwrite(filename, img, params=None):
|
|
|
|
|
+ """支持中文路径的imwrite"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ ext = os.path.splitext(filename)[1].lower()
|
|
|
|
|
+ result, encoded_img = cv2.imencode(ext, img, params)
|
|
|
|
|
+
|
|
|
|
|
+ if result:
|
|
|
|
|
+ with open(filename, 'wb') as f:
|
|
|
|
|
+ encoded_img.tofile(f)
|
|
|
|
|
+ return True
|
|
|
|
|
+ else:
|
|
|
|
|
+ fry_algo_print("警告", f"Warning: Unable to encode image: {filename}")
|
|
|
|
|
+ return False
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ fry_algo_print("错误", f"Error: Unable to write file: {filename}")
|
|
|
|
|
+ fry_algo_print("错误", f"Error details: {str(e)}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def fry_opencv_Chinese_path_init():
|
|
|
|
|
+ """覆盖OpenCV的原始函数"""
|
|
|
|
|
+ cv2.imread = fry_cv2_imread
|
|
|
|
|
+ cv2.imwrite = fry_cv2_imwrite
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+OPENCV_IO_ALREADY_INIT = False
|
|
|
|
|
+if not OPENCV_IO_ALREADY_INIT:
|
|
|
|
|
+ fry_opencv_Chinese_path_init()
|
|
|
|
|
+ OPENCV_IO_ALREADY_INIT = True
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def to_json_serializable(obj):
|
|
|
|
|
+ """将包含自定义对象的数据结构转换为JSON可序列化的格式"""
|
|
|
|
|
+ if obj is None or isinstance(obj, (bool, int, float, str)):
|
|
|
|
|
+ return obj
|
|
|
|
|
+ elif isinstance(obj, dict):
|
|
|
|
|
+ return {key: to_json_serializable(value) for key, value in obj.items()}
|
|
|
|
|
+ elif isinstance(obj, (list, tuple)):
|
|
|
|
|
+ return [to_json_serializable(item) for item in obj]
|
|
|
|
|
+ elif isinstance(obj, set):
|
|
|
|
|
+ return [to_json_serializable(item) for item in obj]
|
|
|
|
|
+ elif isinstance(obj, bytes):
|
|
|
|
|
+ return obj.decode('utf-8', errors='ignore')
|
|
|
|
|
+ else:
|
|
|
|
|
+ if hasattr(obj, '__dict__'):
|
|
|
|
|
+ return to_json_serializable(obj.__dict__)
|
|
|
|
|
+ elif hasattr(obj, 'to_dict'):
|
|
|
|
|
+ return to_json_serializable(obj.to_dict())
|
|
|
|
|
+ elif hasattr(obj, 'to_json'):
|
|
|
|
|
+ return to_json_serializable(obj.to_json())
|
|
|
|
|
+ else:
|
|
|
|
|
+ return str(obj)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class EdgeCornerType(Enum):
|
|
|
|
|
+ """边角类型枚举"""
|
|
|
|
|
+ TOP_LEFT = "左上角"
|
|
|
|
|
+ TOP_RIGHT = "右上角"
|
|
|
|
|
+ BOTTOM_LEFT = "左下角"
|
|
|
|
|
+ BOTTOM_RIGHT = "右下角"
|
|
|
|
|
+ TOP = "上边"
|
|
|
|
|
+ BOTTOM = "下边"
|
|
|
|
|
+ LEFT = "左边"
|
|
|
|
|
+ RIGHT = "右边"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class EdgeCornerDefect:
|
|
|
|
|
+ """边角缺陷数据结构"""
|
|
|
|
|
+ type: EdgeCornerType
|
|
|
|
|
+ protrusion_area: float = 0.0 # 突出面积(像素)
|
|
|
|
|
+ depression_area: float = 0.0 # 凹陷面积(像素)
|
|
|
|
|
+ protrusion_pixels: int = 0 # 突出像素数
|
|
|
|
|
+ depression_pixels: int = 0 # 凹陷像素数
|
|
|
|
|
+ contour_points: List[List[float]] = field(default_factory=list) # 该边角的轮廓点
|
|
|
|
|
+ fitted_points: List[List[float]] = field(default_factory=list) # 拟合的边角点
|
|
|
|
|
+ region_points: List[List[float]] = field(default_factory=list) # 区域边界点
|
|
|
|
|
+
|
|
|
|
|
+ def to_dict(self) -> Dict[str, Any]:
|
|
|
|
|
+ """转换为字典"""
|
|
|
|
|
+ return {
|
|
|
|
|
+ "type": self.type.value,
|
|
|
|
|
+ "protrusion_area": float(self.protrusion_area),
|
|
|
|
|
+ "depression_area": float(self.depression_area),
|
|
|
|
|
+ "protrusion_pixels": int(self.protrusion_pixels),
|
|
|
|
|
+ "depression_pixels": int(self.depression_pixels),
|
|
|
|
|
+ "contour_points": self.contour_points,
|
|
|
|
|
+ "fitted_points": self.fitted_points,
|
|
|
|
|
+ "region_points": self.region_points
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def load_from_dict(cls, data: Dict[str, Any]) -> 'EdgeCornerDefect':
|
|
|
|
|
+ """从字典加载"""
|
|
|
|
|
+ return cls(
|
|
|
|
|
+ type=EdgeCornerType(data["type"]),
|
|
|
|
|
+ protrusion_area=data.get("protrusion_area", 0.0),
|
|
|
|
|
+ depression_area=data.get("depression_area", 0.0),
|
|
|
|
|
+ protrusion_pixels=data.get("protrusion_pixels", 0),
|
|
|
|
|
+ depression_pixels=data.get("depression_pixels", 0),
|
|
|
|
|
+ contour_points=data.get("contour_points", []),
|
|
|
|
|
+ fitted_points=data.get("fitted_points", []),
|
|
|
|
|
+ region_points=data.get("region_points", [])
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class FryAlgoParamsBase:
|
|
|
|
|
+ """算法参数基类"""
|
|
|
|
|
+ algo_name: Optional[str] = None
|
|
|
|
|
+ debug_level: str = "no" # no, normal, detail
|
|
|
|
|
+ save_json_params: bool = False
|
|
|
|
|
+ is_first_algo: bool = False
|
|
|
|
|
+ is_last_algo: bool = False
|
|
|
|
|
+
|
|
|
|
|
+ def to_dict(self) -> Dict[str, Any]:
|
|
|
|
|
+ """转换为字典"""
|
|
|
|
|
+ result = {}
|
|
|
|
|
+ for key, value in self.__dict__.items():
|
|
|
|
|
+ if hasattr(value, 'to_dict'):
|
|
|
|
|
+ result[key] = value.to_dict()
|
|
|
|
|
+ elif isinstance(value, Enum):
|
|
|
|
|
+ result[key] = value.value
|
|
|
|
|
+ elif isinstance(value, Path):
|
|
|
|
|
+ result[key] = str(value)
|
|
|
|
|
+ else:
|
|
|
|
|
+ result[key] = value
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class CardDefectDetectionParams(FryAlgoParamsBase):
|
|
|
|
|
+ """卡片缺陷检测算法参数"""
|
|
|
|
|
+ algo_name: str = "卡片边角缺陷检测"
|
|
|
|
|
+ label_name: str = "outer_box" # JSON中的框标签名称
|
|
|
|
|
+ long_edge_corner_length: float = 200 # 矩形长边的角的长度
|
|
|
|
|
+ short_edge_corner_length: float = 200 # 矩形短边的角的长度
|
|
|
|
|
+ edge_width: float = 50 # 边区域向内延伸的宽度
|
|
|
|
|
+ iteration_rounds: int = 5 # 步骤d和e迭代轮数
|
|
|
|
|
+ ransac_residual_threshold: float = 5.0 # RANSAC残差阈值
|
|
|
|
|
+ save_intermediate_results: bool = True # 是否保存中间结果
|
|
|
|
|
+ save_overlay_images: bool = True # 是否保存叠加图像
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class MaskProcessor:
|
|
|
|
|
+ """Mask处理工具类"""
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def polygon_to_mask(polygon_points: np.ndarray, img_shape: Tuple[int, int]) -> np.ndarray:
|
|
|
|
|
+ """将多边形转换为mask
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ polygon_points: 多边形顶点坐标
|
|
|
|
|
+ img_shape: (height, width)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 二值mask图像
|
|
|
|
|
+ """
|
|
|
|
|
+ mask = np.zeros(img_shape[:2], dtype=np.uint8)
|
|
|
|
|
+ if len(polygon_points) > 0:
|
|
|
|
|
+ cv2.fillPoly(mask, [polygon_points.astype(np.int32)], 255)
|
|
|
|
|
+ return mask
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def calculate_inner_points(
|
|
|
|
|
+ fitted_quad: np.ndarray,
|
|
|
|
|
+ long_edge_indices: List[int],
|
|
|
|
|
+ short_edge_indices: List[int],
|
|
|
|
|
+ long_corner_length: float,
|
|
|
|
|
+ short_corner_length: float
|
|
|
|
|
+ ) -> np.ndarray:
|
|
|
|
|
+ """计算内部四个点B1-B4
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ fitted_quad: 拟合四边形的四个顶点
|
|
|
|
|
+ long_edge_indices: 长边索引
|
|
|
|
|
+ short_edge_indices: 短边索引
|
|
|
|
|
+ long_corner_length: 长边角长度
|
|
|
|
|
+ short_corner_length: 短边角长度
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 内部四个点坐标
|
|
|
|
|
+ """
|
|
|
|
|
+ inner_points = np.zeros((4, 2))
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(4):
|
|
|
|
|
+ # 获取当前顶点
|
|
|
|
|
+ current_point = fitted_quad[i]
|
|
|
|
|
+
|
|
|
|
|
+ # 获取相邻两条边的向量
|
|
|
|
|
+ prev_idx = (i - 1) % 4
|
|
|
|
|
+ next_idx = (i + 1) % 4
|
|
|
|
|
+
|
|
|
|
|
+ edge_to_prev = fitted_quad[prev_idx] - current_point
|
|
|
|
|
+ edge_to_next = fitted_quad[next_idx] - current_point
|
|
|
|
|
+
|
|
|
|
|
+ # 判断边的类型并设置对应的长度
|
|
|
|
|
+ # 边索引: 0(0-1), 1(1-2), 2(2-3), 3(3-0)
|
|
|
|
|
+ edge_prev_idx = prev_idx if prev_idx < i else 3
|
|
|
|
|
+ edge_next_idx = i
|
|
|
|
|
+
|
|
|
|
|
+ # 根据边类型设置移动距离
|
|
|
|
|
+ if edge_prev_idx in long_edge_indices:
|
|
|
|
|
+ dist_prev = long_corner_length
|
|
|
|
|
+ else:
|
|
|
|
|
+ dist_prev = short_corner_length
|
|
|
|
|
+
|
|
|
|
|
+ if edge_next_idx in long_edge_indices:
|
|
|
|
|
+ dist_next = long_corner_length
|
|
|
|
|
+ else:
|
|
|
|
|
+ dist_next = short_corner_length
|
|
|
|
|
+
|
|
|
|
|
+ # 单位化向量
|
|
|
|
|
+ edge_to_prev_unit = edge_to_prev / np.linalg.norm(edge_to_prev)
|
|
|
|
|
+ edge_to_next_unit = edge_to_next / np.linalg.norm(edge_to_next)
|
|
|
|
|
+
|
|
|
|
|
+ # 计算内部点
|
|
|
|
|
+ inner_points[i] = current_point + dist_prev * edge_to_prev_unit + dist_next * edge_to_next_unit
|
|
|
|
|
+
|
|
|
|
|
+ return inner_points
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def get_corner_region_mask(
|
|
|
|
|
+ corner_idx: int,
|
|
|
|
|
+ fitted_quad: np.ndarray,
|
|
|
|
|
+ inner_points: np.ndarray,
|
|
|
|
|
+ img_shape: Tuple[int, int]
|
|
|
|
|
+ ) -> np.ndarray:
|
|
|
|
|
+ """获取角区域mask
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ corner_idx: 角索引(0-3)
|
|
|
|
|
+ fitted_quad: 拟合四边形顶点
|
|
|
|
|
+ inner_points: 内部四个点
|
|
|
|
|
+ img_shape: 图像尺寸
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 角区域mask
|
|
|
|
|
+ """
|
|
|
|
|
+ mask = np.zeros(img_shape[:2], dtype=np.uint8)
|
|
|
|
|
+
|
|
|
|
|
+ # 获取角区域的四个顶点
|
|
|
|
|
+ prev_idx = (corner_idx - 1) % 4
|
|
|
|
|
+ next_idx = (corner_idx + 1) % 4
|
|
|
|
|
+
|
|
|
|
|
+ # 计算角区域的边界点
|
|
|
|
|
+ corner_point = fitted_quad[corner_idx]
|
|
|
|
|
+ inner_point = inner_points[corner_idx]
|
|
|
|
|
+
|
|
|
|
|
+ # 从内部点向两条边做垂线得到边界点
|
|
|
|
|
+ edge_to_prev = fitted_quad[prev_idx] - corner_point
|
|
|
|
|
+ edge_to_next = fitted_quad[next_idx] - corner_point
|
|
|
|
|
+
|
|
|
|
|
+ edge_to_prev_unit = edge_to_prev / np.linalg.norm(edge_to_prev)
|
|
|
|
|
+ edge_to_next_unit = edge_to_next / np.linalg.norm(edge_to_next)
|
|
|
|
|
+
|
|
|
|
|
+ # 计算垂足
|
|
|
|
|
+ proj_prev = np.dot(inner_point - corner_point, edge_to_prev_unit)
|
|
|
|
|
+ proj_next = np.dot(inner_point - corner_point, edge_to_next_unit)
|
|
|
|
|
+
|
|
|
|
|
+ point_on_prev_edge = corner_point + proj_prev * edge_to_prev_unit
|
|
|
|
|
+ point_on_next_edge = corner_point + proj_next * edge_to_next_unit
|
|
|
|
|
+
|
|
|
|
|
+ # 创建角区域多边形
|
|
|
|
|
+ corner_polygon = np.array([
|
|
|
|
|
+ corner_point,
|
|
|
|
|
+ point_on_prev_edge,
|
|
|
|
|
+ inner_point,
|
|
|
|
|
+ point_on_next_edge
|
|
|
|
|
+ ], dtype=np.int32)
|
|
|
|
|
+
|
|
|
|
|
+ cv2.fillPoly(mask, [corner_polygon], 255)
|
|
|
|
|
+
|
|
|
|
|
+ return mask
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def get_edge_region_mask(
|
|
|
|
|
+ edge_idx: int,
|
|
|
|
|
+ fitted_quad: np.ndarray,
|
|
|
|
|
+ inner_points: np.ndarray,
|
|
|
|
|
+ edge_width: float,
|
|
|
|
|
+ img_shape: Tuple[int, int]
|
|
|
|
|
+ ) -> np.ndarray:
|
|
|
|
|
+ """获取边区域mask
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ edge_idx: 边索引(0-3)
|
|
|
|
|
+ fitted_quad: 拟合四边形顶点
|
|
|
|
|
+ inner_points: 内部四个点
|
|
|
|
|
+ edge_width: 边区域宽度
|
|
|
|
|
+ img_shape: 图像尺寸
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 边区域mask
|
|
|
|
|
+ """
|
|
|
|
|
+ mask = np.zeros(img_shape[:2], dtype=np.uint8)
|
|
|
|
|
+
|
|
|
|
|
+ # 获取边的两个端点
|
|
|
|
|
+ p1_idx = edge_idx
|
|
|
|
|
+ p2_idx = (edge_idx + 1) % 4
|
|
|
|
|
+
|
|
|
|
|
+ p1 = fitted_quad[p1_idx]
|
|
|
|
|
+ p2 = fitted_quad[p2_idx]
|
|
|
|
|
+
|
|
|
|
|
+ # 获取对应的内部点
|
|
|
|
|
+ inner_p1 = inner_points[p1_idx]
|
|
|
|
|
+ inner_p2 = inner_points[p2_idx]
|
|
|
|
|
+
|
|
|
|
|
+ # 计算边向量和法向量
|
|
|
|
|
+ edge_vec = p2 - p1
|
|
|
|
|
+ edge_unit = edge_vec / np.linalg.norm(edge_vec)
|
|
|
|
|
+
|
|
|
|
|
+ # 法向量(向内)
|
|
|
|
|
+ normal = np.array([-edge_unit[1], edge_unit[0]])
|
|
|
|
|
+
|
|
|
|
|
+ # 确保法向量指向内部
|
|
|
|
|
+ center = np.mean(fitted_quad, axis=0)
|
|
|
|
|
+ if np.dot(normal, center - (p1 + p2) / 2) < 0:
|
|
|
|
|
+ normal = -normal
|
|
|
|
|
+
|
|
|
|
|
+ # 从内部点向边做垂线
|
|
|
|
|
+ proj1 = np.dot(inner_p1 - p1, edge_unit)
|
|
|
|
|
+ proj2 = np.dot(inner_p2 - p1, edge_unit)
|
|
|
|
|
+
|
|
|
|
|
+ point_on_edge_1 = p1 + proj1 * edge_unit
|
|
|
|
|
+ point_on_edge_2 = p1 + proj2 * edge_unit
|
|
|
|
|
+
|
|
|
|
|
+ # 创建边区域梯形
|
|
|
|
|
+ inner_edge_p1 = point_on_edge_1 + edge_width * normal
|
|
|
|
|
+ inner_edge_p2 = point_on_edge_2 + edge_width * normal
|
|
|
|
|
+
|
|
|
|
|
+ edge_polygon = np.array([
|
|
|
|
|
+ point_on_edge_1,
|
|
|
|
|
+ point_on_edge_2,
|
|
|
|
|
+ inner_edge_p2,
|
|
|
|
|
+ inner_edge_p1
|
|
|
|
|
+ ], dtype=np.int32)
|
|
|
|
|
+
|
|
|
|
|
+ cv2.fillPoly(mask, [edge_polygon], 255)
|
|
|
|
|
+
|
|
|
|
|
+ return mask
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def calculate_defect_pixels(
|
|
|
|
|
+ contour_mask: np.ndarray,
|
|
|
|
|
+ fitted_mask: np.ndarray,
|
|
|
|
|
+ region_mask: np.ndarray
|
|
|
|
|
+ ) -> Tuple[int, int]:
|
|
|
|
|
+ """计算缺陷像素数
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ contour_mask: 轮廓mask
|
|
|
|
|
+ fitted_mask: 拟合mask
|
|
|
|
|
+ region_mask: 区域mask
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (突出像素数, 凹陷像素数)
|
|
|
|
|
+ """
|
|
|
|
|
+ # 限制在区域内
|
|
|
|
|
+ contour_in_region = cv2.bitwise_and(contour_mask, region_mask)
|
|
|
|
|
+ fitted_in_region = cv2.bitwise_and(fitted_mask, region_mask)
|
|
|
|
|
+
|
|
|
|
|
+ # 计算并集
|
|
|
|
|
+ union_mask = cv2.bitwise_or(contour_in_region, fitted_in_region)
|
|
|
|
|
+
|
|
|
|
|
+ # 突出:并集减去拟合区域
|
|
|
|
|
+ protrusion_mask = cv2.bitwise_and(union_mask, cv2.bitwise_not(fitted_in_region))
|
|
|
|
|
+ protrusion_pixels = np.sum(protrusion_mask > 0)
|
|
|
|
|
+
|
|
|
|
|
+ # 凹陷:并集减去轮廓区域
|
|
|
|
|
+ depression_mask = cv2.bitwise_and(union_mask, cv2.bitwise_not(contour_in_region))
|
|
|
|
|
+ depression_pixels = np.sum(depression_mask > 0)
|
|
|
|
|
+
|
|
|
|
|
+ return protrusion_pixels, depression_pixels
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class CardDefectDetectionAlgo:
|
|
|
|
|
+ """卡片边角缺陷检测算法类"""
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ """初始化"""
|
|
|
|
|
+ self.intermediate_results = {}
|
|
|
|
|
+ self.mask_processor = MaskProcessor()
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def load_json_data(json_path) -> Dict[str, Any]:
|
|
|
|
|
+ """加载JSON数据"""
|
|
|
|
|
+ with open(json_path, 'r', encoding='utf-8') as f:
|
|
|
|
|
+ return json.load(f)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _create_blank_image(width: int, height: int) -> np.ndarray:
|
|
|
|
|
+ """创建空白图像"""
|
|
|
|
|
+ return np.ones((height, width, 3), dtype=np.uint8) * 255
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def load_or_create_image(image_path: str) -> np.ndarray:
|
|
|
|
|
+ """加载图像或创建空白图像"""
|
|
|
|
|
+ if os.path.exists(image_path):
|
|
|
|
|
+ img = cv2.imread(str(image_path))
|
|
|
|
|
+ if img is not None:
|
|
|
|
|
+ return img
|
|
|
|
|
+
|
|
|
|
|
+ raise ValueError("警告", f"图像不存在或无法读取: {image_path},创建空白图像")
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _get_unique_shape(shapes: List[Dict], label_name: str) -> Optional[Dict]:
|
|
|
|
|
+ """获取唯一的形状"""
|
|
|
|
|
+ target_shapes = [s for s in shapes if s.get('label') == label_name]
|
|
|
|
|
+
|
|
|
|
|
+ if not target_shapes:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ if len(target_shapes) == 1:
|
|
|
|
|
+ return target_shapes[0]
|
|
|
|
|
+
|
|
|
|
|
+ def get_shape_score(shape):
|
|
|
|
|
+ confidence = shape.get('probability', 0)
|
|
|
|
|
+ points = np.array(shape['points'])
|
|
|
|
|
+ area = cv2.contourArea(points.astype(np.float32))
|
|
|
|
|
+ return (confidence, area)
|
|
|
|
|
+
|
|
|
|
|
+ target_shapes.sort(key=get_shape_score, reverse=True)
|
|
|
|
|
+ return target_shapes[0]
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _get_min_area_rect(points: np.ndarray) -> Tuple[np.ndarray, float, float, List[int], List[int]]:
|
|
|
|
|
+ """获取最小外接矩形并识别长短边
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (矩形顶点, 长边长度, 短边长度, 长边索引列表, 短边索引列表)
|
|
|
|
|
+ """
|
|
|
|
|
+ rect = cv2.minAreaRect(points.astype(np.float32))
|
|
|
|
|
+ box = cv2.boxPoints(rect)
|
|
|
|
|
+ box = np.intp(box)
|
|
|
|
|
+
|
|
|
|
|
+ # 计算每条边的长度
|
|
|
|
|
+ edge_lengths = []
|
|
|
|
|
+ for i in range(4):
|
|
|
|
|
+ p1 = box[i]
|
|
|
|
|
+ p2 = box[(i + 1) % 4]
|
|
|
|
|
+ length = np.linalg.norm(p2 - p1)
|
|
|
|
|
+ edge_lengths.append(length)
|
|
|
|
|
+
|
|
|
|
|
+ # 识别长边和短边
|
|
|
|
|
+ sorted_indices = np.argsort(edge_lengths)
|
|
|
|
|
+ short_edge_indices = sorted_indices[:2].tolist()
|
|
|
|
|
+ long_edge_indices = sorted_indices[2:].tolist()
|
|
|
|
|
+
|
|
|
|
|
+ long_edge = np.mean([edge_lengths[i] for i in long_edge_indices])
|
|
|
|
|
+ short_edge = np.mean([edge_lengths[i] for i in short_edge_indices])
|
|
|
|
|
+
|
|
|
|
|
+ return box, long_edge, short_edge, long_edge_indices, short_edge_indices
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _classify_points_to_edges_corners(
|
|
|
|
|
+ contour_points: np.ndarray,
|
|
|
|
|
+ rect_corners: np.ndarray,
|
|
|
|
|
+ long_edge: float,
|
|
|
|
|
+ short_edge: float,
|
|
|
|
|
+ long_corner_length: float,
|
|
|
|
|
+ short_corner_length: float
|
|
|
|
|
+ ) -> Dict[EdgeCornerType, np.ndarray]:
|
|
|
|
|
+ """将轮廓点分类到边和角"""
|
|
|
|
|
+ classified_points = {edge_type: [] for edge_type in EdgeCornerType}
|
|
|
|
|
+
|
|
|
|
|
+ edges = []
|
|
|
|
|
+ for i in range(4):
|
|
|
|
|
+ p1 = rect_corners[i]
|
|
|
|
|
+ p2 = rect_corners[(i + 1) % 4]
|
|
|
|
|
+ edges.append((p1, p2))
|
|
|
|
|
+
|
|
|
|
|
+ edge_lengths = [np.linalg.norm(e[1] - e[0]) for e in edges]
|
|
|
|
|
+ long_edge_indices = np.argsort(edge_lengths)[-2:]
|
|
|
|
|
+ short_edge_indices = np.argsort(edge_lengths)[:2]
|
|
|
|
|
+
|
|
|
|
|
+ for point in contour_points:
|
|
|
|
|
+ min_dist = float('inf')
|
|
|
|
|
+ min_type = None
|
|
|
|
|
+
|
|
|
|
|
+ for i, corner in enumerate(rect_corners):
|
|
|
|
|
+ dist = np.linalg.norm(point - corner)
|
|
|
|
|
+ if dist < min_dist:
|
|
|
|
|
+ min_dist = dist
|
|
|
|
|
+ if i == 0:
|
|
|
|
|
+ min_type = EdgeCornerType.TOP_LEFT
|
|
|
|
|
+ elif i == 1:
|
|
|
|
|
+ min_type = EdgeCornerType.TOP_RIGHT
|
|
|
|
|
+ elif i == 2:
|
|
|
|
|
+ min_type = EdgeCornerType.BOTTOM_RIGHT
|
|
|
|
|
+ else:
|
|
|
|
|
+ min_type = EdgeCornerType.BOTTOM_LEFT
|
|
|
|
|
+
|
|
|
|
|
+ for i, (p1, p2) in enumerate(edges):
|
|
|
|
|
+ dist = CardDefectDetectionAlgo._point_to_segment_distance(point, p1, p2)
|
|
|
|
|
+
|
|
|
|
|
+ edge_vector = p2 - p1
|
|
|
|
|
+ edge_length = np.linalg.norm(edge_vector)
|
|
|
|
|
+ projection = np.dot(point - p1, edge_vector) / edge_length
|
|
|
|
|
+
|
|
|
|
|
+ if i in long_edge_indices:
|
|
|
|
|
+ corner_ratio = long_corner_length / edge_length
|
|
|
|
|
+ else:
|
|
|
|
|
+ corner_ratio = short_corner_length / edge_length
|
|
|
|
|
+
|
|
|
|
|
+ if corner_ratio < projection / edge_length < (1 - corner_ratio):
|
|
|
|
|
+ if dist < min_dist:
|
|
|
|
|
+ min_dist = dist
|
|
|
|
|
+ if i == 0:
|
|
|
|
|
+ min_type = EdgeCornerType.TOP
|
|
|
|
|
+ elif i == 1:
|
|
|
|
|
+ min_type = EdgeCornerType.RIGHT
|
|
|
|
|
+ elif i == 2:
|
|
|
|
|
+ min_type = EdgeCornerType.BOTTOM
|
|
|
|
|
+ else:
|
|
|
|
|
+ min_type = EdgeCornerType.LEFT
|
|
|
|
|
+
|
|
|
|
|
+ if min_type:
|
|
|
|
|
+ classified_points[min_type].append(point)
|
|
|
|
|
+
|
|
|
|
|
+ for key in classified_points:
|
|
|
|
|
+ if classified_points[key]:
|
|
|
|
|
+ classified_points[key] = np.array(classified_points[key])
|
|
|
|
|
+ else:
|
|
|
|
|
+ classified_points[key] = np.array([]).reshape(0, 2)
|
|
|
|
|
+
|
|
|
|
|
+ return classified_points
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _point_to_segment_distance(point: np.ndarray, seg_p1: np.ndarray, seg_p2: np.ndarray) -> float:
|
|
|
|
|
+ """计算点到线段的距离"""
|
|
|
|
|
+ line_vec = seg_p2 - seg_p1
|
|
|
|
|
+ point_vec = point - seg_p1
|
|
|
|
|
+ line_len = np.linalg.norm(line_vec)
|
|
|
|
|
+
|
|
|
|
|
+ if line_len == 0:
|
|
|
|
|
+ return np.linalg.norm(point - seg_p1)
|
|
|
|
|
+
|
|
|
|
|
+ line_unitvec = line_vec / line_len
|
|
|
|
|
+ proj_length = np.dot(point_vec, line_unitvec)
|
|
|
|
|
+
|
|
|
|
|
+ if proj_length < 0:
|
|
|
|
|
+ return np.linalg.norm(point - seg_p1)
|
|
|
|
|
+ elif proj_length > line_len:
|
|
|
|
|
+ return np.linalg.norm(point - seg_p2)
|
|
|
|
|
+ else:
|
|
|
|
|
+ proj_point = seg_p1 + proj_length * line_unitvec
|
|
|
|
|
+ return np.linalg.norm(point - proj_point)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _fit_line_with_ransac(points: np.ndarray, max_iterations: int = 1000,
|
|
|
|
|
+ threshold: float = 5.0) -> Tuple[float, float, float]:
|
|
|
|
|
+ """使用RANSAC拟合直线
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ points: 点集 (N, 2)
|
|
|
|
|
+ max_iterations: 最大迭代次数
|
|
|
|
|
+ threshold: 内点判定阈值
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (a, b, c) 直线方程系数 ax + by + c = 0
|
|
|
|
|
+ """
|
|
|
|
|
+ if len(points) < 2:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ best_line = None
|
|
|
|
|
+ best_inliers = 0
|
|
|
|
|
+
|
|
|
|
|
+ for _ in range(max_iterations):
|
|
|
|
|
+ # 随机选择两个点
|
|
|
|
|
+ idx = np.random.choice(len(points), 2, replace=False)
|
|
|
|
|
+ p1, p2 = points[idx]
|
|
|
|
|
+
|
|
|
|
|
+ # 计算直线参数
|
|
|
|
|
+ if abs(p2[0] - p1[0]) < 1e-6: # 垂直线
|
|
|
|
|
+ a, b, c = 1, 0, -p1[0]
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 一般直线 y = kx + m => kx - y + m = 0
|
|
|
|
|
+ k = (p2[1] - p1[1]) / (p2[0] - p1[0])
|
|
|
|
|
+ m = p1[1] - k * p1[0]
|
|
|
|
|
+ a, b, c = k, -1, m
|
|
|
|
|
+
|
|
|
|
|
+ # 归一化
|
|
|
|
|
+ norm = np.sqrt(a * a + b * b)
|
|
|
|
|
+ a, b, c = a / norm, b / norm, c / norm
|
|
|
|
|
+
|
|
|
|
|
+ # 计算内点数
|
|
|
|
|
+ distances = np.abs(a * points[:, 0] + b * points[:, 1] + c)
|
|
|
|
|
+ inliers = np.sum(distances < threshold)
|
|
|
|
|
+
|
|
|
|
|
+ if inliers > best_inliers:
|
|
|
|
|
+ best_inliers = inliers
|
|
|
|
|
+ best_line = (a, b, c)
|
|
|
|
|
+
|
|
|
|
|
+ # 使用所有内点重新拟合(精细化)
|
|
|
|
|
+ if best_line:
|
|
|
|
|
+ a, b, c = best_line
|
|
|
|
|
+ distances = np.abs(a * points[:, 0] + b * points[:, 1] + c)
|
|
|
|
|
+ inlier_points = points[distances < threshold]
|
|
|
|
|
+
|
|
|
|
|
+ if len(inlier_points) >= 2:
|
|
|
|
|
+ # 使用OpenCV的fitLine进行最终拟合
|
|
|
|
|
+ vx, vy, x0, y0 = cv2.fitLine(inlier_points, cv2.DIST_L2, 0, 0.01, 0.01)
|
|
|
|
|
+
|
|
|
|
|
+ # 转换为一般式 ax + by + c = 0
|
|
|
|
|
+ # 直线方向向量(vx, vy),点(x0, y0)在直线上
|
|
|
|
|
+ # 法向量为(-vy, vx)
|
|
|
|
|
+ a = -vy[0]
|
|
|
|
|
+ b = vx[0]
|
|
|
|
|
+ c = -(a * x0[0] + b * y0[0])
|
|
|
|
|
+
|
|
|
|
|
+ # 归一化
|
|
|
|
|
+ norm = np.sqrt(a * a + b * b)
|
|
|
|
|
+ return (a / norm, b / norm, c / norm)
|
|
|
|
|
+
|
|
|
|
|
+ return best_line
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _fit_lines_opencv(
|
|
|
|
|
+ classified_points: Dict[EdgeCornerType, np.ndarray],
|
|
|
|
|
+ threshold: float = 5.0
|
|
|
|
|
+ ) -> Dict[EdgeCornerType, Tuple[float, float, float]]:
|
|
|
|
|
+ """使用OpenCV拟合四条边的直线(只使用边点,不使用角点)
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ classified_points: 分类后的点
|
|
|
|
|
+ threshold: RANSAC阈值
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 拟合的直线字典 {边类型: (a, b, c)}
|
|
|
|
|
+ """
|
|
|
|
|
+ fitted_lines = {}
|
|
|
|
|
+
|
|
|
|
|
+ # 边类型映射
|
|
|
|
|
+ edge_only_types = {
|
|
|
|
|
+ EdgeCornerType.TOP: EdgeCornerType.TOP,
|
|
|
|
|
+ EdgeCornerType.BOTTOM: EdgeCornerType.BOTTOM,
|
|
|
|
|
+ EdgeCornerType.LEFT: EdgeCornerType.LEFT,
|
|
|
|
|
+ EdgeCornerType.RIGHT: EdgeCornerType.RIGHT
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for edge_type in edge_only_types:
|
|
|
|
|
+ # 只使用纯边点,不包含角点
|
|
|
|
|
+ edge_points = classified_points.get(edge_type, np.array([]))
|
|
|
|
|
+
|
|
|
|
|
+ if len(edge_points) < 2:
|
|
|
|
|
+ fry_algo_print("警告", f"{edge_type.value}边点数不足,跳过拟合")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 使用RANSAC拟合直线
|
|
|
|
|
+ line_params = CardDefectDetectionAlgo._fit_line_with_ransac(
|
|
|
|
|
+ edge_points, threshold=threshold
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if line_params:
|
|
|
|
|
+ fitted_lines[edge_type] = line_params
|
|
|
|
|
+ fry_algo_print("信息", f"{edge_type.value}拟合成功,点数: {len(edge_points)}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ fry_algo_print("警告", f"{edge_type.value}拟合失败")
|
|
|
|
|
+
|
|
|
|
|
+ return fitted_lines
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _get_line_intersection(line1: Tuple[float, float, float],
|
|
|
|
|
+ line2: Tuple[float, float, float]) -> Optional[np.ndarray]:
|
|
|
|
|
+ """计算两条直线的交点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ line1: 直线1参数 (a1, b1, c1)
|
|
|
|
|
+ line2: 直线2参数 (a2, b2, c2)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 交点坐标 [x, y] 或 None
|
|
|
|
|
+ """
|
|
|
|
|
+ a1, b1, c1 = line1
|
|
|
|
|
+ a2, b2, c2 = line2
|
|
|
|
|
+
|
|
|
|
|
+ det = a1 * b2 - a2 * b1
|
|
|
|
|
+ if abs(det) < 1e-10:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ x = (b1 * c2 - b2 * c1) / det
|
|
|
|
|
+ y = (a2 * c1 - a1 * c2) / det
|
|
|
|
|
+
|
|
|
|
|
+ return np.array([x, y])
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _get_quadrilateral_from_lines(fitted_lines: Dict[EdgeCornerType, Tuple[float, float, float]]) -> np.ndarray:
|
|
|
|
|
+ """从拟合的直线获取四边形的四个角点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ fitted_lines: 拟合的直线字典
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 四个角点坐标,按左上、右上、右下、左下顺序
|
|
|
|
|
+ """
|
|
|
|
|
+ # 检查是否有四条边
|
|
|
|
|
+ required_edges = [EdgeCornerType.TOP, EdgeCornerType.RIGHT,
|
|
|
|
|
+ EdgeCornerType.BOTTOM, EdgeCornerType.LEFT]
|
|
|
|
|
+
|
|
|
|
|
+ for edge in required_edges:
|
|
|
|
|
+ if edge not in fitted_lines:
|
|
|
|
|
+ fry_algo_print("警告", f"缺少{edge.value}的拟合直线")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # 计算四个角点
|
|
|
|
|
+ corners = {}
|
|
|
|
|
+
|
|
|
|
|
+ # 左上角 = 左边 ∩ 上边
|
|
|
|
|
+ top_left = CardDefectDetectionAlgo._get_line_intersection(
|
|
|
|
|
+ fitted_lines[EdgeCornerType.LEFT],
|
|
|
|
|
+ fitted_lines[EdgeCornerType.TOP]
|
|
|
|
|
+ )
|
|
|
|
|
+ if top_left is not None:
|
|
|
|
|
+ corners['top_left'] = top_left
|
|
|
|
|
+
|
|
|
|
|
+ # 右上角 = 右边 ∩ 上边
|
|
|
|
|
+ top_right = CardDefectDetectionAlgo._get_line_intersection(
|
|
|
|
|
+ fitted_lines[EdgeCornerType.RIGHT],
|
|
|
|
|
+ fitted_lines[EdgeCornerType.TOP]
|
|
|
|
|
+ )
|
|
|
|
|
+ if top_right is not None:
|
|
|
|
|
+ corners['top_right'] = top_right
|
|
|
|
|
+
|
|
|
|
|
+ # 右下角 = 右边 ∩ 下边
|
|
|
|
|
+ bottom_right = CardDefectDetectionAlgo._get_line_intersection(
|
|
|
|
|
+ fitted_lines[EdgeCornerType.RIGHT],
|
|
|
|
|
+ fitted_lines[EdgeCornerType.BOTTOM]
|
|
|
|
|
+ )
|
|
|
|
|
+ if bottom_right is not None:
|
|
|
|
|
+ corners['bottom_right'] = bottom_right
|
|
|
|
|
+
|
|
|
|
|
+ # 左下角 = 左边 ∩ 下边
|
|
|
|
|
+ bottom_left = CardDefectDetectionAlgo._get_line_intersection(
|
|
|
|
|
+ fitted_lines[EdgeCornerType.LEFT],
|
|
|
|
|
+ fitted_lines[EdgeCornerType.BOTTOM]
|
|
|
|
|
+ )
|
|
|
|
|
+ if bottom_left is not None:
|
|
|
|
|
+ corners['bottom_left'] = bottom_left
|
|
|
|
|
+
|
|
|
|
|
+ # 检查是否获得了所有角点
|
|
|
|
|
+ if len(corners) != 4:
|
|
|
|
|
+ fry_algo_print("警告", f"只计算出{len(corners)}个角点")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # 按顺序返回
|
|
|
|
|
+ return np.array([
|
|
|
|
|
+ corners['top_left'],
|
|
|
|
|
+ corners['top_right'],
|
|
|
|
|
+ corners['bottom_right'],
|
|
|
|
|
+ corners['bottom_left']
|
|
|
|
|
+ ])
|
|
|
|
|
+
|
|
|
|
|
+ def _calculate_defects_with_mask(
|
|
|
|
|
+ self,
|
|
|
|
|
+ contour_points: np.ndarray,
|
|
|
|
|
+ fitted_quad: np.ndarray,
|
|
|
|
|
+ classified_points: Dict[EdgeCornerType, np.ndarray],
|
|
|
|
|
+ img_shape: Tuple[int, int],
|
|
|
|
|
+ long_corner_length: float,
|
|
|
|
|
+ short_corner_length: float,
|
|
|
|
|
+ edge_width: float
|
|
|
|
|
+ ) -> Tuple[Dict[EdgeCornerType, EdgeCornerDefect], np.ndarray]:
|
|
|
|
|
+ """使用mask方法计算缺陷
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (缺陷字典, 内部四个点)
|
|
|
|
|
+ """
|
|
|
|
|
+ defects = {}
|
|
|
|
|
+
|
|
|
|
|
+ # 获取拟合四边形的最小外接矩形,识别长短边
|
|
|
|
|
+ _, _, _, long_edge_indices, short_edge_indices = self._get_min_area_rect(fitted_quad)
|
|
|
|
|
+
|
|
|
|
|
+ # 计算内部四个点
|
|
|
|
|
+ inner_points = self.mask_processor.calculate_inner_points(
|
|
|
|
|
+ fitted_quad, long_edge_indices, short_edge_indices,
|
|
|
|
|
+ long_corner_length, short_corner_length
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 创建轮廓和拟合四边形的mask
|
|
|
|
|
+ contour_mask = self.mask_processor.polygon_to_mask(contour_points, img_shape)
|
|
|
|
|
+ fitted_mask = self.mask_processor.polygon_to_mask(fitted_quad, img_shape)
|
|
|
|
|
+
|
|
|
|
|
+ # 处理四个角
|
|
|
|
|
+ corner_types = [
|
|
|
|
|
+ EdgeCornerType.TOP_LEFT,
|
|
|
|
|
+ EdgeCornerType.TOP_RIGHT,
|
|
|
|
|
+ EdgeCornerType.BOTTOM_RIGHT,
|
|
|
|
|
+ EdgeCornerType.BOTTOM_LEFT
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ for i, corner_type in enumerate(corner_types):
|
|
|
|
|
+ defect = EdgeCornerDefect(type=corner_type)
|
|
|
|
|
+
|
|
|
|
|
+ # 获取角区域mask
|
|
|
|
|
+ region_mask = self.mask_processor.get_corner_region_mask(
|
|
|
|
|
+ i, fitted_quad, inner_points, img_shape
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 计算缺陷
|
|
|
|
|
+ protrusion_pixels, depression_pixels = self.mask_processor.calculate_defect_pixels(
|
|
|
|
|
+ contour_mask, fitted_mask, region_mask
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ defect.protrusion_pixels = protrusion_pixels
|
|
|
|
|
+ defect.depression_pixels = depression_pixels
|
|
|
|
|
+ defect.protrusion_area = float(protrusion_pixels)
|
|
|
|
|
+ defect.depression_area = float(depression_pixels)
|
|
|
|
|
+
|
|
|
|
|
+ # 保存该区域的点
|
|
|
|
|
+ edge_corner_points = classified_points.get(corner_type, np.array([]))
|
|
|
|
|
+ if len(edge_corner_points) > 0:
|
|
|
|
|
+ defect.contour_points = edge_corner_points.tolist()
|
|
|
|
|
+
|
|
|
|
|
+ defects[corner_type] = defect
|
|
|
|
|
+
|
|
|
|
|
+ # 处理四条边
|
|
|
|
|
+ edge_types = [
|
|
|
|
|
+ EdgeCornerType.TOP,
|
|
|
|
|
+ EdgeCornerType.RIGHT,
|
|
|
|
|
+ EdgeCornerType.BOTTOM,
|
|
|
|
|
+ EdgeCornerType.LEFT
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ for i, edge_type in enumerate(edge_types):
|
|
|
|
|
+ defect = EdgeCornerDefect(type=edge_type)
|
|
|
|
|
+
|
|
|
|
|
+ # 获取边区域mask
|
|
|
|
|
+ region_mask = self.mask_processor.get_edge_region_mask(
|
|
|
|
|
+ i, fitted_quad, inner_points, edge_width, img_shape
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 计算缺陷
|
|
|
|
|
+ protrusion_pixels, depression_pixels = self.mask_processor.calculate_defect_pixels(
|
|
|
|
|
+ contour_mask, fitted_mask, region_mask
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ defect.protrusion_pixels = protrusion_pixels
|
|
|
|
|
+ defect.depression_pixels = depression_pixels
|
|
|
|
|
+ defect.protrusion_area = float(protrusion_pixels)
|
|
|
|
|
+ defect.depression_area = float(depression_pixels)
|
|
|
|
|
+
|
|
|
|
|
+ # 保存该区域的点
|
|
|
|
|
+ edge_points = classified_points.get(edge_type, np.array([]))
|
|
|
|
|
+ if len(edge_points) > 0:
|
|
|
|
|
+ defect.contour_points = edge_points.tolist()
|
|
|
|
|
+
|
|
|
|
|
+ defects[edge_type] = defect
|
|
|
|
|
+
|
|
|
|
|
+ return defects, inner_points
|
|
|
|
|
+
|
|
|
|
|
+ def _draw_text_with_background(
|
|
|
|
|
+ self,
|
|
|
|
|
+ img: np.ndarray,
|
|
|
|
|
+ text: str,
|
|
|
|
|
+ position: Tuple[int, int],
|
|
|
|
|
+ font_scale: float = 0.6,
|
|
|
|
|
+ font_thickness: int = 1,
|
|
|
|
|
+ text_color: Tuple[int, int, int] = (255, 255, 255),
|
|
|
|
|
+ bg_color: Tuple[int, int, int] = (0, 0, 0),
|
|
|
|
|
+ bg_opacity: float = 0.7
|
|
|
|
|
+ ) -> np.ndarray:
|
|
|
|
|
+ """在图像上绘制带半透明背景的文字"""
|
|
|
|
|
+ font = cv2.FONT_HERSHEY_SIMPLEX
|
|
|
|
|
+
|
|
|
|
|
+ (text_width, text_height), baseline = cv2.getTextSize(
|
|
|
|
|
+ text, font, font_scale, font_thickness
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ x, y = position
|
|
|
|
|
+ padding = 5
|
|
|
|
|
+ bg_pt1 = (x - padding, y - text_height - padding)
|
|
|
|
|
+ bg_pt2 = (x + text_width + padding, y + baseline + padding)
|
|
|
|
|
+
|
|
|
|
|
+ overlay = img.copy()
|
|
|
|
|
+ cv2.rectangle(overlay, bg_pt1, bg_pt2, bg_color, -1)
|
|
|
|
|
+
|
|
|
|
|
+ cv2.addWeighted(overlay, bg_opacity, img, 1 - bg_opacity, 0, img)
|
|
|
|
|
+ cv2.putText(img, text, position, font, font_scale, text_color, font_thickness)
|
|
|
|
|
+
|
|
|
|
|
+ return img
|
|
|
|
|
+
|
|
|
|
|
+ def _draw_overlay_on_image(
|
|
|
|
|
+ self,
|
|
|
|
|
+ image: np.ndarray,
|
|
|
|
|
+ step_name: str,
|
|
|
|
|
+ contour_points: Optional[np.ndarray] = None,
|
|
|
|
|
+ min_rect: Optional[np.ndarray] = None,
|
|
|
|
|
+ fitted_quad: Optional[np.ndarray] = None,
|
|
|
|
|
+ inner_points: Optional[np.ndarray] = None,
|
|
|
|
|
+ classified_points: Optional[Dict[EdgeCornerType, np.ndarray]] = None,
|
|
|
|
|
+ defects: Optional[Dict[EdgeCornerType, EdgeCornerDefect]] = None
|
|
|
|
|
+ ) -> np.ndarray:
|
|
|
|
|
+ """在图像上绘制叠加可视化"""
|
|
|
|
|
+ overlay = image.copy()
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制轮廓
|
|
|
|
|
+ if contour_points is not None and len(contour_points) > 0:
|
|
|
|
|
+ cv2.polylines(overlay, [contour_points.astype(np.int32)], True, (255, 0, 0), 2)
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制最小外接矩形
|
|
|
|
|
+ if min_rect is not None and len(min_rect) == 4:
|
|
|
|
|
+ cv2.polylines(overlay, [min_rect.astype(np.int32)], True, (0, 255, 0), 2)
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制拟合四边形
|
|
|
|
|
+ if fitted_quad is not None and len(fitted_quad) == 4:
|
|
|
|
|
+ cv2.polylines(overlay, [fitted_quad.astype(np.int32)], True, (0, 0, 255), 3)
|
|
|
|
|
+ for i, point in enumerate(fitted_quad):
|
|
|
|
|
+ cv2.circle(overlay, tuple(point.astype(int)), 8, (0, 0, 255), -1)
|
|
|
|
|
+ self._draw_text_with_background(
|
|
|
|
|
+ overlay, f"C{i}", tuple(point.astype(int)),
|
|
|
|
|
+ font_scale=0.8, text_color=(255, 255, 255)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制内部四个点
|
|
|
|
|
+ if inner_points is not None and len(inner_points) == 4:
|
|
|
|
|
+ for i, point in enumerate(inner_points):
|
|
|
|
|
+ cv2.circle(overlay, tuple(point.astype(int)), 10, (255, 0, 255), -1)
|
|
|
|
|
+ self._draw_text_with_background(
|
|
|
|
|
+ overlay, f"B{i + 1}", tuple(point.astype(int) + np.array([15, 0])),
|
|
|
|
|
+ font_scale=0.8, text_color=(255, 255, 0), bg_color=(128, 0, 128)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制连接线(可选)
|
|
|
|
|
+ if fitted_quad is not None:
|
|
|
|
|
+ cv2.line(overlay, tuple(fitted_quad[i].astype(int)),
|
|
|
|
|
+ tuple(point.astype(int)), (255, 0, 255), 1, cv2.LINE_AA)
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制分类点
|
|
|
|
|
+ if classified_points:
|
|
|
|
|
+ colors = {
|
|
|
|
|
+ EdgeCornerType.TOP_LEFT: (255, 0, 0),
|
|
|
|
|
+ EdgeCornerType.TOP_RIGHT: (0, 255, 0),
|
|
|
|
|
+ EdgeCornerType.BOTTOM_LEFT: (255, 255, 0),
|
|
|
|
|
+ EdgeCornerType.BOTTOM_RIGHT: (0, 255, 255),
|
|
|
|
|
+ EdgeCornerType.TOP: (255, 0, 255),
|
|
|
|
|
+ EdgeCornerType.BOTTOM: (128, 0, 255),
|
|
|
|
|
+ EdgeCornerType.LEFT: (0, 128, 255),
|
|
|
|
|
+ EdgeCornerType.RIGHT: (255, 128, 0)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for edge_type, points in classified_points.items():
|
|
|
|
|
+ if len(points) > 0:
|
|
|
|
|
+ color = colors.get(edge_type, (128, 128, 128))
|
|
|
|
|
+ for point in points:
|
|
|
|
|
+ cv2.circle(overlay, tuple(point.astype(int)), 3, color, -1)
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制缺陷信息
|
|
|
|
|
+ if defects:
|
|
|
|
|
+ y_offset = 30
|
|
|
|
|
+ for edge_type, defect in defects.items():
|
|
|
|
|
+ if defect.protrusion_pixels >= 0 or defect.depression_pixels >= 0:
|
|
|
|
|
+ text = f"{edge_type.name}: protrusion={defect.protrusion_pixels}px, depression={defect.depression_pixels}px"
|
|
|
|
|
+ self._draw_text_with_background(
|
|
|
|
|
+ overlay, text, (10, y_offset),
|
|
|
|
|
+ font_scale=0.5, text_color=(255, 255, 255),
|
|
|
|
|
+ bg_color=(0, 0, 128), bg_opacity=0.7
|
|
|
|
|
+ )
|
|
|
|
|
+ y_offset += 25
|
|
|
|
|
+
|
|
|
|
|
+ # 添加步骤标题
|
|
|
|
|
+ self._draw_text_with_background(
|
|
|
|
|
+ overlay, step_name, (10, overlay.shape[0] - 20),
|
|
|
|
|
+ font_scale=1.0, text_color=(255, 255, 255),
|
|
|
|
|
+ bg_color=(255, 0, 0), bg_opacity=0.7
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ return overlay
|
|
|
|
|
+
|
|
|
|
|
+ def _save_intermediate_result(
|
|
|
|
|
+ self,
|
|
|
|
|
+ step_num: int,
|
|
|
|
|
+ step_name: str,
|
|
|
|
|
+ data: Any,
|
|
|
|
|
+ output_dir: Path,
|
|
|
|
|
+ image: Optional[np.ndarray] = None,
|
|
|
|
|
+ save_overlay: bool = True,
|
|
|
|
|
+ inner_points: Optional[np.ndarray] = None
|
|
|
|
|
+ ):
|
|
|
|
|
+ """保存中间结果"""
|
|
|
|
|
+ step_dir = output_dir / "intermediate_results"
|
|
|
|
|
+ step_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ json_path = step_dir / f"step_{step_num:02d}_{step_name}.json"
|
|
|
|
|
+ with open(json_path, 'w', encoding='utf-8') as f:
|
|
|
|
|
+ json.dump(to_json_serializable(data), f, ensure_ascii=False, indent=2)
|
|
|
|
|
+
|
|
|
|
|
+ if save_overlay and image is not None:
|
|
|
|
|
+ overlay_image = self._draw_overlay_on_image(
|
|
|
|
|
+ image=image,
|
|
|
|
|
+ step_name=f"Step {step_num}: {step_name}",
|
|
|
|
|
+ contour_points=np.array(data.get("contour", [])) if "contour" in data else None,
|
|
|
|
|
+ min_rect=np.array(data.get("min_rect", [])) if "min_rect" in data else None,
|
|
|
|
|
+ fitted_quad=np.array(data.get("fitted_quad", [])) if "fitted_quad" in data else None,
|
|
|
|
|
+ inner_points=inner_points,
|
|
|
|
|
+ classified_points={EdgeCornerType(k): np.array(v) for k, v in
|
|
|
|
|
+ data.get("classified_points", {}).items()} if "classified_points" in data else None,
|
|
|
|
|
+ defects={EdgeCornerType(k): EdgeCornerDefect.load_from_dict(v) for k, v in
|
|
|
|
|
+ data.get("defects", {}).items()} if "defects" in data else None
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ overlay_path = step_dir / f"step_{step_num:02d}_{step_name}_overlay.jpg"
|
|
|
|
|
+ cv2.imwrite(str(overlay_path), overlay_image)
|
|
|
|
|
+
|
|
|
|
|
+ def _interpolate_contour_points(self, contour_points: np.ndarray,
|
|
|
|
|
+ interpolation_method: str = "linear") -> np.ndarray:
|
|
|
|
|
+ """
|
|
|
|
|
+ 在轮廓点之间插入新点,增加点的密度
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ contour_points: 原始轮廓点
|
|
|
|
|
+ interpolation_method: 插值方法,可选 "linear" 或 "midpoint"
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 插值后的轮廓点
|
|
|
|
|
+ """
|
|
|
|
|
+ if len(contour_points) < 2:
|
|
|
|
|
+ return contour_points
|
|
|
|
|
+
|
|
|
|
|
+ interpolated_points = []
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(len(contour_points)):
|
|
|
|
|
+ # 添加原始点
|
|
|
|
|
+ interpolated_points.append(contour_points[i])
|
|
|
|
|
+
|
|
|
|
|
+ # 获取下一个点(形成闭合轮廓)
|
|
|
|
|
+ next_idx = (i + 1) % len(contour_points)
|
|
|
|
|
+ next_point = contour_points[next_idx]
|
|
|
|
|
+
|
|
|
|
|
+ if interpolation_method == "midpoint":
|
|
|
|
|
+ # 简单的中点插值
|
|
|
|
|
+ mid_point = (contour_points[i] + next_point) / 2.0
|
|
|
|
|
+ interpolated_points.append(mid_point)
|
|
|
|
|
+ elif interpolation_method == "linear":
|
|
|
|
|
+ # 线性插值,可以插入多个点
|
|
|
|
|
+ distance = np.linalg.norm(next_point - contour_points[i])
|
|
|
|
|
+ # 根据距离决定插入点的数量(每50像素插入一个点)
|
|
|
|
|
+ num_insert = max(1, int(distance / 50))
|
|
|
|
|
+ for j in range(1, num_insert + 1):
|
|
|
|
|
+ t = j / (num_insert + 1)
|
|
|
|
|
+ interp_point = (1 - t) * contour_points[i] + t * next_point
|
|
|
|
|
+ interpolated_points.append(interp_point)
|
|
|
|
|
+
|
|
|
|
|
+ return np.array(interpolated_points, dtype=np.int32)
|
|
|
|
|
+
|
|
|
|
|
+ def process_single_group_entrance(
|
|
|
|
|
+ self,
|
|
|
|
|
+ data_dict: Dict,
|
|
|
|
|
+ algo_params: CardDefectDetectionParams
|
|
|
|
|
+ ) -> Dict:
|
|
|
|
|
+ """处理单个数据组的核心逻辑"""
|
|
|
|
|
+ if algo_params.debug_level == 'detail':
|
|
|
|
|
+ fry_algo_print("信息", f"开始处理数据,参数: {algo_params.to_dict()}")
|
|
|
|
|
+
|
|
|
|
|
+ image = data_dict.get("image")
|
|
|
|
|
+ json_data = data_dict.get("label", {})
|
|
|
|
|
+ output_dir = Path(data_dict.get("output_dir", "./output"))
|
|
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ # 获取图像尺寸
|
|
|
|
|
+ if image is not None:
|
|
|
|
|
+ img_shape = image.shape
|
|
|
|
|
+ else:
|
|
|
|
|
+ img_shape = (json_data.get("imageHeight", 2048), json_data.get("imageWidth", 2048), 3)
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤1:读取并筛选唯一形状
|
|
|
|
|
+ shapes = json_data.get("shapes", [])
|
|
|
|
|
+ unique_shape = self._get_unique_shape(shapes, algo_params.label_name)
|
|
|
|
|
+
|
|
|
|
|
+ if unique_shape is None:
|
|
|
|
|
+ fry_algo_print("错误", f"未找到标签为{algo_params.label_name}的形状")
|
|
|
|
|
+ return data_dict
|
|
|
|
|
+
|
|
|
|
|
+ contour_points = np.array(unique_shape["points"])
|
|
|
|
|
+
|
|
|
|
|
+ if algo_params.save_intermediate_results:
|
|
|
|
|
+ self._save_intermediate_result(
|
|
|
|
|
+ 1, "筛选唯一形状",
|
|
|
|
|
+ {"contour": contour_points.tolist(), "shape_info": unique_shape},
|
|
|
|
|
+ output_dir, image, algo_params.save_overlay_images
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # ========== 步骤1.5: 轮廓点插值(新增) ==========
|
|
|
|
|
+ interpolated_contour = self._interpolate_contour_points(contour_points, "linear")
|
|
|
|
|
+
|
|
|
|
|
+ if algo_params.debug_level == 'detail':
|
|
|
|
|
+ fry_algo_print("信息", f"轮廓点插值: {len(contour_points)} -> {len(interpolated_contour)} 点")
|
|
|
|
|
+
|
|
|
|
|
+ # 使用插值后的轮廓点进行后续处理
|
|
|
|
|
+ contour_points = interpolated_contour
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤2:获取最小外接矩形
|
|
|
|
|
+ min_rect, long_edge, short_edge, _, _ = self._get_min_area_rect(contour_points)
|
|
|
|
|
+
|
|
|
|
|
+ if algo_params.save_intermediate_results:
|
|
|
|
|
+ self._save_intermediate_result(
|
|
|
|
|
+ 2, "最小外接矩形",
|
|
|
|
|
+ {
|
|
|
|
|
+ "contour": contour_points.tolist(),
|
|
|
|
|
+ "min_rect": min_rect.tolist(),
|
|
|
|
|
+ "long_edge": float(long_edge),
|
|
|
|
|
+ "short_edge": float(short_edge)
|
|
|
|
|
+ },
|
|
|
|
|
+ output_dir, image, algo_params.save_overlay_images
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤3:初始分类
|
|
|
|
|
+ classified_points = self._classify_points_to_edges_corners(
|
|
|
|
|
+ contour_points, min_rect, long_edge, short_edge,
|
|
|
|
|
+ algo_params.long_edge_corner_length,
|
|
|
|
|
+ algo_params.short_edge_corner_length
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if algo_params.save_intermediate_results:
|
|
|
|
|
+ classified_dict = {k.value: v.tolist() if len(v) > 0 else []
|
|
|
|
|
+ for k, v in classified_points.items()}
|
|
|
|
|
+ self._save_intermediate_result(
|
|
|
|
|
+ 3, "初始点分类",
|
|
|
|
|
+ {"classified_points": classified_dict, "min_rect": min_rect.tolist()},
|
|
|
|
|
+ output_dir, image, algo_params.save_overlay_images
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤4-5:迭代拟合
|
|
|
|
|
+ fitted_quad = min_rect
|
|
|
|
|
+ for iteration in range(algo_params.iteration_rounds):
|
|
|
|
|
+ # 使用新的OpenCV拟合方法(只用边点)
|
|
|
|
|
+ fitted_lines = self._fit_lines_opencv(
|
|
|
|
|
+ classified_points,
|
|
|
|
|
+ algo_params.ransac_residual_threshold
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ new_quad = self._get_quadrilateral_from_lines(fitted_lines)
|
|
|
|
|
+
|
|
|
|
|
+ if new_quad is not None:
|
|
|
|
|
+ fitted_quad = new_quad
|
|
|
|
|
+
|
|
|
|
|
+ # 重新分类点
|
|
|
|
|
+ classified_points = self._classify_points_to_edges_corners(
|
|
|
|
|
+ contour_points, fitted_quad, long_edge, short_edge,
|
|
|
|
|
+ algo_params.long_edge_corner_length,
|
|
|
|
|
+ algo_params.short_edge_corner_length
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if algo_params.save_intermediate_results and iteration == algo_params.iteration_rounds - 1:
|
|
|
|
|
+ classified_dict = {k.value: v.tolist() if len(v) > 0 else []
|
|
|
|
|
+ for k, v in classified_points.items()}
|
|
|
|
|
+
|
|
|
|
|
+ # 转换fitted_lines格式用于保存
|
|
|
|
|
+ fitted_lines_dict = {k.value: v for k, v in fitted_lines.items()}
|
|
|
|
|
+
|
|
|
|
|
+ self._save_intermediate_result(
|
|
|
|
|
+ 4 + iteration, f"迭代{iteration + 1}_拟合结果",
|
|
|
|
|
+ {
|
|
|
|
|
+ "classified_points": classified_dict,
|
|
|
|
|
+ "fitted_quad": fitted_quad.tolist(),
|
|
|
|
|
+ "fitted_lines": fitted_lines_dict
|
|
|
|
|
+ },
|
|
|
|
|
+ output_dir, image, algo_params.save_overlay_images
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤6:计算缺陷
|
|
|
|
|
+ defects, inner_points = self._calculate_defects_with_mask(
|
|
|
|
|
+ contour_points, fitted_quad, classified_points,
|
|
|
|
|
+ img_shape,
|
|
|
|
|
+ algo_params.long_edge_corner_length,
|
|
|
|
|
+ algo_params.short_edge_corner_length,
|
|
|
|
|
+ algo_params.edge_width
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ defects_dict = {k.value: v.to_dict() for k, v in defects.items()}
|
|
|
|
|
+
|
|
|
|
|
+ if algo_params.save_intermediate_results:
|
|
|
|
|
+ self._save_intermediate_result(
|
|
|
|
|
+ 5 + algo_params.iteration_rounds, "最终缺陷结果",
|
|
|
|
|
+ {
|
|
|
|
|
+ "defects": defects_dict,
|
|
|
|
|
+ "fitted_quad": fitted_quad.tolist(),
|
|
|
|
|
+ "inner_points": inner_points.tolist(),
|
|
|
|
|
+ "contour": contour_points.tolist()
|
|
|
|
|
+ },
|
|
|
|
|
+ output_dir, image, algo_params.save_overlay_images,
|
|
|
|
|
+ inner_points
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 保存最终结果
|
|
|
|
|
+ result_path = output_dir / "defect_detection_result.json"
|
|
|
|
|
+ final_result = {
|
|
|
|
|
+ "defects": defects_dict,
|
|
|
|
|
+ "fitted_quad": fitted_quad.tolist(),
|
|
|
|
|
+ "inner_points": inner_points.tolist()
|
|
|
|
|
+ }
|
|
|
|
|
+ with open(result_path, 'w', encoding='utf-8') as f:
|
|
|
|
|
+ json.dump(final_result, f, ensure_ascii=False, indent=2)
|
|
|
|
|
+
|
|
|
|
|
+ if algo_params.debug_level in ['normal', 'detail']:
|
|
|
|
|
+ fry_algo_print("成功", f"缺陷检测完成,结果已保存到: {result_path}")
|
|
|
|
|
+
|
|
|
|
|
+ data_dict["defects"] = defects_dict
|
|
|
|
|
+ data_dict["fitted_quad"] = fitted_quad.tolist()
|
|
|
|
|
+ data_dict["inner_points"] = inner_points.tolist()
|
|
|
|
|
+
|
|
|
|
|
+ return data_dict
|
|
|
|
|
+
|
|
|
|
|
+ def process_batch_group_entrance(
|
|
|
|
|
+ self,
|
|
|
|
|
+ input_dir_str: str,
|
|
|
|
|
+ output_dir_str: str,
|
|
|
|
|
+ algo_params: CardDefectDetectionParams
|
|
|
|
|
+ ) -> None:
|
|
|
|
|
+ """批量处理多个JSON文件"""
|
|
|
|
|
+ input_dir = Path(input_dir_str)
|
|
|
|
|
+ output_dir = Path(output_dir_str)
|
|
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ json_files = list(input_dir.rglob("*.json"))
|
|
|
|
|
+
|
|
|
|
|
+ if not json_files:
|
|
|
|
|
+ fry_algo_print("警告", f"未在{input_dir}中找到JSON文件")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ fry_algo_print("信息", f"找到{len(json_files)}个JSON文件")
|
|
|
|
|
+
|
|
|
|
|
+ for json_file in json_files:
|
|
|
|
|
+ fry_algo_print("组开始", f"处理文件: {json_file.name}")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ json_data = self.load_json_data(json_file)
|
|
|
|
|
+
|
|
|
|
|
+ image_stem = json_file.stem
|
|
|
|
|
+ image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
|
|
|
|
|
+ image_path = None
|
|
|
|
|
+
|
|
|
|
|
+ for ext in image_extensions:
|
|
|
|
|
+ potential_path = json_file.parent / f"{image_stem}{ext}"
|
|
|
|
|
+ if potential_path.exists():
|
|
|
|
|
+ image_path = potential_path
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ image_width = json_data.get("imageWidth", 2048)
|
|
|
|
|
+ image_height = json_data.get("imageHeight", 2048)
|
|
|
|
|
+
|
|
|
|
|
+ if image_path:
|
|
|
|
|
+ image = self.load_or_create_image(image_path, image_width, image_height)
|
|
|
|
|
+ else:
|
|
|
|
|
+ raise ValueError("警告", f"未找到对应的图像文件")
|
|
|
|
|
+
|
|
|
|
|
+ relative_path = json_file.relative_to(input_dir)
|
|
|
|
|
+ file_output_dir = output_dir / relative_path.parent / json_file.stem
|
|
|
|
|
+ file_output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ data_dict = {
|
|
|
|
|
+ "image": image,
|
|
|
|
|
+ "label": json_data,
|
|
|
|
|
+ "output_dir": str(file_output_dir),
|
|
|
|
|
+ "answer_json": {}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ result = self.process_single_group_entrance(data_dict, algo_params)
|
|
|
|
|
+
|
|
|
|
|
+ fry_algo_print("成功", f"文件{json_file.name}处理完成")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ fry_algo_print("错误", f"处理文件{json_file.name}时出错: {str(e)}")
|
|
|
|
|
+ import traceback
|
|
|
|
|
+ traceback.print_exc()
|
|
|
|
|
+
|
|
|
|
|
+ fry_algo_print("组结束", f"文件{json_file.name}处理结束")
|
|
|
|
|
+
|
|
|
|
|
+ fry_algo_print("成功", "批量处理完成")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _test_card_defect_detection_real():
|
|
|
|
|
+ """测试真实数据"""
|
|
|
|
|
+ temp_dir = Path("_test_real").resolve()
|
|
|
|
|
+ input_dir = temp_dir / "input"
|
|
|
|
|
+ output_dir = temp_dir / "output"
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ algo = CardDefectDetectionAlgo()
|
|
|
|
|
+
|
|
|
|
|
+ params = CardDefectDetectionParams(
|
|
|
|
|
+ # debug_level="detail",
|
|
|
|
|
+ debug_level="no",
|
|
|
|
|
+ save_intermediate_results=True,
|
|
|
|
|
+ save_overlay_images=True,
|
|
|
|
|
+ label_name="outer_box",
|
|
|
|
|
+ long_edge_corner_length=200,
|
|
|
|
|
+ short_edge_corner_length=200,
|
|
|
|
|
+ edge_width=50,
|
|
|
|
|
+ iteration_rounds=3,
|
|
|
|
|
+ ransac_residual_threshold=5.0
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ algo.process_batch_group_entrance(
|
|
|
|
|
+ str(input_dir),
|
|
|
|
|
+ str(output_dir),
|
|
|
|
|
+ params
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ fry_algo_print("成功", f"测试完成,结果保存在: {output_dir}")
|
|
|
|
|
+
|
|
|
|
|
+ result_files = list(output_dir.rglob("*"))
|
|
|
|
|
+ for result_file in result_files:
|
|
|
|
|
+ if result_file.is_file():
|
|
|
|
|
+ fry_algo_print("信息", f"生成文件: {result_file}")
|
|
|
|
|
+
|
|
|
|
|
+ finally:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _test_one_img():
|
|
|
|
|
+ algo = CardDefectDetectionAlgo()
|
|
|
|
|
+
|
|
|
|
|
+ image_path = r"C:\Code\ML\Image\Card\_250915_many_capture_img\_250915_1743_reflect_nature_defrct\1_front_coaxial_1_0.jpg"
|
|
|
|
|
+ json_path = r"C:\Code\ML\Project\卡片缺陷检测项目组\_250903_1644_边角不直脚本优化\_test_real\input\1_front_coaxial_1_0.json"
|
|
|
|
|
+
|
|
|
|
|
+ image = algo.load_or_create_image(image_path)
|
|
|
|
|
+ json_data = algo.load_json_data(json_path)
|
|
|
|
|
+ temp_out_dir = r"temp_out_dir"
|
|
|
|
|
+
|
|
|
|
|
+ data_dict = {
|
|
|
|
|
+ "image": image,
|
|
|
|
|
+ "label": json_data,
|
|
|
|
|
+ "output_dir": str(temp_out_dir),
|
|
|
|
|
+ "answer_json": {}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ algo_params = CardDefectDetectionParams(
|
|
|
|
|
+ # debug_level="detail",
|
|
|
|
|
+ debug_level="no",
|
|
|
|
|
+ save_intermediate_results=True,
|
|
|
|
|
+ save_overlay_images=True,
|
|
|
|
|
+ label_name="outer_box",
|
|
|
|
|
+ long_edge_corner_length=200,
|
|
|
|
|
+ short_edge_corner_length=200,
|
|
|
|
|
+ edge_width=50,
|
|
|
|
|
+ iteration_rounds=3,
|
|
|
|
|
+ ransac_residual_threshold=5.0
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ result = algo.process_single_group_entrance(data_dict, algo_params)
|
|
|
|
|
+ print(result)
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ # _test_card_defect_detection_real()
|
|
|
|
|
+ _test_one_img()
|