|
|
@@ -0,0 +1,415 @@
|
|
|
+import os
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+import json
|
|
|
+from pathlib import Path
|
|
|
+from dataclasses import dataclass, field
|
|
|
+from typing import Dict, List, Tuple, Optional, Any
|
|
|
+from enum import Enum
|
|
|
+from datetime import datetime
|
|
|
+from app.core.logger import get_logger
|
|
|
+
|
|
|
+logger = get_logger(__name__)
|
|
|
+
|
|
|
+def fry_algo_print(level_str: str, info_str: str):
|
|
|
+ """通用日志打印函数"""
|
|
|
+ logger.info(f"[{level_str.upper()}] : {info_str}")
|
|
|
+
|
|
|
+
|
|
|
+def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR):
|
|
|
+ """支持中文路径的 imread"""
|
|
|
+ try:
|
|
|
+ with open(filename, 'rb') as f:
|
|
|
+ chunk = f.read()
|
|
|
+ chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
|
|
|
+ img = cv2.imdecode(chunk_arr, flags)
|
|
|
+ if img is None:
|
|
|
+ fry_algo_print("警告", f"Warning: Unable to decode image: {filename}")
|
|
|
+ return img
|
|
|
+ except IOError as e:
|
|
|
+ fry_algo_print("错误", f"IOError: Unable to read file: {filename}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def fry_cv2_imwrite(filename, img, params=None):
|
|
|
+ """支持中文路径的 imwrite"""
|
|
|
+ try:
|
|
|
+ ext = os.path.splitext(filename)[1].lower()
|
|
|
+ result, encoded_img = cv2.imencode(ext, img, params)
|
|
|
+ if result:
|
|
|
+ with open(filename, 'wb') as f:
|
|
|
+ encoded_img.tofile(f)
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ fry_algo_print("警告", f"Warning: Unable to encode image: {filename}")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ fry_algo_print("错误", f"Error: Unable to write file: {filename}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+# 初始化OpenCV以支持中文路径
|
|
|
+cv2.imread = fry_cv2_imread
|
|
|
+cv2.imwrite = fry_cv2_imwrite
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class FryAlgoParamsBase:
|
|
|
+ """算法参数基类"""
|
|
|
+ debug_level: str = "no" # 可选: "no", "normal", "detail"
|
|
|
+
|
|
|
+
|
|
|
+class CenterMode(Enum):
|
|
|
+ """居中模式枚举"""
|
|
|
+ CENTROID = "centroid"
|
|
|
+ BOUNDING_RECT = "bounding_rect"
|
|
|
+
|
|
|
+
|
|
|
+class FillMode(Enum):
|
|
|
+ """填充模式枚举"""
|
|
|
+ BLACK = "black"
|
|
|
+ WHITE = "white"
|
|
|
+ REPLICATE = "replicate"
|
|
|
+
|
|
|
+
|
|
|
+class EdgeType(Enum):
|
|
|
+ """边类型枚举"""
|
|
|
+ TOP = "上边"
|
|
|
+ BOTTOM = "下边"
|
|
|
+ LEFT = "左边"
|
|
|
+ RIGHT = "右边"
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class ContourInfo:
|
|
|
+ """轮廓信息数据类"""
|
|
|
+ contour: np.ndarray
|
|
|
+ area: float
|
|
|
+ centroid: Tuple[int, int]
|
|
|
+ bounding_rect: Tuple[int, int, int, int]
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class FryCardProcessParams(FryAlgoParamsBase):
|
|
|
+ """
|
|
|
+ 卡片处理(转正+居中)的统一参数类
|
|
|
+ """
|
|
|
+ # --- 转正相关参数 ---
|
|
|
+ label_name: str = "outer_box" # 在JSON中要查找的形状标签
|
|
|
+ corner_distance_ratio: float = 0.15 # 定义角点区域占边长的比例
|
|
|
+
|
|
|
+ # --- 居中相关参数 ---
|
|
|
+ center_mode: CenterMode = CenterMode.BOUNDING_RECT # 居中模式
|
|
|
+ fill_mode: FillMode = FillMode.BLACK # 图像平移后的背景填充模式
|
|
|
+ fill_color: Tuple[int, int, int] = (0, 0, 0) # 自定义填充颜色 (BGR)
|
|
|
+
|
|
|
+
|
|
|
+class FryCardProcessor:
|
|
|
+ """
|
|
|
+ 整合了卡片转正与居中功能的处理器
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def process_image_with_json(
|
|
|
+ self,
|
|
|
+ image: np.ndarray,
|
|
|
+ seg_json: Dict,
|
|
|
+ params: FryCardProcessParams
|
|
|
+ ) -> Optional[np.ndarray]:
|
|
|
+ """
|
|
|
+ (V2: 简化版,合并旋转与居中)
|
|
|
+ 使用预先计算的分割JSON对单个图像进行转正和居中处理,并保持原图大小。
|
|
|
+ """
|
|
|
+ if params.debug_level in ['normal', 'detail']:
|
|
|
+ fry_algo_print("信息", "开始处理图像...")
|
|
|
+
|
|
|
+ # ==================== 步骤 1: 从JSON提取轮廓 ====================
|
|
|
+ contour_points = self._extract_largest_contour(seg_json.get("shapes", []), params.label_name)
|
|
|
+ if contour_points is None:
|
|
|
+ fry_algo_print("错误", f"在JSON中未找到标签为 '{params.label_name}' 的轮廓。")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # ==================== 步骤 2: 拟合最小外接矩形 ====================
|
|
|
+ # 注意:这里我们直接在原始轮廓点上操作,插值对于minAreaRect影响不大
|
|
|
+ rect = cv2.minAreaRect(contour_points.astype(np.float32))
|
|
|
+ fitted_quad = np.intp(cv2.boxPoints(rect))
|
|
|
+
|
|
|
+ if params.debug_level == 'detail':
|
|
|
+ fry_algo_print("信息", "成功拟合卡片的最小外接矩形。")
|
|
|
+
|
|
|
+ # ==================== 步骤 3: 图像转正与居中 (一步完成) ====================
|
|
|
+ final_image = self._rectify_and_center_image(image, fitted_quad)
|
|
|
+
|
|
|
+ if final_image is None:
|
|
|
+ fry_algo_print("错误", "图像转正与居中失败。")
|
|
|
+ return None
|
|
|
+
|
|
|
+ if params.debug_level in ['normal', 'detail']:
|
|
|
+ fry_algo_print("成功", f"处理完成。输入尺寸: {image.shape[:2]}, 输出尺寸: {final_image.shape[:2]}")
|
|
|
+
|
|
|
+ return final_image
|
|
|
+
|
|
|
+ # ------------------- 辅助方法: 轮廓处理 -------------------
|
|
|
+ @staticmethod
|
|
|
+ def _extract_largest_contour(shapes: List[Dict], label_name: str) -> Optional[np.ndarray]:
|
|
|
+ target_shapes = [s for s in shapes if s.get("label") == label_name and s.get("points")]
|
|
|
+ if not target_shapes:
|
|
|
+ return None
|
|
|
+
|
|
|
+ max_shape = max(
|
|
|
+ target_shapes,
|
|
|
+ key=lambda s: cv2.contourArea(np.array(s["points"], dtype=np.float32))
|
|
|
+ )
|
|
|
+ return np.array(max_shape["points"], dtype=np.int32)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _interpolate_contour_points(contour: np.ndarray, num_points_per_edge=50) -> np.ndarray:
|
|
|
+ if len(contour) < 2:
|
|
|
+ return contour
|
|
|
+
|
|
|
+ interpolated_points = []
|
|
|
+ for i in range(len(contour)):
|
|
|
+ p1 = contour[i]
|
|
|
+ p2 = contour[(i + 1) % len(contour)]
|
|
|
+
|
|
|
+ x_vals = np.linspace(p1[0], p2[0], num_points_per_edge)
|
|
|
+ y_vals = np.linspace(p1[1], p2[1], num_points_per_edge)
|
|
|
+
|
|
|
+ interpolated_points.extend(np.vstack((x_vals, y_vals)).T)
|
|
|
+
|
|
|
+ return np.array(interpolated_points, dtype=np.int32)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _rectify_and_center_image(image: np.ndarray, quad: np.ndarray) -> Optional[np.ndarray]:
|
|
|
+ """
|
|
|
+ 一步完成图像的旋转矫正和居中,并保持原始图像尺寸。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ image (np.ndarray): 原始图像.
|
|
|
+ quad (np.ndarray): 卡片的四个角点.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Optional[np.ndarray]: 处理后的图像.
|
|
|
+ """
|
|
|
+ # 获取原始图像的尺寸,这将是我们最终输出的尺寸
|
|
|
+ h_img, w_img = image.shape[:2]
|
|
|
+
|
|
|
+ # 1. 计算最小外接矩形以获取旋转信息
|
|
|
+ rect = cv2.minAreaRect(quad.astype(np.float32))
|
|
|
+ center, (width, height), angle = rect
|
|
|
+
|
|
|
+ # 2. 标准化角度
|
|
|
+ # OpenCV的minAreaRect返回的角度在[-90, 0)之间。
|
|
|
+ # 让图像保持竖着放
|
|
|
+ if width > height:
|
|
|
+ angle -= 90
|
|
|
+
|
|
|
+ fry_algo_print("DETAIL", f"检测到旋转角度: {angle:.2f}°")
|
|
|
+
|
|
|
+ # 3. 计算旋转矩阵 (围绕卡片中心)
|
|
|
+ M = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
|
+
|
|
|
+ # 4. 将平移变换合并到旋转矩阵中
|
|
|
+ # 我们希望卡片的中心点 center = (cx, cy) 被移动到图像的中心点 (w_img/2, h_img/2)
|
|
|
+ # 因此,平移量为 (w_img/2 - cx, h_img/2 - cy)
|
|
|
+ M[0, 2] += (w_img / 2) - center[0]
|
|
|
+ M[1, 2] += (h_img / 2) - center[1]
|
|
|
+
|
|
|
+ # 5. 应用合并后的仿射变换
|
|
|
+ # 输出尺寸 dsize 设置为原始图像尺寸 (w_img, h_img)
|
|
|
+ final_image = cv2.warpAffine(
|
|
|
+ image,
|
|
|
+ M,
|
|
|
+ (w_img, h_img), # <-- 核心:使用原始图像尺寸
|
|
|
+ flags=cv2.INTER_CUBIC,
|
|
|
+ borderMode=cv2.BORDER_CONSTANT,
|
|
|
+ borderValue=(0, 0, 0) # 默认使用黑色填充
|
|
|
+ )
|
|
|
+
|
|
|
+ return final_image
|
|
|
+
|
|
|
+ # ------------------- 辅助方法: 居中 (Centering) -------------------
|
|
|
+ @staticmethod
|
|
|
+ def _analyze_contours(contours: List[np.ndarray]) -> Dict[int, ContourInfo]:
|
|
|
+ info_dict = {}
|
|
|
+ for idx, contour in enumerate(contours):
|
|
|
+ area = cv2.contourArea(contour)
|
|
|
+ if area < 100: continue
|
|
|
+
|
|
|
+ M = cv2.moments(contour)
|
|
|
+ cx = int(M["m10"] / M["m00"]) if M["m00"] != 0 else 0
|
|
|
+ cy = int(M["m01"] / M["m00"]) if M["m00"] != 0 else 0
|
|
|
+
|
|
|
+ x, y, w, h = cv2.boundingRect(contour)
|
|
|
+
|
|
|
+ info_dict[idx] = ContourInfo(contour, area, (cx, cy), (x, y, w, h))
|
|
|
+ return info_dict
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _calculate_offset(contour_info: ContourInfo, image_shape: Tuple[int, int], params: FryCardProcessParams) -> \
|
|
|
+ Tuple[int, int]:
|
|
|
+ h, w = image_shape[:2]
|
|
|
+
|
|
|
+ if params.center_mode == CenterMode.CENTROID:
|
|
|
+ cx, cy = contour_info.centroid
|
|
|
+ return w // 2 - cx, h // 2 - cy
|
|
|
+
|
|
|
+ elif params.center_mode == CenterMode.BOUNDING_RECT:
|
|
|
+ x, y, rect_w, rect_h = contour_info.bounding_rect
|
|
|
+ rect_cx = x + rect_w // 2
|
|
|
+ rect_cy = y + rect_h // 2
|
|
|
+ return w // 2 - rect_cx, h // 2 - rect_cy
|
|
|
+
|
|
|
+ return 0, 0
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _apply_translation(image: np.ndarray, offset: Tuple[int, int], params: FryCardProcessParams) -> np.ndarray:
|
|
|
+ h, w = image.shape[:2]
|
|
|
+ offset_x, offset_y = offset
|
|
|
+
|
|
|
+ M = np.float32([[1, 0, offset_x], [0, 1, offset_y]])
|
|
|
+
|
|
|
+ if params.fill_mode == FillMode.WHITE:
|
|
|
+ borderValue = (255, 255, 255)
|
|
|
+ elif params.fill_mode == FillMode.BLACK:
|
|
|
+ borderValue = (0, 0, 0)
|
|
|
+ else: # REPLICATE 或其他
|
|
|
+ borderValue = params.fill_color
|
|
|
+
|
|
|
+ borderMode = cv2.BORDER_REPLICATE if params.fill_mode == FillMode.REPLICATE else cv2.BORDER_CONSTANT
|
|
|
+
|
|
|
+ return cv2.warpAffine(image, M, (w, h), borderMode=borderMode, borderValue=borderValue)
|
|
|
+
|
|
|
+ # ------------------- RANSAC 拟合静态方法集 -------------------
|
|
|
+ @staticmethod
|
|
|
+ def _classify_contour_points(contour: np.ndarray, rect_corners: np.ndarray, corner_ratio: float) -> Dict[str, List]:
|
|
|
+ classified = {et.value: [] for et in EdgeType}
|
|
|
+
|
|
|
+ edges = [(rect_corners[i], rect_corners[(i + 1) % 4]) for i in range(4)]
|
|
|
+ edge_lengths = [np.linalg.norm(e[1] - e[0]) for e in edges]
|
|
|
+
|
|
|
+ for point in contour:
|
|
|
+ min_dist = float('inf')
|
|
|
+ min_type = None
|
|
|
+
|
|
|
+ edge_types = [EdgeType.TOP, EdgeType.RIGHT, EdgeType.BOTTOM, EdgeType.LEFT]
|
|
|
+ for i, (p1, p2) in enumerate(edges):
|
|
|
+ dist_to_seg = FryCardProcessor._point_to_segment_distance(point, p1, p2)
|
|
|
+
|
|
|
+ if dist_to_seg < min_dist:
|
|
|
+ edge_vec = p2 - p1
|
|
|
+ edge_len = edge_lengths[i]
|
|
|
+ proj_len = np.dot(point - p1, edge_vec) / edge_len if edge_len > 0 else 0
|
|
|
+
|
|
|
+ if corner_ratio * edge_len < proj_len < (1 - corner_ratio) * edge_len:
|
|
|
+ min_dist = dist_to_seg
|
|
|
+ min_type = edge_types[i].value
|
|
|
+
|
|
|
+ if min_type:
|
|
|
+ classified[min_type].append(point)
|
|
|
+ return classified
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _point_to_segment_distance(point: np.ndarray, p1: np.ndarray, p2: np.ndarray) -> float:
|
|
|
+ line_vec = p2 - p1
|
|
|
+ p_vec = point - p1
|
|
|
+ line_len_sq = np.dot(line_vec, line_vec)
|
|
|
+ if line_len_sq == 0:
|
|
|
+ return np.linalg.norm(p_vec)
|
|
|
+
|
|
|
+ t = np.dot(p_vec, line_vec) / line_len_sq
|
|
|
+ t = max(0, min(1, t))
|
|
|
+
|
|
|
+ proj_point = p1 + t * line_vec
|
|
|
+ return np.linalg.norm(point - proj_point)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _fit_lines_with_ransac(classified_points: Dict[str, List], threshold: float) -> Dict[str, Tuple]:
|
|
|
+ lines = {}
|
|
|
+ for edge_name, points in classified_points.items():
|
|
|
+ if len(points) < 2: continue
|
|
|
+
|
|
|
+ points_np = np.array(points, dtype=np.float32)
|
|
|
+ vx, vy, x0, y0 = cv2.fitLine(points_np, cv2.DIST_L2, 0, 0.01, 0.01)
|
|
|
+
|
|
|
+ a, b = -vy[0], vx[0]
|
|
|
+ c = -(a * x0[0] + b * y0[0])
|
|
|
+ norm = np.sqrt(a ** 2 + b ** 2)
|
|
|
+ lines[edge_name] = (a / norm, b / norm, c / norm)
|
|
|
+ return lines
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _get_quadrilateral_from_lines(lines: Dict[str, Tuple]) -> Optional[np.ndarray]:
|
|
|
+ def line_intersection(line1, line2):
|
|
|
+ a1, b1, c1 = line1
|
|
|
+ a2, b2, c2 = line2
|
|
|
+ det = a1 * b2 - a2 * b1
|
|
|
+ if abs(det) < 1e-10: return None
|
|
|
+ x = (b1 * c2 - b2 * c1) / det
|
|
|
+ y = (a2 * c1 - a1 * c2) / det
|
|
|
+ return [x, y]
|
|
|
+
|
|
|
+ edge_map = {
|
|
|
+ "tl": (EdgeType.LEFT.value, EdgeType.TOP.value),
|
|
|
+ "tr": (EdgeType.RIGHT.value, EdgeType.TOP.value),
|
|
|
+ "br": (EdgeType.RIGHT.value, EdgeType.BOTTOM.value),
|
|
|
+ "bl": (EdgeType.LEFT.value, EdgeType.BOTTOM.value),
|
|
|
+ }
|
|
|
+
|
|
|
+ corners = {}
|
|
|
+ for corner, (edge1_name, edge2_name) in edge_map.items():
|
|
|
+ if edge1_name not in lines or edge2_name not in lines: return None
|
|
|
+ pt = line_intersection(lines[edge1_name], lines[edge2_name])
|
|
|
+ if pt is None: return None
|
|
|
+ corners[corner] = pt
|
|
|
+
|
|
|
+ return np.array([corners["tl"], corners["tr"], corners["br"], corners["bl"]], dtype=np.float32)
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ """主函数,用于演示如何使用 FryCardProcessor"""
|
|
|
+
|
|
|
+ # 创建输出目录
|
|
|
+ output_dir = Path("./test_output") / datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ fry_algo_print("信息", f"所有输出将保存在: {output_dir.resolve()}")
|
|
|
+
|
|
|
+ # 自动生成测试数据
|
|
|
+ image_path = r"C:\Code\ML\Image\Card\_250917_1157_pokemon_no flecct01\48_front_0_1.jpg.jpg"
|
|
|
+ json_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\temp\outer\48_front_0_1.jpg.json"
|
|
|
+
|
|
|
+ # 1. 设置处理参数
|
|
|
+ params = FryCardProcessParams(
|
|
|
+ debug_level="detail",
|
|
|
+ label_name="outer_box",
|
|
|
+ center_mode=CenterMode.BOUNDING_RECT,
|
|
|
+ fill_mode=FillMode.BLACK
|
|
|
+ )
|
|
|
+
|
|
|
+ # 2. 初始化处理器
|
|
|
+ processor = FryCardProcessor()
|
|
|
+
|
|
|
+ # 3. 读取图像和JSON
|
|
|
+ image = cv2.imread(str(image_path))
|
|
|
+ with open(json_path, 'r', encoding='utf-8') as f:
|
|
|
+ seg_json = json.load(f)
|
|
|
+
|
|
|
+ if image is None:
|
|
|
+ fry_algo_print("错误", "无法读取图像文件。")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 4. 执行处理
|
|
|
+ final_image = processor.process_image_with_json(image, seg_json, params)
|
|
|
+
|
|
|
+ # 5. 保存结果
|
|
|
+ if final_image is not None:
|
|
|
+ result_path = output_dir / "final_processed_card.jpg"
|
|
|
+ cv2.imwrite(str(result_path), final_image)
|
|
|
+ fry_algo_print("成功", f"处理完成!最终图像已保存至: {result_path.resolve()}")
|
|
|
+ else:
|
|
|
+ fry_algo_print("失败", "处理过程中发生错误,未生成最终图像。")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|