介绍
背景
系统架构
编排引擎
REST API
后端
业务逻辑
任务
运行条件
任务类型
REST任务的任务属性(同步和异步)
转换器的任务属性
拆分器的任务属性
业务逻辑JSON的示例
使用代码
IAdministration
IOrchestration
管理API
业务流程API
兴趣点
- GitHub上的源代码 - http://github.com/WorkMaze/RESTGrid
- MySql提供程序的编排引擎docker容器http://hub.docker.com/r/workmaze/restgrid.mysqlengine.workflow/
- 用于MySql提供程序的REST API docker容器http://hub.docker.com/r/workmaze/restgrid.mysqlengine.api/
- 适用于AWS DynamoDB的业务流程引脚架容器(AWS的NoSQL服务)https://hub.docker.com/r/workmaze/restgrid.dynamodbengine.workflow/
- AWS DynamoDB提供程序的REST API docker容器http://hub.docker.com/r/workmaze/restgrid.dynamodbengine.api/
RESTGrid是.NET Core中的一个简单的Workflow \ ETL系统,它使用REST服务与外部世界进行交互。它具有以下特征:
- 系统能够使用JSON格式定义工作流。
- 工作流可以是同步的,也可以是长期运行和异步的。
- 工作流中的每个步骤都是一个REST服务的调用。
- 步骤可以是同步的,其中在执行前一步骤之后调用下一步骤。
- 步骤也可以是异步的,其中在执行步骤之后,工作流进入等待阶段并等待外部调用。
- 可以使用REST API启动,重新启动工作流。
- 使用REST API进行管理。
- 能够使用JUST定义JSON转换 ——https://blog.csdn.net/mzl87/article/details/88716658。
- 基于接口的系统,因此可以连接到不同的数据源。
- Docker容器可用于MySql提供程序。
- Docker容器可用于AWS DynamoDB(AWS的NoSQL服务)提供程序。
我对这个软件的意图是提供一个专注于基于.NET Framework和MySql的Messaging \ ETL \ Workflow系统的解决方案。这不包括丰富的Windows UI功能。但是,还有一些额外的功能:
- 源代码位于源代码中包含的.NET Core和docker文件中。
- Docker Hub提供了MySql提供程序的Docker容器。
- 虽然缺少丰富的Windows UI功能,但还包括用于管理的REST API。
- 目前,仅包含MySql提供程序。但是,可以使用提供的接口轻松开发新的提供程序。
- AWS(DynamoDB)的NoSQL服务提供程序现在也已包含在解决方案中。
关于系统的各种组件的简要描述。下面是表示系统架构的示意图:
这是系统的核心,它将任务排入队列,读取工作流的业务逻辑并运行任务。
REST API外部系统使用它来启动\重新启动工作流以及用户管理\设置工作流。
后端这是存储配置和历史数据的商店。RESTGrid是主要项目,它定义系统的接口,对象和编排逻辑。
由于这是一个基于接口的项目,只要我们实现定义的接口,我们就可以连接任何后端。
RESTGrid的核心库可作为Nuget包提供:https://www.nuget.org/packages/RESTGrid/
业务逻辑系统的业务逻辑使用标准JSON格式定义。
该root对象包含以下两个属性:
- Start (工作流程的起始任务)
- Tasks (工作流中所有后续任务的数组)
该Task JSON包含以下属性:
- Identifier——标识任务的唯一标识符
- Type——任务类型
- Next——一组任务标识符,指向可在此任务之后运行的任务
- TaskRetries——任务在失败之前可以重试的次数
- TaskProperties——包含有关如何运行任务的信息的JSON对象(可以使用JUST转换)
- RunCondition——运行任务必须满足的条件
该run条件决定哪些需要满足的任务运行的条件。它具有以下属性:
- Evaluator——需要评估的表达式(可以使用JUST转换)。
- Evaluated——要评估的表达式(可以使用JUST转换)。
- Operator——标准操作符:
- stringequals
- stringcontains
- mathequals
- mathgreaterthan
- mathlessthan
- mathgreaterthanorequalto
- mathlessthanorequalto
任务可以是以下四种类型之一:
- Sync——同步REST调用(下一步在同步调用后执行)。
- Async——异步REST调用(工作流在执行异步调用后等待外部输入)。
- Transformer——转换消息体(JUST转换)
- Splitter——根据JSON内的数组拆分消息体(JUST拆分)
REST任务的JSON任务属性包含以下属性:
- Url—— REST服务的Url
- Method—— GET,POST,PUT或者DELETE
- Headers——键值对JSON(可选)
- Body
- QueryString
- TransformerID——标识转换器JSON的整数ID(对于MySql,这是transformer表的主键)
- ArrayPath——指向数组的JSONPath。
"Start": {
"Identifier": "CreateUser",
"Next": [
"CreateRole"
],
"RunCondition": null,
"Type": "Transformer",
"TaskProperties": {
"TransformerID": "3"
}
},
"Tasks": [
{
"Identifier": "CreateRole",
"Next": [
"AddApplication",
"Notify"
],
"Type": "Splitter",
"TaskProperties": {
"ArrayPath": "$.Organization.Employee"
}
},
{
"Identifier": "AddApplication",
"Type": "Sync",
"Next": [
"Approve"
],
"RunCondition": {
"Evaluated": "CreditCard",
"Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
"Operator": "stringequals"
},
"TaskProperties": {
"Url": "http://localhost:5001/",
"Method": "POST",
"Headers": null,
"Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
"QueryString": "api/table/user"
},
"TaskRetries": 0
},
{
"Identifier": "Notify",
"Next": [
"Approve"
],
"Type": "Async",
"RunCondition": {
"Evaluated": "Cash",
"Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
"Operator": "stringequals"
},
"TaskProperties": {
"Url": "http://localhost:5001/",
"Method": "POST",
"Headers": null,
"Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
"QueryString": "api/table/user"
},
"TaskRetries": 0
},
{
"Identifier": "Approve",
"Type": "Sync",
"TaskProperties": {
"Url": "http://localhost:5001/",
"Method": "POST",
"Headers": null,
"Body": {
"Message": "Your payment has been approved"
},
"QueryString": "api/table/user"
},
"TaskRetries": 0
}
]
}
上面的业务逻辑JSON代表了一个工作流,它看起来像下面的示意图:
RESTGrid是一个基于接口的系统。核心库提供了可以继承的接口,以实现您自己的后端提供程序。
提供以下接口:
IAdministrationnamespace RESTGrid.Interfaces
{
public interface IAdministration
{
void CreateWorkflowType(string workflowTypeName, JObject businessLogicJson);
void CreateTransformer(JObject transformerJson);
WorkflowHistory GetHistory(string workflowID);
}
}
IOrchestration
namespace RESTGrid.Interfaces
{
public interface IOrchestration
{
void PublishWorkflowStep(string workflowTypeName, Guid workflowID,
JObject messageBodyJson, JObject customPropertiesJson, string stepIdentifier,
bool stepSucceeded, bool workflowCompleted, int retries, bool active,
string runStepIdentifier, string splitID);
void SetWorkflowActive(JObject messageBodyJson, string customPropertyName,
string customPropertyValue);
List Enqueue();
JObject GetTransformer(int transformerID);
}
}
一旦你实现了IOrchestration接口后,您可以轻松开发自己的Orchestration\ Workflow引擎。以下是MySql引擎的实现方式:
MySqlOrchestration orchestration = new MySqlOrchestration(connectionString);
OrchestrationEngine engine = new OrchestrationEngine(orchestration);
Console.WriteLine("Running orchestration engine...");
while (true)
{
engine.Run();
}
MySql提供程序的REST API具有以下API:
管理APIGET {url}/api/Administration/History{workflowID}
Returns 200 OK with a JSON containing the entire workflow history.
POST {url}/api/Administration/WorkflowType/{workflowTypeName}
Body containing the Business Logic JSON.
Returns 204 No Content.
POST {url}/api/Administration/Transformer
Returns 204 No Content.
业务流程API
PUT {url}/api/Orchestration/Workflow/{customPropertyName}/{customPropertyValue}
Body (Optional) - .
Returns 204 No Content.
POST {url}/api/Orchestration/Workflow/{workflowTypeName}
Body containing the JSON message in the following format:-
{
"MessageBody": ,
"CustomProperties": ,
}
Returns 204 No Content
兴趣点
- 由于该项目使用.NET Core和docker容器,因此可以轻松地托管在支持docker的云提供商上。
- MySql数据库既可以是独立系统,也可以托管在支持它的云提供商上。
- 系统调用的REST服务可以托管在云中,从而可以在云上托管整个系统。
原文地址:https://www.codeproject.com/Articles/1204148/RESTGrid