arean_anylize_draw.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. import os
  2. import json
  3. import cv2
  4. import numpy as np
  5. import random
  6. from dataclasses import dataclass, field
  7. from typing import Dict, List, Optional, Any, Tuple
  8. from collections import defaultdict
  9. def fry_algo_print(level_str: str, info_str: str):
  10. print(f"[{level_str}] : {info_str}")
  11. def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR):
  12. try:
  13. with open(filename, 'rb') as f:
  14. chunk = f.read()
  15. chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
  16. img = cv2.imdecode(chunk_arr, flags)
  17. if img is None:
  18. fry_algo_print("警告", f"Warning: Unable to decode image: {filename}")
  19. return img
  20. except IOError as e:
  21. fry_algo_print("错误", f"IOError: Unable to read file: {filename}")
  22. fry_algo_print("错误", f"Error details: {str(e)}")
  23. return None
  24. def fry_cv2_imwrite(filename, img, params=None):
  25. try:
  26. ext = os.path.splitext(filename)[1].lower()
  27. result, encoded_img = cv2.imencode(ext, img, params)
  28. if result:
  29. with open(filename, 'wb') as f:
  30. encoded_img.tofile(f)
  31. return True
  32. else:
  33. fry_algo_print("警告", f"Warning: Unable to encode image: {filename}")
  34. return False
  35. except Exception as e:
  36. fry_algo_print("错误", f"Error: Unable to write file: {filename}")
  37. fry_algo_print("错误", f"Error details: {str(e)}")
  38. return False
  39. def fry_opencv_Chinese_path_init():
  40. cv2.imread = fry_cv2_imread
  41. cv2.imwrite = fry_cv2_imwrite
  42. OPENCV_IO_ALREADY_INIT = False
  43. if not OPENCV_IO_ALREADY_INIT:
  44. fry_opencv_Chinese_path_init()
  45. OPENCV_IO_ALREADY_INIT = True
  46. def to_json_serializable(obj):
  47. if isinstance(obj, (np.ndarray,)): return obj.tolist()
  48. if isinstance(obj, (np.integer,)): return int(obj)
  49. if isinstance(obj, (np.floating,)): return float(obj)
  50. if hasattr(obj, 'to_dict'): return obj.to_dict()
  51. try:
  52. return json.dumps(obj)
  53. except TypeError:
  54. return str(obj)
  55. @dataclass
  56. class DefectInfo:
  57. """单个缺陷的详细信息"""
  58. label: str
  59. pixel_area: float
  60. actual_area: float # 平方毫米
  61. width: float # 毫米
  62. height: float # 毫米
  63. contour: List[List[int]]
  64. min_rect: Tuple[Tuple[float, float], Tuple[float, float], float] # 最小外接矩形
  65. def to_dict(self) -> Dict[str, Any]:
  66. return {
  67. "label": self.label,
  68. "pixel_area": self.pixel_area,
  69. "actual_area": self.actual_area,
  70. "width": self.width,
  71. "height": self.height,
  72. "contour": self.contour,
  73. "min_rect": self.min_rect
  74. }
  75. @dataclass
  76. class AnalysisResult:
  77. """封装单次分析的所有结果,包括缺陷列表和统计信息"""
  78. defects: List[DefectInfo] = field(default_factory=list)
  79. total_defect_count: int = 0
  80. total_pixel_area = float = 0.0
  81. total_defect_area: float = 0.0 # 所有缺陷的总面积 (mm^2)
  82. area_by_label: Dict[str, float] = field(default_factory=lambda: defaultdict(float))
  83. count_by_label: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
  84. def to_dict(self) -> Dict[str, Any]:
  85. return {
  86. "defects": [d.to_dict() for d in self.defects],
  87. "statistics": {
  88. "total_defect_count": self.total_defect_count,
  89. "total_pixel_area": self.total_pixel_area,
  90. "total_defect_area_mm2": self.total_defect_area,
  91. "area_by_label_mm2": dict(self.area_by_label),
  92. "count_by_label": dict(self.count_by_label)
  93. }
  94. }
  95. @dataclass
  96. class DrawingParams:
  97. """封装所有绘图相关的参数"""
  98. draw_min_rect: bool = True
  99. font_scale: float = 0.5
  100. font_thickness: int = 1
  101. contour_thickness: int = 1
  102. rect_thickness: int = 1
  103. info_bg_alpha: float = 0.5
  104. contour_color: Tuple[int, int, int] = (0, 255, 0)
  105. rect_color: Tuple[int, int, int] = (255, 0, 0)
  106. text_color: Tuple[int, int, int] = (255, 255, 255)
  107. bg_color: Tuple[int, int, int] = (0, 0, 0)
  108. class DefectVisualizer:
  109. """一个专门用于在图像上可视化缺陷信息的类。"""
  110. def __init__(self, params: DrawingParams):
  111. self.params = params
  112. def draw_defects_on_image(self, image: np.ndarray, defects: List[DefectInfo]) -> np.ndarray:
  113. vis_image = image.copy()
  114. for i, defect in enumerate(defects):
  115. self._draw_single_defect(vis_image, defect, i)
  116. return vis_image
  117. def _draw_single_defect(self, image: np.ndarray, defect: DefectInfo, idx: int):
  118. contour = np.array(defect.contour, dtype=np.int32)
  119. cv2.drawContours(image, [contour], -1, self.params.contour_color, self.params.contour_thickness)
  120. if self.params.draw_min_rect:
  121. box = np.intp(cv2.boxPoints(defect.min_rect))
  122. cv2.drawContours(image, [box], 0, self.params.rect_color, self.params.rect_thickness)
  123. info_text = [
  124. f"L: {defect.label}", f"A: {defect.actual_area:.3f} mm2",
  125. f"W: {defect.width:.3f} mm", f"H: {defect.height:.3f} mm"
  126. ]
  127. M = cv2.moments(contour)
  128. cx = int(M["m10"] / M["m00"]) if M["m00"] != 0 else contour[0][0][0]
  129. cy = int(M["m01"] / M["m00"]) if M["m00"] != 0 else contour[0][0][1]
  130. cx += random.randint(-30, 10);
  131. cy += random.randint(-30, 10)
  132. text_size, _ = cv2.getTextSize(info_text[0], cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale,
  133. self.params.font_thickness)
  134. cx = max(10, min(cx, image.shape[1] - text_size[0] - 10))
  135. cy = max(text_size[1] * len(info_text) + 10, min(cy, image.shape[0] - 10))
  136. y_offset = cy - (text_size[1] + 10) * (len(info_text) - 1)
  137. for text in info_text:
  138. self._draw_text_with_background(image, text, (cx, y_offset))
  139. y_offset += text_size[1] + 10
  140. def _draw_text_with_background(self, image: np.ndarray, text: str, position: Tuple[int, int]):
  141. text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale,
  142. self.params.font_thickness)
  143. x, y = position
  144. overlay = image.copy()
  145. cv2.rectangle(overlay, (x - 2, y - text_size[1] - 2), (x + text_size[0] + 2, y + 2), self.params.bg_color, -1)
  146. cv2.addWeighted(overlay, self.params.info_bg_alpha, image, 1 - self.params.info_bg_alpha, 0, image)
  147. cv2.putText(image, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale, self.params.text_color,
  148. self.params.font_thickness, cv2.LINE_AA)
  149. class DefectProcessor:
  150. """
  151. 缺陷处理器,专注于单次分析和可视化任务。
  152. 此类是无状态的,不处理批处理,使其功能更专一、更易于测试和复用。
  153. """
  154. def __init__(self, pixel_resolution: float):
  155. """
  156. 初始化处理器。
  157. Args:
  158. pixel_resolution (float): 像素分辨率,单位是 μm/pixel。
  159. """
  160. if pixel_resolution <= 0:
  161. raise ValueError("Pixel resolution must be a positive number.")
  162. self.pixel_to_mm = pixel_resolution / 1000.0
  163. @staticmethod
  164. def _calculate_metrics(contour: np.ndarray, pixel_to_mm: float) -> Tuple[float, float, float, float, Any]:
  165. """静态辅助方法,计算单个轮廓的各项指标。"""
  166. pixel_area = cv2.contourArea(contour)
  167. actual_area = pixel_area * (pixel_to_mm ** 2)
  168. min_rect = cv2.minAreaRect(contour)
  169. # 确保宽度总是较小的一边
  170. width_pixels, height_pixels = sorted(min_rect[1])
  171. width, height = width_pixels * pixel_to_mm, height_pixels * pixel_to_mm
  172. return pixel_area, actual_area, width, height, min_rect
  173. def analyze_from_json(self, json_data: Dict[str, Any]) -> AnalysisResult:
  174. """
  175. [需求 1] 仅根据JSON数据计算缺陷面积并统计,返回包含详细信息的JSON友好对象。
  176. Args:
  177. json_data (Dict[str, Any]): 从labelme JSON文件加载的字典数据。
  178. Returns:
  179. AnalysisResult: 包含所有缺陷信息和统计结果的数据对象。
  180. """
  181. if not json_data or 'shapes' not in json_data:
  182. return AnalysisResult()
  183. result = AnalysisResult()
  184. for shape in json_data['shapes']:
  185. label = shape.get('label', 'unlabeled')
  186. points = shape.get('points')
  187. if not points:
  188. continue
  189. contour = np.array(points, dtype=np.int32)
  190. pixel_area, actual_area, width, height, min_rect = self._calculate_metrics(contour, self.pixel_to_mm)
  191. defect = DefectInfo(
  192. label=label,
  193. pixel_area=pixel_area,
  194. actual_area=actual_area,
  195. width=width,
  196. height=height,
  197. contour=contour.tolist(),
  198. min_rect=min_rect
  199. )
  200. result.defects.append(defect)
  201. # 更新统计信息
  202. result.total_defect_count += 1
  203. result.total_pixel_area += pixel_area
  204. result.total_defect_area += actual_area
  205. result.count_by_label[label] += 1
  206. result.area_by_label[label] += actual_area
  207. return result
  208. def analyze_and_draw(self, image: np.ndarray, json_data: Dict[str, Any], drawing_params: DrawingParams) -> Tuple[
  209. np.ndarray, AnalysisResult]:
  210. """
  211. [需求 2] 输入图片和JSON数据,返回绘制好的图片和分析结果。
  212. Args:
  213. image (np.ndarray): OpenCV格式的BGR图像。
  214. json_data (Dict[str, Any]): 从labelme JSON文件加载的字典数据。
  215. drawing_params (DrawingParams): 控制绘图样式的参数对象。
  216. Returns:
  217. Tuple[np.ndarray, AnalysisResult]:
  218. - 绘制了缺陷信息的新图像。
  219. - 包含所有缺陷信息和统计结果的数据对象。
  220. """
  221. # 1. 首先,执行纯JSON分析以获取所有计算结果
  222. analysis_result = self.analyze_from_json(json_data)
  223. # 2. 如果没有缺陷,直接返回原图和分析结果
  224. if not analysis_result.defects:
  225. return image, analysis_result
  226. # 3. 使用DefectVisualizer进行绘图
  227. visualizer = DefectVisualizer(drawing_params)
  228. drawn_image = visualizer.draw_defects_on_image(image, analysis_result.defects)
  229. return drawn_image, analysis_result
  230. def run_json_only_analysis_example(json_path: str, output_json_path: str):
  231. """示例1: 演示如何仅使用JSON文件进行分析。"""
  232. fry_algo_print("重要", f"--- 场景1: 仅JSON分析 ---")
  233. # 1. 加载JSON数据
  234. try:
  235. with open(json_path, 'r', encoding='utf-8') as f:
  236. labelme_data = json.load(f)
  237. except Exception as e:
  238. fry_algo_print("错误", f"无法加载JSON文件 '{json_path}': {e}")
  239. return
  240. # 2. 初始化处理器并执行分析
  241. processor = DefectProcessor(pixel_resolution=24.54)
  242. analysis_result = processor.analyze_from_json(labelme_data)
  243. # 3. 打印统计结果
  244. fry_algo_print("信息", f"分析完成: {os.path.basename(json_path)}")
  245. stats = analysis_result.to_dict()["statistics"]
  246. print(json.dumps(stats, indent=2, ensure_ascii=False))
  247. # 4. 将完整结果保存为新的JSON文件
  248. with open(output_json_path, 'w', encoding='utf-8') as f:
  249. json.dump(analysis_result.to_dict(), f, ensure_ascii=False, indent=2, default=to_json_serializable)
  250. fry_algo_print("成功", f"详细分析结果已保存到: {output_json_path}")
  251. def run_image_and_json_analysis_example(image_path: str, json_path: str, output_dir: str):
  252. """示例2: 演示如何结合图像和JSON进行分析与绘图。"""
  253. fry_algo_print("重要", f"--- 场景2: 图像与JSON结合分析和绘图 ---")
  254. # 1. 加载图像和JSON数据
  255. image = cv2.imread(image_path)
  256. if image is None:
  257. fry_algo_print("错误", f"无法加载图片: {image_path}")
  258. return
  259. try:
  260. with open(json_path, 'r', encoding='utf-8') as f:
  261. labelme_data = json.load(f)
  262. except Exception as e:
  263. fry_algo_print("错误", f"无法加载JSON文件 '{json_path}': {e}")
  264. return
  265. # 2. 初始化处理器
  266. processor = DefectProcessor(pixel_resolution=24.54)
  267. # --- 2a. 测试绘制最小外接矩形 ---
  268. fry_algo_print("信息", "子场景 2a: 绘制最小外接矩形")
  269. drawing_params_with_rect = DrawingParams(draw_min_rect=True)
  270. drawn_image_rect, result_rect = processor.analyze_and_draw(image, labelme_data, drawing_params_with_rect)
  271. # 保存结果
  272. base_name = os.path.splitext(os.path.basename(image_path))[0]
  273. output_image_path_rect = os.path.join(output_dir, f"{base_name}_with_rect.jpg")
  274. output_json_path_rect = os.path.join(output_dir, f"{base_name}_with_rect_results.json")
  275. cv2.imwrite(output_image_path_rect, drawn_image_rect)
  276. with open(output_json_path_rect, 'w', encoding='utf-8') as f:
  277. json.dump(result_rect.to_dict(), f, ensure_ascii=False, indent=2, default=to_json_serializable)
  278. fry_algo_print("成功", f"带矩形的图片已保存到: {output_image_path_rect}")
  279. fry_algo_print("成功", f"对应的分析结果已保存到: {output_json_path_rect}")
  280. # --- 2b. 测试不绘制最小外接矩形 ---
  281. # fry_algo_print("信息", "子场景 2b: 不绘制最小外接矩形")
  282. # drawing_params_no_rect = DrawingParams(draw_min_rect=False)
  283. # drawn_image_no_rect, result_no_rect = processor.analyze_and_draw(image, labelme_data, drawing_params_no_rect)
  284. #
  285. # # 保存结果
  286. # output_image_path_no_rect = os.path.join(output_dir, f"{base_name}_no_rect.png")
  287. # output_json_path_no_rect = os.path.join(output_dir, f"{base_name}_no_rect_results.json")
  288. #
  289. # cv2.imwrite(output_image_path_no_rect, drawn_image_no_rect)
  290. # # 注意:分析结果 `result_no_rect` 和 `result_rect` 是一样的,因为分析和绘图是分离的
  291. # with open(output_json_path_no_rect, 'w', encoding='utf-8') as f:
  292. # json.dump(result_no_rect.to_dict(), f, ensure_ascii=False, indent=2, default=to_json_serializable)
  293. #
  294. # fry_algo_print("成功", f"不带矩形的图片已保存到: {output_image_path_no_rect}")
  295. if __name__ == "__main__":
  296. image_file_path = r"C:\Code\ML\Project\卡片缺陷检测项目组\计算边角缺陷大小\测试数据\250805_pokemon_0001.jpg"
  297. json_file_path = r"C:\Code\ML\Project\卡片缺陷检测项目组\计算边角缺陷大小\测试数据\250805_pokemon_0001.json"
  298. output_dir = r"C:\Code\ML\Project\卡片缺陷检测项目组\计算边角缺陷大小\测试数据_my"
  299. os.makedirs(output_dir, exist_ok=True)
  300. # 1. 仅JSON分析
  301. # run_json_only_analysis_example(
  302. # json_path=json_file_path,
  303. # output_json_path=os.path.join(output_dir, "json_only_analysis_result.json")
  304. # )
  305. #
  306. # print("\n" + "=" * 50 + "\n")
  307. # 2. 图像和JSON结合分析
  308. run_image_and_json_analysis_example(
  309. image_path=image_file_path,
  310. json_path=json_file_path,
  311. output_dir=output_dir
  312. )
  313. fry_algo_print("重要", "所有示例运行完毕!")