前言
之前已经给大家详解解析了ResNet的原理,其是在什么背景下产生的,这对我们其实有很重要的意义,只有我们了解当时的研究情况,就不会觉得ResNet的出现会突兀,会给我们带来许多思考,如果没看过的,一定先去看看原理篇:一文带你看透什么是ResNet - carpell - 博客园
这里将带大家手撕ResNet代码,小白也没事,一样也能听懂看懂代码的运行逻辑,逻辑一定是最重要的,只有了解其逻辑才有可能自己来复现代码。我们这里就以花的图像分类为例子带大家来看看ResNet代码复现的细节。其与pytorch的官方代码是差不多的。如果可以的话,大家一定要自己尝试去复现一样,自己写的过程中更能够加深自己的理解。
全部的代码位于:fouen6/image_classification_ResNet: 基于resnet的图片分类(pytorch)下载下来可直接运行复现(只需要配好pytorch环境就行了)
model
BasicBlock 和Bottleneck
首先我们来看ResNet最经典的残差结构,其有两种形式BasicBlock和Bottleneck,其中BasicBlock用于浅层网络,Bottleneck用于深层网络。具体的细节就不讲述了。我们来看代码现的细节。
首先来看BasicBlock部分的代码,在每一个BasicBlock块中,有两层卷积结构,并且都是3x3的卷积,然后还有我们用来处理梯度消失梯度爆炸的BN层,还有我们用来提取特征增加非线性变换的激活层,所以我们就能够很清晰的知道,每个BasicBlock的构成就是(卷积BN激活)x2,同时在输出时进行相加,保证恒等映射,即增加残差结构(out += identity)。还有个注意的地方,第二个激活层是要在(out += identity)之后的。这就是最主要的了,还有些别的细节,再来看,有个参数是expansion,这是干嘛的呢?就是在残差块中进行通道变换的,图中也能看出,输入与输出的通道都是64,所以在这expansion是为1的。这就不难看出后面的Bottleneck中的expansion参数就是为4了,即通道变成输入的4倍。
class BasicBlock(nn.Module): expansion = 1 def __init__(self,in_planes,planes,stride=1,downsample=None,norm_layer=nn.BatchNorm2d): super(BasicBlock,self).__init__() self.conv1 = conv3x3(in_planes,planes,stride) self.bn1 = norm_layer(planes) self.relu = nn.ReLU(inplace=True) self.conv2 = conv3x3(planes,planes) self.bn2 = norm_layer(planes) self.downsample = downsample self.stride = stride def forward(self,x): identity = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) if self.downsample is not None: identity = self.downsample(x) out += identity out = self.relu(out) return out
然后再来看Bottleneck部分的代码:组成与逻辑上都是与BasicBlock块相似的,一样的部分我就不说了,主要看不同的地方。首先经过一个1x1的卷积,作用是减少通道数,然后在3x3卷积提取特征,在经过1x1的卷积,增加通道数变回原来的通道。先降维再升维的目的就是降低参数量。所以逻辑上很清楚了,按照刚才所说的搭建整个Bottleneck部分就行了。
class Bottleneck(nn.Module): expansion = 4 def __init__(self,in_planes,planes,stride=1,downsample=None,norm_layer=nn.BatchNorm2d): super(Bottleneck,self).__init__() self.conv1 = conv1x1(in_planes,planes,stride) self.bn1 = norm_layer(planes) self.conv2 = conv3x3(planes,planes,stride) self.bn2 = norm_layer(planes) self.conv3 = conv1x1(planes,planes * self.expansion) self.bn3 = norm_layer(planes * self.expansion) self.relu = nn.ReLU(inplace=True) self.downsample = downsample self.stride = stride def forward(self,x): identity = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.relu(out) out = self.conv3(out) out = self.bn3(out) if self.downsample is not None: identity = self.downsample(x) out += identity out = self.relu(out) return out
有个参数downsample一直没讲,因为两部分都有,在这里着重讲一下。其作用就是下采样,当identity与out两者shape不同时,统一shape的。什么时候会用到这个参数呢?看图虚线的地方就代表发生了downsample。要知道我们是将ResNet模块化搭建的,会有着4个layer层,而每个layer层会有着多个的BasicBlock或者是Bottleneck,但是每个layer层的shape是不同的,是向下递减的,举个例子,比如第一个layer是112,那么后续就会变成56,28,14,逐渐下降的。注意这是shape逐渐下降的,但是通道是每个layer变多的。那么不同layer的层的shape不同,不同层之间如何进行恒等相加呢?所以这里就设置了downsample参数,并且其只在每个layer的第一个BasicBlock或者是Bottleneck使用到的。
ResNet
这部分就比较简单了,对于ResNet,首先经过一个初步的特征编码,7x7的卷积,BN层,激活层,池化层,然后就是四个layer层的搭建,最后就是全局池化和全连接层了。这是整体的架构的搭建,在来看一些细节的处理,对于我们所搭建的网络模型,我们肯定是需要进行参数的初始化的,对于卷积层的参数采用凯明初始化,BN层的参数初始化为权重为1,偏置为0。还有个不同的地方就是zero_init_residual的设置,我们将每个BasicBlock或者是Bottleneck的最后一个BN层的权重初始化为0。因为这样是能够对网络的性能有所提升的。还有make_layer函数的部分,我们上面说了只让每个layer的第一个BasicBlock或者是Bottleneck有downsample参数,那么如何控制呢?就是通过stride不为1来让其有downsample参数。
class ResNet(nn.Module): def __init__(self,block,layers,num_classes=1000,zero_init_residual=False,norm_layer=nn.BatchNorm2d): super(ResNet,self).__init__() self.in_planes = 64 self.conv1 = nn.Conv2d(3,64,kernel_size=7,stride=1,padding=3,bias=False) self.bn1 = norm_layer(64) self.relu = nn.ReLU(inplace=True) self.maxpool = nn.MaxPool2d(kernel_size=3,stride=2,padding=1) self.layer1 = self._make_layer(block,64,layers[0],norm_layer=norm_layer) self.layer2 = self._make_layer(block,128,layers[1],stride=2,norm_layer=norm_layer) self.layer3 = self._make_layer(block,256,layers[2],stride=2,norm_layer=norm_layer) self.layer4 = self._make_layer(block,512,layers[3],stride=2,norm_layer=norm_layer) self.avgpool = nn.AdaptiveAvgPool2d((1,1)) self.fc = nn.Linear(512*block.expansion,num_classes) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight,mode='fan_out',nonlinearity='relu') if isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight,1) nn.init.constant_(m.bias,0) if zero_init_residual: for m in self.modules(): if isinstance(m, Bottleneck): nn.init.constant_(m.bn3.weight,0) if isinstance(m, BasicBlock): nn.init.constant_(m.bn2.weight,0) def _make_layer(self,block,planes,num_blocks,stride=1,norm_layer=nn.BatchNorm2d): downsample = None if stride != 1 or self.in_planes != planes *block.expansion: downsample = nn.Sequential( conv1x1(self.in_planes,planes * block.expansion,stride), norm_layer(planes * block.expansion), ) layers = [] layers.append(block(self.in_planes,planes,stride,downsample,norm_layer)) self.in_planes = planes * block.expansion for i in range(1,num_blocks): layers.append(block(self.in_planes,planes)) return nn.Sequential(*layers) def forward(self,x): x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.maxpool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.avgpool(x) x = x.view(x.size(0),-1) x = self.fc(x) return x
ResNet18\34\50\101\152
然后不同深度的ResNet网络,就是通过控制其是使用BasicBlock或者是Bottleneck,还有每层具体的数量来控制,所以我们可以搭建多个不同深度的ResNet模型。
def resnet18(pretrained=False,**kwargs): model = ResNet(BasicBlock,[2,2,2,2],**kwargs) if pretrained: model.load_state_dict(model_zoo.load_url(model_urls['resnet18'])) return modeldef resnet34(pretrained=False,**kwargs): model = ResNet(BasicBlock,[3,4,6,4],**kwargs) if pretrained: model.load_state_dict(model_zoo.load_url(model_urls['resnet34'])) return modeldef resnet50(pretrained=False,**kwargs): model = ResNet(BasicBlock,[3,4,6,3],**kwargs) if pretrained: model.load_state_dict(model_zoo.load_url(model_urls['resnet50'])) return modeldef resnet101(pretrained=False,**kwargs): model = ResNet(BasicBlock,[3,4,23,3],**kwargs) if pretrained: model.load_state_dict(model_zoo.load_url(model_urls['resnet101'])) return modeldef resnet152(pretrained=False,**kwargs): model = ResNet(BasicBlock,[3,8,36,3],**kwargs) if pretrained: model.load_state_dict(model_zoo.load_url(model_urls['resnet152'])) return model
data
文件: split.py
功能: 数据集划分脚本。将原始数据集 flower_photos 划分为 train 和 test 两个数据集,并更改图片size=224x224
数据集下载地址:http://download.tensorflow.org/example_images/flower_photos.tgz
数据集保存路径: 根目录 \ dataset \ flower_photos
首先我们要知道我们如何去处理数据的逻辑,首先我们得知道,我们该以什么样的比例来划分数据集,然后就是我们将其处理到224的大小来便于ResNet的处理提取,其实这一步有没有都行,后续训练数据处理的时候也可以进行。所以就是代码实现我们的逻辑就行了,确定划分比例为train:test=8:2。读取我们的文件夹数据,然后逐步处理图片大小,划分训练集测试集。
import osimport globimport randomimport cv2import numpy as npif __name__ == '__main__': split_rate = 0.2 # 训练集和验证集划分比率 resize_image = 224 # 图片缩放后统一大小 file_path = './flower_photos' # 获取原始数据集路径 # 找到文件中所有文件夹的目录,即类文件夹名 dirs = glob.glob(os.path.join(file_path, '*')) dirs = [d for d in dirs if os.path.isdir(d)] print("Totally {} classes: {}".format(len(dirs), dirs)) # 打印花类文件夹名称 for path in dirs: # 对每个类别进行单独处理 path = path.split('\\')[-1] # -1表示以分隔符/保留后面的一段字符 # 在根目录中创建两个文件夹,train/test os.makedirs("train\\{}".format(path), exist_ok=True) os.makedirs("test\\{}".format(path), exist_ok=True) # 读取原始数据集中path类中对应类型的图片,并添加到files中 files = glob.glob(os.path.join(file_path, path, '*jpg')) files += glob.glob(os.path.join(file_path, path, '*jpeg')) files += glob.glob(os.path.join(file_path, path, '*png')) random.shuffle(files) # 打乱图片顺序 split_boundary = int(len(files) * split_rate) # 训练集和测试集的划分边界 for i, file in enumerate(files): img = cv2.imread(file) # 更改原始图片尺寸 old_size = img.shape[:2] # (height, width) ratio = float(resize_image) / max(old_size) # 通过最长的size计算原始图片缩放比率 # 把原始图片最长的size缩放到resize_pic,短的边等比率缩放,等比例缩放不会改变图片的原始长宽比 new_size = tuple([int(x * ratio) for x in old_size]) im = cv2.resize(img, (new_size[1], new_size[0])) # 更改原始图片的尺寸 new_im = np.zeros((resize_image, resize_image, 3), dtype=np.uint8) # 创建一个resize_pic尺寸的黑色背景 # 把新图片im贴到黑色背景上,并通过'地板除//'设置居中放置 x_start = (resize_image - new_size[1]) // 2 y_start = (resize_image - new_size[0]) // 2 new_im[y_start:y_start + new_size[0], x_start:x_start + new_size[1]] = im # 打印处理进度 print("Processing file {} of {}: {}".format(i + 1, len(files), file)) # 先划分0.2_rate的测试集,剩下的再划分为0.8_ate的训练集,同时直接更改图片后缀为.jpg if i < split xss=removed xss=removed>
train
怎么写train文件,先确定我们整体的逻辑:首先设备的选择gpu确定,然后数据的处理读入,然后模型的搭建(这里有个细节就是,我们选择加载其预训练模型,但是其网络的结构是不对的,因为我们输出的只有5个类,原来有1000,所以我们先加载预训练模型,然后更换全连接层设置输出类别为我们自己的类别),以及优化器,损失函数的确定,接着就是每个epoch训练的逻辑,读取训练数据,模型预测,损失计算,梯度反传更新网络。训练数据结束后我们评估当前epoch的模型训练成果,进行评价,读取测试数据预测评估判断准确率。保存准确率最高的模型的参数。
以下就是将代码的实现的细节过程,知道了整体逻辑再去看代码,思路就会清晰很多,其实这是train文件的整体的设计思路,基本上都是按照这个逻辑来的,当然其中的细节你可以继续优化如何去做,但是整体上的逻辑就基本上都是这样的。再结合代码仔细看看。
import osimport argparseimport sysimport torch.nn as nnimport torchimport timeimport torchvision.transforms as transformsimport torchvision.datasets as datasetsimport jsonimport torch.utils.data as Datafrom tqdm import tqdmfrom model import *import torch.optim as optimdef get_argparse(): parser = argparse.ArgumentParser() parser.add_argument('--epochs',type=int,default=10,help='number of epochs') parser.add_argument('--batch_size',type=int,default=8,help='batch size') parser.add_argument('--data_path',type=str,default='./dataset/',help='path to dataset') parser.add_argument('--model',type=str,default='resnet18',help='model name') parser.add_argument('--lr',type=float,default=0.001,help='learning rate') parser.add_argument('--save_dir',type=str,default='./checkpoint/',help='save .pth') parser.add_argument('--num_classes',type=int,default=5,help='number of classes') return parserdef train(args): device = torch.device("cuda:0" if torch.cuda.is_available() else"cpu") print('Use device:', device) train_transform = transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) val_transform = transforms.Compose([ transforms.Resize(256), # 图像缩放 transforms.CenterCrop(224), # 中心裁剪 transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) train_dataset = datasets.ImageFolder(root=os.path.join(args.data_path, 'train'), transform=train_transform) train_num = len(train_dataset) val_dataset = datasets.ImageFolder(root=os.path.join(args.data_path, 'test'), transform=val_transform) val_num = len(val_dataset) flower_list = train_dataset.class_to_idx class_dict = dict((val,key) for key,val in flower_list.items()) json_str = json.dumps(class_dict,indent=4) with open('class_indices.json','w') as json_file: json_file.write(json_str) num_workers = min([os.cpu_count(), args.batch_size if args.batch_size > 1 else 0, 8]) print("Using batch_size={} dataloader worker every process.".format(num_workers)) train_loader = Data.DataLoader(train_dataset,batch_size=args.batch_size,shuffle=True,num_workers=num_workers) val_loader = Data.DataLoader(val_dataset,batch_size=args.batch_size,num_workers=num_workers,shuffle=False) print('Number of training images:{}, Number of validation images:{}'.format(train_num,val_num)) model = get_model(args.model) num_ftrs = model.fc.in_features # 获取全连接层的输入特征数量 model.fc = torch.nn.Linear(num_ftrs, len(flower_list)) # 修改输出维度为5 model = model.cuda() loss_function = nn.CrossEntropyLoss() params = [p for p in model.parameters() if p.requires_grad] optimizer = optim.Adam(params,args.lr) batch_num = len(train_loader) total_time = 0 best_acc = 0 for epoch in range(args.epochs): start_time = time.perf_counter() model.train() train_loss = 0 train_bar = tqdm(train_loader,file=sys.stdout) for step,data in enumerate(train_bar): train_images,train_labels = data train_images = train_images.to(device) train_labels = train_labels.to(device) optimizer.zero_grad() outputs = model(train_images) loss = loss_function(outputs,train_labels) loss.backward() optimizer.step() train_loss += loss.item() train_bar.desc ="train eopch[{}/{}] loss: {:.3f}".format(epoch+1,args.epochs,loss) model.eval() val_acc = 0 var_bar = tqdm(val_loader,file=sys.stdout) with torch.no_grad(): for val_data in var_bar: val_images, val_labels = val_data val_images = val_images.to(device) val_labels = val_labels.to(device) val_y=model(val_images) pred_y = torch.max(val_y,1)[1] val_acc += torch.eq(pred_y,val_labels).sum().item() var_bar.desc ="val eopch[{}/{}]".format(epoch+1,args.epochs) val_accurate = val_acc / val_num print("[epoch {:.0f}] train_loss: {:.3f} val_accuracy: {:.3f}".format(epoch+1,train_loss/batch_num,val_accurate)) epoch_time = time.perf_counter()-start_time print("epoch_time:{}".format(epoch_time)) total_time += epoch_time print() if val_accurate > best_acc: best_acc = val_accurate torch.save(model.state_dict(),os.path.join(args.save_dir,args.model+'_best.pth')) m,s = divmod(total_time,60) h,m = divmod(m,60) print("total time:{:0f}:{:0f}:{:0f}".format(h,m,s)) print("Finished Training!")if __name__ == '__main__': args = get_argparse().parse_args() train(args)
test
测试代码的就是测试我们的模型预测结果了。其实这里的逻辑实现就跟我们train里面的评价阶段是类似的。但是这里会多出可视化的步骤。具体的代码细节看看下面,你就能理解了。
import argparseimport torchimport torch.nn as nnfrom matplotlib import pyplot as pltfrom model import *import torchvision.transforms as transformsimport cv2import osimport jsondef get_argparse(): parser = argparse.ArgumentParser() parser.add_argument('--data_path',type=str,default='./dataset/show',help='path to dataset') parser.add_argument('--model',type=str,default='resnet18',help='model name') parser.add_argument('--checkpoint',type=str,default='./checkpoint/resnet18_best.pth',help='checkpoint path') parser.add_argument('--num_classes',type=int,default=5,help='number of classes') return parserdef main(): args = get_argparse().parse_args() device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') data_transform = transforms.Compose([ transforms.ToPILImage(), transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]), ]) image_path = [os.path.join(args.data_path,f) for f in os.listdir(args.data_path) if f.endswith('.jpg')] image_path = image_path[:10] class_indict = json.load(open("./class_indices.json")) model = get_model(args.model) num_ftrs = model.fc.in_features # 获取全连接层的输入特征数量 model.fc = torch.nn.Linear(num_ftrs, args.num_classes) model.load_state_dict(torch.load(args.checkpoint, map_location=device)) model.to(device) model.eval() fig , axes = plt.subplots(2,5,figsize=(15,6)) axes = axes.flatten() for idx, image_path in enumerate(image_path): img = cv2.imread(image_path) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = data_transform(img).unsqueeze(0) img = img.to(device) with torch.no_grad(): output = model(img) predict = torch.softmax(output, dim=1) pred = torch.argmax(predict,dim=1).cpu().numpy() class_name = class_indict[str(pred[0])] # 获取类别名称 prob = predict[0, pred[0]].item() # 获取预测概率 print_res ="class: {} prob: {:.3f}".format(class_name, prob) # 显示图片和标题 mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1).to(device) std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1).to(device) img_for_display = img.squeeze().mul(std).add(mean).permute(1, 2, 0).cpu().numpy() # 调整通道顺序并移动到 CPU axes[idx].imshow(img_for_display) # 显示图像 axes[idx].set_title(print_res) axes[idx].axis('off') # 不显示坐标轴 # 隐藏多余的子图(如果有) for idx in range(len(image_path), len(axes)): axes[idx].axis('off') plt.tight_layout() plt.show()if __name__ == '__main__': main()
代码运行以及测试结果
如果你认真看到这,恭喜你,已经对如何利用ResNet模型去进行图片分类任务有了一个比较详细的认知了,以下就是我的代码的运行以及测试的结果。
完整运行代码地址(下载即可用):fouen6/image_classification_ResNet: 基于resnet的图片分类(pytorch)
代码的整体架构就是
首先运行split.py进行数据集的划分,然后就是运行train.py训练模型网络,最后可以通过test.py测试模型。按这个顺序可完整实现上述功能。如有讲的不够好不对的地方欢迎批评指正。