| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- import os
- import cv2
- import numpy as np
- import json
- from pathlib import Path
- from dataclasses import dataclass, field
- from typing import Dict, List, Tuple, Optional, Any
- from enum import Enum
- from datetime import datetime
- from app.core.logger import get_logger
- logger = get_logger(__name__)
- def fry_algo_print(level_str: str, info_str: str):
- """通用日志打印函数"""
- logger.info(f"[{level_str.upper()}] : {info_str}")
- def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR):
- """支持中文路径的 imread"""
- try:
- with open(filename, 'rb') as f:
- chunk = f.read()
- chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
- img = cv2.imdecode(chunk_arr, flags)
- if img is None:
- fry_algo_print("警告", f"Warning: Unable to decode image: {filename}")
- return img
- except IOError as e:
- fry_algo_print("错误", f"IOError: Unable to read file: {filename}")
- return None
- def fry_cv2_imwrite(filename, img, params=None):
- """支持中文路径的 imwrite"""
- try:
- ext = os.path.splitext(filename)[1].lower()
- result, encoded_img = cv2.imencode(ext, img, params)
- if result:
- with open(filename, 'wb') as f:
- encoded_img.tofile(f)
- return True
- else:
- fry_algo_print("警告", f"Warning: Unable to encode image: {filename}")
- return False
- except Exception as e:
- fry_algo_print("错误", f"Error: Unable to write file: {filename}")
- return False
- # 初始化OpenCV以支持中文路径
- cv2.imread = fry_cv2_imread
- cv2.imwrite = fry_cv2_imwrite
- @dataclass
- class FryAlgoParamsBase:
- """算法参数基类"""
- debug_level: str = "no" # 可选: "no", "normal", "detail"
- class CenterMode(Enum):
- """居中模式枚举"""
- CENTROID = "centroid"
- BOUNDING_RECT = "bounding_rect"
- class FillMode(Enum):
- """填充模式枚举"""
- BLACK = "black"
- WHITE = "white"
- REPLICATE = "replicate"
- class EdgeType(Enum):
- """边类型枚举"""
- TOP = "上边"
- BOTTOM = "下边"
- LEFT = "左边"
- RIGHT = "右边"
- @dataclass
- class ContourInfo:
- """轮廓信息数据类"""
- contour: np.ndarray
- area: float
- centroid: Tuple[int, int]
- bounding_rect: Tuple[int, int, int, int]
- @dataclass
- class FryCardProcessParams(FryAlgoParamsBase):
- """
- 卡片处理(转正+居中)的统一参数类
- """
- # --- 转正相关参数 ---
- label_name: str = "outer_box" # 在JSON中要查找的形状标签
- corner_distance_ratio: float = 0.15 # 定义角点区域占边长的比例
- # --- 居中相关参数 ---
- center_mode: CenterMode = CenterMode.BOUNDING_RECT # 居中模式
- fill_mode: FillMode = FillMode.BLACK # 图像平移后的背景填充模式
- fill_color: Tuple[int, int, int] = (0, 0, 0) # 自定义填充颜色 (BGR)
- class FryCardProcessor:
- """
- 整合了卡片转正与居中功能的处理器
- """
- def __init__(self):
- pass
- def process_image_with_json(
- self,
- image: np.ndarray,
- seg_json: Dict,
- params: FryCardProcessParams
- ) -> Optional[np.ndarray]:
- """
- (V2: 简化版,合并旋转与居中)
- 使用预先计算的分割JSON对单个图像进行转正和居中处理,并保持原图大小。
- """
- if params.debug_level in ['normal', 'detail']:
- fry_algo_print("信息", "开始处理图像...")
- # ==================== 步骤 1: 从JSON提取轮廓 ====================
- contour_points = self._extract_largest_contour(seg_json.get("shapes", []), params.label_name)
- if contour_points is None:
- fry_algo_print("错误", f"在JSON中未找到标签为 '{params.label_name}' 的轮廓。")
- return None
- # ==================== 步骤 2: 拟合最小外接矩形 ====================
- # 注意:这里我们直接在原始轮廓点上操作,插值对于minAreaRect影响不大
- rect = cv2.minAreaRect(contour_points.astype(np.float32))
- fitted_quad = np.intp(cv2.boxPoints(rect))
- if params.debug_level == 'detail':
- fry_algo_print("信息", "成功拟合卡片的最小外接矩形。")
- # ==================== 步骤 3: 图像转正与居中 (一步完成) ====================
- final_image = self._rectify_and_center_image(image, fitted_quad)
- if final_image is None:
- fry_algo_print("错误", "图像转正与居中失败。")
- return None
- if params.debug_level in ['normal', 'detail']:
- fry_algo_print("成功", f"处理完成。输入尺寸: {image.shape[:2]}, 输出尺寸: {final_image.shape[:2]}")
- return final_image
- # ------------------- 辅助方法: 轮廓处理 -------------------
- @staticmethod
- def _extract_largest_contour(shapes: List[Dict], label_name: str) -> Optional[np.ndarray]:
- target_shapes = [s for s in shapes if s.get("label") == label_name and s.get("points")]
- if not target_shapes:
- return None
- max_shape = max(
- target_shapes,
- key=lambda s: cv2.contourArea(np.array(s["points"], dtype=np.float32))
- )
- return np.array(max_shape["points"], dtype=np.int32)
- @staticmethod
- def _interpolate_contour_points(contour: np.ndarray, num_points_per_edge=50) -> np.ndarray:
- if len(contour) < 2:
- return contour
- interpolated_points = []
- for i in range(len(contour)):
- p1 = contour[i]
- p2 = contour[(i + 1) % len(contour)]
- x_vals = np.linspace(p1[0], p2[0], num_points_per_edge)
- y_vals = np.linspace(p1[1], p2[1], num_points_per_edge)
- interpolated_points.extend(np.vstack((x_vals, y_vals)).T)
- return np.array(interpolated_points, dtype=np.int32)
- @staticmethod
- def _rectify_and_center_image(image: np.ndarray, quad: np.ndarray) -> Optional[np.ndarray]:
- """
- 一步完成图像的旋转矫正和居中,并保持原始图像尺寸。
- Args:
- image (np.ndarray): 原始图像.
- quad (np.ndarray): 卡片的四个角点.
- Returns:
- Optional[np.ndarray]: 处理后的图像.
- """
- # 获取原始图像的尺寸,这将是我们最终输出的尺寸
- h_img, w_img = image.shape[:2]
- # 1. 计算最小外接矩形以获取旋转信息
- rect = cv2.minAreaRect(quad.astype(np.float32))
- center, (width, height), angle = rect
- # 2. 标准化角度
- # OpenCV的minAreaRect返回的角度在[-90, 0)之间。
- # 让图像保持竖着放
- if width > height:
- angle -= 90
- fry_algo_print("DETAIL", f"检测到旋转角度: {angle:.2f}°")
- # 3. 计算旋转矩阵 (围绕卡片中心)
- M = cv2.getRotationMatrix2D(center, angle, 1.0)
- # 4. 将平移变换合并到旋转矩阵中
- # 我们希望卡片的中心点 center = (cx, cy) 被移动到图像的中心点 (w_img/2, h_img/2)
- # 因此,平移量为 (w_img/2 - cx, h_img/2 - cy)
- M[0, 2] += (w_img / 2) - center[0]
- M[1, 2] += (h_img / 2) - center[1]
- # 5. 应用合并后的仿射变换
- # 输出尺寸 dsize 设置为原始图像尺寸 (w_img, h_img)
- final_image = cv2.warpAffine(
- image,
- M,
- (w_img, h_img), # <-- 核心:使用原始图像尺寸
- flags=cv2.INTER_CUBIC,
- borderMode=cv2.BORDER_CONSTANT,
- borderValue=(0, 0, 0) # 默认使用黑色填充
- )
- return final_image
- # ------------------- 辅助方法: 居中 (Centering) -------------------
- @staticmethod
- def _analyze_contours(contours: List[np.ndarray]) -> Dict[int, ContourInfo]:
- info_dict = {}
- for idx, contour in enumerate(contours):
- area = cv2.contourArea(contour)
- if area < 100: continue
- M = cv2.moments(contour)
- cx = int(M["m10"] / M["m00"]) if M["m00"] != 0 else 0
- cy = int(M["m01"] / M["m00"]) if M["m00"] != 0 else 0
- x, y, w, h = cv2.boundingRect(contour)
- info_dict[idx] = ContourInfo(contour, area, (cx, cy), (x, y, w, h))
- return info_dict
- @staticmethod
- def _calculate_offset(contour_info: ContourInfo, image_shape: Tuple[int, int], params: FryCardProcessParams) -> \
- Tuple[int, int]:
- h, w = image_shape[:2]
- if params.center_mode == CenterMode.CENTROID:
- cx, cy = contour_info.centroid
- return w // 2 - cx, h // 2 - cy
- elif params.center_mode == CenterMode.BOUNDING_RECT:
- x, y, rect_w, rect_h = contour_info.bounding_rect
- rect_cx = x + rect_w // 2
- rect_cy = y + rect_h // 2
- return w // 2 - rect_cx, h // 2 - rect_cy
- return 0, 0
- @staticmethod
- def _apply_translation(image: np.ndarray, offset: Tuple[int, int], params: FryCardProcessParams) -> np.ndarray:
- h, w = image.shape[:2]
- offset_x, offset_y = offset
- M = np.float32([[1, 0, offset_x], [0, 1, offset_y]])
- if params.fill_mode == FillMode.WHITE:
- borderValue = (255, 255, 255)
- elif params.fill_mode == FillMode.BLACK:
- borderValue = (0, 0, 0)
- else: # REPLICATE 或其他
- borderValue = params.fill_color
- borderMode = cv2.BORDER_REPLICATE if params.fill_mode == FillMode.REPLICATE else cv2.BORDER_CONSTANT
- return cv2.warpAffine(image, M, (w, h), borderMode=borderMode, borderValue=borderValue)
- # ------------------- RANSAC 拟合静态方法集 -------------------
- @staticmethod
- def _classify_contour_points(contour: np.ndarray, rect_corners: np.ndarray, corner_ratio: float) -> Dict[str, List]:
- classified = {et.value: [] for et in EdgeType}
- edges = [(rect_corners[i], rect_corners[(i + 1) % 4]) for i in range(4)]
- edge_lengths = [np.linalg.norm(e[1] - e[0]) for e in edges]
- for point in contour:
- min_dist = float('inf')
- min_type = None
- edge_types = [EdgeType.TOP, EdgeType.RIGHT, EdgeType.BOTTOM, EdgeType.LEFT]
- for i, (p1, p2) in enumerate(edges):
- dist_to_seg = FryCardProcessor._point_to_segment_distance(point, p1, p2)
- if dist_to_seg < min_dist:
- edge_vec = p2 - p1
- edge_len = edge_lengths[i]
- proj_len = np.dot(point - p1, edge_vec) / edge_len if edge_len > 0 else 0
- if corner_ratio * edge_len < proj_len < (1 - corner_ratio) * edge_len:
- min_dist = dist_to_seg
- min_type = edge_types[i].value
- if min_type:
- classified[min_type].append(point)
- return classified
- @staticmethod
- def _point_to_segment_distance(point: np.ndarray, p1: np.ndarray, p2: np.ndarray) -> float:
- line_vec = p2 - p1
- p_vec = point - p1
- line_len_sq = np.dot(line_vec, line_vec)
- if line_len_sq == 0:
- return np.linalg.norm(p_vec)
- t = np.dot(p_vec, line_vec) / line_len_sq
- t = max(0, min(1, t))
- proj_point = p1 + t * line_vec
- return np.linalg.norm(point - proj_point)
- @staticmethod
- def _fit_lines_with_ransac(classified_points: Dict[str, List], threshold: float) -> Dict[str, Tuple]:
- lines = {}
- for edge_name, points in classified_points.items():
- if len(points) < 2: continue
- points_np = np.array(points, dtype=np.float32)
- vx, vy, x0, y0 = cv2.fitLine(points_np, cv2.DIST_L2, 0, 0.01, 0.01)
- a, b = -vy[0], vx[0]
- c = -(a * x0[0] + b * y0[0])
- norm = np.sqrt(a ** 2 + b ** 2)
- lines[edge_name] = (a / norm, b / norm, c / norm)
- return lines
- @staticmethod
- def _get_quadrilateral_from_lines(lines: Dict[str, Tuple]) -> Optional[np.ndarray]:
- def line_intersection(line1, line2):
- a1, b1, c1 = line1
- a2, b2, c2 = line2
- det = a1 * b2 - a2 * b1
- if abs(det) < 1e-10: return None
- x = (b1 * c2 - b2 * c1) / det
- y = (a2 * c1 - a1 * c2) / det
- return [x, y]
- edge_map = {
- "tl": (EdgeType.LEFT.value, EdgeType.TOP.value),
- "tr": (EdgeType.RIGHT.value, EdgeType.TOP.value),
- "br": (EdgeType.RIGHT.value, EdgeType.BOTTOM.value),
- "bl": (EdgeType.LEFT.value, EdgeType.BOTTOM.value),
- }
- corners = {}
- for corner, (edge1_name, edge2_name) in edge_map.items():
- if edge1_name not in lines or edge2_name not in lines: return None
- pt = line_intersection(lines[edge1_name], lines[edge2_name])
- if pt is None: return None
- corners[corner] = pt
- return np.array([corners["tl"], corners["tr"], corners["br"], corners["bl"]], dtype=np.float32)
- def main():
- """主函数,用于演示如何使用 FryCardProcessor"""
- # 创建输出目录
- output_dir = Path("./test_output") / datetime.now().strftime("%Y%m%d_%H%M%S")
- output_dir.mkdir(parents=True, exist_ok=True)
- fry_algo_print("信息", f"所有输出将保存在: {output_dir.resolve()}")
- # 自动生成测试数据
- image_path = r"C:\Code\ML\Image\Card\_250917_1157_pokemon_no flecct01\48_front_0_1.jpg.jpg"
- json_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\temp\outer\48_front_0_1.jpg.json"
- # 1. 设置处理参数
- params = FryCardProcessParams(
- debug_level="detail",
- label_name="outer_box",
- center_mode=CenterMode.BOUNDING_RECT,
- fill_mode=FillMode.BLACK
- )
- # 2. 初始化处理器
- processor = FryCardProcessor()
- # 3. 读取图像和JSON
- image = cv2.imread(str(image_path))
- with open(json_path, 'r', encoding='utf-8') as f:
- seg_json = json.load(f)
- if image is None:
- fry_algo_print("错误", "无法读取图像文件。")
- return
- # 4. 执行处理
- final_image = processor.process_image_with_json(image, seg_json, params)
- # 5. 保存结果
- if final_image is not None:
- result_path = output_dir / "final_processed_card.jpg"
- cv2.imwrite(str(result_path), final_image)
- fry_algo_print("成功", f"处理完成!最终图像已保存至: {result_path.resolve()}")
- else:
- fry_algo_print("失败", "处理过程中发生错误,未生成最终图像。")
- if __name__ == "__main__":
- main()
|