链接
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch.autograd import Function
# ********************* 二值(+-1) ***********************
# A
class Binary_a(Function):
@staticmethod
def forward(self, input):
self.save_for_backward(input)
output = torch.sign(input)
return output
@staticmethod
def backward(self, grad_output):
input, = self.saved_tensors
#*******************ste*********************
grad_input = grad_output.clone()
#****************saturate_ste***************
grad_input[input.ge(1)] = 0
grad_input[input.le(-1)] = 0
return grad_input
# W
class Binary_w(Function):
@staticmethod
def forward(self, input):
output = torch.sign(input)
return output
@staticmethod
def backward(self, grad_output):
#*******************ste*********************
grad_input = grad_output.clone()
return grad_input
# ********************* 三值(+-1、0) ***********************
class Ternary(Function):
@staticmethod
def forward(self, input):
# **************** channel级 - E(|W|) ****************
E = torch.mean(torch.abs(input), (3, 2, 1), keepdim=True)
# **************** 阈值 ****************
threshold = E * 0.7
# ************** W —— +-1、0 **************
output = torch.sign(torch.add(torch.sign(torch.add(input, threshold)),torch.sign(torch.add(input, -threshold))))
return output, threshold
@staticmethod
def backward(self, grad_output, grad_threshold):
#*******************ste*********************
grad_input = grad_output.clone()
return grad_input
# ********************* A(特征)量化(二值) ***********************
class activation_bin(nn.Module):
def __init__(self, A):
super().__init__()
self.A = A
self.relu = nn.ReLU(inplace=True)
def binary(self, input):
output = Binary_a.apply(input)
return output
def forward(self, input):
if self.A == 2:
output = self.binary(input)
# ******************** A —— 1、0 *********************
#a = torch.clamp(a, min=0)
else:
output = self.relu(input)
return output
# ********************* W(模型参数)量化(三/二值) ***********************
def meancenter_clampConvParams(w):
mean = w.data.mean(1, keepdim=True)
w.data.sub(mean) # W中心化(C方向)
w.data.clamp(-1.0, 1.0) # W截断
return w
class weight_tnn_bin(nn.Module):
def __init__(self, W):
super().__init__()
self.W = W
def binary(self, input):
output = Binary_w.apply(input)
return output
def ternary(self, input):
output = Ternary.apply(input)
return output
def forward(self, input):
if self.W == 2 or self.W == 3:
# **************************************** W二值 *****************************************
if self.W == 2:
output = meancenter_clampConvParams(input) # W中心化+截断
# **************** channel级 - E(|W|) ****************
E = torch.mean(torch.abs(output), (3, 2, 1), keepdim=True)
# **************** α(缩放因子) ****************
alpha = E
# ************** W —— +-1 **************
output = self.binary(output)
# ************** W * α **************
output = output * alpha # 若不需要α(缩放因子),注释掉即可
# **************************************** W三值 *****************************************
elif self.W == 3:
output_fp = input.clone()
# ************** W —— +-1、0 **************
output, threshold = self.ternary(input)
# **************** α(缩放因子) ****************
output_abs = torch.abs(output_fp)
mask_le = output_abs.le(threshold)
mask_gt = output_abs.gt(threshold)
output_abs[mask_le] = 0
output_abs_th = output_abs.clone()
output_abs_th_sum = torch.sum(output_abs_th, (3, 2, 1), keepdim=True)
mask_gt_sum = torch.sum(mask_gt, (3, 2, 1), keepdim=True).float()
alpha = output_abs_th_sum / mask_gt_sum # α(缩放因子)
# *************** W * α ****************
output = output * alpha # 若不需要α(缩放因子),注释掉即可
else:
output = input
return output
# ********************* 量化卷积(同时量化A/W,并做卷积) ***********************
class Conv2d_Q(nn.Conv2d):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding=0,
dilation=1,
groups=1,
bias=True,
A=2,
W=2
):
super().__init__(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias
)
# 实例化调用A和W量化器
self.activation_quantizer = activation_bin(A=A)
self.weight_quantizer = weight_tnn_bin(W=W)
def forward(self, input):
# 量化A和W
bin_input = self.activation_quantizer(input)
tnn_bin_weight = self.weight_quantizer(self.weight)
#print(bin_input)
#print(tnn_bin_weight)
# 用量化后的A和W做卷积
output = F.conv2d(
input=bin_input,
weight=tnn_bin_weight,
bias=self.bias,
stride=self.stride,
padding=self.padding,
dilation=self.dilation,
groups=self.groups)
return output
# *********************量化(三值、二值)卷积*********************
class Tnn_Bin_Conv2d(nn.Module):
# 参数:last_relu-尾层卷积输入激活
def __init__(self, input_channels, output_channels,
kernel_size=-1, stride=-1, padding=-1, groups=1, last_relu=0, A=2, W=2):
super(Tnn_Bin_Conv2d, self).__init__()
self.A = A
self.W = W
self.last_relu = last_relu
# ********************* 量化(三/二值)卷积 *********************
self.tnn_bin_conv = Conv2d_Q(input_channels, output_channels,
kernel_size=kernel_size, stride=stride, padding=padding, groups=groups, A=A, W=W)
self.bn = nn.BatchNorm2d(output_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.tnn_bin_conv(x)
x = self.bn(x)
if self.last_relu:
x = self.relu(x)
return x
class Net(nn.Module):
def __init__(self, cfg = None, A=2, W=2):
super(Net, self).__init__()
# 模型结构与搭建
if cfg is None:
cfg = [192, 160, 96, 192, 192, 192, 192, 192]
self.tnn_bin = nn.Sequential(
nn.Conv2d(3, cfg[0], kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(cfg[0]),
Tnn_Bin_Conv2d(cfg[0], cfg[1], kernel_size=1, stride=1, padding=0, A=A, W=W),
Tnn_Bin_Conv2d(cfg[1], cfg[2], kernel_size=1, stride=1, padding=0, A=A, W=W),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
Tnn_Bin_Conv2d(cfg[2], cfg[3], kernel_size=5, stride=1, padding=2, A=A, W=W),
Tnn_Bin_Conv2d(cfg[3], cfg[4], kernel_size=1, stride=1, padding=0, A=A, W=W),
Tnn_Bin_Conv2d(cfg[4], cfg[5], kernel_size=1, stride=1, padding=0, A=A, W=W),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
Tnn_Bin_Conv2d(cfg[5], cfg[6], kernel_size=3, stride=1, padding=1, A=A, W=W),
Tnn_Bin_Conv2d(cfg[6], cfg[7], kernel_size=1, stride=1, padding=0, last_relu=1, A=A, W=W),
nn.Conv2d(cfg[7], 10, kernel_size=1, stride=1, padding=0),
nn.BatchNorm2d(10),
nn.ReLU(inplace=True),
nn.AvgPool2d(kernel_size=8, stride=1, padding=0),
)
def forward(self, x):
x = self.tnn_bin(x)
x = x.view(x.size(0), -1)
return x
import sys
import math
import numpy as np
import torch.optim as optim
from torch.autograd import Variable
import torchvision
import torchvision.transforms as transforms
import os
device = torch.device('cuda:0')
# 随机种子——训练结果可复现
def setup_seed(seed):
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic = True
# 训练lr调整
def adjust_learning_rate(optimizer, epoch):
update_list = [10,20,30,40,50]
if epoch in update_list:
for param_group in optimizer.param_groups:
param_group['lr'] = param_group['lr'] * 0.5
return
# 模型训练
def train(epoch):
model.train()
for batch_idx, (data, target) in enumerate(trainloader):
# 前向传播
data, target = data.cuda(), target.cuda()
data, target = Variable(data), Variable(target)
output = model(data)
loss = criterion(output, target)
# 反向传播
optimizer.zero_grad()
loss.backward() # 求梯度
optimizer.step() # 参数更新
# 显示训练集loss(/100个batch)
if batch_idx % 100 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tLR: {}'.format(
epoch, batch_idx * len(data), len(trainloader.dataset),
100. * batch_idx / len(trainloader), loss.data.item(),
optimizer.param_groups[0]['lr']))
return
# 模型测试
def test():
global best_acc
model.eval()
test_loss = 0
average_test_loss = 0
correct = 0
for data, target in testloader:
data, target = data.cuda(), target.cuda()
data, target = Variable(data), Variable(target)
# 前向传播
output = model(data)
test_loss += criterion(output, target).data.item()
pred = output.data.max(1, keepdim=True)[1]
correct += pred.eq(target.data.view_as(pred)).cpu().sum()
# 测试准确率
acc = 100. * float(correct) / len(testloader.dataset)
print(acc)
if __name__=='__main__':
setup_seed(1)#随机种子——训练结果可复现
# 训练集:随机裁剪 + 水平翻转 + 归一化
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))])
# 测试集:归一化
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))])
# 数据加载
trainset = torchvision.datasets.CIFAR10(root='./data',train = True, download = True, transform = transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=2) # 训练集数据
testset = torchvision.datasets.CIFAR10(root='./data',train = False, download = True, transform = transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=256, shuffle=False, num_workers=2) # 测试集数据
# cifar10类别
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
print('******Initializing model******')
# ******************** 在model的量化卷积中同时量化A(特征)和W(模型参数) ************************
model = Net(A=2, W=2)
best_acc = 0
for m in model.modules():
if isinstance(m, nn.Conv2d):
nn.init.xavier_uniform_(m.weight.data)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
m.weight.data.normal_(0, 0.01)
m.bias.data.zero_()
# cpu、gpu
model.to(device)
# 打印模型结构
print(model)
# 超参数
param_dict = dict(model.named_parameters())
params = []
for key, value in param_dict.items():
params += [{'params':[value], 'lr': 0.01, 'weight_decay':0.0}]
# 损失函数
criterion = nn.CrossEntropyLoss()
# 优化器
optimizer = optim.Adam(params, lr=0.01, weight_decay=0.0)
# 训练模型
for epoch in range(1, 300):
adjust_learning_rate(optimizer, epoch)
train(epoch)
test()