| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702 |
- import numpy as np
- import json
- import torch
- import datetime
- import torch.nn as nn
- import cv2
- from pathlib import Path
- import copy
- from typing import Dict, List, Tuple, Optional
- from app.utils.card_inference.backbone import BiSeNetV2
- from app.utils.card_inference.predict_preprocess import predict_preprocess
- from app.utils.card_inference.create_predict_result import create_result_singleImg
- from app.utils.card_inference.handle_result import process_detection_result
- import logging
- logging.basicConfig(level=logging.INFO)
- def fry_algo_print(level_str: str, info_str: str):
- logging.info(f"[{level_str}] : {info_str}")
- def fry_cv2_imread(filename, flags=cv2.IMREAD_COLOR):
- """支持中文路径的图像读取"""
- try:
- with open(filename, 'rb') as f:
- chunk = f.read()
- chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
- img = cv2.imdecode(chunk_arr, flags)
- if img is None:
- fry_algo_print("警告", f"Warning: Unable to decode image: {filename}")
- return img
- except IOError as e:
- fry_algo_print("错误", f"IOError: Unable to read file: {filename}")
- fry_algo_print("错误", f"Error details: {str(e)}")
- return None
- def fry_cv2_imwrite(filename, img, params=None):
- """支持中文路径的图像保存"""
- try:
- ext = Path(filename).suffix.lower()
- result, encoded_img = cv2.imencode(ext, img, params)
- if result:
- with open(filename, 'wb') as f:
- encoded_img.tofile(f)
- return True
- else:
- fry_algo_print("警告", f"Warning: Unable to encode image: {filename}")
- return False
- except Exception as e:
- fry_algo_print("错误", f"Error: Unable to write file: {filename}")
- fry_algo_print("错误", f"Error details: {str(e)}")
- return False
- def fry_opencv_chinese_path_init():
- """初始化OpenCV中文路径支持"""
- cv2.imread = fry_cv2_imread
- cv2.imwrite = fry_cv2_imwrite
- # 初始化OpenCV中文路径支持
- OPENCV_IO_ALREADY_INIT = False
- if not OPENCV_IO_ALREADY_INIT:
- fry_opencv_chinese_path_init()
- OPENCV_IO_ALREADY_INIT = True
- class FryBisenetV2Predictor:
- """BiSeNetV2 语义分割预测器"""
- def __init__(self,
- pth_path: str,
- real_seg_class_dict: Dict[int, str],
- imgSize_train_dict: Dict[str, int],
- confidence: float = 0.5,
- label_colors_dict: Optional[Dict[str, Tuple[int, int, int]]] = None,
- input_channels: int = 3,
- aux_mode: str = "eval"):
- """
- 初始化预测器
- Args:
- pth_path: 模型权重文件路径
- real_seg_class_dict: 真实的分割类别字典,格式为 {类别id: 类别名称}
- imgSize_train_dict: 训练时的图像尺寸,格式为 {'width': 宽度, 'height': 高度}
- confidence: 置信度阈值
- label_colors_dict: 类别颜色字典,格式为 {类别名称: (R, G, B)}
- input_channels: 输入通道数
- aux_mode: 辅助模式
- """
- self.pth_path = pth_path
- self.real_seg_class_dict = real_seg_class_dict
- self.imgSize_train_dict = imgSize_train_dict
- self.confidence = confidence
- self.input_channels = input_channels
- self.aux_mode = aux_mode
- # 构建完整的分割类别字典(包含背景类)
- self.seg_class_dict = {0: '___background___'}
- self.seg_class_dict.update(real_seg_class_dict)
- self.n_classes = len(self.seg_class_dict)
- # 生成或使用提供的颜色字典
- self.label_colors_dict = self._generate_label_colors(label_colors_dict)
- # 获取设备
- self.device = self._get_device()
- # 初始化模型
- self.model = self._init_model()
- @staticmethod
- def _get_device():
- """获取计算设备"""
- return torch.device("cuda" if torch.cuda.is_available() else "cpu")
- def _generate_label_colors(self, label_colors_dict: Optional[Dict[str, Tuple[int, int, int]]]) -> Dict[
- str, Tuple[int, int, int]]:
- """
- 生成或补充类别颜色字典
- Args:
- label_colors_dict: 用户提供的颜色字典
- Returns:
- 完整的颜色字典
- """
- if label_colors_dict is None:
- label_colors_dict = {}
- # 为所有类别生成颜色(除了背景)
- np.random.seed(42) # 设置随机种子以保证颜色一致性
- for class_id, class_name in self.seg_class_dict.items():
- if class_id == 0: # 跳过背景类
- continue
- if class_name not in label_colors_dict:
- # 生成随机颜色,避免太暗的颜色
- color = tuple(np.random.randint(50, 256, 3).tolist())
- label_colors_dict[class_name] = color
- return label_colors_dict
- def _load_model_weights(self, model: nn.Module, modelLoadPth: str) -> nn.Module:
- """
- 加载模型权重
- Args:
- model: 模型对象
- modelLoadPth: 权重文件路径
- Returns:
- 加载权重后的模型
- """
- fry_algo_print("信息", "加载预训练参数...")
- weights_dict = torch.load(modelLoadPth, map_location=self.device)
- new_weights_dict = {}
- exclude_layer_list = ["aux2", 'aux3', 'aux4', 'aux5']
- all_layer_num = 0
- ok_layer_num = 0
- for k, v in weights_dict.items():
- all_layer_num += 1
- is_exclude = False
- # 检查是否需要排除该层
- for exclude_str in exclude_layer_list:
- if exclude_str in k:
- is_exclude = True
- break
- if not is_exclude:
- new_weights_dict[k] = v
- ok_layer_num += 1
- else:
- fry_algo_print("信息", f"被排除的层:{k}")
- # 加载权重,不要求严格对等
- model.load_state_dict(new_weights_dict, strict=False)
- fry_algo_print("信息", f"成功加载模型层数:{ok_layer_num}/{all_layer_num}")
- return model
- def _init_model(self) -> nn.Module:
- """初始化并加载模型"""
- model = BiSeNetV2(self.n_classes, self.input_channels, self.aux_mode)
- model = model.to(self.device)
- model = self._load_model_weights(model, self.pth_path)
- model.eval()
- return model
- def _predict_tensor(self, CHW: torch.Tensor) -> Dict:
- """
- 对单个图像张量进行预测
- Args:
- CHW: 形状为 (C, H, W) 的图像张量
- Returns:
- 包含预测结果的字典
- """
- with torch.no_grad():
- NCHW = CHW.unsqueeze(0)
- # 因为单张图片推理 batch norm 层会报错,所以复制一份
- NCHW2 = torch.cat([NCHW, NCHW], dim=0)
- # 模型推理
- logits, *logits_aux = self.model(NCHW2)
- # 计算概率和预测类别
- probs = torch.softmax(logits, dim=1)
- preds = torch.argmax(probs, dim=1)
- # 转换为numpy数组
- probs_np = probs.detach().cpu().numpy()
- preds_np = preds.detach().cpu().numpy()
- # 取第一张图片的结果
- ansImg_needSave = preds_np[0]
- ansProbs = probs_np[0]
- return {
- "ans_img": ansImg_needSave,
- "probs": ansProbs,
- "file_name": "result"
- }
- def _save_result_json(self, result: Dict, json_path: Path):
- """
- 保存预测结果为JSON文件
- Args:
- result: 预测结果字典
- json_path: JSON文件保存路径
- """
- # 将numpy数组转换为可序列化的格式
- json_result = {}
- for key, value in result.items():
- if isinstance(value, np.ndarray):
- json_result[key] = value.tolist()
- elif isinstance(value, dict):
- json_result[key] = {}
- for k, v in value.items():
- if isinstance(v, np.ndarray):
- json_result[key][k] = v.tolist()
- else:
- json_result[key][k] = v
- else:
- json_result[key] = value
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(json_result, f, ensure_ascii=False, indent=2)
- def predict_single_image_np(self,
- img_bgr: np.ndarray,
- image_path_str: str = None,
- save_visualization: bool = True,
- save_json: bool = True,
- answer_json_dir_str: Optional[str] = None,
- input_channels=3
- ) -> Dict:
- """
- 预测单张图片
- Args:
- img_path: 图片路径
- save_visualization: 是否保存可视化结果
- save_json: 是否保存JSON结果
- answer_json_dir_str: JSON结果保存目录
- Returns:
- 预测结果字典
- """
- if image_path_str is None:
- timestamp = datetime.now().strftime('%y%m%d_%H%M%S_%f')
- image_path_real_str = f"{timestamp}.jpg"
- else:
- image_path_real_str = str(image_path_str)
- img_path_real_obj = Path(image_path_real_str).resolve()
- answer_json_dir_str_obj = Path(answer_json_dir_str).resolve()
- shape = img_bgr.shape
- image_channel = shape[2]
- fry_algo_print("信息", f"模型需要的通道数为:{input_channels}")
- fry_algo_print("信息", f"测试的图片实际的通道数为:{image_channel}")
- if image_channel != input_channels:
- # if image_channel==3 and input_channels==1:
- # img_bgr = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
- # elif image_channel==4 and input_channels==1:
- # img_bgr = cv2.cvtColor(img_bgr, cv2.COLOR_BGRA2GRAY)
- # elif image_channel==1 and input_channels==3:
- # img_bgr = cv2.cvtColor(img_bgr, cv2.COLOR_GRAY2BGR)
- # else:
- # raise ValueError(f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{input_channels}")
- raise ValueError(
- f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{input_channels}")
- # 获取原始图片尺寸
- height, width = img_bgr.shape[:2]
- originImgSize = {'width': width, 'height': height}
- # 预处理
- imgTensor_CHW_norm = predict_preprocess(img_bgr, self.imgSize_train_dict)
- # 预测
- ansImgDict = self._predict_tensor(imgTensor_CHW_norm)
- image_path_name = str(img_path_real_obj.name)
- # 创建结果
- per_img_seg_result = create_result_singleImg(
- self.seg_class_dict,
- ansImgDict,
- originImgSize,
- self.imgSize_train_dict,
- confidence=self.confidence,
- )
- # 保存JSON结果
- if save_json and answer_json_dir_str:
- json_dir = Path(answer_json_dir_str)
- json_dir.mkdir(parents=True, exist_ok=True)
- if image_path_str is None:
- cv2.imwrite(image_path_real_str, img_bgr)
- # 获取图片文件名(不含扩展名)
- img_name = Path(img_path_real_obj).stem
- json_path = json_dir / f"{img_name}.json"
- self._save_result_json(per_img_seg_result, json_path)
- fry_algo_print("成功", f"JSON结果已保存到:{json_path}")
- # 保存可视化结果
- if save_visualization:
- result_img = process_detection_result(img_bgr, per_img_seg_result, self.label_colors_dict)
- output_path = str(answer_json_dir_str_obj / f"{img_path_real_obj.name}")
- cv2.imwrite(output_path, result_img)
- fry_algo_print("成功", f"可视化结果已保存到:{output_path}")
- return per_img_seg_result
- def _save_result_json(self, result: Dict, json_path: Path):
- """
- 保存预测结果为JSON文件
- Args:
- result: 预测结果字典
- json_path: JSON文件保存路径
- """
- # 将numpy数组转换为可序列化的格式
- json_result = {}
- for key, value in result.items():
- if isinstance(value, np.ndarray):
- json_result[key] = value.tolist()
- elif isinstance(value, dict):
- json_result[key] = {}
- for k, v in value.items():
- if isinstance(v, np.ndarray):
- json_result[key][k] = v.tolist()
- else:
- json_result[key][k] = v
- else:
- json_result[key] = value
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(json_result, f, ensure_ascii=False, indent=2)
- def predict_from_image(self, img_bgr: np.ndarray) -> Dict:
- """
- 直接从解码后的图像数据(numpy数组)进行预测。
- Args:
- img_bgr: BGR格式的图像,作为一个numpy数组。
- Returns:
- 预测结果字典。
- """
- # 检查通道数是否匹配
- shape = img_bgr.shape
- image_channel = shape[2] if len(shape) == 3 else 1
- if image_channel != self.input_channels:
- raise ValueError(
- f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{self.input_channels}")
- # 获取原始图片尺寸
- height, width = img_bgr.shape[:2]
- originImgSize = {'width': width, 'height': height}
- # 预处理
- imgTensor_CHW_norm = predict_preprocess(img_bgr, self.imgSize_train_dict)
- # 预测
- ansImgDict = self._predict_tensor(imgTensor_CHW_norm)
- # 创建结果
- per_img_seg_result = create_result_singleImg(
- self.seg_class_dict,
- ansImgDict,
- originImgSize,
- self.imgSize_train_dict,
- confidence=self.confidence
- )
- return per_img_seg_result
- def predict_single_image(self,
- img_path: str,
- save_visualization: bool = True,
- save_json: bool = True,
- answer_json_dir_str: Optional[str] = None,
- input_channels=3
- ) -> Dict:
- """
- 预测单张图片
- Args:
- img_path: 图片路径
- save_visualization: 是否保存可视化结果
- save_json: 是否保存JSON结果
- answer_json_dir_str: JSON结果保存目录
- Returns:
- 预测结果字典
- """
- img_path_obj = Path(img_path).resolve()
- img_path_parent_obj = img_path_obj.parent
- answer_json_dir_str_obj = Path(answer_json_dir_str).resolve()
- # 读取图片
- img_bgr = cv2.imread(str(img_path_obj))
- if img_bgr is None:
- raise ValueError(f"无法读取图片:{img_path}")
- per_img_seg_result = self.predict_single_image_np(
- img_bgr=img_bgr,
- image_path_str=str(img_path_obj),
- save_visualization=save_visualization,
- save_json=save_json,
- answer_json_dir_str=answer_json_dir_str,
- input_channels=input_channels
- )
- return per_img_seg_result
- def predict_batch(self,
- img_paths: List[str],
- save_visualization: bool = True,
- save_json: bool = True,
- answer_json_dir_str: Optional[str] = None,
- input_channels=3
- ) -> List[Dict]:
- """
- 批量预测图片
- Args:
- img_paths: 图片路径列表
- save_visualization: 是否保存可视化结果
- save_json: 是否保存JSON结果
- answer_json_dir_str: JSON结果保存目录
- output_dir: 可视化结果保存目录
- Returns:
- 所有图片的预测结果列表
- """
- answer_json_dir_str_obj = Path(answer_json_dir_str).resolve()
- results = []
- Path(answer_json_dir_str).mkdir(parents=True, exist_ok=True)
- # 批量处理
- for i, img_path in enumerate(img_paths):
- fry_algo_print("信息", f"处理图片 {i + 1}/{len(img_paths)}: {img_path}")
- try:
- # 读取图片
- img_bgr = cv2.imread(img_path)
- if img_bgr is None:
- fry_algo_print("信息", f"警告:无法读取图片 {img_path}")
- continue
- shape = img_bgr.shape
- image_channel = shape[2]
- if image_channel != input_channels:
- fry_algo_print("信息", f"模型需要的通道数为:{input_channels}")
- fry_algo_print("信息", f"测试的图片实际的通道数为:{image_channel}")
- fry_algo_print("错误",
- f"输入图片的通道数和模型不匹配:image_channel:{image_channel},input_channels:{input_channels}")
- continue
- # 获取原始图片尺寸
- height, width = img_bgr.shape[:2]
- originImgSize = {'width': width, 'height': height}
- # 预处理
- imgTensor_CHW_norm = predict_preprocess(img_bgr, self.imgSize_train_dict)
- # 预测
- ansImgDict = self._predict_tensor(imgTensor_CHW_norm)
- img_path_obj = Path(img_path).resolve()
- image_path_name = str(img_path_obj.name)
- # 创建结果
- per_img_seg_result = create_result_singleImg(
- self.seg_class_dict,
- ansImgDict,
- originImgSize,
- self.imgSize_train_dict,
- confidence=self.confidence,
- image_path_name=image_path_name
- )
- # 保存JSON结果
- if save_json and answer_json_dir_str:
- json_dir = Path(answer_json_dir_str)
- json_dir.mkdir(parents=True, exist_ok=True)
- img_name = Path(img_path).stem
- json_path = json_dir / f"{img_name}.json"
- self._save_result_json(per_img_seg_result, json_path)
- # 保存可视化结果
- if save_visualization:
- result_img = process_detection_result(img_bgr, per_img_seg_result, self.label_colors_dict)
- output_path = answer_json_dir_str_obj / f"{Path(img_path).name}"
- cv2.imwrite(str(output_path), result_img)
- results.append(per_img_seg_result)
- except Exception as e:
- fry_algo_print("失败", f"处理图片 {img_path} 时出错:{e}")
- continue
- fry_algo_print("成功", f"批量处理完成,成功处理 {len(results)}/{len(img_paths)} 张图片")
- return results
- def main():
- """使用示例"""
- # 配置参数
- pth_path = r"segmentation_bisenetv2.pth"
- input_channels = 3
- real_seg_class_dict = {1: 'outer_box'}
- # 为不同类别设置不同颜色(可选)
- label_colors_dict = {
- 'outer_box': (255, 0, 0),
- }
- imgSize_train_dict = {'width': 1280, 'height': 1280}
- confidence = 0.5
- # 创建预测器
- predictor = FryBisenetV2Predictor(
- pth_path=pth_path,
- real_seg_class_dict=real_seg_class_dict,
- imgSize_train_dict=imgSize_train_dict,
- confidence=confidence,
- label_colors_dict=label_colors_dict,
- input_channels=input_channels,
- )
- # 单张图片预测
- print("=== 单张图片预测 ===")
- now_img_path = r"input_output\images\coaxis_0008.jpg"
- answer_json_dir_str = r"input_output\images_answer_json_dir_str"
- result = predictor.predict_single_image(
- img_path=now_img_path,
- save_visualization=True,
- save_json=True,
- answer_json_dir_str=answer_json_dir_str,
- input_channels=input_channels,
- )
- # 批量预测示例
- # print("\n=== 批量图片预测 ===")
- # img_paths = [
- # r"input_output\images\coaxis_0008.jpg",
- # r"input_output\images\coaxis_0082.jpg",
- # r"input_output\images\ring_0001.jpg",
- # r"input_output\images\Pokemon_back_for_Edge_0001.jpg",
- # ]
- #
- # results = predictor.predict_batch(
- # img_paths=img_paths,
- # save_visualization=True,
- # save_json=True,
- # answer_json_dir_str=answer_json_dir_str,
- # input_channels=input_channels,
- # )
- def _test_pokemon_inner_box():
- # 配置参数
- pth_path = r"E:\_250807_训练好的导出的模型\_250808_1043_宝可梦内框训练效果还行\pth_and_images\segmentation_bisenetv2.pth"
- input_channels = 3
- real_seg_class_dict = {1: 'inner_box'}
- # 为不同类别设置不同颜色(可选)
- label_colors_dict = {
- 'outer_box': (255, 0, 0),
- }
- imgSize_train_dict = {'width': 1280, 'height': 1280}
- confidence = 0.5
- # 创建预测器
- predictor = FryBisenetV2Predictor(
- pth_path=pth_path,
- real_seg_class_dict=real_seg_class_dict,
- imgSize_train_dict=imgSize_train_dict,
- confidence=confidence,
- label_colors_dict=label_colors_dict,
- input_channels=input_channels,
- )
- # 单张图片预测
- print("=== 单张图片预测 ===")
- now_img_path = r"E:\_250807_训练好的导出的模型\_250808_1043_宝可梦内框训练效果还行\pth_and_images\images\diff_big_00065.jpg"
- answer_json_dir_str = r"E:\_250807_训练好的导出的模型\_250808_1043_宝可梦内框训练效果还行\pth_and_images\images_answer"
- result = predictor.predict_single_image(
- img_path=now_img_path,
- save_visualization=True,
- save_json=True,
- answer_json_dir_str=answer_json_dir_str
- )
- def _test_pokemon_back_edge():
- # 配置参数
- pth_path = r"E:\_250807_训练好的导出的模型\_250811_1104_宝可梦背面边角\pth_and_images\segmentation_bisenetv2.pth"
- input_channels = 3
- real_seg_class_dict = {
- 1: 'wear',
- 2: 'wear_and_impact',
- 3: 'impact',
- 4: 'damaged',
- 5: 'wear_and_stain',
- }
- # 为不同类别设置不同颜色(可选)
- # label_colors_dict = {
- # 'outer_box': (255, 0, 0),
- # }
- imgSize_train_dict = {'width': 512, 'height': 512}
- confidence = 0.5
- # 创建预测器
- predictor = FryBisenetV2Predictor(
- pth_path=pth_path,
- real_seg_class_dict=real_seg_class_dict,
- imgSize_train_dict=imgSize_train_dict,
- confidence=confidence,
- input_channels=input_channels,
- )
- # 单张图片预测
- print("=== 单张图片预测 ===")
- now_img_path = r"E:\_250807_训练好的导出的模型\_250811_1104_宝可梦背面边角\pth_and_images\images\split\Pokémon_back_for_Edge_0001_bottom_grid_r0_c0.jpg"
- answer_json_dir_str = r"E:\_250807_训练好的导出的模型\_250811_1104_宝可梦背面边角\pth_and_images\images_answer"
- result = predictor.predict_single_image(
- img_path=now_img_path,
- save_visualization=True,
- save_json=True,
- answer_json_dir_str=answer_json_dir_str,
- input_channels=input_channels
- )
- if __name__ == "__main__":
- main()
- # test_pokemon_inner_box()
- # test_pokemon_back_edge()
|