目录
data_utils.py
model_assembly.py
email_notifications.py
task.py
Dockerfile
下一步
- 下载源 - 1.2 MB
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。您最终会得到一个满足Google MLOps 成熟度模型2级要求的功能管道。我们假设您对Python、Deep Learning、Docker、DevOps和Flask有一定的了解。
在上一篇文章中,我们为这个项目搭建了一个云环境。在这一部分中,我们将带您了解持续集成、模型自动训练、自动调整和持续交付所需的代码。下图显示了我们在项目过程中的位置。
我们将展示代码的精简版本。有关完整版本,请参阅此存储库。我们将在这个项目中使用GCR Docker镜像(由TensorFlow提供支持)——但也可以随意使用替代镜像。
首先,我们将讨论在本地运行这些解决方案的代码。稍后,我们将看到如何为云部署做好准备。
下图显示了我们项目的文件结构。
该data_utils.py文件句柄的数据加载、转换和模型保存到GCS。此文件可能因项目而异。本质上,它在模型训练之前执行所有数据处理任务。让我们来看看代码:
import datetime
from google.cloud import storage
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import gc
from sklearn import preprocessing
import os
import zipfile
import cv2
import sys
def dataset_transformation(path):
images = []
for dirname, _, filenames in os.walk(path):
for filename in filenames:
if filename.endswith('.png'):
image = cv2.imread(os.path.join(dirname, filename))
image = cv2.resize(image, (128, 128))
images.append(image)
return images
def load_data(args):
file_1 = '/root/AutomaticTraining-Dataset/COVID_RX/normal_images.zip'
file_2 = '/root/AutomaticTraining-Dataset/COVID_RX/covid_images.zip'
file_3 = '/root/AutomaticTraining-Dataset/COVID_RX/viral_images.zip'
extract_to = '/root/AutomaticTraining-Dataset/COVID_RX/'
with zipfile.ZipFile(file_1, 'r') as zip_ref:
zip_ref.extractall(extract_to)
with zipfile.ZipFile(file_2, 'r') as zip_ref:
zip_ref.extractall(extract_to)
with zipfile.ZipFile(file_3, 'r') as zip_ref:
zip_ref.extractall(extract_to)
normal = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/normal_images')
covid = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/covid_images')
viral = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/viral_images')
#Train and test - dataset combination
X = normal + viral + covid
#Transforming from list to numpy array.
X = np.array(X)
#Creating labels.
y = []
for i in range(len(normal)):
y.append(0)
for i in range(len(covid)):
y.append(1)
for i in range(len(viral)):
y.append(2)
y = np.array(y)
#Dataset splitting
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0, shuffle = True)
return X_train, X_test, y_train, y_test
def save_model(bucket_name, best_model):
try:
storage_client = storage.Client() #if running on GCP
bucket = storage_client.bucket(bucket_name)
blob1 = bucket.blob('{}/{}'.format('testing',best_model))
blob1.upload_from_filename(best_model)
return True,None
except Exception as e:
return False,e
该model_assembly.py文件包含模型的创建和自动的调整的代码。我们希望从一个非常基本的模型开始——训练它并评估它。如果初始模型没有达到预期的性能,我们将引入改进,直到达到我们的目标。我们来看一下代码:
from tensorflow.keras.models import load_model
from tensorflow.keras import layers
import tensorflow as tf
import numpy as np
def get_base_model():
input_img = layers.Input(shape=(128, 128, 3))
x = layers.Conv2D(64,(3, 3), activation='relu')(input_img)
return input_img,x
def get_additional_layer(filters,x):
x = layers.MaxPooling2D((2, 2))(x)
x = layers.Conv2D(filters, (3, 3), activation='relu')(x)
return x
def get_final_layers(neurons,x):
x = layers.SpatialDropout2D(0.2)(x)
x = layers.Flatten()(x)
x = layers.Dense(neurons)(x)
x = layers.Dense(3)(x)
return x
这些函数将在循环中调用,并且在第一次迭代中,我们将获取base_model、final_layers,并将它们堆叠起来以构建一个非常简单的模型。如果在训练后我们发现模型表现不够好,那么我们将再次获取base_model,添加additional_layers, 堆栈final_layers,然后再次训练和评估它。如果我们仍然无法达到良好的性能,那么最后一个过程将在循环中重复添加更多additional_layers,直到我们达到预定义的良好指标。
email_notifications.py该email_notifications.py文件是负责通过本地SMTP服务器产品所有者提供的电子邮件。这些电子邮件会告诉所有者一切是否正常,如果不正常,则说明有什么问题。
import smtplib
import os
# Email variables definition
sender = ‘example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = ‘example@gmail.com’
smtp_password = ‘your_password’
def training_result(result,model_acc):
if result == 'ok':
message = 'The model reached '+str(model_acc)+', It has been saved to GCS.'
if result == 'failed':
message = 'None of the models reached an acceptable accuracy, training execution had to be forcefully ended.’
message = 'Subject: {}\n\n{}'.format('An automatic training job has ended recently', message)
try:
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email: '+str(e),flush=True)
return
def exception(e_message):
try:
message = 'Subject: {}\n\n{}'.format('An automatic training job has failed.', e_message)
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email: '+str(e),flush=True)
return
该task.py文件编排程序。它会初始化GPU——如果有可用的话——开始模型训练,并在需要时调整模型。它还接收传递给应用程序的参数。这是代码:
import tensorflow as tf
from tensorflow.keras import Model, layers, optimizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import Model
from tensorflow.keras.models import load_model
import argparse
import model_assembly, data_utils, email_notifications
import sys
import os
import gc
from google.cloud import storage
import datetime
import math
# general variables declaration
model_name = 'best_model.hdf5'
def initialize_gpu():
if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
return
def start_training(args):
# Loading splitted data
X_train, X_test, y_train, y_test = data_utils.load_data(args)
# Initializing GPU if available
initialize_gpu()
train_model(X_train, X_test, y_train, y_test, args)
def train_model(X_train, X_test, y_train, y_test,args):
try:
model_loss, model_acc = [0,0]
counter = 0
while model_acc 0.85:
saved_ok = data_utils.save_model(args.bucket_name,model_name)
if saved_ok[0] == True:
email_notifications.training_result('ok',model_acc)
sys.exit(0)
else:
email_notifications.exception(saved_ok[1])
sys.exit(1)
else:
pass
if counter >= 5:
email_notifications.training_result('failed',None)
sys.exit(1)
counter += 1
except Exception as e:
email_notifications.exception('An exception when training the model has occurred: '+str(e))
sys.exit(1)
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--bucket-name', type=str, default = 'automatictrainingcicd-aiplatform', help = 'GCP bucket name')
parser.add_argument('--epochs', type=int, default=3, help='Epochs number')
args = parser.parse_args()
return args
def main():
args = get_args()
start_training(args)
if __name__ == '__main__':
main()
我们的Dockerfile负责将指令传递给Docker守护程序以构建正确的容器。这是它的样子:
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0
WORKDIR /root
RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python
RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev
ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-Dataset.git
ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-CodeCommit.git
RUN mv /root/AutomaticTraining-CodeCommit/model_assembly.py /root
RUN mv /root/AutomaticTraining-CodeCommit/task.py /root
RUN mv /root/AutomaticTraining-CodeCommit/data_utils.py /root
RUN mv /root/AutomaticTraining-CodeCommit/email_notifications.py /root
ENTRYPOINT ["python","task.py"]
上述文件使用gcr.io/deeplearning-platform-release/tf2-cpu.2-0镜像、安装依赖项、克隆所需的存储库、将文件移动到主目录并设置容器执行的入口点。
下一步在本系列的下一篇文章中,我们将深入研究持续训练代码。敬请关注!
https://www.codeproject.com/Articles/5301647/Model-Auto-Adjustment-in-an-MLOps-Pipeline