arean_anylize_draw.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. import os
  2. import json
  3. import cv2
  4. import numpy as np
  5. import random
  6. from dataclasses import dataclass, field
  7. from typing import Dict, List, Optional, Any, Tuple, Union
  8. from collections import defaultdict
  9. import logging
  10. logger = logging.getLogger('ClassifyEdgeCorner')
  11. def fry_algo_print(level_str: str, info_str: str):
  12. logger.info(f"[{level_str}] : {info_str}")
  13. def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR):
  14. try:
  15. with open(filename, 'rb') as f:
  16. chunk = f.read()
  17. chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
  18. img = cv2.imdecode(chunk_arr, flags)
  19. if img is None:
  20. fry_algo_print("警告", f"Warning: Unable to decode image: {filename}")
  21. return img
  22. except IOError as e:
  23. fry_algo_print("错误", f"IOError: Unable to read file: {filename}")
  24. fry_algo_print("错误", f"Error details: {str(e)}")
  25. return None
  26. def fry_cv2_imwrite(filename, img, params=None):
  27. try:
  28. ext = os.path.splitext(filename)[1].lower()
  29. result, encoded_img = cv2.imencode(ext, img, params)
  30. if result:
  31. with open(filename, 'wb') as f:
  32. encoded_img.tofile(f)
  33. return True
  34. else:
  35. fry_algo_print("警告", f"Warning: Unable to encode image: {filename}")
  36. return False
  37. except Exception as e:
  38. fry_algo_print("错误", f"Error: Unable to write file: {filename}")
  39. fry_algo_print("错误", f"Error details: {str(e)}")
  40. return False
  41. def fry_opencv_Chinese_path_init():
  42. cv2.imread = fry_cv2_imread
  43. cv2.imwrite = fry_cv2_imwrite
  44. OPENCV_IO_ALREADY_INIT = False
  45. if not OPENCV_IO_ALREADY_INIT:
  46. fry_opencv_Chinese_path_init()
  47. OPENCV_IO_ALREADY_INIT = True
  48. def to_json_serializable(obj):
  49. if isinstance(obj, (np.ndarray,)): return obj.tolist()
  50. if isinstance(obj, (np.integer,)): return int(obj)
  51. if isinstance(obj, (np.floating,)): return float(obj)
  52. if hasattr(obj, 'to_dict'): return obj.to_dict()
  53. try:
  54. return json.dumps(obj, indent=2)
  55. except TypeError:
  56. return str(obj)
  57. @dataclass
  58. class DefectInfo:
  59. """单个缺陷的详细信息"""
  60. label: str
  61. pixel_area: float
  62. actual_area: float # 平方毫米
  63. width: float # 毫米
  64. height: float # 毫米
  65. contour: List[List[int]]
  66. min_rect: Tuple[Tuple[float, float], Tuple[float, float], float] # 最小外接矩形
  67. def to_dict(self) -> Dict[str, Any]:
  68. return {
  69. "label": self.label,
  70. "pixel_area": self.pixel_area,
  71. "actual_area": self.actual_area,
  72. "width": self.width,
  73. "height": self.height,
  74. "points": self.contour,
  75. "min_rect": self.min_rect
  76. }
  77. @dataclass
  78. class AnalysisResult:
  79. """封装单次分析的所有结果,包括缺陷列表和统计信息"""
  80. defects: List[DefectInfo] = field(default_factory=list)
  81. total_defect_count: int = 0
  82. total_pixel_area: float = 0.0
  83. total_defect_area: float = 0.0 # 所有缺陷的总面积 (mm^2)
  84. area_by_label: Dict[str, float] = field(default_factory=lambda: defaultdict(float))
  85. count_by_label: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
  86. def to_dict(self) -> Dict[str, Any]:
  87. return {
  88. "defects": [d.to_dict() for d in self.defects],
  89. "statistics": {
  90. "total_defect_count": self.total_defect_count,
  91. "total_pixel_area": self.total_pixel_area,
  92. "total_defect_area_mm2": self.total_defect_area,
  93. "area_by_label_mm2": dict(self.area_by_label),
  94. "count_by_label": dict(self.count_by_label)
  95. }
  96. }
  97. @dataclass
  98. class DrawingParams:
  99. """封装所有绘图相关的参数"""
  100. draw_min_rect: bool = True
  101. font_scale: float = 0.5
  102. font_thickness: int = 1
  103. contour_thickness: int = 1
  104. rect_thickness: int = 1
  105. info_bg_alpha: float = 0.5
  106. contour_color: Tuple[int, int, int] = (0, 255, 0)
  107. rect_color: Tuple[int, int, int] = (255, 0, 0)
  108. text_color: Tuple[int, int, int] = (255, 255, 255)
  109. bg_color: Tuple[int, int, int] = (0, 0, 0)
  110. class DefectVisualizer:
  111. """一个专门用于在图像上可视化缺陷信息的类。"""
  112. def __init__(self, params: DrawingParams):
  113. self.params = params
  114. def draw_defects_on_image(self, image: np.ndarray, defects: List[DefectInfo]) -> np.ndarray:
  115. vis_image = image.copy()
  116. for i, defect in enumerate(defects):
  117. self._draw_single_defect(vis_image, defect, i)
  118. return vis_image
  119. def _draw_single_defect(self, image: np.ndarray, defect: DefectInfo, idx: int):
  120. contour = np.array(defect.contour, dtype=np.int32)
  121. cv2.drawContours(image, [contour], -1, self.params.contour_color, self.params.contour_thickness)
  122. if self.params.draw_min_rect:
  123. box = np.intp(cv2.boxPoints(defect.min_rect))
  124. cv2.drawContours(image, [box], 0, self.params.rect_color, self.params.rect_thickness)
  125. info_text = [
  126. f"L: {defect.label}",
  127. f"A: {defect.actual_area:.3f} mm2",
  128. f"W: {defect.width:.3f} mm", f"H: {defect.height:.3f} mm"
  129. ]
  130. M = cv2.moments(contour)
  131. cx = int(M["m10"] / M["m00"]) if M["m00"] != 0 else contour[0][0][0]
  132. cy = int(M["m01"] / M["m00"]) if M["m00"] != 0 else contour[0][0][1]
  133. cx += random.randint(-30, 10);
  134. cy += random.randint(-30, 10)
  135. text_size, _ = cv2.getTextSize(info_text[0], cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale,
  136. self.params.font_thickness)
  137. cx = max(10, min(cx, image.shape[1] - text_size[0] - 10))
  138. cy = max(text_size[1] * len(info_text) + 10, min(cy, image.shape[0] - 10))
  139. y_offset = cy - (text_size[1] + 10) * (len(info_text) - 1)
  140. for text in info_text:
  141. self._draw_text_with_background(image, text, (cx, y_offset))
  142. y_offset += text_size[1] + 10
  143. def _draw_text_with_background(self, image: np.ndarray, text: str, position: Tuple[int, int]):
  144. text_size, _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale,
  145. self.params.font_thickness)
  146. x, y = position
  147. overlay = image.copy()
  148. cv2.rectangle(overlay, (x - 2, y - text_size[1] - 2), (x + text_size[0] + 2, y + 2), self.params.bg_color, -1)
  149. cv2.addWeighted(overlay, self.params.info_bg_alpha, image, 1 - self.params.info_bg_alpha, 0, image)
  150. cv2.putText(image, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, self.params.font_scale, self.params.text_color,
  151. self.params.font_thickness, cv2.LINE_AA)
  152. class DefectProcessor:
  153. """
  154. 缺陷处理器,专注于单次分析和可视化任务。
  155. 此类是无状态的,不处理批处理,使其功能更专一、更易于测试和复用。
  156. """
  157. def __init__(self, pixel_resolution: float):
  158. """
  159. 初始化处理器。
  160. Args:
  161. pixel_resolution (float): 像素分辨率,单位是 μm/pixel。
  162. """
  163. if pixel_resolution <= 0:
  164. raise ValueError("Pixel resolution must be a positive number.")
  165. self.pixel_to_mm = pixel_resolution / 1000.0
  166. @staticmethod
  167. def _calculate_metrics(contour: np.ndarray, pixel_to_mm: float) -> Tuple[float, float, float, float, Any]:
  168. """静态辅助方法,计算单个轮廓的各项指标。"""
  169. pixel_area = cv2.contourArea(contour)
  170. actual_area = pixel_area * (pixel_to_mm ** 2)
  171. min_rect = cv2.minAreaRect(contour)
  172. # 确保宽度总是较小的一边
  173. width_pixels, height_pixels = sorted(min_rect[1])
  174. width, height = width_pixels * pixel_to_mm, height_pixels * pixel_to_mm
  175. return pixel_area, actual_area, width, height, min_rect
  176. def analyze_from_json(self, json_data: Dict[str, Any],
  177. is_return_obj=False) -> Union[dict, AnalysisResult]:
  178. """
  179. [需求 1] 仅根据JSON数据计算缺陷面积并统计,返回包含详细信息的JSON友好对象。
  180. Args:
  181. json_data (Dict[str, Any]): 从labelme JSON文件加载的字典数据。
  182. Returns:
  183. AnalysisResult: 包含所有缺陷信息和统计结果的数据对象。
  184. """
  185. result = AnalysisResult()
  186. if not json_data or 'shapes' not in json_data:
  187. if is_return_obj:
  188. return result
  189. result_json = to_json_serializable(result.to_dict())
  190. result_json = json.loads(result_json)
  191. return result_json
  192. for shape in json_data['shapes']:
  193. label = shape.get('label', 'unlabeled')
  194. points = shape.get('points')
  195. if not points:
  196. continue
  197. contour = np.array(points, dtype=np.int32)
  198. pixel_area, actual_area, width, height, min_rect = self._calculate_metrics(contour, self.pixel_to_mm)
  199. defect = DefectInfo(
  200. label=label,
  201. pixel_area=pixel_area,
  202. actual_area=actual_area,
  203. width=width,
  204. height=height,
  205. contour=contour.tolist(),
  206. min_rect=min_rect
  207. )
  208. result.defects.append(defect)
  209. # 更新统计信息
  210. result.total_defect_count += 1
  211. result.total_pixel_area += pixel_area
  212. result.total_defect_area += actual_area
  213. result.count_by_label[label] += 1
  214. result.area_by_label[label] += actual_area
  215. if is_return_obj:
  216. return result
  217. result_json = to_json_serializable(result.to_dict())
  218. result_json = json.loads(result_json)
  219. return result_json
  220. def analyze_and_draw(self, image: np.ndarray, json_data: Dict[str, Any], drawing_params: DrawingParams) -> Tuple[
  221. np.ndarray, dict]:
  222. """
  223. [需求 2] 输入图片和JSON数据,返回绘制好的图片和分析结果。
  224. Args:
  225. image (np.ndarray): OpenCV格式的BGR图像。
  226. json_data (Dict[str, Any]): 从labelme JSON文件加载的字典数据。
  227. drawing_params (DrawingParams): 控制绘图样式的参数对象。
  228. Returns:
  229. Tuple[np.ndarray, AnalysisResult]:
  230. - 绘制了缺陷信息的新图像。
  231. - 包含所有缺陷信息和统计结果的数据对象。
  232. """
  233. # 1. 首先,执行纯JSON分析以获取所有计算结果
  234. analysis_result = self.analyze_from_json(json_data, is_return_obj=True)
  235. # 2. 如果没有缺陷,直接返回原图和分析结果
  236. if not analysis_result.defects:
  237. result_json = to_json_serializable(analysis_result.to_dict())
  238. result_json = json.loads(result_json)
  239. return image, result_json
  240. # 3. 使用DefectVisualizer进行绘图
  241. visualizer = DefectVisualizer(drawing_params)
  242. drawn_image = visualizer.draw_defects_on_image(image, analysis_result.defects)
  243. result_json = to_json_serializable(analysis_result.to_dict())
  244. result_json = json.loads(result_json)
  245. return drawn_image, result_json
  246. def run_json_only_analysis_example(json_path: str, output_json_path: str):
  247. """示例1: 演示如何仅使用JSON文件进行分析。"""
  248. fry_algo_print("重要", f"--- 场景1: 仅JSON分析 ---")
  249. # 1. 加载JSON数据
  250. try:
  251. with open(json_path, 'r', encoding='utf-8') as f:
  252. labelme_data = json.load(f)
  253. except Exception as e:
  254. fry_algo_print("错误", f"无法加载JSON文件 '{json_path}': {e}")
  255. return
  256. # 2. 初始化处理器并执行分析
  257. processor = DefectProcessor(pixel_resolution=24.54)
  258. analysis_result = processor.analyze_from_json(labelme_data)
  259. # 3. 打印统计结果
  260. fry_algo_print("信息", f"分析完成: {os.path.basename(json_path)}")
  261. stats = analysis_result["statistics"]
  262. print(json.dumps(stats, indent=2, ensure_ascii=False))
  263. # 4. 将完整结果保存为新的JSON文件
  264. with open(output_json_path, 'w', encoding='utf-8') as f:
  265. json.dump(analysis_result, f, ensure_ascii=False, indent=2, default=to_json_serializable)
  266. fry_algo_print("成功", f"详细分析结果已保存到: {output_json_path}")
  267. def run_image_and_json_analysis_example(image_path: str, json_path: str, output_dir: str):
  268. """示例2: 演示如何结合图像和JSON进行分析与绘图。"""
  269. fry_algo_print("重要", f"--- 场景2: 图像与JSON结合分析和绘图 ---")
  270. # 1. 加载图像和JSON数据
  271. image = cv2.imread(image_path)
  272. if image is None:
  273. fry_algo_print("错误", f"无法加载图片: {image_path}")
  274. return
  275. try:
  276. with open(json_path, 'r', encoding='utf-8') as f:
  277. labelme_data = json.load(f)
  278. except Exception as e:
  279. fry_algo_print("错误", f"无法加载JSON文件 '{json_path}': {e}")
  280. return
  281. # 2. 初始化处理器
  282. processor = DefectProcessor(pixel_resolution=24.54)
  283. # --- 2a. 测试绘制最小外接矩形 ---
  284. fry_algo_print("信息", "子场景 2a: 绘制最小外接矩形")
  285. drawing_params_with_rect = DrawingParams(draw_min_rect=True)
  286. drawn_image_rect, result_rect = processor.analyze_and_draw(image, labelme_data, drawing_params_with_rect)
  287. # 保存结果
  288. base_name = os.path.splitext(os.path.basename(image_path))[0]
  289. output_image_path_rect = os.path.join(output_dir, f"{base_name}_with_rect.jpg")
  290. output_json_path_rect = os.path.join(output_dir, f"{base_name}_with_rect_results.json")
  291. cv2.imwrite(output_image_path_rect, drawn_image_rect)
  292. with open(output_json_path_rect, 'w', encoding='utf-8') as f:
  293. json.dump(result_rect.to_dict(), f, ensure_ascii=False, indent=2, default=to_json_serializable)
  294. fry_algo_print("成功", f"带矩形的图片已保存到: {output_image_path_rect}")
  295. fry_algo_print("成功", f"对应的分析结果已保存到: {output_json_path_rect}")
  296. # --- 2b. 测试不绘制最小外接矩形 ---
  297. # fry_algo_print("信息", "子场景 2b: 不绘制最小外接矩形")
  298. # drawing_params_no_rect = DrawingParams(draw_min_rect=False)
  299. # drawn_image_no_rect, result_no_rect = processor.analyze_and_draw(image, labelme_data, drawing_params_no_rect)
  300. #
  301. # # 保存结果
  302. # output_image_path_no_rect = os.path.join(output_dir, f"{base_name}_no_rect.png")
  303. # output_json_path_no_rect = os.path.join(output_dir, f"{base_name}_no_rect_results.json")
  304. #
  305. # cv2.imwrite(output_image_path_no_rect, drawn_image_no_rect)
  306. # # 注意:分析结果 `result_no_rect` 和 `result_rect` 是一样的,因为分析和绘图是分离的
  307. # with open(output_json_path_no_rect, 'w', encoding='utf-8') as f:
  308. # json.dump(result_no_rect.to_dict(), f, ensure_ascii=False, indent=2, default=to_json_serializable)
  309. #
  310. # fry_algo_print("成功", f"不带矩形的图片已保存到: {output_image_path_no_rect}")
  311. if __name__ == "__main__":
  312. image_file_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\temp\250805_pokemon_0001.jpg"
  313. json_file_path = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\_temp_work\pokemon_front_corner_no_reflect_defect-merge.json"
  314. output_dir = r"C:\Code\ML\Project\CheckCardBoxAndDefectServer\temp\测试数据_my"
  315. os.makedirs(output_dir, exist_ok=True)
  316. # 1. 仅JSON分析
  317. # run_json_only_analysis_example(
  318. # json_path=json_file_path,
  319. # output_json_path=os.path.join(output_dir, "json_only_analysis_result.json")
  320. # )
  321. #
  322. # print("\n" + "=" * 50 + "\n")
  323. # 2. 图像和JSON结合分析
  324. run_image_and_json_analysis_example(
  325. image_path=image_file_path,
  326. json_path=json_file_path,
  327. output_dir=output_dir
  328. )
  329. fry_algo_print("重要", "所有示例运行完毕!")