目录
task.py
data_utils.py
Dockerfile
下一步
- 下载源 - 1.2 MB
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。您最终会得到一个满足Google MLOps 成熟度模型2级要求的功能管道。我们假设您对Python、深度学习、Docker、DevOps和Flask有一定的了解。
在上一篇文章中,我们讨论了ML CI/CD管道中的单元测试步骤。在本节中,我们将构建模型API以支持预测服务。
下图显示了我们在项目过程中的位置。
代码文件的结构如下:
本文中的大部分代码与上一篇几乎相同,因此我们只看不同之处。
在此存储库中查找完整代码,因为下面显示的代码段是精简版本。
task.py协调容器内程序执行的task.py文件如下所示:
import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils, email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import flash,Flask,Response,request,jsonify
import threading
import requests
import time
# IMPORTANT
# If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/
# Starting flask app
app = Flask(__name__)
# general variables declaration
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
global model
@app.before_first_request
def before_first_request():
def initialize_job():
if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
global model
# Checking if there's any model saved at testing on GCS
model_gcs = data_utils.previous_model(bucket_name,model_name)
# If any model exists at prod, load it, test it on data and use it on the API
if model_gcs[0] == True:
model_gcs = data_utils.load_model(bucket_name,model_name)
if model_gcs[0] == True:
try:
model = load_model(model_name)
except Exception as e:
email_notifications.exception('Something went wrong trying to production model. Exception: '+str(e))
sys.exit(1)
else:
email_notifications.exception('Something went wrong when trying to load production model. Exception: '+str(model_gcs[1]))
sys.exit(1)
if model_gcs[0] == False:
email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
sys.exit(1)
if model_gcs[0] == None:
email_notifications.exception('Something went wrong when trying to check if production model exists. Exception: '+model_gcs[1]+'. Aborting execution.')
sys.exit(1)
thread = threading.Thread(target=initialize_job)
thread.start()
@app.route('/init', methods=['GET','POST'])
def init():
message = {'message': 'API initialized.'}
response = jsonpickle.encode(message)
return Response(response=response, status=200, mimetype="application/json")
@app.route('/', methods=['POST'])
def index():
if request.method=='POST':
try:
#Converting string that contains image to uint8
image = np.fromstring(request.data,np.uint8)
image = image.reshape((128,128,3))
image = [image]
image = np.array(image)
image = image.astype(np.float16)
result = model.predict(image)
result = np.argmax(result)
message = {'message': '{}'.format(str(result))}
json_response = jsonify(message)
return json_response
except Exception as e:
message = {'message': 'Error'}
json_response = jsonify(message)
email_notifications.exception('Something went wrong when trying to make prediction via Production API. Exception: '+str(e)+'. Aborting execution.')
return json_response
else:
message = {'message': 'Error. Please use this API in a proper manner.'}
json_response = jsonify(message)
return json_response
def self_initialize():
def initialization():
global started
started = False
while started == False:
try:
server_response = requests.get('http://127.0.0.1:5000/init')
if server_response.status_code == 200:
print('API has started successfully, quitting initialization job.')
started = True
except:
print('API has not started. Still attempting to initialize it.')
time.sleep(3)
thread = threading.Thread(target=initialization)
thread.start()
if __name__ == '__main__':
self_initialize()
app.run(host='0.0.0.0',debug=True,threaded=True)
该data_utils.py文件不同于其以前的版本仅在加载从生产注册表模型中的一部分。区别在于:
- status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_filename)).exists(storage_client) 经过 status = storage.Blob(bucket=bucket, name='{}/{}'.format('production',model_filename)).exists(storage_client)
- blob1 = bucket.blob('{}/{}'.format('testing',model_filename)) by blob1 = bucket.blob('{}/{}'.format('production',model_filename))
在我们的Dockerfile 中,替换
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git
为
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-PredictionAPI.git
在本地构建并运行容器后,您应该可以通过POST请求在http://172.17.0.2:5000/获得一个功能齐全的预测服务。
下一步在接下来的系列文章中,我们将看到如何在Kubernetes、Jenkins和Google Cloud Platform的帮助下将各个容器链接到一个实际的管道中。敬请关注!
https://www.codeproject.com/Articles/5301650/Building-an-MLOps-Model-API