前言
FCN原理篇讲解:【语义分割专栏】:FCN原理篇 - carpell - 博客园
代码地址,下载可复现:fouen6/FCN_semantic-segmentation
本篇文章收录于语义分割专栏,如果对语义分割领域感兴趣的,可以去看看专栏,会对经典的模型以及代码进行详细的讲解哦!其中会包含可复现的代码!
上篇文章已经带大家学习过了FCN的原理,相信大家对于原理应该有了比较深的了解。本文将会带大家去手动复现属于自己的一个语义分割模型。将会深入代码进行讲解,如果有讲错的地方欢迎大家批评指正!
其实所有的深度学习模型的搭建我认为可以总结成五部分:模型的构建,数据集的处理,评价指标的设定,训练流程,测试。其实感觉有点深度学习代码八股文的那种意思。本篇同样的也会按照这样的方式进行讲解,希望大家能够深入代码去进行了解学习。
请记住:只懂原理不懂代码,你就算有了很好的想法创新点,你也难以去实现,所以希望大家能够深入去了解,最好能够参考着本文自己复现一下。
FCN全流程代码
模型搭建(model)
我们这里根据原论文一样采用VGG作为我们的特征提取网络,如果你对VGG网络还不太了解的话,可以先去看看我对VGG网络的讲解。
我们都知道VGG采用了一些重复的结构,所以我们根据maxpool出现的位置将其划分为5个stage。这样我们可以同时用不同深度的VGG的网络,VGG11到VGG19都可以使用,因为其结构是一样的。
backbone = get_backbone(backbone=backbone, pretrained=True) features = list(backbone.features.children()) pool_indices = [i + 1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)] pool_indices = [0] + pool_indices + [len(features)] # 划分阶段 self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]]) self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]]) self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]]) self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]]) self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]])
然后一个非常重要的,我们采用我们的双线性插值来初始化我们的反卷积,使用双线性插值来初始化,可以在训练初期保证模型有一个比较好的输出然后在通过训练调整。
def _make_bilinear_weights(size,num_channels): factor = (size+1)//2 if size % 2 == 1: center = factor - 1 else: center = factor - 0.5 og = torch.FloatTensor(size, size) for i in range(size): for j in range(size): og[i, j] = (1-abs((i-center)/factor)) * (1-abs((j-center)/factor)) filter = torch.zeros(num_channels,num_channels,size,size) for i in range(num_channels): filter[i,i] = og return filter
最后我们先搭建我们的FCN32s模型,首先我们加载预训练的VGG16,当然别的VGG也行,你自己选择就行了。我们只需要其全连接层前的所有层,划分不同的stage,然后构建fcn检测头,先7x7的卷积,等效于对7×7感受野做全连接,输出4096通道,然后再1x1的卷积,提取语义特征,最后在1x1卷积输出每个空间位置上的类别分布(如21类)。FCN32s是直接从最后的层进行32倍上采样,当然了这样的结果就比较粗糙了。所以效果不会太好。
这里有个细节哈,x = x[:, :, :input_size[0], :input_size[1]],我们裁剪了保证初始大小,因为上采样过程中可能会造成图像的尺度超出一点点的,比如上采样后应该是224,然后最后是225,所以裁剪保证与初始大小相同。
class FCN32s(nn.Module): def __init__(self,num_classes = 21,backbone='vgg16'): super(FCN32s, self).__init__() self.num_classes = num_classes backbone = get_backbone(backbone=backbone, pretrained=True) features = list(backbone.features.children()) pool_indices = [i + 1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)] pool_indices = [0] + pool_indices + [len(features)] # 划分阶段 self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]]) self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]]) self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]]) self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]]) self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]]) self.fcn_head = nn.Sequential( nn.Conv2d(512, 4096, kernel_size=7,padding=3), nn.ReLU(inplace=True), nn.Dropout(), nn.Conv2d(4096,4096,kernel_size=1), nn.ReLU(inplace=True), nn.Dropout(), nn.Conv2d(4096, self.num_classes, kernel_size=1), ) self.upsample32 = nn.ConvTranspose2d(self.num_classes,self.num_classes,kernel_size = 64,stride = 32,padding = 16,bias = False) for m in self.modules(): if isinstance(m, nn.ConvTranspose2d): m.weight.data.zero_() m.weight.data = _make_bilinear_weights(m.kernel_size[0], m.out_channels) def forward(self, x): input_size = x.size()[2:] x = self.stage1(x) x = self.stage2(x) x = self.stage3(x) x = self.stage4(x) x = self.stage5(x) x = self.fcn_head(x) x = self.upsample32(x) x = x[:, :, :input_size[0], :input_size[1]] return x
然后就是FCN16s了,我们通过将stage4的输出作为我们的pool4,同时我们将pool4经过卷积输出通道到变为类别分数,到时候方便跟最终的输出做跳跃连接。经过fcn_head输出的x先上采样两倍到跟pool4相同的shape,然后两者做跳跃连接相加后再上采样16倍到输入图像的shape大小。
class FCN16s(nn.Module): def __init__(self,num_classes = 21,backbone='vgg16'): super(FCN16s, self).__init__() self.num_classes = num_classes backbone = get_backbone(backbone=backbone, pretrained=True) features = list(backbone.features.children()) pool_indices = [i + 1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)] pool_indices = [0] + pool_indices + [len(features)] # 划分阶段 self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]]) self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]]) self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]]) self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]]) self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]]) self.fcn_head = nn.Sequential( nn.Conv2d(512, 4096, kernel_size=7,padding=3), nn.ReLU(inplace=True), nn.Dropout(), nn.Conv2d(4096,4096,kernel_size=1), nn.ReLU(inplace=True), nn.Dropout(), nn.Conv2d(4096, self.num_classes, kernel_size=1), ) self.pool4_score = nn.Conv2d(512,self.num_classes, kernel_size=1) self.upsample2 = nn.ConvTranspose2d(self.num_classes,self.num_classes,kernel_size = 4,stride = 2,padding = 1, bias = False) self.upsample16 = nn.ConvTranspose2d(self.num_classes, self.num_classes, kernel_size=32, stride=16, padding=8, bias=False) for m in self.modules(): if isinstance(m, nn.ConvTranspose2d): m.weight.data.zero_() m.weight.data = _make_bilinear_weights(m.kernel_size[0], m.out_channels) def forward(self, x): input_size = x.size()[2:] x = self.stage1(x) x = self.stage2(x) x = self.stage3(x) x = self.stage4(x) pool4 = x x = self.stage5(pool4) x = self.fcn_head(x) x = self.upsample2(x) pool4_score = self.pool4_score(pool4) pool4_score = pool4_score[:, :, :x.size()[2], :x.size()[3]] x = x + pool4_score x = self.upsample16(x) x = x[:, :, :input_size[0], :input_size[1]] return x
然后就是FCN8s了,我们通过将stage3和stage4的输出作为我们的pool3和pool4,同时我们将分别将pool3和pool4经过卷积输出将通道到变为类别分数,到时候方便跟最终的输出做跳跃连接。经过fcn_head输出的x先上采样两倍到跟pool4相同的shape,然后两者做跳跃连接相加后再上采样2倍到pool3的shape大小。再与pool3做跳跃连接相加,上采样8倍数到输出图像的shape大小。
class FCN8s(nn.Module): def __init__(self,num_classes = 21,backbone='vgg16'): super(FCN8s, self).__init__() self.num_classes = num_classes backbone = get_backbone(backbone=backbone,pretrained=True) features = list(backbone.features.children()) pool_indices = [i +1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)] pool_indices = [0] + pool_indices + [len(features)] # 划分阶段 self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]]) self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]]) self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]]) self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]]) self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]]) self.fcn_head = nn.Sequential( nn.Conv2d(512, 4096, kernel_size=7,padding=3), nn.ReLU(inplace=True), nn.Dropout2d(), nn.Conv2d(4096,4096,kernel_size=1), nn.ReLU(inplace=True), nn.Dropout2d(), nn.Conv2d(4096, self.num_classes, kernel_size=1), ) self.pool3_score = nn.Conv2d(256,self.num_classes, kernel_size=1) self.pool4_score = nn.Conv2d(512, self.num_classes, kernel_size=1) self.upsample2_1 = nn.ConvTranspose2d(self.num_classes,self.num_classes,kernel_size = 4,stride = 2,padding = 1, bias = False) self.upsample2_2 = nn.ConvTranspose2d(self.num_classes, self.num_classes, kernel_size=4, stride=2, padding=1, bias=False) self.upsample8 = nn.ConvTranspose2d(self.num_classes, self.num_classes, kernel_size=16, stride=8, padding=4, bias=False) for m in self.modules(): if isinstance(m, nn.ConvTranspose2d): m.weight.data.zero_() m.weight.data=_make_bilinear_weights(m.kernel_size[0],m.out_channels) def forward(self, x): input_size = x.size()[2:] x = self.stage1(x) x = self.stage2(x) pool3 = self.stage3(x) pool4 = self.stage4(pool3) x = self.stage5(pool4) x = self.fcn_head(x) x = self.upsample2_1(x) pool4_score = self.pool4_score(pool4) pool4_score = pool4_score[:, :, :x.size()[2], :x.size()[3]] x = x + pool4_score x = self.upsample2_2(x) pool3_score = self.pool3_score(pool3) pool3_score = pool3_score[:, :, :x.size()[2], :x.size()[3]] x = x + pool3_score x = self.upsample8(x) x = x[:, :, :input_size[0], :input_size[1]] return x
数据处理(dataloader)
数据集名称:VOC2012
数据集下载地址:The PASCAL Visual Object Classes Challenge 2012 (VOC2012)
在这里下载哈,2GB的那个。
这里我已经专门发了一篇博客对语义分割任务常用的数据集做了深入的介绍,已经具体讲解了其实现的处理代码。如果你对语义分割常用数据集有不了解的话,可以先去我的语义分割专栏中进行了解哦!! 我这里就直接附上代码了。
import torchimport numpy as npfrom PIL import Imagefrom torch.utils.data import Dataset, DataLoaderimport osimport randomimport torchvision.transforms as TVOC_CLASSES = [ 'background','aeroplane','bicycle','bird','boat','bottle','bus', 'car','cat','chair','cow','diningtable','dog','horse', 'motorbike','person','potted plant','sheep','sofa','train','tv/monitor']class VOCSegmentation(Dataset): def __init__(self, root, split='train', img_size=320, augment=True): super(VOCSegmentation, self).__init__() self.root = root self.split = split self.img_size = img_size self.augment = augment img_dir = os.path.join(root, 'JPEGImages') mask_dir = os.path.join(root, 'SegmentationClass') split_file = os.path.join(root, 'ImageSets', 'Segmentation', f'{split}.txt') if not os.path.exists(split_file): raise FileNotFoundError(split_file) with open(split_file, 'r') as f: file_names = [x.strip() for x in f.readlines()] self.images = [os.path.join(img_dir, x + '.jpg') for x in file_names] self.masks = [os.path.join(mask_dir, x + '.png') for x in file_names] assert len(self.images) == len(self.masks) print(f"✅ {split} set loaded: {len(self.images)} samples") self.normalize = T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) def __getitem__(self, index): img = Image.open(self.images[index]).convert('RGB') mask = Image.open(self.masks[index]) # mask为P模式(0~20的类别) # Resize img = img.resize((self.img_size, self.img_size), Image.BILINEAR) mask = mask.resize((self.img_size, self.img_size), Image.NEAREST) # 转Tensor img = T.functional.to_tensor(img) mask = torch.from_numpy(np.array(mask)).long() # 0~20 # 数据增强 if self.augment: if random.random() > 0.5: img = T.functional.hflip(img) mask = T.functional.hflip(mask) if random.random() > 0.5: img = T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2)(img) img = self.normalize(img) return img, mask def __len__(self): return len(self.images)def get_dataloader(data_path, batch_size=4, img_size=320, num_workers=4): train_dataset = VOCSegmentation(root=data_path, split='train', img_size=img_size, augment=True) val_dataset = VOCSegmentation(root=data_path, split='val', img_size=img_size, augment=False) train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True, num_workers=num_workers) val_loader = DataLoader(val_dataset, shuffle=False, batch_size=batch_size, pin_memory=True, num_workers=num_workers) return train_loader, val_loader
评价指标(metric)
我们这里语义分割采用的评价指标为:PA(像素准确率),CPA(类别像素准确率),MPA(类别平均像素准确率),IoU(交并比),mIoU(平均交并比),FWIoU(频率加权交并比),mF1(平均F1分数)。
这里我已经专门发了一篇博客对这些平均指标做了深入的介绍,已经具体讲解了其实现的代码。如果你对这些评价指标有不了解的话,可以先去我的语义分割专栏中进行了解哦!! 我这里就直接附上代码了。
import numpy as np__all__ = ['SegmentationMetric']class SegmentationMetric(object): def __init__(self, numClass): self.numClass = numClass self.confusionMatrix = np.zeros((self.numClass,) * 2) def genConfusionMatrix(self, imgPredict, imgLabel): mask = (imgLabel >= 0) & (imgLabel < self.numClass) label = self.numClass * imgLabel[mask] + imgPredict[mask] count = np.bincount(label, minlength=self.numClass ** 2) confusionMatrix = count.reshape(self.numClass, self.numClass) return confusionMatrix def addBatch(self, imgPredict, imgLabel): assert imgPredict.shape == imgLabel.shape self.confusionMatrix += self.genConfusionMatrix(imgPredict, imgLabel) return self.confusionMatrix def pixelAccuracy(self): acc = np.diag(self.confusionMatrix).sum() / self.confusionMatrix.sum() return acc def classPixelAccuracy(self): denominator = self.confusionMatrix.sum(axis=1) denominator = np.where(denominator == 0, 1e-12, denominator) classAcc = np.diag(self.confusionMatrix) / denominator return classAcc def meanPixelAccuracy(self): classAcc = self.classPixelAccuracy() meanAcc = np.nanmean(classAcc) return meanAcc def IntersectionOverUnion(self): intersection = np.diag(self.confusionMatrix) union = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag( self.confusionMatrix) union = np.where(union == 0, 1e-12, union) IoU = intersection / union return IoU def meanIntersectionOverUnion(self): mIoU = np.nanmean(self.IntersectionOverUnion()) return mIoU def Frequency_Weighted_Intersection_over_Union(self): denominator1 = np.sum(self.confusionMatrix) denominator1 = np.where(denominator1 == 0, 1e-12, denominator1) freq = np.sum(self.confusionMatrix, axis=1) / denominator1 denominator2 = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag( self.confusionMatrix) denominator2 = np.where(denominator2 == 0, 1e-12, denominator2) iu = np.diag(self.confusionMatrix) / denominator2 FWIoU = (freq[freq > 0] * iu[freq > 0]).sum() return FWIoU def classF1Score(self): tp = np.diag(self.confusionMatrix) fp = self.confusionMatrix.sum(axis=0) - tp fn = self.confusionMatrix.sum(axis=1) - tp precision = tp / (tp + fp + 1e-12) recall = tp / (tp + fn + 1e-12) f1 = 2 * precision * recall / (precision + recall + 1e-12) return f1 def meanF1Score(self): f1 = self.classF1Score() mean_f1 = np.nanmean(f1) return mean_f1 def reset(self): self.confusionMatrix = np.zeros((self.numClass, self.numClass)) def get_scores(self): scores = { 'Pixel Accuracy': self.pixelAccuracy(), 'Class Pixel Accuracy': self.classPixelAccuracy(), 'Intersection over Union': self.IntersectionOverUnion(), 'Class F1 Score': self.classF1Score(), 'Frequency Weighted Intersection over Union': self.Frequency_Weighted_Intersection_over_Union(), 'Mean Pixel Accuracy': self.meanPixelAccuracy(), 'Mean Intersection over Union(mIoU)': self.meanIntersectionOverUnion(), 'Mean F1 Score': self.meanF1Score() } return scores
训练流程(train)
到这里,所有的前期准备都已经就绪,我们就要开始训练我们的模型了。
def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--data_root', type=str, default='./datasets/VOC2012', help='Dataset root path') parser.add_argument('--classes_name', type=str, default='VOC', help='Dataset class names') parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model') parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head') parser.add_argument('--num_classes', type=int, default=21, help='Number of classes') parser.add_argument('--epochs', type=int, default=50, help='Epochs') parser.add_argument('--lr', type=float, default=0.005, help='Learning rate') parser.add_argument('--momentum', type=float, default=0.9, help='Momentum') parser.add_argument('--weight-decay', type=float, default=1e-4, help='Weight decay') parser.add_argument('--batch_size', type=int, default=8, help='Batch size') parser.add_argument('--checkpoint', type=str, default='./checkpoint', help='Checkpoint directory') parser.add_argument('--resume', type=str, default=None, help='Resume checkpoint path') return parser.parse_args()
首先来看看我们的一些参数的设定,一般我们都是这样放在最前面,能够让人更加快速的了解其代码的一些核心参数设置。首先就是我们的数据集位置(),然后就是我们的数据集名称(classes_name),这个暂时没什么用,因为我们目前只用了VOC数据集,然后就是特征提取网络的选择(backbone),这里我们可以选择不同深度的VGG网络,检测模型的选择(head),我们可以选择不同的fcn模型,数据集的类别数(num_classes),训练epoch数,这个你设置大一点也行,因为我们会在训练过程中保存最好结果的模型的。学习率(lr),动量(momentum),权重衰减(weight-decay),这些都属于模型超参数,大家可以尝试不同的数值,多试试,就会有个大致的了解的,批量大小(batch_size)根据自己电脑性能来设置,一般都是为2的倍数,保存权重的文件夹(checkpoint),是否继续训练(resume)。
def train(args): if not os.path.exists(args.checkpoint): os.makedirs(args.checkpoint) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') n_gpu = torch.cuda.device_count() print(f"Device: {device}, GPUs available: {n_gpu}") # Dataloader train_loader, val_loader = get_dataloader(args.data_root, batch_size=args.batch_size) train_dataset_size = len(train_loader.dataset) val_dataset_size = len(val_loader.dataset) print(f"Train samples: {train_dataset_size}, Val samples: {val_dataset_size}") # Model model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes) model.to(device) # Loss + Optimizer + Scheduler criterion = nn.CrossEntropyLoss(ignore_index=255) #optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) optimizer = torch.optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) scaler = torch.cuda.amp.GradScaler() # Resume start_epoch = 0 best_miou = 0.0 if args.resume and os.path.isfile(args.resume): print(f"Loading checkpoint '{args.resume}'") checkpoint = torch.load(args.resume) start_epoch = checkpoint['epoch'] best_miou = checkpoint['best_miou'] model.load_state_dict(checkpoint['model_state_dict']) optimizer.load_state_dict(checkpoint['optimizer_state_dict']) scheduler.load_state_dict(checkpoint['scheduler_state_dict']) print(f"Loaded checkpoint (epoch {start_epoch})") # Training history history = { 'train_loss': [], 'val_loss': [], 'pixel_accuracy': [], 'miou': [] } print(f"🚀 Start training ({args.head})") for epoch in range(start_epoch, args.epochs): model.train() train_loss = 0.0 t0 = time.time() for images, masks in tqdm(train_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Train]'): images = images.to(device) masks = masks.to(device) optimizer.zero_grad() with torch.cuda.amp.autocast(): outputs = model(images) loss = criterion(outputs, masks) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() train_loss += loss.item() * images.size(0) train_loss /= train_dataset_size history['train_loss'].append(train_loss) # Validation model.eval() val_loss = 0.0 evaluator = SegmentationMetric(args.num_classes) with torch.no_grad(): for images, masks in tqdm(val_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Val]'): images = images.to(device) masks = masks.to(device) outputs = model(images) loss = criterion(outputs, masks) val_loss += loss.item() * images.size(0) predictions = torch.argmax(outputs, dim=1) if isinstance(predictions, torch.Tensor): predictions = predictions.cpu().numpy() if isinstance(masks, torch.Tensor): masks = masks.cpu().numpy() evaluator.addBatch(predictions, masks) val_loss /= val_dataset_size history['val_loss'].append(val_loss) scores = evaluator.get_scores() print(f"\n📈 Validation Epoch {epoch+1}:") for k, v in scores.items(): if isinstance(v, np.ndarray): print(f"{k}: {np.round(v, 3)}") else: print(f"{k}: {v:.4f}") history['pixel_accuracy'].append(scores['Pixel Accuracy']) history['miou'].append(scores['Mean Intersection over Union(mIoU)']) # Save best if scores['Mean Intersection over Union(mIoU)'] > best_miou: best_miou = scores['Mean Intersection over Union(mIoU)'] torch.save({ 'epoch': epoch + 1, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'scheduler_state_dict': scheduler.state_dict(), 'best_miou': best_miou, }, os.path.join(args.checkpoint, f'{args.head}_best.pth')) print(f"Saved best model! mIoU: {best_miou:.4f}") scheduler.step() print(f"🕒 Epoch time: {time.time() - t0:.2f}s\n") print("🎉 Training complete!")
然后就是我们的训练流程了。训练流程也是有套路的哦,我们该怎么去搭建一个更好的训练流程,可以从多方面入手的。
首先我们确定我们的代码运行设备,基本上都是要GPU的。然后就是加载我们处理好的数据,这里就是dataloader的那部分了,加载好数据之后,我们加载我们构建好的模型,这就是我们在model那部分做的工作。然后就是loss函数,Optimizer 和 Scheduler,这是我们比较重要的几个部分。loss函数的选择有很多,不同的loss函数在一定程度上会决定我们的模型收敛好坏,像语义分割任务就基本上都是用交叉熵损失函数了。Optimizer 也有很多,SGD,Adam之类的,都可以去尝试下。Scheduler就是我们的学习策略,学习率的更新,希望一开始学习率大,训练到后期学习率小,这样加速收敛,避免震荡。然后还有个scaler,这是AMP(自动混合精度训练),能够节省我们的内存,让我们的小电脑也能跑起来模型。
还有个断点重训功能,为了避免因为一些意外的情况导致训练中断,可能这是我们训练好久的结果,所以我们可以通过这个功能继续从断点进行训练。然后就是训练了,我们加载数据,通过模型的预测与mask得到损失,然后梯度误差反传,更新模型参数。当一个epoch中的数据都训练结束之后,我们就需要评估下我们的模型怎么样了,这里就是根据我们的评价指标进行评价,其中我们标记best_mIoU,当更好的时候就重新保存模型文件。
最后当训练结束后我们就会获得最好的模型参数的文件了。
完整代码:
import argparseimport osimport timefrom tqdm import tqdmimport numpy as npimport torchimport torch.nn as nnfrom datasets.VOC_dataloader import get_dataloaderfrom model import get_modelfrom metric import SegmentationMetricdef parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--data_root', type=str, default='./datasets/VOC2012', help='Dataset root path') parser.add_argument('--classes_name', type=str, default='VOC', help='Dataset class names') parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model') parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head') parser.add_argument('--num_classes', type=int, default=21, help='Number of classes') parser.add_argument('--epochs', type=int, default=50, help='Epochs') parser.add_argument('--lr', type=float, default=0.005, help='Learning rate') parser.add_argument('--momentum', type=float, default=0.9, help='Momentum') parser.add_argument('--weight-decay', type=float, default=1e-4, help='Weight decay') parser.add_argument('--batch_size', type=int, default=8, help='Batch size') parser.add_argument('--checkpoint', type=str, default='./checkpoint', help='Checkpoint directory') parser.add_argument('--resume', type=str, default=None, help='Resume checkpoint path') return parser.parse_args()def train(args): if not os.path.exists(args.checkpoint): os.makedirs(args.checkpoint) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') n_gpu = torch.cuda.device_count() print(f"Device: {device}, GPUs available: {n_gpu}") # Dataloader train_loader, val_loader = get_dataloader(args.data_root, batch_size=args.batch_size) train_dataset_size = len(train_loader.dataset) val_dataset_size = len(val_loader.dataset) print(f"Train samples: {train_dataset_size}, Val samples: {val_dataset_size}") # Model model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes) model.to(device) # Loss + Optimizer + Scheduler criterion = nn.CrossEntropyLoss(ignore_index=255) #optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) optimizer = torch.optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) scaler = torch.cuda.amp.GradScaler() # Resume start_epoch = 0 best_miou = 0.0 if args.resume and os.path.isfile(args.resume): print(f"Loading checkpoint '{args.resume}'") checkpoint = torch.load(args.resume) start_epoch = checkpoint['epoch'] best_miou = checkpoint['best_miou'] model.load_state_dict(checkpoint['model_state_dict']) optimizer.load_state_dict(checkpoint['optimizer_state_dict']) scheduler.load_state_dict(checkpoint['scheduler_state_dict']) print(f"Loaded checkpoint (epoch {start_epoch})") # Training history history = { 'train_loss': [], 'val_loss': [], 'pixel_accuracy': [], 'miou': [] } print(f"🚀 Start training ({args.head})") for epoch in range(start_epoch, args.epochs): model.train() train_loss = 0.0 t0 = time.time() for images, masks in tqdm(train_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Train]'): images = images.to(device) masks = masks.to(device) optimizer.zero_grad() with torch.cuda.amp.autocast(): outputs = model(images) loss = criterion(outputs, masks) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() train_loss += loss.item() * images.size(0) train_loss /= train_dataset_size history['train_loss'].append(train_loss) # Validation model.eval() val_loss = 0.0 evaluator = SegmentationMetric(args.num_classes) with torch.no_grad(): for images, masks in tqdm(val_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Val]'): images = images.to(device) masks = masks.to(device) outputs = model(images) loss = criterion(outputs, masks) val_loss += loss.item() * images.size(0) predictions = torch.argmax(outputs, dim=1) if isinstance(predictions, torch.Tensor): predictions = predictions.cpu().numpy() if isinstance(masks, torch.Tensor): masks = masks.cpu().numpy() evaluator.addBatch(predictions, masks) val_loss /= val_dataset_size history['val_loss'].append(val_loss) scores = evaluator.get_scores() print(f"\n📈 Validation Epoch {epoch+1}:") for k, v in scores.items(): if isinstance(v, np.ndarray): print(f"{k}: {np.round(v, 3)}") else: print(f"{k}: {v:.4f}") history['pixel_accuracy'].append(scores['Pixel Accuracy']) history['miou'].append(scores['Mean Intersection over Union(mIoU)']) # Save best if scores['Mean Intersection over Union(mIoU)'] > best_miou: best_miou = scores['Mean Intersection over Union(mIoU)'] torch.save({ 'epoch': epoch + 1, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'scheduler_state_dict': scheduler.state_dict(), 'best_miou': best_miou, }, os.path.join(args.checkpoint, f'{args.head}_best.pth')) print(f"Saved best model! mIoU: {best_miou:.4f}") scheduler.step() print(f"🕒 Epoch time: {time.time() - t0:.2f}s\n") print("🎉 Training complete!")if __name__ == '__main__': args = parse_arguments() train(args)
模型测试(test)
这里就到了我们的最后一步了,测试我们的模型。
def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--image_dir', type=str, default='./datasets/test', help='Input image or folder') parser.add_argument('--checkpoint', type=str, default='./checkpoint/fcn8s_best.pth', help='Checkpoint path') parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model') parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head') parser.add_argument('--num_classes', type=int, default=21, help='Number of classes') parser.add_argument('--save_dir', type=str, default='./predictions', help='Directory to save results') parser.add_argument('--overlay', type=bool, default=True, help='Save overlay image') return parser.parse_args()
同样的来看,我们所需要的一些参数设定哈!我们所需要进行测试的图片文件夹(image_dir),我们训练时候所保存的权重文件夹(checkpoint),我们选择的特征提取网络(backbone),我们使用的检测模型(head),还有数据集的类别数(num_classes),保持的结果的文件夹(save_dir),还要个非常重要的参数,是否将预测图覆盖在原图上(overlay),通过这个我们可以更好的看语义分割的效果怎么样。
def load_image(image_path): image = Image.open(image_path).convert('RGB') transform = T.Compose([ T.ToTensor(), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) return transform(image).unsqueeze(0), image # tensor, PIL image#把类别mask ➔ 彩色图 (用VOC_COLORMAP)def mask_to_color(mask): color_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8) for label in range(len(VOC_COLORMAP)): color_mask[mask == label] = VOC_COLORMAP[label] return color_maskdef save_mask(mask, save_path): color_mask = mask_to_color(mask) Image.fromarray(color_mask).save(save_path)def overlay_mask_on_image(raw_image, mask, alpha=0.6): mask_color = mask_to_color(mask) mask_pil = Image.fromarray(mask_color) mask_pil = mask_pil.resize(raw_image.size, resample=Image.NEAREST) blended = Image.blend(raw_image, mask_pil, alpha=alpha) return blended
然后来看测试过程中会用到的一些函数,当然测试首先肯定要加载我们的图片呐。注意看这里有个细节,加载图片的时候我们进行了标准化的,为什么这么做?因为我们在训练模型的时候,图片就进行了标准化的操作,所有测试图片,我们肯定要保持图片和训练时候的条件一样。然后为了更好的可视化,我们需要将预测的mask图转换为彩色图。根据VOC_COLORMAP的颜色进行转换即可。还有个overlay_mask_on_image函数,通过将预测的可视化图与原图进行叠加混合能够让我们更加直观。
def predict(args): device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Device: {device}") # 模型 model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes) checkpoint = torch.load(args.checkpoint, map_location=device) model.load_state_dict(checkpoint['model_state_dict']) model.to(device) model.eval() os.makedirs(args.save_dir, exist_ok=True) # 预测单张 or 批量 if os.path.isdir(args.image): image_list = [os.path.join(args.image, f) for f in os.listdir(args.image) if f.lower().endswith(('jpg', 'png', 'jpeg'))] else: image_list = [args.image] print(f"🔎 Found {len(image_list)} images to predict.") for img_path in tqdm(image_list): img_tensor, raw_img = load_image(img_path) img_tensor = img_tensor.to(device) with torch.no_grad(): output = model(img_tensor) pred = torch.argmax(output.squeeze(), dim=0).cpu().numpy() # 保存 mask base_name = os.path.basename(img_path).split('.')[0] mask_save_path = os.path.join(args.save_dir, f"{base_name}_mask.png") save_mask(pred, mask_save_path) # 保存 overlay if args.overlay: overlay_img = overlay_mask_on_image(raw_img, pred) overlay_save_path = os.path.join(args.save_dir, f"{base_name}_overlay.png") overlay_img.save(overlay_save_path) print(f"Saved: {mask_save_path}") if args.overlay: print(f"Saved overlay: {overlay_save_path}") print("🎉 Prediction done!")
然后就到了预测环节,其实流程跟train的流程差不多,但是不在需要像train的时候什么梯度反传更新参数了,直接预测得出结果然后保存即可。
首先确定设备哈,一般都是GPU的,然后就是就是加载数据和模型了,最后预测保存结果即可,这些代码应该还是比较容易理解的,直接看代码即可。
完整代码:
import argparseimport osimport torchimport numpy as npfrom PIL import Imagefrom tqdm import tqdmfrom model import get_modelimport torchvision.transforms as Tfrom datasets import *def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--image_dir', type=str, default='./datasets/test', help='Input image or folder') parser.add_argument('--checkpoint', type=str, default='./checkpoint/fcn8s_best.pth', help='Checkpoint path') parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model') parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head') parser.add_argument('--num_classes', type=int, default=21, help='Number of classes') parser.add_argument('--save_dir', type=str, default='./predictions', help='Directory to save results') parser.add_argument('--overlay', type=bool, default=True, help='Save overlay image') return parser.parse_args()def load_image(image_path): image = Image.open(image_path).convert('RGB') transform = T.Compose([ T.ToTensor(), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) return transform(image).unsqueeze(0), image # tensor, PIL image#把类别mask ➔ 彩色图 (用VOC_COLORMAP)def mask_to_color(mask): color_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8) for label in range(len(VOC_COLORMAP)): color_mask[mask == label] = VOC_COLORMAP[label] return color_maskdef save_mask(mask, save_path): color_mask = mask_to_color(mask) Image.fromarray(color_mask).save(save_path)def overlay_mask_on_image(raw_image, mask, alpha=0.6): mask_color = mask_to_color(mask) mask_pil = Image.fromarray(mask_color) mask_pil = mask_pil.resize(raw_image.size, resample=Image.NEAREST) blended = Image.blend(raw_image, mask_pil, alpha=alpha) return blendeddef predict(args): device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Device: {device}") # 模型 model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes) checkpoint = torch.load(args.checkpoint, map_location=device) model.load_state_dict(checkpoint['model_state_dict']) model.to(device) model.eval() os.makedirs(args.save_dir, exist_ok=True) # 预测单张 or 批量 if os.path.isdir(args.image): image_list = [os.path.join(args.image, f) for f in os.listdir(args.image) if f.lower().endswith(('jpg', 'png', 'jpeg'))] else: image_list = [args.image] print(f"🔎 Found {len(image_list)} images to predict.") for img_path in tqdm(image_list): img_tensor, raw_img = load_image(img_path) img_tensor = img_tensor.to(device) with torch.no_grad(): output = model(img_tensor) pred = torch.argmax(output.squeeze(), dim=0).cpu().numpy() # 保存 mask base_name = os.path.basename(img_path).split('.')[0] mask_save_path = os.path.join(args.save_dir, f"{base_name}_mask.png") save_mask(pred, mask_save_path) # 保存 overlay if args.overlay: overlay_img = overlay_mask_on_image(raw_img, pred) overlay_save_path = os.path.join(args.save_dir, f"{base_name}_overlay.png") overlay_img.save(overlay_save_path) print(f"Saved: {mask_save_path}") if args.overlay: print(f"Saved overlay: {overlay_save_path}") print("🎉 Prediction done!")if __name__ == '__main__': args = parse_arguments() predict(args)
效果图
我就训练了50个epoch,效果还行,效果图如下所示
结语
希望上列所述内容对你有所帮助,如果有错误的地方欢迎大家批评指正!
并且如果可以的话希望大家能够三连鼓励一下,谢谢大家!
如果你觉得讲的还不错想转载,可以直接转载,不过麻烦指出本文来源出处即可,谢谢!