您当前的位置: 首页 > 

寒冰屋

暂无认证

  • 0浏览

    0关注

    2286博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

AI队列长度检测:计算区域中的人数

寒冰屋 发布时间:2020-12-05 11:33:46 ,浏览量:0

目录

预处理输入数据

训练模型

测试模型

创建队列长度的自定义数据集

在自定义数据集上进行测试

下一步是什么?

  • 下载源507.1 KB

在上一篇文章中,我们实现了R-CNN进行目标检测。尽管这些对象检测算法在检测人脸时效果很好,但是当目标对象不清晰时,效果不好。此外,由于它使用滑动窗口技术,因此搜索变得很详尽,并且会损害性能。在本文中,我们将学习实现深度神经网络,以使用密度映射来估计人群或生产线中的人数。

我们将使用ShangaiTech数据集。数据集分为两部分。对于本文,我们将仅与B部分一起为人群训练模型,然后在自定义数据集上对其进行测试。您可以选择使用其中任何一个部分。该代码可以与任何一个正常工作。

让我们从导入所需的库开始。

import os
import cv2
import csv
import math
import random
import numpy as np
from scipy.io import loadmat
from keras import backend as K
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from keras.callbacks import ModelCheckpoint
from keras.models import load_model, load_model, Model
from keras.layers import Conv2D, MaxPooling2D, Concatenate, Input
预处理输入数据

我们的数据集包含两个子目录:test_data和train_data。这两个目录都包含图像及其相应的基本事实。我们无法使用原始格式的数据,因此我们必须进行一些预处理。由于我们将使用按密度计算的CNN方法,因此我们也需要将真实数据作为密度图。在这里,我们将尝试根据给定的真实文件计算真实密度图。

让我们首先定义用于为输入图像生成密度图的函数。

def get_density_map(image, points):
    image_density = np.zeros_like(image, dtype=np.float64)
    height, width = image_density.shape
    if points is None:
        return image_density
    if points.shape[0] == 1:
        x1 = max(0, min(width-1, round(points[0, 0])))
        y1 = max(0, min(height-1, round(points[0, 1])))
        image_density[y1, x1] = 255
        return image_density
    for j in range(points.shape[0]):
        frame_size = 15
        sigma = 4.0
        Height = np.multiply(cv2.getGaussianKernel(frame_size, sigma), (cv2.getGaussianKernel(frame_size, sigma)).T)
        x = min(width-1, max(0, abs(int(math.floor(points[j, 0])))))
        y = min(height-1, max(0, abs(int(math.floor(points[j, 1])))))
        if x >= width or y >= height:
            continue
        x1 = x - frame_size//2 + 0
        y1 = y - frame_size//2 + 0
        x2 = x + frame_size//2 + 1
        y2 = y + frame_size//2 + 1
        dfx1, dfy1, dfx2, dfy2 = 0, 0, 0, 0
        change_Height = False
        if x1 < 0:
            dfx1 = abs(x1) + 0
            x1 = 0
            change_Height = True
        if y1 < 0:
            dfy1 = abs(y1) + 0
            y1 = 0
            change_Height = True
        if x2 > width:
            dfx2 = x2 - width
            x2 = width
            change_Height = True
        if y2 > height:
            dfy2 = y2 - height
            y2 = height
            change_Height = True
        x1h, y1h, x2h, y2h = 1 + dfx1, 1 + dfy1, frame_size - dfx2, frame_size - dfy2
        if change_Height is True:
            Height = np.multiply(cv2.getGaussianKernel(y2h-y1h+1, sigma), (cv2.getGaussianKernel(x2h-x1h+1, sigma)).T)
        image_density[y1:y2, x1:x2] += Height
 
    return image_density

现在,我们可以创建测试和验证数据。指定输入图像文件,输入基本事实文件,测试和验证图像以及标签和输出路径的目录。

input_images_path = ''.join(['./ShanghaiTech/part_B/train_data/images/'])
output_path = './ShanghaiTech/processed_trainval/'
 
training_images_path = ''.join((output_path, '/training_images/'))
training_densities_path = ''.join((output_path, '/training_densities/'))
validation_images_path = ''.join((output_path, '/validation_images/'))
validation_densities_path = ''.join((output_path, '/valalidation_densities/'))
 
ground_truth_path = ''.join(['./ShanghaiTech/part_B/train_data/ground-truth/'])
 
for i in [output_path, training_images_path, training_densities_path, validation_images_path, validation_densities_path]:
	if not os.path.exists(i):
    	os.makedirs(i)

现在,我们将遍历所有训练图像并计算其密度图。我们将使用真实文件分别计算每个图像文件的密度图,并将其保存为相应的csv文件。

 

seed = 95461354
random.seed(seed)
 
n = 400
 
val_test_num = math.ceil(n*0.1)
indices = list(range(1, n+1))
random.shuffle(indices)
 
for idx in range(1, n+1):
    i = indices[idx-1]
    image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
    input_image = ''.join((input_images_path, 'IMG_',str(i), '.jpg'))
    img = cv2.imread(input_image, 0)
    height, width = img.shape
    new_width, new_height = width / 8, height / 8
    new_width, new_height = int(new_width / 8) * 8, int(new_height / 8) * 8
    annotation_Points =  image_info[0][0][0][0][0] - 1
    if width  y1)[0].tolist()) &
                set(np.where(np.squeeze(annotation_Points[:,1]) < y2)[0].tolist())
            )
        ]
 
        base_image_annPoints[:, 0] = base_image_annPoints[:, 0] - x1
        base_image_annPoints[:, 1] = base_image_annPoints[:, 1] - y1
        img_idx = ''.join((str(i), '_',str(j)))
 
        if idx < val_test_num:
            cv2.imwrite(''.join([validation_images_path, img_idx, '.jpg']), base_image)
            with open(''.join([validation_densities_path, img_idx, '.csv']), 'w', newline='') as output:
                writer = csv.writer(output)
                writer.writerows(base_image_density)
        else:
            cv2.imwrite(''.join([training_images_path, img_idx, '.jpg']), base_image)
            with open(''.join([training_densities_path, img_idx, '.csv']), 'w', newline='') as output:
                writer = csv.writer(output)
                writer.writerows(base_image_density)
print("Successfully processed files!")

 

按照相同的模式,我们还需要处理测试数据。

images_path = ''.join(['./ShanghaiTech/part_B/test_data/images/'])
ground_truth_path = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth/'])
ground_truth_csv = ''.join(['./ShanghaiTech/part_B/test_data/ground-truth_csv/'])
 
n = 316
 
for i in range(1, n+1):
    image_info = loadmat(''.join((ground_truth_path, 'GT_IMG_', str(i), '.mat')))['image_info']
    input_img  = ''.join((images_path, 'IMG_', str(i), '.jpg'))
    img = cv2.imread(input_img, 0)
    annotationPoints =  image_info[0][0][0][0][0] - 1
    image_density = get_density_map(img, annotationPoints)
    with open(''.join([ground_truth_csv, 'IMG_', str(i), '.csv']), 'w', newline='') as output:
        writer = csv.writer(output)
        writer.writerows(image_density)
print("Successfully processed files!")
训练模型

完成上述步骤后,我们的数据就准备好了,我们可以加载它来训练我们的模型。现在,我们将定义一个函数,该函数将根据数据加载图像和标签。

def x_y_generator(images_path, labels_path, batch_size=64):
    break_point = 0
    t = 0
    images_path = np.squeeze(images_path).tolist() if isinstance(images_path, np.ndarray) else images_path
    labels_path = np.squeeze(labels_path).tolist() if isinstance(labels_path, np.ndarray) else labels_path
    data_length = len(labels_path)
    while True:
        if not break_point:
            x = []
            y = []
            inner_iteration = batch_size
        else:
            t = 0
            inner_iteration = batch_size - data_length % batch_size
        for i in range(inner_iteration):
            if t >= data_length:
                break_point = 1
                break
            else:
                break_point = 0
            img = (cv2.imread(images_path[t], 0) - 127.5) / 128
            density_map = np.loadtxt(labels_path[t], delimiter=',')
            std = 4
            quarter_den = np.zeros((np.asarray(density_map.shape).astype(int)//std).tolist())
            for r in range(quarter_den.shape[0]):
                for c in range(quarter_den.shape[1]):
                    quarter_den[r, c] = np.sum(density_map[r*std:(r+1)*std, c*std:(c+1)*std])
            x.append(img.reshape(*img.shape, 1))
            y.append(quarter_den.reshape(*quarter_den.shape, 1))
            t += 1
        if not break_point:
            x, y = np.asarray(x), np.asarray(y)
            yield x, y

我们可以使用下面的函数来读取我们的训练、验证和测试数据。

# read training data
train_generator = x_y_generator(train_paths, train_labels, batch_size=len(train_paths))
training_img, train_labels = train_generator.__next__()
 
# read validation data
validation_generator = x_y_generator(validation_paths, validation_labels, batch_size=len(validation_paths))
validating_img, validation_labels = validation_generator.__next__()
 
# read test data
test_generator = x_y_generator(test_paths, test_labels, batch_size=len(test_paths))
testing_img, test_labels = test_generator.__next__()

我们的数据已经准备就绪,因此我们现在可以定义神经网络了。我们将实现一个多列卷积神经网络。它包含三列具有不同过滤器大小的卷积神经网络。想法是将图像作为输入输入到我们的神经网络,并获得以总体人群计数作为输出的密度图。由于这三列对应于不同的过滤器大小,因此每个CNN列学习到的功能都可以适应人们大小的变化,并且可以在拥挤的地方或队列中轻松使用。

def Multi_Column_CNN(input_shape=None):
    inputs = Input(shape=input_shape)
 
    # first column 
    conv_1 = Conv2D(16, (9, 9), padding='same', activation='relu')(inputs)
    conv_1 = MaxPooling2D(2)(conv_1)
    conv_1 = (conv_1)
    conv_1 = Conv2D(32, (7, 7), padding='same', activation='relu')(conv_1)
    conv_1 = MaxPooling2D(2)(conv_1)
    conv_1 = Conv2D(16, (7, 7), padding='same', activation='relu')(conv_1)
    conv_1 = Conv2D(8, (7, 7), padding='same', activation='relu')(conv_1)
 
    # second column 
    conv_2 = Conv2D(20, (7, 7), padding='same', activation='relu')(inputs)
    conv_2 = MaxPooling2D(2)(conv_2)
    conv_2 = (conv_2)
    conv_2 = Conv2D(40, (5, 5), padding='same', activation='relu')(conv_2)
    conv_2 = MaxPooling2D(2)(conv_2)
    conv_2 = Conv2D(20, (5, 5), padding='same', activation='relu')(conv_2)
    conv_2 = Conv2D(10, (5, 5), padding='same', activation='relu')(conv_2)
 
    # third column 
    conv_3 = Conv2D(24, (5, 5), padding='same', activation='relu')(inputs)
    conv_3 = MaxPooling2D(2)(conv_3)
    conv_3 = (conv_3)
    conv_3 = Conv2D(48, (3, 3), padding='same', activation='relu')(conv_3)
    conv_3 = MaxPooling2D(2)(conv_3)
    conv_3 = Conv2D(24, (3, 3), padding='same', activation='relu')(conv_3)
    conv_3 = Conv2D(12, (3, 3), padding='same', activation='relu')(conv_3)
 
    # merge feature map of third column in last dimension and get density map
    conv_merge = Concatenate(axis=-1)([conv_1, conv_2, conv_3])
    # getting density map as output
    density_map = Conv2D(1, (1, 1), padding='same')(conv_merge)
 
    model = Model(inputs=inputs, outputs=density_map)
    return model

有了我们的模型之后,我们还要定义度量标准以衡量模型的性能。我们将使用标准均方误差和均值绝对误差。

def mean_absolute_error(labels, predictions):
    return K.sum(K.abs(labels - predictions)) / 1
 
def mean_square_error(labels, predictions):
    return K.sum(K.square(labels - predictions)) / 1

现在让我们训练模型。我们还将使用Keras的ModelCheckpoint来节省计算资源,并且只保存用于训练和验证的最佳模型。

best_validation = ModelCheckpoint(
    filepath= 'mcnn_val.hdf5', monitor='val_loss', verbose=1, save_best_only=True, mode='min'
)
best_training = ModelCheckpoint(
    filepath= 'mcnn_train.hdf5', monitor='loss', verbose=1, save_best_only=True, mode='min'
)
 
input_shape = (None, None, 1)
model = Multi_Column_CNN(input_shape)
model.compile(loss='mean_squared_error', optimizer='adam', metrics=[mean_absolute_error, mean_square_error])
history = model.fit(
    x=training_img, y=train_labels, batch_size=1, epochs=100,
    validation_data=(validating_img, validation_labels),
    callbacks=[best_validation, best_training]
)

模型训练所需的时间取决于您使用的资源。训练模型后,您可以继续进行测试。

测试模型

作为基本的测试水平,我们可以根据训练数据和验证数据来绘制损耗。

val_loss, loss = history.history['val_loss'], history.history['loss']
loss = np.asarray(loss)
plt.plot(loss, 'b')
plt.legend(['loss'])
plt.show()
plt.plot(val_loss, 'r')
plt.legend(['val_loss'])
plt.show()

我们训练有素的模型显示以下损耗图:

损耗图看起来不错,但让我们对图像进行预测,看看我们的模型是否可以准确地计算图像中的人数。

from keras import models
#load the trained model
model = models.load_model('./ShanghaiTech/part_B/weights/mcnn_val.hdf5', custom_objects={'mean_absolute_error': mean_absolute_error, 'mean_square_error': mean_square_error })
absolute_error = []
squared_error = []
# specifying the number of test to run
num_test = 50
for i in range(testing_img.shape[0])[:num_test]:
    inputs = np.reshape(testing_img[i], [1, *testing_img[i].shape[:2], 1])
    outputs = np.squeeze(model.predict(inputs))
    density_map = np.squeeze(test_labels[i])
    count = np.sum(density_map)
    prediction = np.sum(outputs)
    fg, (ax0, ax1) = plt.subplots(1, 2, figsize=(16, 5))
	# plotting the density maps along with predicted count
    plt.suptitle(' '.join([
        'count:', str(round(count, 2)),
        'prediction:', str(round(prediction, 2))
    ]))
    ax0.imshow(np.squeeze(inputs))
    ax1.imshow(density_map * (255 / (np.max(density_map) - np.min(density_map))))
    plt.show()
    absolute_error.append(abs(count -  prediction))
    square_error.append((count -  prediction) ** 2)
mean_absolute_error = np.mean(absolute_error)
mean_square_error = np.mean(square_error)
print('mean_absolute_error:', mean_absolute_error, 'mean_square_error:', mean_square_error)

以下是一些(良好)预测结果:

我们的模型在此阶段运行良好,但是如何计算队列中的人数呢?没有开源数据集可用于专门针对队列长度进行训练和测试模型,因此我们需要生成自己的数据集。

创建队列长度的自定义数据集

记住基础知识,我们只需要一些图像以及它们与数据集对应的基本事实即可。我们可以简单地从Google搜索中收集图像。没关系吧?但是,我们如何生成基本事实文件?有多种可用于注释图像的工具,包括基于Web的边界框注释器,头部注释器或云供应商提供的某些专用工具,例如AWS SageMaker。您可以选择要生成基本事实文件的任何一个。我将在这里坚持最基础的知识,并使用MATLAB生成基本事实。为了使用MATLAB生成真实文件,请将图像保存在名为“images”的目录中并运行以下脚本:

filePath = fullfile('images', '/*.jpg');
ImageFiles = dir(filePath);
n = length(ImageFiles)
read_images_path = 'images/';
store_gt_path = 'ground-truth/';
t = 0;                      	%number of files initially in training set
 
for i=1:n
   	# read image files
	img = imread([read_path 'IMG_' num2str(i+t) '.jpg']);
# resize image files
	img = imresize(im, [768 1024]);
	imwrite(img,[read_images_path 'IMG_' num2str(i+t) '.jpg'], 'jpg');
	figure
   	# show image on screen
	imshow(img)
	[x,y] = getpts;
	image_info{1,1}.location = [x y];
	image_info{1,1}.number = size(x,1);
	save([store_gt_path 'GT_IMG_' num2str(t+i) '.mat'], 'image_info')
	close
end

脚本运行时,它将遍历images目录中的所有图像并一次在屏幕上显示它们。显示图像时,单击图像中人的头部,然后按Enter键进入下一个图像。

在自定义数据集上进行测试

准备好数据集后,加载经过训练的模型并进行测试。这是我经过测试获得的一些结果:

 

我们的模型做得很好。请注意,这里是一些“好”结果。您的结果可能会有所不同。

下一步是什么?

在本文中,我们学习了估计图像中存在的人数。您可能还会遇到一些非常糟糕的结果,但是我将对模型进行微调。此外,此处获得的密度图可以进一步接入完全连接的网络中,以更准确地预测阵容中的人数。

在本系列的下一篇文章中,我们将把从零开始训练我们的模型与更高级的、预先训练过的方法(如YOLO)进行比较。

关注
打赏
1665926880
查看更多评论
立即登录/注册

微信扫码登录

0.0713s