mobilenet是一种轻量级网络,主要是想用于实时检测,那么那么它的优点就是训练速度快。它的设计点主要是深度卷积和逐点卷积,外加对通道数和分辨率加以改变,大幅度减少参数量。
第一步,你想要了解 Depthwise Conv 这个计算,它的核心在与逐点卷积,在nn.Conv2d的参数中有groups这个参数,默认是groups=1,意识是分组计算,等于一是就是普通的卷积,当时设置为groups = input_channels,就是深度可分离卷积的depthwise conv,给大家看一下简单的计算了解下,输出的是torch.Size([16, 2, 3, 3]),它的含义是将卷积核分成二组,分别卷积,最后将结果cat。当groups=4,那就是深度可分离卷积。
m = nn.Conv2d(4,16,3,1,1,groups=2)
print(m.weight.size())
所以整个Depthwise Conv模块写成一个函数,对应上图的结构,这里的groups=in_channels就是深度可分离卷积了。
def _conv_dw(self, in_channels, out_channels, stride):
return nn.Sequential(
nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
nn.BatchNorm2d(in_channels),
nn.ReLU(),
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(),
)
这个就是正常的普通卷积了,按图中的顺序写出来就行了。
def _conv_st(self, in_channels, out_channels, stride):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(),
)
全部的结构按照图中对应的来设置卷积,中间有重复使用的层设置一个函数循环即可
def _conv_x5(self, in_channel, out_channel, blocks):
layers = []
for i in range(blocks):
layers.append(self._conv_dw(in_channel, out_channel, 1))
return nn.Sequential(*layers)
全部代码
import torch
import torch.nn as nn
class MobleNetV1(nn.Module):
def __init__(self, num_classes):
super(MobleNetV1, self).__init__()
self.conv1 = self._conv_st(3, 32, 2)
self.conv_dw1 = self._conv_dw(32, 64, 1)
self.conv_dw2 = self._conv_dw(64, 128, 2)
self.conv_dw3 = self._conv_dw(128, 128, 1)
self.conv_dw4 = self._conv_dw(128, 256, 2)
self.conv_dw5 = self._conv_dw(256, 256, 1)
self.conv_dw6 = self._conv_dw(256, 512, 2)
self.conv_dw_x5 = self._conv_x5(512, 512, 5)
self.conv_dw7 = self._conv_dw(512, 1024, 2)
self.conv_dw8 = self._conv_dw(1024, 1024, 1)
self.avgpool = nn.AvgPool2d(kernel_size=7, stride=1)
self.fc = nn.Linear(1024, num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.conv_dw1(x)
x = self.conv_dw2(x)
x = self.conv_dw3(x)
x = self.conv_dw4(x)
x = self.conv_dw5(x)
x = self.conv_dw6(x)
x = self.conv_dw_x5(x)
x = self.conv_dw7(x)
x = self.conv_dw8(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
y = torch.softmax(x)
return x, y
def _conv_x5(self, in_channel, out_channel, blocks):
layers = []
for i in range(blocks):
layers.append(self._conv_dw(in_channel, out_channel, 1))
return nn.Sequential(*layers)
def _conv_st(self, in_channels, out_channels, stride):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(),
)
def _conv_dw(self, in_channels, out_channels, stride):
return nn.Sequential(
nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
nn.BatchNorm2d(in_channels),
nn.ReLU(),
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(),
)
net = MobleNetV1(1000)
x = torch.rand(1,3,224,224)
for name,layer in net.named_children():
if name != "fc":
x = layer(x)
print(name, 'output shape:', x.shape)
else:
x = x.view(x.size(0), -1)
x = layer(x)
print(name, 'output shape:', x.shape)
训练效果
和resnet50差不多,不过更快过拟合了。最主要是数据太小,不适合这种大模型。相比数据集
全部代码 可直接训练
import torch
import torch.nn as nn
class MobleNetV1(nn.Module):
def __init__(self, num_classes):
super(MobleNetV1, self).__init__()
self.conv1 = self._conv_st(3, 32, 2)
self.conv_dw1 = self._conv_dw(32, 64, 1)
self.conv_dw2 = self._conv_dw(64, 128, 2)
self.conv_dw3 = self._conv_dw(128, 128, 1)
self.conv_dw4 = self._conv_dw(128, 256, 2)
self.conv_dw5 = self._conv_dw(256, 256, 1)
self.conv_dw6 = self._conv_dw(256, 512, 2)
self.conv_dw_x5 = self._conv_x5(512, 512, 5)
self.conv_dw7 = self._conv_dw(512, 1024, 2)
self.conv_dw8 = self._conv_dw(1024, 1024, 1)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(1024, num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.conv_dw1(x)
x = self.conv_dw2(x)
x = self.conv_dw3(x)
x = self.conv_dw4(x)
x = self.conv_dw5(x)
x = self.conv_dw6(x)
x = self.conv_dw_x5(x)
x = self.conv_dw7(x)
x = self.conv_dw8(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
def _conv_x5(self, in_channel, out_channel, blocks):
layers = []
for i in range(blocks):
layers.append(self._conv_dw(in_channel, out_channel, 1))
return nn.Sequential(*layers)
def _conv_st(self, in_channels, out_channels, stride):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(),
)
def _conv_dw(self, in_channels, out_channels, stride):
return nn.Sequential(
nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
nn.BatchNorm2d(in_channels),
nn.ReLU(),
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(),
)
import torch.nn as nn
def mobleNetV1(num_classes):
return MobleNetV1(num_classes=num_classes)
import time
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
def load_dataset(batch_size):
train_set = torchvision.datasets.CIFAR10(
root="data/cifar-10", train=True,
download=True, transform=transforms.ToTensor()
)
test_set = torchvision.datasets.CIFAR10(
root="data/cifar-10", train=False,
download=True, transform=transforms.ToTensor()
)
train_iter = torch.utils.data.DataLoader(
train_set, batch_size=batch_size, shuffle=True, num_workers=4
)
test_iter = torch.utils.data.DataLoader(
test_set, batch_size=batch_size, shuffle=True, num_workers=4
)
return train_iter, test_iter
def train(net, train_iter, criterion, optimizer, num_epochs, device, num_print, lr_scheduler=None, test_iter=None):
net.train()
record_train = list()
record_test = list()
for epoch in range(num_epochs):
print("========== epoch: [{}/{}] ==========".format(epoch + 1, num_epochs))
total, correct, train_loss = 0, 0, 0
start = time.time()
for i, (X, y) in enumerate(train_iter):
X, y = X.to(device), y.to(device)
output = net(X)
loss = criterion(output, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
total += y.size(0)
correct += (output.argmax(dim=1) == y).sum().item()
train_acc = 100.0 * correct / total
if (i + 1) % num_print == 0:
print("step: [{}/{}], train_loss: {:.3f} | train_acc: {:6.3f}% | lr: {:.6f}" \
.format(i + 1, len(train_iter), train_loss / (i + 1), \
train_acc, get_cur_lr(optimizer)))
if lr_scheduler is not None:
lr_scheduler.step()
print("--- cost time: {:.4f}s ---".format(time.time() - start))
if test_iter is not None:
record_test.append(test(net, test_iter, criterion, device))
record_train.append(train_acc)
return record_train, record_test
def test(net, test_iter, criterion, device):
total, correct = 0, 0
net.eval()
with torch.no_grad():
print("*************** test ***************")
for X, y in test_iter:
X, y = X.to(device), y.to(device)
output = net(X)
loss = criterion(output, y)
total += y.size(0)
correct += (output.argmax(dim=1) == y).sum().item()
test_acc = 100.0 * correct / total
print("test_loss: {:.3f} | test_acc: {:6.3f}%"\
.format(loss.item(), test_acc))
print("************************************\n")
net.train()
return test_acc
def get_cur_lr(optimizer):
for param_group in optimizer.param_groups:
return param_group['lr']
def learning_curve(record_train, record_test=None):
plt.style.use("ggplot")
plt.plot(range(1, len(record_train) + 1), record_train, label="train acc")
if record_test is not None:
plt.plot(range(1, len(record_test) + 1), record_test, label="test acc")
plt.legend(loc=4)
plt.title("learning curve")
plt.xticks(range(0, len(record_train) + 1, 5))
plt.yticks(range(0, 101, 5))
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.show()
import torch.optim as optim
BATCH_SIZE = 128
NUM_EPOCHS = 43
NUM_CLASSES = 10
LEARNING_RATE = 0.02
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0005
NUM_PRINT = 100
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
def main():
net = mobleNetV1(NUM_CLASSES)
net = net.to(DEVICE)
train_iter, test_iter = load_dataset(BATCH_SIZE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
net.parameters(),
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY,
nesterov=True
)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.1)
record_train, record_test = train(net, train_iter, criterion, optimizer, \
NUM_EPOCHS, DEVICE, NUM_PRINT, lr_scheduler, test_iter)
learning_curve(record_train, record_test)
if __name__ == '__main__':
main()