| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- import os
- import json
- import cv2
- import numpy as np
- import random
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Any, Tuple, Union
- from collections import defaultdict
- import logging
- logger = logging.getLogger('ClassifyEdgeCorner')
- def fry_algo_print(level_str: str, info_str: str):
- logger.info(f"[{level_str}] : {info_str}")
- def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR):
- try:
- with open(filename, 'rb') as f:
- chunk = f.read()
- chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
- img = cv2.imdecode(chunk_arr, flags)
- if img is None:
- fry_algo_print("警告", f"Warning: Unable to decode image: {filename}")
- return img
- except IOError as e:
- fry_algo_print("错误", f"IOError: Unable to read file: {filename}")
- fry_algo_print("错误", f"Error details: {str(e)}")
- return None
- def fry_cv2_imwrite(filename, img, params=None):
- try:
- ext = os.path.splitext(filename)[1].lower()
- result, encoded_img = cv2.imencode(ext, img, params)
- if result:
- with open(filename, 'wb') as f:
- encoded_img.tofile(f)
- return True
- else:
- fry_algo_print("警告", f"Warning: Unable to encode image: {filename}")
- return False
- except Exception as e:
- fry_algo_print("错误", f"Error: Unable to write file: {filename}")
- fry_algo_print("错误", f"Error details: {str(e)}")
- return False
- def fry_opencv_Chinese_path_init():
- cv2.imread = fry_cv2_imread
- cv2.imwrite = fry_cv2_imwrite
- OPENCV_IO_ALREADY_INIT = False
- if not OPENCV_IO_ALREADY_INIT:
- fry_opencv_Chinese_path_init()
- OPENCV_IO_ALREADY_INIT = True
- def to_json_serializable(obj):
- if isinstance(obj, (np.ndarray,)): return obj.tolist()
- if isinstance(obj, (np.integer,)): return int(obj)
- if isinstance(obj, (np.floating,)): return float(obj)
- if hasattr(obj, 'to_dict'): return obj.to_dict()
- try:
- return json.dumps(obj, indent=2)
- except TypeError:
- return str(obj)
- @dataclass
- class DefectInfo:
- """单个缺陷的详细信息"""
- label: str
- pixel_area: float
- actual_area: float # 平方毫米
- width: float # 毫米
- height: float # 毫米
- contour: List[List[int]]
- min_rect: Tuple[Tuple[float, float], Tuple[float, float], float] # 最小外接矩形
- def to_dict(self) -> Dict[str, Any]:
- return {
- "label": self.label,
- "pixel_area": self.pixel_area,
- "actual_area": self.actual_area,
- "width": self.width,
- "height": self.height,
- "points": self.contour,
- "min_rect": self.min_rect
- }
- @dataclass
- class AnalysisResult:
- """封装单次分析的所有结果,包括缺陷列表和统计信息"""
- defects: List[DefectInfo] = field(default_factory=list)
- total_defect_count: int = 0
- total_pixel_area: float = 0.0
- total_defect_area: float = 0.0 # 所有缺陷的总面积 (mm^2)
- area_by_label: Dict[str, float] = field(default_factory=lambda: defaultdict(float))
- count_by_label: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
- def to_dict(self) -> Dict[str, Any]:
- return {
- "defects": [d.to_dict() for d in self.defects],
- "statistics": {
- "total_defect_count": self.total_defect_count,
- "total_pixel_area": self.total_pixel_area,
- "total_defect_area_mm2": self.total_defect_area,
- "area_by_label_mm2": dict(self.area_by_label),
- "count_by_label": dict(self.count_by_label)
- }
- }
- @dataclass
- class DrawingParams:
- """封装所有绘图相关的参数"""
- draw_min_rect: bool = True
- font_scale: float = 0.5
- font_thickness: int = 1
- contour_thickness: int = 1
- rect_thickness: int = 1
- info_bg_alpha: float = 0.5
- contour_color: Tuple[int, int, int] = (0, 255, 0)
- rect_color: Tuple[int, int, int] = (255, 0, 0)
- text_color: Tuple[int, int, int] = (255, 255, 255)
- bg_color: Tuple[int, int, int] = (0, 0, 0)
- class DefectVisualizer:
- """一个专门用于在图像上可视化缺陷信息的类。"""
- def __init__(self, params: DrawingParams):
- self.params = params
- def draw_defects_on_image(self, image: np.ndarray, defects: List[DefectInfo]) -> np.ndarray:
- vis_image = image.copy()
- for i, defect in enumerate(defects):
- self._draw_single_defect(vis_image, defect, i)
- return vis_image
- def _draw_single_defect(self, image: np.ndarray, defect: DefectInfo, idx: int):
- contour = np.array(defect.contour, dtype=np.int32)
- cv2.drawContours(image, [contour], -1, self.params.contour_color, self.params.contour_thickness)
- if self.params.draw_min_rect:
- box = np.intp(cv2.boxPoints(defect.min_rect))
- cv2.drawContours(image, [box], 0, self.params.rect_color, self.params.rect_thickness)
- info_text = [
- f"L: {defect.label}",
- f"A: {defect.actual_area:.3f} mm2",
- f"W: {defect.width:.3f} mm", f"H: {defect.height:.3f} mm"
- ]
- M = cv2.moments(contour)
- cx = int(M["m10"] / M["m00"]) if M["m00"] != 0 else contour[0][0][0]
- cy = int(M["m01"] / M["m00"]) if M["m00"] != 0 else contour[0][0][1]
- cx += random.randint(-30, 10);
- cy += random.randint(-30, 10)
- text_size, _ = cv2.getTextSize(info_text[0], cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale,
- self.params.font_thickness)
- cx = max(10, min(cx, image.shape[1] - text_size[0] - 10))
- cy = max(text_size[1] * len(info_text) + 10, min(cy, image.shape[0] - 10))
- y_offset = cy - (text_size[1] + 10) * (len(info_text) - 1)
- for text in info_text:
- self._draw_text_with_background(image, text, (cx, y_offset))
- y_offset += text_size[1] + 10
- def _draw_text_with_background(self, image: np.ndarray, text: str, position: Tuple[int, int]):
- text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale,
- self.params.font_thickness)
- x, y = position
- overlay = image.copy()
- cv2.rectangle(overlay, (x - 2, y - text_size[1] - 2), (x + text_size[0] + 2, y + 2), self.params.bg_color, -1)
- cv2.addWeighted(overlay, self.params.info_bg_alpha, image, 1 - self.params.info_bg_alpha, 0, image)
- cv2.putText(image, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale, self.params.text_color,
- self.params.font_thickness, cv2.LINE_AA)
- class DefectProcessor:
- """
- 缺陷处理器,专注于单次分析和可视化任务。
- 此类是无状态的,不处理批处理,使其功能更专一、更易于测试和复用。
- """
- def __init__(self, pixel_resolution: float):
- """
- 初始化处理器。
- Args:
- pixel_resolution (float): 像素分辨率,单位是 μm/pixel。
- """
- if pixel_resolution <= 0:
- raise ValueError("Pixel resolution must be a positive number.")
- self.pixel_to_mm = pixel_resolution / 1000.0
- @staticmethod
- def _calculate_metrics(contour: np.ndarray, pixel_to_mm: float) -> Tuple[float, float, float, float, Any]:
- """静态辅助方法,计算单个轮廓的各项指标。"""
- pixel_area = cv2.contourArea(contour)
- actual_area = pixel_area * (pixel_to_mm ** 2)
- min_rect = cv2.minAreaRect(contour)
- # 确保宽度总是较小的一边
- width_pixels, height_pixels = sorted(min_rect[1])
- width, height = width_pixels * pixel_to_mm, height_pixels * pixel_to_mm
- return pixel_area, actual_area, width, height, min_rect
- def analyze_from_json(self, json_data: Dict[str, Any],
- is_return_obj=False) -> Union[dict, AnalysisResult]:
- """
- [需求 1] 仅根据JSON数据计算缺陷面积并统计,返回包含详细信息的JSON友好对象。
- Args:
- json_data (Dict[str, Any]): 从labelme JSON文件加载的字典数据。
- Returns:
- AnalysisResult: 包含所有缺陷信息和统计结果的数据对象。
- """
- result = AnalysisResult()
- if not json_data or 'shapes' not in json_data:
- if is_return_obj:
- return result
- result_json = to_json_serializable(result.to_dict())
- result_json = json.loads(result_json)
- return result_json
- for shape in json_data['shapes']:
- label = shape.get('label', 'unlabeled')
- points = shape.get('points')
- if not points:
- continue
- contour = np.array(points, dtype=np.int32)
- pixel_area, actual_area, width, height, min_rect = self._calculate_metrics(contour, self.pixel_to_mm)
- defect = DefectInfo(
- label=label,
- pixel_area=pixel_area,
- actual_area=actual_area,
- width=width,
- height=height,
- contour=contour.tolist(),
- min_rect=min_rect
- )
- result.defects.append(defect)
- # 更新统计信息
- result.total_defect_count += 1
- result.total_pixel_area += pixel_area
- result.total_defect_area += actual_area
- result.count_by_label[label] += 1
- result.area_by_label[label] += actual_area
- if is_return_obj:
- return result
- result_json = to_json_serializable(result.to_dict())
- result_json = json.loads(result_json)
- return result_json
- def analyze_and_draw(self, image: np.ndarray, json_data: Dict[str, Any], drawing_params: DrawingParams) -> Tuple[
- np.ndarray, dict]:
- """
- [需求 2] 输入图片和JSON数据,返回绘制好的图片和分析结果。
- Args:
- image (np.ndarray): OpenCV格式的BGR图像。
- json_data (Dict[str, Any]): 从labelme JSON文件加载的字典数据。
- drawing_params (DrawingParams): 控制绘图样式的参数对象。
- Returns:
- Tuple[np.ndarray, AnalysisResult]:
- - 绘制了缺陷信息的新图像。
- - 包含所有缺陷信息和统计结果的数据对象。
- """
- # 1. 首先,执行纯JSON分析以获取所有计算结果
- analysis_result = self.analyze_from_json(json_data, is_return_obj=True)
- # 2. 如果没有缺陷,直接返回原图和分析结果
- if not analysis_result.defects:
- result_json = to_json_serializable(analysis_result.to_dict())
- result_json = json.loads(result_json)
- return image, result_json
- # 3. 使用DefectVisualizer进行绘图
- visualizer = DefectVisualizer(drawing_params)
- drawn_image = visualizer.draw_defects_on_image(image, analysis_result.defects)
- result_json = to_json_serializable(analysis_result.to_dict())
- result_json = json.loads(result_json)
- return drawn_image, result_json
- def run_json_only_analysis_example(json_path: str, output_json_path: str):
- """示例1: 演示如何仅使用JSON文件进行分析。"""
- fry_algo_print("重要", f"--- 场景1: 仅JSON分析 ---")
- # 1. 加载JSON数据
- try:
- with open(json_path, 'r', encoding='utf-8') as f:
- labelme_data = json.load(f)
- except Exception as e:
- fry_algo_print("错误", f"无法加载JSON文件 '{json_path}': {e}")
- return
- # 2. 初始化处理器并执行分析
- processor = DefectProcessor(pixel_resolution=24.54)
- analysis_result = processor.analyze_from_json(labelme_data)
- # 3. 打印统计结果
- fry_algo_print("信息", f"分析完成: {os.path.basename(json_path)}")
- stats = analysis_result["statistics"]
- print(json.dumps(stats, indent=2, ensure_ascii=False))
- # 4. 将完整结果保存为新的JSON文件
- with open(output_json_path, 'w', encoding='utf-8') as f:
- json.dump(analysis_result, f, ensure_ascii=False, indent=2, default=to_json_serializable)
- fry_algo_print("成功", f"详细分析结果已保存到: {output_json_path}")
- def run_image_and_json_analysis_example(image_path: str, json_path: str, output_dir: str):
- """示例2: 演示如何结合图像和JSON进行分析与绘图。"""
- fry_algo_print("重要", f"--- 场景2: 图像与JSON结合分析和绘图 ---")
- # 1. 加载图像和JSON数据
- image = cv2.imread(image_path)
- if image is None:
- fry_algo_print("错误", f"无法加载图片: {image_path}")
- return
- try:
- with open(json_path, 'r', encoding='utf-8') as f:
- labelme_data = json.load(f)
- except Exception as e:
- fry_algo_print("错误", f"无法加载JSON文件 '{json_path}': {e}")
- return
- # 2. 初始化处理器
- processor = DefectProcessor(pixel_resolution=24.54)
- # --- 2a. 测试绘制最小外接矩形 ---
- fry_algo_print("信息", "子场景 2a: 绘制最小外接矩形")
- drawing_params_with_rect = DrawingParams(draw_min_rect=True)
- drawn_image_rect, result_rect = processor.analyze_and_draw(image, labelme_data, drawing_params_with_rect)
- # 保存结果
- base_name = os.path.splitext(os.path.basename(image_path))[0]
- output_image_path_rect = os.path.join(output_dir, f"{base_name}_with_rect.jpg")
- output_json_path_rect = os.path.join(output_dir, f"{base_name}_with_rect_results.json")
- cv2.imwrite(output_image_path_rect, drawn_image_rect)
- with open(output_json_path_rect, 'w', encoding='utf-8') as f:
- json.dump(result_rect.to_dict(), f, ensure_ascii=False, indent=2, default=to_json_serializable)
- fry_algo_print("成功", f"带矩形的图片已保存到: {output_image_path_rect}")
- fry_algo_print("成功", f"对应的分析结果已保存到: {output_json_path_rect}")
- # --- 2b. 测试不绘制最小外接矩形 ---
- # fry_algo_print("信息", "子场景 2b: 不绘制最小外接矩形")
- # drawing_params_no_rect = DrawingParams(draw_min_rect=False)
- # drawn_image_no_rect, result_no_rect = processor.analyze_and_draw(image, labelme_data, drawing_params_no_rect)
- #
- # # 保存结果
- # output_image_path_no_rect = os.path.join(output_dir, f"{base_name}_no_rect.png")
- # output_json_path_no_rect = os.path.join(output_dir, f"{base_name}_no_rect_results.json")
- #
- # cv2.imwrite(output_image_path_no_rect, drawn_image_no_rect)
- # # 注意:分析结果 `result_no_rect` 和 `result_rect` 是一样的,因为分析和绘图是分离的
- # with open(output_json_path_no_rect, 'w', encoding='utf-8') as f:
- # json.dump(result_no_rect.to_dict(), f, ensure_ascii=False, indent=2, default=to_json_serializable)
- #
- # fry_algo_print("成功", f"不带矩形的图片已保存到: {output_image_path_no_rect}")
- if __name__ == "__main__":
- image_file_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\temp\250805_pokemon_0001.jpg"
- json_file_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-merge.json"
- output_dir = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\temp\测试数据_my"
- os.makedirs(output_dir, exist_ok=True)
- # 1. 仅JSON分析
- # run_json_only_analysis_example(
- # json_path=json_file_path,
- # output_json_path=os.path.join(output_dir, "json_only_analysis_result.json")
- # )
- #
- # print("\n" + "=" * 50 + "\n")
- # 2. 图像和JSON结合分析
- run_image_and_json_analysis_example(
- image_path=image_file_path,
- json_path=json_file_path,
- output_dir=output_dir
- )
- fry_algo_print("重要", "所有示例运行完毕!")
|