import numpy as np import json import torch import datetime import torch.nn as nn import cv2 from pathlib import Path import copy from typing import Dict, List, Tuple, Optional from app.utils.card_inference.backbone import BiSeNetV2 from app.utils.card_inference.predict_preprocess import predict_preprocess from app.utils.card_inference.create_predict_result import create_result_singleImg from app.utils.card_inference.handle_result import process_detection_result import logging logging.basicConfig(level=logging.INFO) def fry_algo_print(level_str: str, info_str: str): # logging.info(f"[{level_str}] : {info_str}") pass def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR): """支持中文路径的图像读取""" try: with open(filename, 'rb') as f: chunk = f.read() chunk_arr = np.frombuffer(chunk, dtype=np.uint8) img = cv2.imdecode(chunk_arr, flags) if img is None: fry_algo_print("警告", f"Warning: Unable to decode image: {filename}") return img except IOError as e: fry_algo_print("错误", f"IOError: Unable to read file: {filename}") fry_algo_print("错误", f"Error details: {str(e)}") return None def fry_cv2_imwrite(filename, img, params=None): """支持中文路径的图像保存""" try: ext = Path(filename).suffix.lower() result, encoded_img = cv2.imencode(ext, img, params) if result: with open(filename, 'wb') as f: encoded_img.tofile(f) return True else: fry_algo_print("警告", f"Warning: Unable to encode image: {filename}") return False except Exception as e: fry_algo_print("错误", f"Error: Unable to write file: {filename}") fry_algo_print("错误", f"Error details: {str(e)}") return False def fry_opencv_chinese_path_init(): """初始化OpenCV中文路径支持""" cv2.imread = fry_cv2_imread cv2.imwrite = fry_cv2_imwrite # 初始化OpenCV中文路径支持 OPENCV_IO_ALREADY_INIT = False if not OPENCV_IO_ALREADY_INIT: fry_opencv_chinese_path_init() OPENCV_IO_ALREADY_INIT = True class FryBisenetV2Predictor: """BiSeNetV2 语义分割预测器""" def __init__(self, pth_path: str, real_seg_class_dict: Dict[int, str], imgSize_train_dict: Dict[str, int], confidence: float = 0.5, label_colors_dict: Optional[Dict[str, Tuple[int, int, int]]] = None, input_channels: int = 3, aux_mode: str = "eval"): """ 初始化预测器 Args: pth_path: 模型权重文件路径 real_seg_class_dict: 真实的分割类别字典,格式为 {类别id: 类别名称} imgSize_train_dict: 训练时的图像尺寸,格式为 {'width': 宽度, 'height': 高度} confidence: 置信度阈值 label_colors_dict: 类别颜色字典,格式为 {类别名称: (R, G, B)} input_channels: 输入通道数 aux_mode: 辅助模式 """ self.pth_path = pth_path self.real_seg_class_dict = real_seg_class_dict self.imgSize_train_dict = imgSize_train_dict self.confidence = confidence self.input_channels = input_channels self.aux_mode = aux_mode # 构建完整的分割类别字典(包含背景类) self.seg_class_dict = {0: '___background___'} self.seg_class_dict.update(real_seg_class_dict) self.n_classes = len(self.seg_class_dict) # 生成或使用提供的颜色字典 self.label_colors_dict = self._generate_label_colors(label_colors_dict) # 获取设备 self.device = self._get_device() # 初始化模型 self.model = self._init_model() @staticmethod def _get_device(): """获取计算设备""" return torch.device("cuda" if torch.cuda.is_available() else "cpu") def _generate_label_colors(self, label_colors_dict: Optional[Dict[str, Tuple[int, int, int]]]) -> Dict[ str, Tuple[int, int, int]]: """ 生成或补充类别颜色字典 Args: label_colors_dict: 用户提供的颜色字典 Returns: 完整的颜色字典 """ if label_colors_dict is None: label_colors_dict = {} # 为所有类别生成颜色(除了背景) np.random.seed(42) # 设置随机种子以保证颜色一致性 for class_id, class_name in self.seg_class_dict.items(): if class_id == 0: # 跳过背景类 continue if class_name not in label_colors_dict: # 生成随机颜色,避免太暗的颜色 color = tuple(np.random.randint(50, 256, 3).tolist()) label_colors_dict[class_name] = color return label_colors_dict def _load_model_weights(self, model: nn.Module, modelLoadPth: str) -> nn.Module: """ 加载模型权重 Args: model: 模型对象 modelLoadPth: 权重文件路径 Returns: 加载权重后的模型 """ fry_algo_print("信息", "加载预训练参数...") weights_dict = torch.load(modelLoadPth, map_location=self.device) new_weights_dict = {} exclude_layer_list = ["aux2", 'aux3', 'aux4', 'aux5'] all_layer_num = 0 ok_layer_num = 0 for k, v in weights_dict.items(): all_layer_num += 1 is_exclude = False # 检查是否需要排除该层 for exclude_str in exclude_layer_list: if exclude_str in k: is_exclude = True break if not is_exclude: new_weights_dict[k] = v ok_layer_num += 1 else: fry_algo_print("信息", f"被排除的层:{k}") # 加载权重,不要求严格对等 model.load_state_dict(new_weights_dict, strict=False) fry_algo_print("信息", f"成功加载模型层数:{ok_layer_num}/{all_layer_num}") return model def _init_model(self) -> nn.Module: """初始化并加载模型""" model = BiSeNetV2(self.n_classes, self.input_channels, self.aux_mode) model = model.to(self.device) model = self._load_model_weights(model, self.pth_path) model.eval() return model def _predict_tensor(self, CHW: torch.Tensor) -> Dict: """ 对单个图像张量进行预测 Args: CHW: 形状为 (C, H, W) 的图像张量 Returns: 包含预测结果的字典 """ with torch.no_grad(): NCHW = CHW.unsqueeze(0) # 因为单张图片推理 batch norm 层会报错,所以复制一份 NCHW2 = torch.cat([NCHW, NCHW], dim=0) # 模型推理 logits, *logits_aux = self.model(NCHW2) # 计算概率和预测类别 probs = torch.softmax(logits, dim=1) preds = torch.argmax(probs, dim=1) # 转换为numpy数组 probs_np = probs.detach().cpu().numpy() preds_np = preds.detach().cpu().numpy() # 取第一张图片的结果 ansImg_needSave = preds_np[0] ansProbs = probs_np[0] return { "ans_img": ansImg_needSave, "probs": ansProbs, "file_name": "result" } def _save_result_json(self, result: Dict, json_path: Path): """ 保存预测结果为JSON文件 Args: result: 预测结果字典 json_path: JSON文件保存路径 """ # 将numpy数组转换为可序列化的格式 json_result = {} for key, value in result.items(): if isinstance(value, np.ndarray): json_result[key] = value.tolist() elif isinstance(value, dict): json_result[key] = {} for k, v in value.items(): if isinstance(v, np.ndarray): json_result[key][k] = v.tolist() else: json_result[key][k] = v else: json_result[key] = value with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_result, f, ensure_ascii=False, indent=2) def predict_single_image_np(self, img_bgr: np.ndarray, image_path_str: str = None, save_visualization: bool = True, save_json: bool = True, answer_json_dir_str: Optional[str] = None, input_channels=3 ) -> Dict: """ 预测单张图片 Args: img_path: 图片路径 save_visualization: 是否保存可视化结果 save_json: 是否保存JSON结果 answer_json_dir_str: JSON结果保存目录 Returns: 预测结果字典 """ if image_path_str is None: timestamp = datetime.now().strftime('%y%m%d_%H%M%S_%f') image_path_real_str = f"{timestamp}.jpg" else: image_path_real_str = str(image_path_str) img_path_real_obj = Path(image_path_real_str).resolve() answer_json_dir_str_obj = Path(answer_json_dir_str).resolve() shape = img_bgr.shape image_channel = shape[2] fry_algo_print("信息", f"模型需要的通道数为:{input_channels}") fry_algo_print("信息", f"测试的图片实际的通道数为:{image_channel}") if image_channel != input_channels: # if image_channel==3 and input_channels==1: # img_bgr = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) # elif image_channel==4 and input_channels==1: # img_bgr = cv2.cvtColor(img_bgr, cv2.COLOR_BGRA2GRAY) # elif image_channel==1 and input_channels==3: # img_bgr = cv2.cvtColor(img_bgr, cv2.COLOR_GRAY2BGR) # else: # raise ValueError(f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{input_channels}") raise ValueError( f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{input_channels}") # 获取原始图片尺寸 height, width = img_bgr.shape[:2] originImgSize = {'width': width, 'height': height} # 预处理 imgTensor_CHW_norm = predict_preprocess(img_bgr, self.imgSize_train_dict) # 预测 ansImgDict = self._predict_tensor(imgTensor_CHW_norm) image_path_name = str(img_path_real_obj.name) # 创建结果 per_img_seg_result = create_result_singleImg( self.seg_class_dict, ansImgDict, originImgSize, self.imgSize_train_dict, confidence=self.confidence, ) # 保存JSON结果 if save_json and answer_json_dir_str: json_dir = Path(answer_json_dir_str) json_dir.mkdir(parents=True, exist_ok=True) if image_path_str is None: cv2.imwrite(image_path_real_str, img_bgr) # 获取图片文件名(不含扩展名) img_name = Path(img_path_real_obj).stem json_path = json_dir / f"{img_name}.json" self._save_result_json(per_img_seg_result, json_path) fry_algo_print("成功", f"JSON结果已保存到:{json_path}") # 保存可视化结果 if save_visualization: result_img = process_detection_result(img_bgr, per_img_seg_result, self.label_colors_dict) output_path = str(answer_json_dir_str_obj / f"{img_path_real_obj.name}") cv2.imwrite(output_path, result_img) fry_algo_print("成功", f"可视化结果已保存到:{output_path}") return per_img_seg_result def _save_result_json(self, result: Dict, json_path: Path): """ 保存预测结果为JSON文件 Args: result: 预测结果字典 json_path: JSON文件保存路径 """ # 将numpy数组转换为可序列化的格式 json_result = {} for key, value in result.items(): if isinstance(value, np.ndarray): json_result[key] = value.tolist() elif isinstance(value, dict): json_result[key] = {} for k, v in value.items(): if isinstance(v, np.ndarray): json_result[key][k] = v.tolist() else: json_result[key][k] = v else: json_result[key] = value with open(json_path, 'w', encoding='utf-8') as f: json.dump(json_result, f, ensure_ascii=False, indent=2) def predict_from_image(self, img_bgr: np.ndarray) -> Dict: """ 直接从解码后的图像数据(numpy数组)进行预测。 Args: img_bgr: BGR格式的图像,作为一个numpy数组。 Returns: 预测结果字典。 """ # 检查通道数是否匹配 shape = img_bgr.shape image_channel = shape[2] if len(shape) == 3 else 1 if image_channel != self.input_channels: raise ValueError( f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{self.input_channels}") # 获取原始图片尺寸 height, width = img_bgr.shape[:2] originImgSize = {'width': width, 'height': height} # 预处理 imgTensor_CHW_norm = predict_preprocess(img_bgr, self.imgSize_train_dict) # 预测 ansImgDict = self._predict_tensor(imgTensor_CHW_norm) # 创建结果 per_img_seg_result = create_result_singleImg( self.seg_class_dict, ansImgDict, originImgSize, self.imgSize_train_dict, confidence=self.confidence ) return per_img_seg_result def predict_single_image(self, img_path: str, save_visualization: bool = True, save_json: bool = True, answer_json_dir_str: Optional[str] = None, input_channels=3 ) -> Dict: """ 预测单张图片 Args: img_path: 图片路径 save_visualization: 是否保存可视化结果 save_json: 是否保存JSON结果 answer_json_dir_str: JSON结果保存目录 Returns: 预测结果字典 """ img_path_obj = Path(img_path).resolve() img_path_parent_obj = img_path_obj.parent answer_json_dir_str_obj = Path(answer_json_dir_str).resolve() # 读取图片 img_bgr = cv2.imread(str(img_path_obj)) if img_bgr is None: raise ValueError(f"无法读取图片:{img_path}") per_img_seg_result = self.predict_single_image_np( img_bgr=img_bgr, image_path_str=str(img_path_obj), save_visualization=save_visualization, save_json=save_json, answer_json_dir_str=answer_json_dir_str, input_channels=input_channels ) return per_img_seg_result def predict_batch(self, img_paths: List[str], save_visualization: bool = True, save_json: bool = True, answer_json_dir_str: Optional[str] = None, input_channels=3 ) -> List[Dict]: """ 批量预测图片 Args: img_paths: 图片路径列表 save_visualization: 是否保存可视化结果 save_json: 是否保存JSON结果 answer_json_dir_str: JSON结果保存目录 output_dir: 可视化结果保存目录 Returns: 所有图片的预测结果列表 """ answer_json_dir_str_obj = Path(answer_json_dir_str).resolve() results = [] Path(answer_json_dir_str).mkdir(parents=True, exist_ok=True) # 批量处理 for i, img_path in enumerate(img_paths): fry_algo_print("信息", f"处理图片 {i + 1}/{len(img_paths)}: {img_path}") try: # 读取图片 img_bgr = cv2.imread(img_path) if img_bgr is None: fry_algo_print("信息", f"警告:无法读取图片 {img_path}") continue shape = img_bgr.shape image_channel = shape[2] if image_channel != input_channels: fry_algo_print("信息", f"模型需要的通道数为:{input_channels}") fry_algo_print("信息", f"测试的图片实际的通道数为:{image_channel}") fry_algo_print("错误", f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{input_channels}") continue # 获取原始图片尺寸 height, width = img_bgr.shape[:2] originImgSize = {'width': width, 'height': height} # 预处理 imgTensor_CHW_norm = predict_preprocess(img_bgr, self.imgSize_train_dict) # 预测 ansImgDict = self._predict_tensor(imgTensor_CHW_norm) img_path_obj = Path(img_path).resolve() image_path_name = str(img_path_obj.name) # 创建结果 per_img_seg_result = create_result_singleImg( self.seg_class_dict, ansImgDict, originImgSize, self.imgSize_train_dict, confidence=self.confidence, image_path_name=image_path_name ) # 保存JSON结果 if save_json and answer_json_dir_str: json_dir = Path(answer_json_dir_str) json_dir.mkdir(parents=True, exist_ok=True) img_name = Path(img_path).stem json_path = json_dir / f"{img_name}.json" self._save_result_json(per_img_seg_result, json_path) # 保存可视化结果 if save_visualization: result_img = process_detection_result(img_bgr, per_img_seg_result, self.label_colors_dict) output_path = answer_json_dir_str_obj / f"{Path(img_path).name}" cv2.imwrite(str(output_path), result_img) results.append(per_img_seg_result) except Exception as e: fry_algo_print("失败", f"处理图片 {img_path} 时出错:{e}") continue fry_algo_print("成功", f"批量处理完成,成功处理 {len(results)}/{len(img_paths)} 张图片") return results def main(): """使用示例""" # 配置参数 pth_path = r"segmentation_bisenetv2.pth" input_channels = 3 real_seg_class_dict = {1: 'outer_box'} # 为不同类别设置不同颜色(可选) label_colors_dict = { 'outer_box': (255, 0, 0), } imgSize_train_dict = {'width': 1280, 'height': 1280} confidence = 0.5 # 创建预测器 predictor = FryBisenetV2Predictor( pth_path=pth_path, real_seg_class_dict=real_seg_class_dict, imgSize_train_dict=imgSize_train_dict, confidence=confidence, label_colors_dict=label_colors_dict, input_channels=input_channels, ) # 单张图片预测 print("=== 单张图片预测 ===") now_img_path = r"input_output\images\coaxis_0008.jpg" answer_json_dir_str = r"input_output\images_answer_json_dir_str" result = predictor.predict_single_image( img_path=now_img_path, save_visualization=True, save_json=True, answer_json_dir_str=answer_json_dir_str, input_channels=input_channels, ) # 批量预测示例 # print("\n=== 批量图片预测 ===") # img_paths = [ # r"input_output\images\coaxis_0008.jpg", # r"input_output\images\coaxis_0082.jpg", # r"input_output\images\ring_0001.jpg", # r"input_output\images\Pokemon_back_for_Edge_0001.jpg", # ] # # results = predictor.predict_batch( # img_paths=img_paths, # save_visualization=True, # save_json=True, # answer_json_dir_str=answer_json_dir_str, # input_channels=input_channels, # ) def _test_pokemon_inner_box(): # 配置参数 pth_path = r"E:\_250807_训练好的导出的模型\_250808_1043_宝可梦内框训练效果还行\pth_and_images\segmentation_bisenetv2.pth" input_channels = 3 real_seg_class_dict = {1: 'inner_box'} # 为不同类别设置不同颜色(可选) label_colors_dict = { 'outer_box': (255, 0, 0), } imgSize_train_dict = {'width': 1280, 'height': 1280} confidence = 0.5 # 创建预测器 predictor = FryBisenetV2Predictor( pth_path=pth_path, real_seg_class_dict=real_seg_class_dict, imgSize_train_dict=imgSize_train_dict, confidence=confidence, label_colors_dict=label_colors_dict, input_channels=input_channels, ) # 单张图片预测 print("=== 单张图片预测 ===") now_img_path = r"E:\_250807_训练好的导出的模型\_250808_1043_宝可梦内框训练效果还行\pth_and_images\images\diff_big_00065.jpg" answer_json_dir_str = r"E:\_250807_训练好的导出的模型\_250808_1043_宝可梦内框训练效果还行\pth_and_images\images_answer" result = predictor.predict_single_image( img_path=now_img_path, save_visualization=True, save_json=True, answer_json_dir_str=answer_json_dir_str ) def _test_pokemon_back_edge(): # 配置参数 pth_path = r"E:\_250807_训练好的导出的模型\_250811_1104_宝可梦背面边角\pth_and_images\segmentation_bisenetv2.pth" input_channels = 3 real_seg_class_dict = { 1: 'wear', 2: 'wear_and_impact', 3: 'impact', 4: 'damaged', 5: 'wear_and_stain', } # 为不同类别设置不同颜色(可选) # label_colors_dict = { # 'outer_box': (255, 0, 0), # } imgSize_train_dict = {'width': 512, 'height': 512} confidence = 0.5 # 创建预测器 predictor = FryBisenetV2Predictor( pth_path=pth_path, real_seg_class_dict=real_seg_class_dict, imgSize_train_dict=imgSize_train_dict, confidence=confidence, input_channels=input_channels, ) # 单张图片预测 print("=== 单张图片预测 ===") now_img_path = r"E:\_250807_训练好的导出的模型\_250811_1104_宝可梦背面边角\pth_and_images\images\split\Pokémon_back_for_Edge_0001_bottom_grid_r0_c0.jpg" answer_json_dir_str = r"E:\_250807_训练好的导出的模型\_250811_1104_宝可梦背面边角\pth_and_images\images_answer" result = predictor.predict_single_image( img_path=now_img_path, save_visualization=True, save_json=True, answer_json_dir_str=answer_json_dir_str, input_channels=input_channels ) if __name__ == "__main__": main() # test_pokemon_inner_box() # test_pokemon_back_edge()