您当前的位置: 首页 > 

寒冰屋

暂无认证

  • 0浏览

    0关注

    2286博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Castle动态代理拦截器可构建可重启的流

寒冰屋 发布时间:2020-12-13 21:38:46 ,浏览量:0

目录

介绍

用户故事4:在另一个AppDomain流中创建可中断且可重新启动的

实现——泛型流

实现——流引擎

实现——单元测试

总结

P.S.

本文演示了使用来自Castle Dynamic Proxy框架的代理和拦截器的另一种方法。我们实现了一个可中断且可重新启动的流,该流确实很轻量的,可调试,并且流逻辑在C#方法中。此外,我们使用Moq进行了单元测试,并证明了用于验证和调试代码的过程中,您无需花费时间来创建和运行应用程序。

  • 从Github下载完整的解决方案

您可以在下面的链接中找到有关动态代理的第一篇文章:

  • Castle动态代理拦截器可跟踪模型更改和触发规则
介绍

引用:

我必须感谢Larry Ross向Pro Coders团队指出了面向切面的编程。

如果您对我以前的博客文章(讨论模型更改跟踪和规则执行)不感到惊讶,那么我将第二次给您留下深刻的印象。假设您有一种神奇的技术可以中断.NET方法,保存其状态并稍后从保存的点(或在另一台计算机上)重新启动。也许您认为可以使用Windows休眠功能将Windows映像保存到大文件中,然后将其复制并在其他虚拟机上还原。但不是!我说的是一个大约一百字节的小状态,可以将其保存到数据库中,由另一台机器读取,然后从保存的点重新启动。

你很感兴趣吗?

然后让我们写一个用户故事。

用户故事4:在另一个AppDomain流中创建可中断且可重新启动的
  • 流步骤的逻辑和顺序应采用C#方法
  • 每个步骤都应具有中断流执行的能力
  • 中断的流和流代码的状态应足以在另一个应用程序域中重新启动流
实现——泛型流

首先,我创建了一个新的.NET Core 3.1项目DemoCatleProxy.RestartableFlow并添加了Castle.Core NuGet包。

在架构上,我以可能有许多不同的流的方式来思考此问题,并且使用了一种引擎来执行这些流中的任何一个,并在需要时从保存的点重新启动它们。如果我们从业务文档分发的角度来考虑它——每种类型的文档将具有唯一的流,并且将包含不同的步骤(例如提交,审查,批准,二次批准等)以及这些步骤的不同顺序。

现在,我想为我们将要实现的流定义一个模板。首先,我们需要一个定义流的接口,我们将在流引擎中使用该接口来引用流:

public interface IFlow
{
    object UntypedModel { get; }
    void Execute();
    internal void SetModel(object model);
}

Execute方法是流逻辑的容器,其余声明用于设置模型。

您可以将模型视为文档本身(在流通中)以及所有相关的支持文档——签名、签名日期、当前状态、下一个交付地址等。

我们执行的每个新流都应具有自己的模型。因此,我的下一个接口是带有Model参数的通用模板:

public interface IFlow : IFlow
    where M : class
{
    M Model { get; }
}

因此,每次为新文档定义新流时,您都会提供一个Model参数,例如:

public class MyFlow : IFlow

现在,为了简化将来的流定义,我们创建一个实现基本功能的基类:

public abstract class Flow : IFlow
    where M : class, new()
{
    protected M _model;
    public M Model => _model;
    public object UntypedModel => _model;

    public Flow()
    {
        _model = new M();
    }

    void IFlow.SetModel(object model)
    {
        _model = model as M;
    }

    public abstract void Execute();
}

基类允许使用类型Model,并在构造函数中创建Model的实例。

最终,我们可以尝试定义第一个流(和模型)并查看其工作方式,我希望像往常一样在单元测试中进行操作。我创建了一个新的xUnit项目DemoCatleProxy.RestartableFlow.Tests,并向主项目添加了项目引用。现在让我们添加流代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;

namespace DemoCatleProxy.RestartableFlow.Tests
{
    public class Model1 
    {
        public string ReceivedMessage { get; set; }
        public string Signature { get; set; }

        public bool IsSubmitted { get; set; }
        public bool IsLoaded { get; set; }
    }

    public interface IDemoDataService
    {
        string LoadReceivedMessage();
        bool IsMessageApproved(string message);
        string GetSignature(string message);
        bool Submit(string message, string signature);
    }

    public class DemoFlow1 : Flow
    {
        private readonly IDemoDataService _dataService;

        public DemoFlow1()
        {
        }

        public DemoFlow1(IDemoDataService dataService)
        {
            _dataService = dataService;
        }

        public override void Execute()
        {
            LoadData();

            CheckIfApproved();

            AddDigitalSignature();

            SubmitData();
        }

        public virtual void LoadData()
        {
            if (Model.IsLoaded)
            {
                throw new FlowFatalTerminateException();
            }

            Model.ReceivedMessage = _dataService.LoadReceivedMessage();
            Model.IsLoaded = true;
        }

        public virtual void CheckIfApproved()
        {
            if (!_dataService.IsMessageApproved(Model.ReceivedMessage))
            {
                throw new FlowStopException();
            }
        }

        public virtual void AddDigitalSignature()
        {
            Model.Signature = _dataService.GetSignature(Model.ReceivedMessage);
        }

        public virtual void SubmitData()
        {
            if (!_dataService.Submit(Model.ReceivedMessage, Model.Signature))
            {
                throw new FlowStopException();
            }
        }
    }
}

我定义了Model1和DemoFlow1,并且还为了更好的现实性,我添加了IDemoDataService契约(接口),流将使用该契约与外界进行通信。

如果你看一下Execute方法,你会看到它是四种方法的序列,他们每个([ LoadData,CheckIfApproved,AddDigitalSignature和SubmitData)必须是虚拟的,我会解释为什么。

我们将使用代理拦截器来拦截每个使用的方法的执行。当我们拦截对代理对象的调用时,我们可以决定是否允许调用或跳过该调用。如果重新开始流,我们应该可以跳过所有调用,直到到达上一个停靠点为止。因此,我们从Execute中调用的方法必须是虚拟的。

如果您看一下LoadData,它将检查Model之前是否尚未加载,以确保我们从头开始Model。我们还有一个额外的检查:如果该Model信号已被加载,则该方法将抛出FlowFatalTerminateException(稍后将定义异常)——此流将被标记为已损坏,但是如果Model尚未加载,我们将读取消息从数据服务中设置Model IsLoaded标志。

序列中的下一步是CheckIfApproved方法。它询问数据服务该消息是否被批准,如果不被批准,它将引发另一个异常FlowStopException或继续执行。如果流已停止(通过FlowStopException]),则流引擎将返回状态IsStopped,并且此流可以稍后重新启动。

重要说明:当我谈论可重新启动的流时,是指我们一次创建并运行一次的实例可以成功完成,或者可以返回状态[IsStopped],然后我们可以将该已停止的实例保存在数据库中,然后尝试重新启动它。

我们在流中使用异常,因为这是在特定点停止执行的最有效方法,并且异常会在调用堆栈中冒泡,直到有人抓住它。

让我们将异常代码添加到主项目中:

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowStopException : Exception
    {
    }

    public class FlowFatalTerminateException : Exception
    {
    }
}

我们将使用FlowStopException通知流引擎流已正常停止并且可以在以后重新启动,并且我们将使用FlowFatalTerminateException通知引擎流已损坏。

实现——流引擎

第一次流运行的逻辑如下(DemoFlow1作为例子):

  1. 我们创建一个新的DemoFlow1类实例(Model由继承自它的Flow基类自动创建),并将其提供给流引擎。
  2. 流引擎创建DemoFlow1实例的流代理对象和流数据——这是DemoFlow1实例内发生的所有更改的完整历史记录。
  3. 然后,该流引擎执行流代理 Execute方法,该方法一个接一个的调用LoadData,CheckIfApproved,AddDigitalSignature,SubmitData。
  4. 每次对上述方法的调用都会被拦截,并且每次调用该方法时,我们都会保存数据流,并在调用的方法之后保存Model的副本。
  5. 如果任何被调用的方法引发异常,则Execute方法将被中断,并且流引擎将返回流数据。
  6. 如果Execute方法完成而没有任何中断,则流引擎返回状态为IsFinished = true的流数据。

重新启动流的逻辑是:

  1. 我们创建一个DemoFlow1新实例,并从上一次运行中获得流数据,然后将流实例和流数据提供给流引擎。
  2. 流引擎创建DemoFlow1实例的流代理对象,并使用提供的流数据。
  3. 然后,该流引擎执行流代理 Execute方法,该方法一个接一个的调用LoadData,CheckIfApproved,AddDigitalSignature,SubmitData。
  4. 对上述方法的每次调用都会被拦截,我们将检查所调用的方法是否已记录在流数据历史中。如果是,则跳过此调用,并用流数据历史中的模型替换当前DemoFlow1实例Model ,以使其与第一次运行此方法后的模型相同。
  5. 如果在流数据历史记录中未找到该调用,我们将继续进行调用并将其Model存储并保存到历史记录中,以与第一次运行相同的方式继续执行流。

对于流引擎,我们将需要一个接口:

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public interface IFlowEngine
    {
        FlowData RunFlow(IFlow flow);
        FlowData RestartFlow(IFlow flow, FlowData flowData);
    }
}

和流数据类:

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowData
    {
        public bool IsFinished { get; set; }
        public List CallHistory { get; set; } = new List();
        public List ModelHistory { get; set; } = new List();
        public bool IsStopped { get; set; }
        public Exception LastException { get; set; }
    }
}

现在,我们可以介绍流引擎:

 

using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Text;
using System.Xml.Schema;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowEngine : IFlowEngine, IInterceptor
    {
        private readonly IProxyGenerator _proxyGenerator;
        private FlowData _flowData;
        private IFlow _flow;
        private int _counter;

        public FlowEngine(IProxyGenerator proxyGenerator)
        {
            _proxyGenerator = proxyGenerator;
        }

        public FlowData RunFlow(IFlow flow)
        {
            _flowData = new FlowData();
            return ProcessFlow(flow);
        }

        public FlowData RestartFlow(IFlow flow, FlowData flowData)
        {
            _flowData = flowData;
            return ProcessFlow(flow);
        }

        private FlowData ProcessFlow(IFlow flow)
        {
            var options = new ProxyGenerationOptions(new FreezableProxyGenerationHook(flow));
            var flowProxy = _proxyGenerator.CreateClassProxyWithTarget(flow.GetType(), 
                            flow, options, new IInterceptor[] { this }) as IFlow;
            _flow = flow;

            try
            {
                // clear previous statuses
                _counter = 0;
                _flowData.IsStopped = false;
                _flowData.LastException = null;

                // run flow
                flowProxy.Execute();
                _flowData.IsFinished = true;
            }
            catch (FlowStopException e)
            {
                _flowData.IsStopped = true;
            }
            catch (Exception e)
            {
                _flowData.LastException = e;
            }

            return _flowData;
        }

        public void Intercept(IInvocation invocation)
        {
            var method = invocation.Method.Name;
            _counter++;
            var historyRecord = $"{_counter}:{method}";

            var index = _flowData.CallHistory.IndexOf(historyRecord);

            if (index == -1)
            {
                // new call, proceed and update histories if no exceptions thrown
                invocation.Proceed();
                _flowData.CallHistory.Add(historyRecord);

                // Clone Model to store new independednt instance
                _flowData.ModelHistory.Add(_flow.UntypedModel.CloneObject());
            }
            else
            {
                // replay in vacuum: don't proceed call and substitute model for next call
                _flow.SetModel(_flowData.ModelHistory[index]);
            }
        }
    }
}

 

如您所见,FlowEngine类在private字段中保持内部状态,因此无法将其声明为单例,每次需要时,我们将需要创建引擎的新实例,并且该实例无法在多个线程之间共享。

如果您通过引擎实现,您将看到它完成了我们上面描述的所有工作。

流引擎使用CloneObject扩展来创建Model对象的新实例并复制其所有属性。请添加Newtonsoft.Json NuGet软件包和以下代码:

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public static class ObjectExtension
    {
        public static T CloneObject(this T source)
        {
            var jsonSerializerSettings = new JsonSerializerSettings
            {
                TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple,
                TypeNameHandling = TypeNameHandling.Objects
            };

            var json = JsonConvert.SerializeObject(source, jsonSerializerSettings);
            var result = JsonConvert.DeserializeObject(json, jsonSerializerSettings);
            return result;
        }
    }
}

也许这不是最有效的方法,但我不想使本文复杂化。

现在要添加的最后一位是一个钩子,需要将其提供给代理生成方法选项。当从其他方法([Execute调用LoadData])调用的所有方法都将在代理级别(而不是原始对象级别)上执行时,它将设置行为,以便我们拦截所有调用。钩子代码为:

using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FreezableProxyGenerationHook : IProxyGenerationHook
    {
        private IFlow _flow;

        public FreezableProxyGenerationHook(IFlow flow)
        {
            _flow = flow;
        }

        public override int GetHashCode()
        {
            return _flow.GetHashCode();
        }

        public override bool Equals(object obj)
        {
            return _flow == (obj as FreezableProxyGenerationHook)._flow;
        }

        public bool ShouldInterceptMethod(Type type, MethodInfo memberInfo)
        {
            return memberInfo.Name != "Execute" && memberInfo.Name != "SetModel";
        }

        public void NonVirtualMemberNotification(Type type, MemberInfo memberInfo)
        {
        }

        public void MethodsInspected()
        {
        }

        public void NonProxyableMemberNotification(Type type, MemberInfo memberInfo)
        {
        }
    }
}

现在,我们可以编译我们的代码并开始对其进行测试。

实现——单元测试

让我们实现一个单元测试,以验证一个简单的场景:

  1. 我们创建一个流并在第一次运行它,然后demo服务IsMessageApproved方法返回False,因此应在CheckIfApproved方法上中断该流。
  2. 在第一次运行后保留流数据,我们尝试重新启动流,但是此时IsMessageApproved返回True,因此我们希望流能够成功完成。

我将代码添加到DemoFlowTests.cs文件中:

using Castle.DynamicProxy;
using System;
using Xunit;
using Moq;

namespace DemoCatleProxy.RestartableFlow.Tests
{
    public class DemoFlowTests
    {
        [Fact]
        public void RunStopRestartFlowTest()
        {
            var flowEngine = new FlowEngine(new ProxyGenerator());

            var demoService = new Mock();
            var flow = new DemoFlow1(demoService.Object);
            int approveTimes = 0;

            demoService.Setup(s => s.LoadReceivedMessage()).Returns("Important message 1");
            demoService.Setup(s => s.GetSignature(It.IsAny())).Returns("0xAABBEFA7");
            demoService.Setup(s => s.Submit(It.IsAny(), 
                                   It.IsAny())).Returns(true);
           
            // the first time it returns false, the second time it returns true
            demoService.Setup(s => s.IsMessageApproved(It.IsAny()))
                .Returns(() => 
                {
                    approveTimes++;
                    return approveTimes == 2; 
                });

            var flowData = flowEngine.RunFlow(flow);
            Assert.True(flowData.IsStopped);
            Assert.False(flowData.IsFinished);
            Assert.Single(flowData.ModelHistory);
            Assert.True((flowData.ModelHistory[0] as Model1)?.IsLoaded);

            // assume we saved flowData to a database and rerun the flow one day after
            var clonedFlowData = flowData.CloneObject();
            var newFlow = new DemoFlow1(demoService.Object);
            clonedFlowData = flowEngine.RestartFlow(newFlow, clonedFlowData);
            Assert.False(clonedFlowData.IsStopped);
            Assert.True(clonedFlowData.IsFinished);
        }
    }
}

使用依赖注入时,可以节省大量的时间和精力在单元测试中。我使用该Moq框架(请安装Moq NuGet软件包)通过IDemoDataService接口生成演示数据服务。真正的演示数据服务尚未实现——我们不需要进行测试。

我设置了所有四个方法,其中三个返回常量值,但IsMessageApproved将在第一次返回False和下一次返回True。

我的测试第一次创建并运行一个流,并检查结果流数据,它应该具有IsStopped = True和IsFinished = False。流数据包含中断的流的状态。此状态是我们重新启动流所需的一切。

我们克隆了流数据(您还记得,我将其序列化为JSON并反序列化了它),以证明流数据是可传输的并且不依赖AppDomain。

最后,我调用RestartFlow。现在测试期望IsStopped = False和IsFinished = True。

如果在RestartFlow调用的行上放置一个断点并使用Step Into,直到使用Intercept方法,您将看到流引擎跳过逻辑的工作方式: 

 

我建议与调试器一起使用,以了解流是如何首次执行以及如何重新启动以完成的。

总结

本文演示了使用来自Castle Dynamic Proxy框架的代理和拦截器的另一种方法。我们实现了一个可中断和可重新启动的流,该流在概念上类似于Microsoft Workflow Foundation流,但是它确实很轻巧、可调试,并且流逻辑在C#方法中。此外,我们使用Moq进行单元测试,并且再次证明,对于验证和调试代码,您无需花时间创建和运行应用程序。

节省单元测试的时间,并感谢您的阅读。

P.S.

我从我的朋友那里得到了关于此博客的一些反馈,实际上,他们对为什么以这种复杂的方式实现包含四个方法序列的简单直接流感兴趣? 

事实是,此博客中的流示例很简单,可以更好地理解。在实际场景中,您的流将包含if条件,goto甚至可能是循环,并且即使在复杂的流中,该方法也将继续起作用。这是Pro Coders团队项目之一的示例流:

public override async Task Execute()
{
    await BeginAsync();
    await PopulateData();
    await FlowTask(typeof(InsuranceClaimValidationRulesTask));
            
dataEntry:
    if (TaskExecutionValidationIssues.Any())
    {
        await UserInput(typeof(InsuranceClaimEntryForm));
    }

    if (Model.Claim.ClaimAmount.Amount > 1000)
    {
        await UserInputReview(typeof(InsuranceClaimEntryForm));

        if (Model.ClaimRejected)
        {
            await SaveRejectedClaim();
            await GenerateRejectLetter();
            goto postDocuments;
        }

        if (TaskExecutionValidationIssues.Any())
        {
            goto dataEntry;
        }
    }

    if (Model.Claim.PaymentOptionCode == "EFT")
    {
        await MakeTransfer();
    }
    else if (Model.Claim.PaymentOptionCode == "CHQ")
    {
        await PrintBankCheque();
    }
    else if (Model.Claim.PaymentOptionCode == "FUT_CONTR")
    {
        await BuyOptions();
    }
    else
    {
        Fail($"Invalid Payment Option {Model.Claim.PaymentOptionCode}", _logStreamer);
    }

    await SaveClaim();
    await GenerateSuccessLetter();

postDocuments:
    await PostProducedDocuments();
    await EndAsync();
}

如您所见,该流引擎还支持异步操作async-await和用户以动态形式输入,但是原理是相同的——流可以在某个时刻停止甚至失败,然后在一天或一个月后在另一台机器上重新启动。

关注
打赏
1665926880
查看更多评论
立即登录/注册

微信扫码登录

0.0654s