您当前的位置: 首页 > 

寒冰屋

暂无认证

  • 0浏览

    0关注

    2286博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

快速轻巧的CQRS和事件源解决方案

寒冰屋 发布时间:2020-07-04 16:23:29 ,浏览量:0

目录

介绍

何苦呢?

优先级

1.可读性

2.性能

3.可调试性

4.最小的依赖

5. 单独的命令和事件

6.多租户

7. Sagas/流程经理

8. 调度

9. 聚合到期

10. Async/Await是邪恶的

整洁架构

Timeline项目

样例项目

总览

入门

用法

方案A:如何创建和更新联系人

数据流

方案B:如何制作聚合快照

方案C:如何使聚合脱机

方案D:如何创建具有唯一登录名的新用户

方案E:如何调度命令

方案F:如何使用一个命令更新多个聚合

方案G:如何实现自定义事件处理程序

方案H:如何使用自定义处理程序覆盖命令

展示

应用程序

写端(Write Side)

读端(Read Side)

领域

持久性

CQRS + ES主干

命令

事件

聚合

快照

指标

本文的目的是提供使用C#编程语言和.NET Framework的CQRS + ES模式的快速、轻量级实现。此实现功能比较全面,包括对命令和事件的SQL Server序列化、调度的命令、快照、sagas(即流程管理器)以及用于多租户自定义的即插即用替代的支持。我将描述代码的结构,并说明如何与示例应用程序一起工作。

  • 下载源-3.6 MB
介绍

如果您正在阅读本文,那么您可能已经对命令查询责任隔离(CQRS)和事件源(ES)有所了解,所以我不会解释它是什么,为什么要使用它,或者为什么你想要避免它。有很多相关的文章可用参考,如下:

  • CQRS in Practice
  • CQRS by Martin Fowler
  • Event Sourcing Pattern (Microservices.io) or Event Sourcing Pattern (Microsoft.com)
  • Event Sourcing: The Good, the Bad, and the Ugly
  • What They Don't Tell You About Event Sourcing

本文的目的是提供使用C#编程语言和.NET Framework的快速,轻量级的实现。

此实现功能比较全面,包括支持SQL Server命令和事件的持久性、调度的命令、快照、sagas(即流程管理器)以及用于多租户自定义的即插即用替代。

我将描述代码的结构,并说明如何使用遵循整洁架构(Clean Architectur)模式的示例应用程序。

何苦呢?

几乎可以肯定这是您的第一个问题。

已经有很多CQRS + ES框架和解决方案,这些框架和解决方案已经得到充分开发和充分验证。如果您正在研究选择方案并评估构建vs购买决策,那么您可以选择出色的商业产品和开源替代品。

例如:

  • Event Store
  • EventFlow
  • CQRSlite
  • SimpleCQRS

为什么要实现另一种解决方案?为什么我要从头开始并发展自己的?

我研究和使用CQRS和ES模式已经有几年了。我既使用了商业解决方案,也使用了开源解决方案,并使用它们来构建和/或改进实际的生产系统。如果您的背景和经历与我的相似,那么您可能已经知道该问题的答案,但是您可能不想相信这是真的(因为我很长时间没有这样做):

如果您认真考虑采用CQRS + ES架构,那么除了自行构建之外,别无选择。

就像克里斯·基尔(Chris Kiehl)在他的文章中说的那样:“事件源很难 ”

...您可能会从头开始构建核心组件。就技术堆栈而言,该领域的框架往往是重量级,规范性强,缺乏灵活性的框架。如果您想启动并运行某事,则应该自己动手做(这是建议的方法)。

如果是这样,那么对我来说写这篇文章有什么帮助?

简单:这是另一个带有源代码的示例,因此您可以看到我如何解决CQRS + ES解决方案中出现的一些问题。如果您是从CQRS + ES项目开始的,那么您应该研究发现的所有示例。

我不希望(或建议)采用此源代码并将其合并到您开发的任何应用程序中。

相反,我的意图仅是提供另一个示例,您可以从中得出一些想法——也许(如果我做得不错的话)为您自己的项目提供一些小启发。

优先级

从驱动实现的优先级列表开始是很重要的,因为要做出的许多设计决策都需要进行重大的权衡。

CQRS + ES的纯粹主义者会反对我的某些决定,并坚决谴责其他决定。我可以忍受这一点。

我已经设计和开发软件很长时间了(比我准备在这里承认的时间还长)。我流血、流汗、流泪和染白头发不止一点——所以我敏锐地意识到,面对取舍,选择不当所带来的成本。

以下优先级有助于告知和指导这些决定。它们以重要性的高低顺序列出,但是所有都是必需的,以,给自己倒杯酒,安顿下来,因为这里的序言将是很长的...

1.可读性

该代码必须可读。

代码越可读,它就越有用、可维护和可扩展。

在我使用的某些实现中(以及我自己开发的某些实现中),除了原始作者之外,几乎所有其他人都无法理解底层CQRS + ES主干的代码。我们不能在这里允许。一小组开发人员必须能够并且相对容易地共享并使用代码,并且充分了解其工作方式以及为什么以这种方式编写代码。

尤其重要的是,用于注册命令处理程序和事件处理程序的代码必须简单明了。

许多CQRS + ES框架使用反射和依赖注入的组合来自动注册用户,以处理命令和事件。尽管这通常非常聪明——并通常会减少项目中的代码行总数——但它隐藏了命令(或事件)与其订阅者之间的关系,从而将这些关系变成了不透明的,神奇的黑框。许多控制反转(IoC)容器使此操作变得容易,因此可以理解,但我相信这是一个错误。

需要明确的是:在项目中使用IoC容器不是错误。依赖注入是一种出色的最佳实践,也是一种完善的重要技术。但是,依赖项注入模式本身并不是发布-订阅模式,并且将两者混为一谈会导致很多痛苦和灾难。在IoC容器库中使用高度专业化的功能来自动化该库的预期用途之外的功能,然后将软件体系结构中最关键的组件紧密结合在一起是一个错误(我自己做过)。当您的应用程序中的某些行为异常时,这将使故障排除和调试异常困难且耗时。

因此,作为此可读性目标的一部分,必须在代码中显式定义命令处理程序和事件处理程序的注册,而不是通过约定或自动化来隐式定义。

2.性能

代码必须是快速的。

处理命令和事件是在CQRS + ES架构上开发的任何系统的核心,因此吞吐量优化是关键的性能指标。

就并发用户和系统发出命令并观察已发布事件的影响而言,实现必须处理最大可能的数量。

在我以前的一些实现中,很多痛苦和苦难是由于将命令发送到大型聚合(例如,具有大量事件流的长期聚合)时发生的并发冲突而引起的。根本原因通常是性能不佳的代码。因此,算法优化至关重要。

快照是满足此要求所不可或缺的,因此必须是解决方案所不可或缺的。该实现必须具有对每个聚合根上的自动快照的内置支持。

内存中缓存是运行时优化的另一个重要部分,因此,它也必须是解决方案不可或缺的一部分。

3.可调试性

使用标准调试器(如Visual Studio IDE调试器)来跟踪代码并跟踪其执行必须是很容易的。

我已经看到许多CQRS + ES实现依赖于复杂的算法来动态注册、查找和调用用于处理命令和事件的方法。

同样,这些算法中的许多算法都非常聪明:它们具有强大的功能和灵活性,并且可以显着减少解决方案中的代码行数。

例如,我在过去的一些项目中使用过DynamicInvoker类。这是一段巧妙的代码——少于150行——而且效果很好。(我没有写它,所以当我这么说的时候我并不自夸。)但是,如果代码中有些杂乱无章的东西,您已经编写了调用此类的方法的代码,并且如果需要使用调试器,然后你需要特别熟练地进行思维体操,以了解所发生的事情。我不是,所以如果使用任何动态调用,那么在使用调试器时,理解代码和跟踪其执行的线程必须非常容易。

4.最小的依赖

外部依赖性必须保持在绝对的最低限度。

过多的依赖性导致代码比您在系统的任何关键组件中可能需要的速度更慢,更重且更脆弱。最小化依赖关系有助于确保您的代码更快、更轻巧、更健壮。

最重要的是,最小化依赖性有助于确保解决方案不会与任何外部程序集、服务或组件紧密耦合,除非该依赖性至关重要。

如果软件的基本体系结构依赖于某些外部第三方组件,则必须做好准备,有可能在某天对其进行更改可能会影响您的项目。有时这是可以接受的风险,而其他时候则不是。

在该特定实现方式中,对该风险的容忍度非常低。

因此,您会注意到核心的Timeline程序集(实现CQRS + ES主干)仅具有一个外部依赖项:即System.NET Framework中的名称空间。

旁白一下,因为这是一篇有趣的文章,说明了我的观点:在撰写本文时,2018年,NPM JavaScript软件包“单数”在一周内有280万以上的安装。所有这些开发人员都没有编写基本的代码来让函数在数字为奇数时返回true的情况,而是选择将is-odd程序包与他们的300多个依赖项链合并到他们的解决方案中!

5. 单独的命令和事件

许多CQRS + ES框架都以共同的基类派生的方式实现一个Command类和一个Event类。

这样做的理由很明显:将命令和事件都视为通用消息的子类型是很自然的。两者都是使用某种形式的“服务总线”“发送”的,那么为什么不在共享基类中实现通用功能,而编写一个双重用途的类来路由消息——而不是编写大量重复代码呢?

这是我过去采用的方法,对此有很好的论据。

但是,我现在认为这可能是一个错误。引用罗伯特·马丁(Robert C. Martin)的话:

软件开发人员经常陷入陷阱——陷阱取决于他们对重复的恐惧。在软件中,复制通常是一件坏事。但是有不同种类的重复。确实存在重复,其中对一个实例的每次更改都必须对该实例的每个副本进行相同的更改。然后有虚假或偶然的重复。如果两个明显重复的代码部分沿着不同的路径发展——如果它们以不同的速率变化并且由于不同的原因——那么它们就不是真正的重复...当您将用例彼此垂直分离时,就会遇到这个问题,您的诱惑是将用例耦合在一起,因为它们具有相似的用户界面,相似的算法或相似的数据库模式。小心。抵制诱惑,不要犯条件反射式消除重复的罪。确保重复是真实的。

命令和事件彼此之间有足够的不同,以保证它们可以沿着不同的路径发展并适应系统需求。

我还没有经历过通过消除A)发送/处理命令和B)发布/处理事件的“重复”代码来提高代码质量、性能或可读性的任何情况。

因此,命令和事件不得具有任何共享基类,并且用于发送/发布命令/事件的机制一定不能是共享队列。

6.多租户

多租户必须是解决方案不可或缺的组成部分,而不是事后必须附加的功能或设施。

这些天来,我专门构建和维护企业多租户系统。这意味着我只有一个应用程序的单个实例,该实例为具有多个并发用户的多个并发租户提供服务。

在此实现中将多租户作为优先级有几个原因:

  • 每个集合必须分配给一个租户。这使得数据的所有权清晰且定义明确。
  • 当需要扩大规模时,分片必须易于实现。分片是将聚合分布到多个写侧节点,而“租户”是划分聚合的最自然边界。
  • 特定于租户的自定义必须易于实现。每个应用程序对于每个命令和每个事件都有核心的默认行为,但是在为许多不同的组织和/或利益相关者服务的大型复杂应用程序中,不同的租户肯定具有各种特定的需求。有时差异很小。有时它们很重要。此处的解决方案必须允许开发人员使用针对特定租户定制的功能来覆盖命令和/或事件的默认处理。覆盖必须是明确的,因此易于识别、启用或禁用。
7. Sagas/流程经理

实现流程管理器所需的步骤数量必须相对较少,并且流程管理器的代码必须相对易于编写。

流程管理器(有时称为saga)是一个独立的组件,它以交叉聚合、最终一致的方式对域事件做出反应。流程管理器有时纯粹是反应性的,有时代表工作流。

从技术角度来看,流程管理器是一种状态机,受传入事件的驱动,这些事件可能是从多个聚合发布的。每个状态都可能有副作用(例如,发送命令,与外部Web服务通信,发送电子邮件)。

我曾使用过一些CQRS + ES框架,这些框架根本不支持流程管理器,而其他框架则支持该概念,但不以易于理解或配置的方式提供支持。

例如,在我自己过去的一种实现中,事件被附加到数据库日志之后,事件存储将立即发布事件。它不是由聚集或命令处理程序发布的。这甚至使实现最基本的工作流程也变得异常困难:我无法从事件处理程序中向聚合发送同步命令,因为事件存储的Save方法在同步锁(以维护线程安全)内执行,并且新事件不创建死锁就无法发布。

无论工作流程的状态机多么简单或复杂,要协调该流程中的事件,都需要具有副作用的代码,例如向其他聚合发送命令,向外部Web服务发送请求或发送电子邮件。因此,此处的解决方案必须具有本地的内置支持才能实现此目的。

8. 调度

命令调度必须是解决方案不可或缺的一部分。

使用计时器发送命令必须很容易,因此该命令会在计时器经过后执行。这使开发人员可以指示执行任何命令的特定日期和时间。

这对于必须依赖时间触发的命令很有用。

对于在正常执行流程之外的后台进程中必须“脱机”执行的命令,它也很有用。这种类型的完全异步操作非常适合需要较长时间才能完成的命令。

例如,假设您有一条命令要求在某些外部第三方Web服务上调用方法,并且该服务通常需要超过80万毫秒才能响应。必须安排此类命令在非高峰时间执行和/或在执行的主线程之外执行。

9. 聚合到期

该解决方案必须具有本机内置的聚合到期和清除支持。

我需要一个CQRS + ES解决方案,该解决方案可以轻松地将聚合事件流从联机结构化日志复制到脱机存储,并从事件存储中清除它。

事件源极简主义者将立即对此进行红色标记,并说绝不可更改或删除聚合事件流。他们会说事件(因此是聚集)从定义上是不可变的。

但是,在某些情况下,这是不可协商的业务需求。

  • 第一:当客户不续订对多租户应用程序的订阅时,托管该应用程序的服务提供商通常负有合同义务,要求从其系统中删除该客户的数据。
  • 第二:当项目团队进行频繁的集成测试以确认系统功能正常运行时,从定义上看,输入和输出这些测试的数据是临时的。用于测试聚合的事件流的永久存储是浪费磁盘空间,没有当前或将来的业务价值;我们需要一种删除它的机制。

因此,可以说,这里的解决方案必须提供一种简便的方法来将聚合移出操作系统并移入“冷存储”。

10. Async/Await是邪恶的

我当然在开玩笑。

但事实并非如此。

C#中的async/await模式产生非常高性能的代码。毫无疑问。在某些情况下,我已经看到它将性能提高了一个数量级或更多。

async/await模式可以在此解决方案的将来迭代中应用,但是——尽管此列表中具有第二优先级——在此解决方案中它是不允许的,因为它会导致破坏第一优先级。

在将async/await引入方法后,您将被迫转换其调用方,以便它们使用async/ await(或被迫开始将干净的代码包装在脏线程块中),然后被迫转换这些调用方的调用方,因此他们在整个代码库中使用async/await,依此类推。该async/await关键字蔓延像传染性僵尸病毒。由此产生的异步代码混乱几乎可以肯定会更快,但同时更难阅读,甚至更难调试。

可读性是这里的重中之重,因此,我一直避免async/await直到它是提高性能的唯一剩余选择(而且提高性能是不可商议的业务要求)。

整洁架构

马修· 伦兹(Matthew Renze)在整洁架构主题方面开设了出色的Pluralsight类。该解决方案的源代码包含五个程序集,并且遵循他所倡导的简洁架构模式。

Timeline项目

Timeline程序集实现了CQRS + ES主干。该程序集没有上游依赖性,因此它并不特定于任何应用程序。它可以从示例应用程序中断开,并集成到一个新的解决方案中,以开发完全不同的应用程序。

其他四个程序集(Sample。*)使用Timeline程序集在控制台应用程序中实现这些层,以演示我在CQRS + ES软件系统中执行常见任务的方法。

项目依赖关系图如下所示:

样例项目

注意,Timeline程序集没有引用任何Sample程序集。

还要注意以领域为中心的方法:领域层不依赖于Presentation,Application或Persistence层。

示例领域的实体关系图如图2所示:

在此基本数据模型中:

  • 一个Person有0..N个银行Account;
  • 一个Transfer从一个账户取钱,然后存入另一个账户;
  • 一个User可能是没有个人数据的管理员,或者是拥有多个租户拥有的个人数据的某人

请记住:每个Person,Account和Transfer都是聚合根,因此这些实体中的每个都有一个Tenant属性。

总览

图3展示了此解决方案中CQRS + ES的总体方法:

请注意,Write Side(命​​令)和Read Side(查询)已被很好地描述。

您还可以看到,事件源非常类似于Write Side的插件。尽管此解决方案中未进行演示,但您可以看到不带事件源的CQRS解决方案的外观,有时(取决于CQRS)是更好的模式,具体取决于项目的要求。

以下是该体系结构的关键特征:

  • 命令队列将命令(调度所需)保存在结构化日志中。
  • 命令订阅者在命令队列上侦听命令。
  • 命令订户负责创建聚合并在执行命令时在聚合上调用方法。
  • 命令订户将聚合(作为事件流)保存在结构化日志中。
  • 命令订户在事件队列上发布事件。
  • 已发布的事件由事件订阅者和流程管理器处理。
  • 流程管理器可以响应事件在命令队列上发送命令。
  • 事件订阅者在查询存储中创建和更新投影。
  • 查询搜索是用于读取投影的轻量级数据访问层。
入门

在编译和执行源代码之前:

  1. 执行脚本“Create Database.sql”以创建本地SQL Server数据库。
  2. 更新Web.config中的连接字符串。
  3. 为OfflineStoragePath更新Web.config中的appSetting值。
用法

我将从顶部开始并演示如何使用它,然后从应用程序堆栈一直向下浏览到CQRS + ES主干的基本细节,而不是从底部开始描述Timeline程序集的工作方式。

如果我吸引了你这么长时间,那么我应该感谢你陪我到现在……

方案A:如何创建和更新联系人

这是最简单的用法。

在这里,我们创建一个新的联系人,然后进行名称更改,模拟Alice结婚的用例:

public static void Run(ICommandQueue commander)
{
    var alice = Guid.NewGuid();
    commander.Send(new RegisterPerson(alice, "Alice", "O'Wonderland"));
    commander.Send(new RenamePerson(alice, "Alice", "Cooper"));
}

这样运行后,读端投影看起来很好,正如预期的那样:

数据流

下图说明了系统在这种情况下执行的步骤:

方案B:如何制作聚合快照

快照由“Timeline”程序集自动执行。默认情况下,每个聚合都启用了它们,因此您无需执行任何操作即可运行此功能。

在下一个测试运行中,Timeline程序集被配置为每10个事件后拍摄一次快照。我们注册一个新的联系人,然后将其重命名20次。这将在事件编号20上生成快照,这是倒数第二个重命名操作。

public static void Run(ICommandQueue commander)
{
    var henry = Guid.NewGuid();
    commander.Send(new RegisterPerson(henry, "King", "Henry I"));
    for (int i = 1; i  new Person();

    public void RegisterPerson(string firstName, string lastName, DateTimeOffset registered)
    {
        // 1. Validate command
        // Omitted for the sake of brevity.

        // 2. Validate domain.
        // Omitted for the sake of brevity.

        // 3. Apply change to aggregate state.
        var e = new PersonRegistered(firstName, lastName, registered);
        Apply(e);
    }

    public void RenamePerson(string firstName, string lastName)
    {
        var e = new PersonRenamed(firstName, lastName);
        Apply(e);
    }
}

注意,聚合状态是在与聚合根分开的类中实现的。

这使得序列化和快照更易于管理,并有助于整体可读性,因为它在命令相关功能和事件相关功能之间进行了更强的划分:

public class Person : AggregateState
{
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTimeOffset Registered { get; set; }

    public void When(PersonRegistered @event)
    {
        FirstName = @event.FirstName;
        LastName = @event.LastName;
        Registered = @event.Registered;
    }

    public void When(PersonRenamed @event)
    {
        FirstName = @event.FirstName;
        LastName = @event.LastName;
    }
}

事件(如命令和查询)是轻量级的POCO类:

public class PersonRenamed : Event
{
    public string FirstName { get; set; }
    public string LastName { get; set; }

    public PersonRenamed(string first, string last) { FirstName = first; LastName = last; }
}
持久性

在持久层,我们开始看到对外部第三方组件的更多依赖关系。例如,在这里我们依靠:

  • Json.NET用于JSON序列化和反序列化;
  • System.Data,用于使用ADO.NET在SQL Server中记录命令、事件和快照;和
  • 用于查询投影的实体框架。

该项目中的源代码实现了标准的常规数据访问层,并且在这一层中应该没有什么是新的,特别是创新的,或任何有经验的开发人员都感到惊讶的,因此不需要进行特殊讨论。 。

CQRS + ES主干

女士们,先生们,终于(漫长)终于来到了您一直在等待的夜晚:Timeline程序集实际上实现了CQRS + ES模式,这使得上述所有事情成为可能。

有趣的是...现在我们已经到了基本要点,剩下的谜团应该很少了。

您会注意到的第一件事是Timeline程序集不依赖于外部第三方组件(显然,.NET Framework本身除外)。

命令

这里只有几件事要注意。

如您所料,Command基类包含用于聚合标识符和版本号的属性。它还包含用于租户和发送命令的用户身份的属性。

/// 
/// Defines the base class for all commands.
/// 
/// 
/// A command is a request to change the domain. It is always are named with a verb in 
/// the imperative mood, such as Confirm Order. Unlike an event, a command is not a 
/// statement of fact; it is only a request, and thus may be refused. Commands are
/// immutable because their expected usage is to be sent directly to the domain model for 
/// processing. They do not need to change during their projected lifetime.
/// 
public class Command : ICommand
{
    public Guid AggregateIdentifier { get; set; }
    public int? ExpectedVersion { get; set; }

    public Guid IdentityTenant { get; set; }
    public Guid IdentityUser { get; set; }

    public Guid CommandIdentifier { get; set; }
    public Command() { CommandIdentifier = Guid.NewGuid(); }
}

CommandQueue实现了ICommandQueue接口,该接口定义了一组用于注册订阅者和覆盖以及发送和调度命令的方法。您可以将其视为命令的服务总线。

事件

Event基类包含用于集合标识符和版本号的属性,以及用于为其发起/发布事件的租户和用户的标识的属性。这样可以确保每个事件日志条目都与特定的租户和用户相关联。

您可以将其EventQueue视为事件的服务总线。

聚合

AggregateState类只有一点点黑魔法。Apply方法使用反射来确定将事件应用于聚合状态时要调用的方法。我不是特别喜欢这种方法,但是我找不到任何避免方法。幸运的是,这些代码易于阅读和理解:

/// 
/// Represents the state (data) of an aggregate. A derived class should be a POCO
/// (DTO/Packet) that includes a When method for each event type that changes its
/// property values. Ideally, the property values for an instance of  this class 
/// should be modified only through its When methods.
/// 
public abstract class AggregateState
{
    public void Apply(IEvent @event)
    {
        var when = GetType().GetMethod("When", new[] { @event.GetType() });

        if (when == null)
            throw new MethodNotFoundException(GetType(), "When", @event.GetType());

        when.Invoke(this, new object[] { @event });
    }
}
快照

实现快照的源代码比我最初启动该项目时想象的更加整洁和简单。逻辑有些复杂,但是Snapshots命名空间中只有240行代码,因此在此不再赘述。

指标

我将以一些基本指标结束本文。(稍后再介绍。)

这是NDepend根据Timeline程序集生成的分析报告:

如您所见,源代码并不完美,但确实获得了“A”级评级,技术债务估计仅为1.3%。在我撰写本文时,该项目也非常紧凑,只有439行代码。

注意:NDepend 从程序集.pdb符号文件中每个方法的序列点数中计算代码行(LOC)。Visual Studio对LOC的计数不同;在Timeline项目上,它报告了1,916行源代码,以及277行可执行代码。

关注
打赏
1665926880
查看更多评论
立即登录/注册

微信扫码登录

0.1062s