目录
预处理输入数据
训练模型
测试模型
创建队列长度的自定义数据集
在自定义数据集上进行测试
下一步是什么?
- 下载源507.1 KB
在上一篇文章中,我们实现了R-CNN进行目标检测。尽管这些对象检测算法在检测人脸时效果很好,但是当目标对象不清晰时,效果不好。此外,由于它使用滑动窗口技术,因此搜索变得很详尽,并且会损害性能。在本文中,我们将学习实现深度神经网络,以使用密度映射来估计人群或生产线中的人数。
我们将使用ShangaiTech数据集。数据集分为两部分。对于本文,我们将仅与B部分一起为人群训练模型,然后在自定义数据集上对其进行测试。您可以选择使用其中任何一个部分。该代码可以与任何一个正常工作。
让我们从导入所需的库开始。
import os
import cv2
import csv
import math
import random
import numpy as np
from scipy.io import loadmat
from keras import backend as K
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from keras.callbacks import ModelCheckpoint
from keras.models import load_model, load_model, Model
from keras.layers import Conv2D, MaxPooling2D, Concatenate, Input
预处理输入数据
我们的数据集包含两个子目录:test_data和train_data。这两个目录都包含图像及其相应的基本事实。我们无法使用原始格式的数据,因此我们必须进行一些预处理。由于我们将使用按密度计算的CNN方法,因此我们也需要将真实数据作为密度图。在这里,我们将尝试根据给定的真实文件计算真实密度图。
让我们首先定义用于为输入图像生成密度图的函数。
def get_density_map(image, points):
image_density = np.zeros_like(image, dtype=np.float64)
height, width = image_density.shape
if points is None:
return image_density
if points.shape[0] == 1:
x1 = max(0, min(width-1, round(points[0, 0])))
y1 = max(0, min(height-1, round(points[0, 1])))
image_density[y1, x1] = 255
return image_density
for j in range(points.shape[0]):
frame_size = 15
sigma = 4.0
Height = np.multiply(cv2.getGaussianKernel(frame_size, sigma), (cv2.getGaussianKernel(frame_size, sigma)).T)
x = min(width-1, max(0, abs(int(math.floor(points[j, 0])))))
y = min(height-1, max(0, abs(int(math.floor(points[j, 1])))))
if x >= width or y >= height:
continue
x1 = x - frame_size//2 + 0
y1 = y - frame_size//2 + 0
x2 = x + frame_size//2 + 1
y2 = y + frame_size//2 + 1
dfx1, dfy1, dfx2, dfy2 = 0, 0, 0, 0
change_Height = False
if x1 < 0:
dfx1 = abs(x1) + 0
x1 = 0
change_Height = True
if y1 < 0:
dfy1 = abs(y1) + 0
y1 = 0
change_Height = True
if x2 > width:
dfx2 = x2 - width
x2 = width
change_Height = True
if y2 > height:
dfy2 = y2 - height
y2 = height
change_Height = True
x1h, y1h, x2h, y2h = 1 + dfx1, 1 + dfy1, frame_size - dfx2, frame_size - dfy2
if change_Height is True:
Height = np.multiply(cv2.getGaussianKernel(y2h-y1h+1, sigma), (cv2.getGaussianKernel(x2h-x1h+1, sigma)).T)
image_density[y1:y2, x1:x2] += Height
return image_density
现在,我们可以创建测试和验证数据。指定输入图像文件,输入基本事实文件,测试和验证图像以及标签和输出路径的目录。
input_images_path = ''.join(['./ShanghaiTech/part_B/train_data/images/'])
output_path = './ShanghaiTech/processed_trainval/'
training_images_path = ''.join((output_path, '/training_images/'))
training_densities_path = ''.join((output_path, '/training_densities/'))
validation_images_path = ''.join((output_path, '/validation_images/'))
validation_densities_path = ''.join((output_path, '/valalidation_densities/'))
ground_truth_path = ''.join(['./ShanghaiTech/part_B/train_data/ground-truth/'])
for i in [output_path, training_images_path, training_densities_path, validation_images_path, validation_densities_path]:
if not os.path.exists(i):
os.makedirs(i)
现在,我们将遍历所有训练图像并计算其密度图。我们将使用真实文件分别计算每个图像文件的密度图,并将其保存为相应的csv文件。
seed = 95461354
random.seed(seed)
n = 400
val_test_num = math.ceil(n*0.1)
indices = list(range(1, n+1))
random.shuffle(indices)
for idx in range(1, n+1):
i = indices[idx-1]
image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
input_image = ''.join((input_images_path, 'IMG_',str(i), '.jpg'))
img = cv2.imread(input_image, 0)
height, width = img.shape
new_width, new_height = width / 8, height / 8
new_width, new_height = int(new_width / 8) * 8, int(new_height / 8) * 8
annotation_Points = image_info[0][0][0][0][0] - 1
if width y1)[0].tolist()) &
set(np.where(np.squeeze(annotation_Points[:,1]) < y2)[0].tolist())
)
]
base_image_annPoints[:, 0] = base_image_annPoints[:, 0] - x1
base_image_annPoints[:, 1] = base_image_annPoints[:, 1] - y1
img_idx = ''.join((str(i), '_',str(j)))
if idx < val_test_num:
cv2.imwrite(''.join([validation_images_path, img_idx, '.jpg']), base_image)
with open(''.join([validation_densities_path, img_idx, '.csv']), 'w', newline='') as output:
writer = csv.writer(output)
writer.writerows(base_image_density)
else:
cv2.imwrite(''.join([training_images_path, img_idx, '.jpg']), base_image)
with open(''.join([training_densities_path, img_idx, '.csv']), 'w', newline='') as output:
writer = csv.writer(output)
writer.writerows(base_image_density)
print("Successfully processed files!")
按照相同的模式,我们还需要处理测试数据。
images_path = ''.join(['./ShanghaiTech/part_B/test_data/images/'])
ground_truth_path = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth/'])
ground_truth_csv = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth_csv/'])
n = 316
for i in range(1, n+1):
image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
input_img = ''.join((images_path, 'IMG_', str(i), '.jpg'))
img = cv2.imread(input_img, 0)
annotationPoints = image_info[0][0][0][0][0] - 1
image_density = get_density_map(img, annotationPoints)
with open(''.join([ground_truth_csv, 'IMG_', str(i), '.csv']), 'w', newline='') as output:
writer = csv.writer(output)
writer.writerows(image_density)
print("Successfully processed files!")
训练模型
完成上述步骤后,我们的数据就准备好了,我们可以加载它来训练我们的模型。现在,我们将定义一个函数,该函数将根据数据加载图像和标签。
def x_y_generator(images_path, labels_path, batch_size=64):
break_point = 0
t = 0
images_path = np.squeeze(images_path).tolist() if isinstance(images_path, np.ndarray) else images_path
labels_path = np.squeeze(labels_path).tolist() if isinstance(labels_path, np.ndarray) else labels_path
data_length = len(labels_path)
while True:
if not break_point:
x = []
y = []
inner_iteration = batch_size
else:
t = 0
inner_iteration = batch_size - data_length % batch_size
for i in range(inner_iteration):
if t >= data_length:
break_point = 1
break
else:
break_point = 0
img = (cv2.imread(images_path[t], 0) - 127.5) / 128
density_map = np.loadtxt(labels_path[t], delimiter=',')
std = 4
quarter_den = np.zeros((np.asarray(density_map.shape).astype(int)//std).tolist())
for r in range(quarter_den.shape[0]):
for c in range(quarter_den.shape[1]):
quarter_den[r, c] = np.sum(density_map[r*std:(r+1)*std, c*std:(c+1)*std])
x.append(img.reshape(*img.shape, 1))
y.append(quarter_den.reshape(*quarter_den.shape, 1))
t += 1
if not break_point:
x, y = np.asarray(x), np.asarray(y)
yield x, y
我们可以使用下面的函数来读取我们的训练、验证和测试数据。
# read training data
train_generator = x_y_generator(train_paths, train_labels, batch_size=len(train_paths))
training_img, train_labels = train_generator.__next__()
# read validation data
validation_generator = x_y_generator(validation_paths, validation_labels, batch_size=len(validation_paths))
validating_img, validation_labels = validation_generator.__next__()
# read test data
test_generator = x_y_generator(test_paths, test_labels, batch_size=len(test_paths))
testing_img, test_labels = test_generator.__next__()
我们的数据已经准备就绪,因此我们现在可以定义神经网络了。我们将实现一个多列卷积神经网络。它包含三列具有不同过滤器大小的卷积神经网络。想法是将图像作为输入输入到我们的神经网络,并获得以总体人群计数作为输出的密度图。由于这三列对应于不同的过滤器大小,因此每个CNN列学习到的功能都可以适应人们大小的变化,并且可以在拥挤的地方或队列中轻松使用。
def Multi_Column_CNN(input_shape=None):
inputs = Input(shape=input_shape)
# first column
conv_1 = Conv2D(16, (9, 9), padding='same', activation='relu')(inputs)
conv_1 = MaxPooling2D(2)(conv_1)
conv_1 = (conv_1)
conv_1 = Conv2D(32, (7, 7), padding='same', activation='relu')(conv_1)
conv_1 = MaxPooling2D(2)(conv_1)
conv_1 = Conv2D(16, (7, 7), padding='same', activation='relu')(conv_1)
conv_1 = Conv2D(8, (7, 7), padding='same', activation='relu')(conv_1)
# second column
conv_2 = Conv2D(20, (7, 7), padding='same', activation='relu')(inputs)
conv_2 = MaxPooling2D(2)(conv_2)
conv_2 = (conv_2)
conv_2 = Conv2D(40, (5, 5), padding='same', activation='relu')(conv_2)
conv_2 = MaxPooling2D(2)(conv_2)
conv_2 = Conv2D(20, (5, 5), padding='same', activation='relu')(conv_2)
conv_2 = Conv2D(10, (5, 5), padding='same', activation='relu')(conv_2)
# third column
conv_3 = Conv2D(24, (5, 5), padding='same', activation='relu')(inputs)
conv_3 = MaxPooling2D(2)(conv_3)
conv_3 = (conv_3)
conv_3 = Conv2D(48, (3, 3), padding='same', activation='relu')(conv_3)
conv_3 = MaxPooling2D(2)(conv_3)
conv_3 = Conv2D(24, (3, 3), padding='same', activation='relu')(conv_3)
conv_3 = Conv2D(12, (3, 3), padding='same', activation='relu')(conv_3)
# merge feature map of third column in last dimension and get density map
conv_merge = Concatenate(axis=-1)([conv_1, conv_2, conv_3])
# getting density map as output
density_map = Conv2D(1, (1, 1), padding='same')(conv_merge)
model = Model(inputs=inputs, outputs=density_map)
return model
有了我们的模型之后,我们还要定义度量标准以衡量模型的性能。我们将使用标准均方误差和均值绝对误差。
def mean_absolute_error(labels, predictions):
return K.sum(K.abs(labels - predictions)) / 1
def mean_square_error(labels, predictions):
return K.sum(K.square(labels - predictions)) / 1
现在让我们训练模型。我们还将使用Keras的ModelCheckpoint来节省计算资源,并且只保存用于训练和验证的最佳模型。
best_validation = ModelCheckpoint(
filepath= 'mcnn_val.hdf5', monitor='val_loss', verbose=1, save_best_only=True, mode='min'
)
best_training = ModelCheckpoint(
filepath= 'mcnn_train.hdf5', monitor='loss', verbose=1, save_best_only=True, mode='min'
)
input_shape = (None, None, 1)
model = Multi_Column_CNN(input_shape)
model.compile(loss='mean_squared_error', optimizer='adam', metrics=[mean_absolute_error, mean_square_error])
history = model.fit(
x=training_img, y=train_labels, batch_size=1, epochs=100,
validation_data=(validating_img, validation_labels),
callbacks=[best_validation, best_training]
)
模型训练所需的时间取决于您使用的资源。训练模型后,您可以继续进行测试。
作为基本的测试水平,我们可以根据训练数据和验证数据来绘制损耗。
val_loss, loss = history.history['val_loss'], history.history['loss']
loss = np.asarray(loss)
plt.plot(loss, 'b')
plt.legend(['loss'])
plt.show()
plt.plot(val_loss, 'r')
plt.legend(['val_loss'])
plt.show()
我们训练有素的模型显示以下损耗图:
损耗图看起来不错,但让我们对图像进行预测,看看我们的模型是否可以准确地计算图像中的人数。
from keras import models
#load the trained model
model = models.load_model('./ShanghaiTech/part_B/weights/mcnn_val.hdf5', custom_objects={'mean_absolute_error': mean_absolute_error, 'mean_square_error': mean_square_error })
absolute_error = []
squared_error = []
# specifying the number of test to run
num_test = 50
for i in range(testing_img.shape[0])[:num_test]:
inputs = np.reshape(testing_img[i], [1, *testing_img[i].shape[:2], 1])
outputs = np.squeeze(model.predict(inputs))
density_map = np.squeeze(test_labels[i])
count = np.sum(density_map)
prediction = np.sum(outputs)
fg, (ax0, ax1) = plt.subplots(1, 2, figsize=(16, 5))
# plotting the density maps along with predicted count
plt.suptitle(' '.join([
'count:', str(round(count, 2)),
'prediction:', str(round(prediction, 2))
]))
ax0.imshow(np.squeeze(inputs))
ax1.imshow(density_map * (255 / (np.max(density_map) - np.min(density_map))))
plt.show()
absolute_error.append(abs(count - prediction))
square_error.append((count - prediction) ** 2)
mean_absolute_error = np.mean(absolute_error)
mean_square_error = np.mean(square_error)
print('mean_absolute_error:', mean_absolute_error, 'mean_square_error:', mean_square_error)
以下是一些(良好)预测结果:
我们的模型在此阶段运行良好,但是如何计算队列中的人数呢?没有开源数据集可用于专门针对队列长度进行训练和测试模型,因此我们需要生成自己的数据集。
创建队列长度的自定义数据集记住基础知识,我们只需要一些图像以及它们与数据集对应的基本事实即可。我们可以简单地从Google搜索中收集图像。没关系吧?但是,我们如何生成基本事实文件?有多种可用于注释图像的工具,包括基于Web的边界框注释器,头部注释器或云供应商提供的某些专用工具,例如AWS SageMaker。您可以选择要生成基本事实文件的任何一个。我将在这里坚持最基础的知识,并使用MATLAB生成基本事实。为了使用MATLAB生成真实文件,请将图像保存在名为“images”的目录中并运行以下脚本:
filePath = fullfile('images', '/*.jpg');
ImageFiles = dir(filePath);
n = length(ImageFiles)
read_images_path = 'images/';
store_gt_path = 'ground-truth/';
t = 0; %number of files initially in training set
for i=1:n
# read image files
img = imread([read_path 'IMG_' num2str(i+t) '.jpg']);
# resize image files
img = imresize(im, [768 1024]);
imwrite(img,[read_images_path 'IMG_' num2str(i+t) '.jpg'], 'jpg');
figure
# show image on screen
imshow(img)
[x,y] = getpts;
image_info{1,1}.location = [x y];
image_info{1,1}.number = size(x,1);
save([store_gt_path 'GT_IMG_' num2str(t+i) '.mat'], 'image_info')
close
end
脚本运行时,它将遍历images目录中的所有图像并一次在屏幕上显示它们。显示图像时,单击图像中人的头部,然后按Enter键进入下一个图像。
准备好数据集后,加载经过训练的模型并进行测试。这是我经过测试获得的一些结果:
我们的模型做得很好。请注意,这里是一些“好”结果。您的结果可能会有所不同。
下一步是什么?在本文中,我们学习了估计图像中存在的人数。您可能还会遇到一些非常糟糕的结果,但是我将对模型进行微调。此外,此处获得的密度图可以进一步接入完全连接的网络中,以更准确地预测阵容中的人数。
在本系列的下一篇文章中,我们将把从零开始训练我们的模型与更高级的、预先训练过的方法(如YOLO)进行比较。