目录
1. python代码
- 1. python代码
import requests
import os
import shutil
import zipfile
azkaban_url="http://192.168.8.111:8050"
azkaban_username="azkaban"
azkaban_password="azkaban"
project_name = "test_project"
flow_name = "test"
azkaban_zip_dir_base_path = "C:\\Users\\dell\\Desktop"
# 登录azkaban, 并获取azkaban_session_id
def login():
url = azkaban_url + "?action=login"
data = {
"username": azkaban_username,
"password": azkaban_password
}
response = requests.post(url=url, data=data)
azkaban_session_id = response.json()["session.id"]
return azkaban_session_id
# 删除project项目
def delete_project(azkaban_session_id):
url = azkaban_url + "/manager?delete=true&session.id=%s&project=%s" \
% (azkaban_session_id, project_name)
# no response
requests.get(url=url)
# 创建project, 如果project存在则先删除
def create_overwrite_project(azkaban_session_id):
url = azkaban_url + "/manager?action=create"
data = {
"session.id": azkaban_session_id,
"name": project_name,
"description": "测试项目"
}
response = requests.post(url=url, data=data)
# 如果创建失败直接抛异常
if response.json()["status"]=="error" and response.json()["message"] != "Project already exists.":
raise Exception(response.json())
# 如果project存在,则删除后,再创建
if "message" in dict(response.json()).keys() and response.json()["message"] == "Project already exists.":
delete_project(azkaban_session_id)
create_overwrite_project(azkaban_session_id)
# 生成azkaban的zip项目文件
def generate_zip_dir():
# 创建目录, 如果目录存在,则删除目录
azkaban_zip_dir_path = os.path.join(azkaban_zip_dir_base_path, project_name)
if os.path.exists(azkaban_zip_dir_path):
shutil.rmtree(azkaban_zip_dir_path)
os.mkdir(azkaban_zip_dir_path)
# 生成flow20.project文件
azkaban_zip_flow20_project_file_path = os.path.join(azkaban_zip_dir_path, "flow20.project")
azkaban_zip_flow20_project_file = open(azkaban_zip_flow20_project_file_path, mode = 'w')
azkaban_zip_flow20_project_file.write("azkaban-flow-version: 2.0")
azkaban_zip_flow20_project_file.close()
# 生成test.flow文件
azkaban_zip_test_flow_file_path = os.path.join(azkaban_zip_dir_path, flow_name+".flow")
azkaban_zip_test_flow_file = open(azkaban_zip_test_flow_file_path, mode = 'w')
azkaban_zip_test_flow_file.write("nodes:\n")
azkaban_zip_test_flow_file.write(" - name: job1\n")
azkaban_zip_test_flow_file.write(" type: command\n")
azkaban_zip_test_flow_file.write(" config:\n")
azkaban_zip_test_flow_file.write(' command: echo "This is an echoed text."\n')
azkaban_zip_test_flow_file.close()
# 压缩文件夹
# 如果zip文件存在,则删除zip文件
if os.path.exists(azkaban_zip_dir_path + ".zip"):
os.remove(azkaban_zip_dir_path + ".zip")
zip = zipfile.ZipFile(file = azkaban_zip_dir_path + ".zip", mode = 'w', compression = zipfile.ZIP_DEFLATED)
pre_len = len(os.path.dirname(azkaban_zip_dir_path))
for dirpath, dirnames, filenames in os.walk(azkaban_zip_dir_path):
for filename in filenames:
azkaban_zip_file_path = os.path.join(dirpath, filename)
zip.write(filename=azkaban_zip_file_path, arcname = azkaban_zip_file_path[pre_len:].strip(os.path.sep))
zip.close()
# 上传项目zip文件
def upload_project_zip(azkaban_session_id):
url = azkaban_url + "/manager?ajax=upload"
# 3-tuple:('filename', fileobj, 'content_type')
files = {'file':(project_name+ ".zip",
open(os.path.join(azkaban_zip_dir_base_path, project_name) + ".zip", 'rb'),
'application/zip')}
data = {
"session.id": azkaban_session_id,
"project": project_name,
"file": files,
'ajax':'upload'
}
response = requests.post(url=url, data=data, files= files)
# 如果上传失败直接抛异常
if "error" in dict(response.json()).keys():
raise Exception(response.json())
# 以后台的方式执行flow, 提交完立刻返回response, 所以需要根据execution_status进行处理
def execute_flow(azkaban_session_id):
url = azkaban_url + "/executor?ajax=executeFlow&session.id=%s&project=%s&flow=%s" \
% (azkaban_session_id, project_name, flow_name)
response = requests.get(url=url)
# 如果执行失败直接抛异常
if "error" in dict(response.json()).keys():
raise Exception(response.json())
execution_id = response.json()['execid']
return execution_id
# 获取flow执行的信息
def fetch_flow_execution(azkaban_session_id, execution_id):
url = azkaban_url + "/executor?ajax=fetchexecflow&session.id=%s&execid=%d" \
% (azkaban_session_id, execution_id)
response = requests.get(url=url)
execution_status=response.json()['nodes'][0]['status']
return execution_status
if __name__ == '__main__':
azkaban_session_id = login()
create_overwrite_project(azkaban_session_id)
generate_zip_dir()
upload_project_zip(azkaban_session_id)
execution_id = execute_flow(azkaban_session_id)
execution_status = fetch_flow_execution(azkaban_session_id, execution_id)
delete_project(azkaban_session_id)