目录
data_utils.py
email_notifications.py
task.py
Dockerfile
下一步
- 下载源 - 1.2 MB
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。您最终会得到一个满足Google MLOps 成熟度模型2级要求的功能管道。我们假设您对Python、深度学习、Docker、DevOps和Flask有一定的了解。
在上一篇文章中,我们讨论了CI/CD MLOps Pipeline的模型创建、自动调整和通知。在本节中,我们将查看在ML管道中实现持续训练所需的代码。下图显示了我们在项目流程中的位置。
请记住,只要推送到数据集存储库,就会执行此工作流。该脚本将检查生产或测试注册表中是否有可用的模型。然后它会重新训练它找到的模型。这是我们的应用程序文件结构:
我们以精简版本显示代码文件。有关完整版本,请参阅代码存储库。
data_utils.py该data_utils.py代码不会完全像以前一样。它从存储库加载数据,对其进行转换,并将生成的模型保存到GCS。唯一的区别是现在这个文件包含两个附加功能。其中一个检查测试注册表中是否存在模型,另一个加载该模型。
取上一篇文章中的data_utils.py文件,在文件末尾添加以下函数:
def previous_model(bucket_name,model_type,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) status = storage.Blob(bucket = bucket, name = '{}/{}'.format(model_type,model_filename)).exists(storage_client) return status,None except Exception as e: print('Something went wrong when trying to check if previous model exists GCS bucket. Exception: ',flush=True) return None,e def load_model(bucket_name,model_type,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) blob1 = bucket.blob('{}/{}'.format(model_type,model_filename)) blob1.download_to_filename('/root/'+str(model_filename)) return True,None except Exception as e: print('Something went wrong when trying to load previous model from GCS bucket. Exception: '+str(e),flush=True) return False,e
email_notifications.py代码基本上和以前一样,除了它现在发送不同的消息。
import smtplib import os # Variables definition sender = ‘example@gmail.com’ receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider smtp_port = 587 smtp_account = ‘example@gmail.com’ smtp_password = ‘your_password’ def training_result(result,accuracy): if result == 'old_evaluation_prod': message = "A data push has been detected. Old model from production has reached more than 0.85 of accuracy. There's no need to retrain it." if result == 'retrain_prod': message = 'A data push has been detected. Old model from production has been retrained and has reached more than 0.85 of accuracy. It has been saved into /testing.' if result == 'old_evaluation_test': message = "A data push has been detected,. Old model from /testing has reached more than 0.85 of accuracy. There's no need to retrain it." if result == 'retrain_test': message = 'A data push has been detected. Old model from /testing has been retrained and reached more than 0.85 of accuracy. It has been saved into /testing.' if result == 'poor_metrics': message = 'A data push has been detected. Old models from /production and /testing have been retrained but none of them reached more than 0.85 of accuracy.’ if result == 'not_found': message = 'No previous models were found at GCS. ' message = 'Subject: {}\n\n{}'.format('An automatic training job has ended recently', message) try: server = smtplib.SMTP(smtp_provider,smtp_port) server.starttls() server.login(smtp_account,smtp_password) server.sendmail(sender, receiver, message) return except Exception as e: print('Something went wrong. Unable to send email: 'str(e),flush=True) return def exception(e_message): try: message = 'Subject: {}\n\n{}'.format('An automatic training job has failed recently', e_message) server = smtplib.SMTP(smtp_provider,smtp_port) server.starttls() server.login(smtp_account,smtp_password) server.sendmail(sender, receiver, message) return except Exception as e: print('Something went wrong. Unable to send email.',flush=True) print('Exception: ',e) return
该task.py上述文件的代码编排执行。和以前一样,它检查主机上是否存在GPU,初始化GPU(如果找到),处理传递给代码执行的参数,并加载数据。并开始重新训练。再培训结束后,代码会将生成的模型推送到测试注册中心并通知产品所有者。让我们看看代码是什么样的:
import tensorflow as tf from tensorflow.keras.callbacks import ModelCheckpoint from tensorflow.keras.models import load_model import argparse import data_utils import data_utils, email_notifications import sys import os from google.cloud import storage import datetime # general variables declaration model_name = 'best_model.hdf5' def initialize_gpu(): if len(tf.config.experimental.list_physical_devices('GPU')) > 0: tf.config.set_soft_device_placement(True) tf.debugging.set_log_device_placement(True) return def start_training(args): # Loading splitted data X_train, X_test, y_train, y_test = data_utils.load_data(args) # Initializing GPU if available (if available) initialize_gpu() # Checking if there's any model saved at testing or production folders in GCS model_gcs_prod = data_utils.previous_model(args.bucket_name,'production',model_name) model_gcs_test = data_utils.previous_model(args.bucket_name,'testing',model_name) # If any model exists at production, load it, test it on data and if it doesn't reach good metric then retrain it and save it to testing folder if model_gcs_prod[0] == True: train_prod_model(X_train, X_test, y_train, y_test,args) if model_gcs_prod[0] == False: if model_gcs_test[0] == True: train_test_model(X_train, X_test, y_train, y_test,args) if model_gcs_test[0] == False: email_notifications.training_result('not_found',' ') sys.exit(1) if model_gcs_test[0] == None: email_notifications.exception('Something went wrong when trying to check if old testing model exists. Exception: '+model_gcs_test[1]+'. Aborting automatic training.') sys.exit(1) if model_gcs_prod[0] == None: email_notifications.exception('Something went wrong when trying to check if old production model exists. Exception: '+model_gcs_prod[1]+'. Aborting automatic training.') sys.exit(1) def train_prod_model(X_train, X_test, y_train, y_test,args): model_gcs_prod = data_utils.load_model(args.bucket_name,'production',model_name) if model_gcs_prod[0] == True: try: cnn = load_model(model_name) model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2) if model_acc > 0.85: saved_ok = data_utils.save_model(args.bucket_name,model_name) if saved_ok[0] == True: email_notifications.training_result('old_evaluation_prod', model_acc) sys.exit(0) else: email_notifications.exception(saved_ok[1]) sys.exit(1) else: cnn = load_model(model_name) checkpoint = ModelCheckpoint(model_name, monitor='val_loss', verbose=1, save_best_only=True, mode='auto', save_freq="epoch") cnn.fit(X_train, y_train, epochs=args.epochs,validation_data=(X_test, y_test), callbacks=[checkpoint]) model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2) if model_acc > 0.85: saved_ok = data_utils.save_model(args.bucket_name,model_name) if saved_ok[0] == True: email_notifications.training_result('retrain_prod',model_acc) sys.exit(0) else: email_notifications.exception(saved_ok[1]) sys.exit(1) else: return except Exception as e: email_notifications.exception('Something went wrong when trying to retrain old production model. Exception: '+str(e)) sys.exit(1) else: email_notifications.exception('Something went wrong when trying to load old production model. Exception: '+str(model_gcs_prod[1])) sys.exit(1) def train_test_model(X_train, X_test, y_train, y_test,args): model_gcs_test = data_utils.load_model(args.bucket_name,'testing',model_name) if model_gcs_test[0] == True: try: cnn = load_model(model_name) model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2) if model_acc > 0.85: # Nothing to do, keep the model the way it is. email_notifications.training_result('old_evaluation_test',model_acc) sys.exit(0) else: cnn = load_model(model_name) checkpoint = ModelCheckpoint(model_name, monitor='val_loss', verbose=1, save_best_only=True, mode='auto', save_freq="epoch") cnn.fit(X_train, y_train, epochs=args.epochs, validation_data=(X_test, y_test), callbacks=[checkpoint]) model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2) if model_acc > 0.85: saved_ok = data_utils.save_model(args.bucket_name,model_name) if saved_ok[0] == True: email_notifications.training_result('retrain_test',model_acc) sys.exit(0) else: email_notifications.exception(saved_ok[1]) sys.exit(1) else: email_notifications.training_result('poor_metrics',model_acc) sys.exit(1) except Exception as e: email_notifications.exception('Something went wrong when trying to retrain old testing model. Exception: '+str(e)) sys.exit(1) else: email_notifications.exception('Something went wrong when trying to load old testing model. Exception: '+str(model_gcs_test[1])) sys.exit(1) def get_args(): parser = argparse.ArgumentParser() parser.add_argument('--bucket-name', type=str, default = 'automatictrainingcicd-aiplatform',help='GCP bucket name') parser.add_argument('--epochs', type=int, default=2, help='Epochs number') args = parser.parse_args() return args def main(): args = get_args() start_training(args) if __name__ == '__main__': main()
该Dockerfile处理Docker容器构建。它从其存储库加载数据集,从其存储库加载代码文件,并定义容器执行应从何处开始:
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0 WORKDIR /root RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-Dataset.git ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-DataCommit.git RUN mv /root/AutomaticTraining-DataCommit/task.py /root RUN mv /root/AutomaticTraining-DataCommit/data_utils.py /root RUN mv /root/AutomaticTraining-DataCommit/email_notifications.py /root ENTRYPOINT ["python","task.py"]
您会注意到代码中的ADD指令。这些强制构建过程在构建容器时始终提取存储库内容——而不是在本地注册表中兑现它们。
在本地构建并运行容器后,您应该能够使用新收集的数据重新训练模型。我们还没有谈到触发这个工作。我们将在稍后讨论GitHub webhooks和Jenkins时介绍这一步,但本质上,只要在相应的存储库中检测到推送,Jenkins就能够触发此工作流程。推送是通过Webhook检测到的,Webhook是一种在存储库本身中配置的方法。
在该过程结束时,您应该会找到存储在GCS测试注册表中的模型文件。
在接下来的文章中,我们将建立一个模型单元测试容器。敬请关注!
https://www.codeproject.com/Articles/5301648/Continuous-Training-in-an-MLOps-Pipeline