当您有一项需要很长时间才能完成的家务时,如果您可以划分工作量,它可能会更快。
例如,如果您自己粉刷一个房间,这项工作可能需要几个小时。但是如果有几个朋友,同样数量的工作可以在一个小时或更短的时间内完成,这取决于帮助的朋友的数量以及每个人的努力程度。
这同样适用于编程,这些朋友被称为线程。当您想要更快地处理集合时,一个常见的解决方案是将工作分配给并发运行的线程。
在 .NET 的早期,产生新线程是手工工作,需要一些知识。如果您查看线程文档,您会发现管理这些线程需要一些“编排代码”。
因为我们自己编写代码,所以这段代码也有可能包含错误。当您生成多个线程以实现最佳性能时,它甚至会变得更加复杂。
幸运的是,C# 使用 .NET Framework 4 中引入的任务并行库 (TPL)为我们隐藏了所有实现细节。可以肯定地说,与自定义实现相比,此代码中出现错误的可能性要小得多。
数据并行是指对源集合或数组中的元素同时(即并行)执行相同操作的场景。在数据并行操作中,源集合是分区的,以便多个线程可以同时对不同的段进行操作。
在本文中,我们将了解更快地处理集合的不同方法。
初始代码对于我们的案例,我们有 60.000 个项目必须从一个系统迁移到另一个系统。处理 1.000 件物品需要 30 分钟,因此处理集合中的所有物品总共需要 30 小时。
这是很多时间,特别是如果您知道迁移项目不是一项艰巨的任务。初始代码的简化版本如下所示。对列表的简单迭代,在循环内,迁移项目,我们:
- 检索项目的详细信息
- 迁移项目
- 将项目保存到系统 B
foreach (var itemId in itemsFromSystemA)
{
var item = GetItemFromSystemA(itemId);
var result = MigrateToSystemB(item);
SaveToSystemB(result);
}
当我们仔细观察我们浪费时间的地方时,很明显很多时间都花在了等待上。在检索项目时等待并等待要保存的项目。迁移MigrateToSystemB
速度很快,只需几毫秒。
通常,我建议限制 I/O 的数量以加快此过程。这可以在我们批量检索和保存项目时完成,而不是一个接一个。对于这个用例,这样做不是一种选择。
使这种迁移更快的唯一方法是在代码中添加并行性。我们希望同时迁移多个项目,而不是按顺序逐项迁移,即在迁移前一个项目之后迁移下一个项目。每个迁移都有自己的线程,这使得它更有效率。
平行向循环添加并行性的最简单方法是使用Parallel.ForEach. 在内部,该Parallel.ForEach
方法将工作划分为多个任务,一个用于集合中的每个项目。
Parallel 类为常见操作(例如 for 循环、for each 循环和一组语句的执行)提供基于库的数据并行替换。
Task可以比作一个轻量级的线程,具有更多的功能。有关两者之间的区别,请参阅C# 中的任务与线程的区别。
令我惊讶的是,重构后的代码看起来与初始实现没有太大区别。只需稍作改动即可将迭代包装在Parallel.ForEach
方法中。
Parallel.ForEach(itemsFromSystemA, itemId => {
var item = GetItemFromSystemA(itemId);
var result = MigrateToSystemB(item);
SaveToSystemB(result);
});
这样做会导致我们现在同时处理同一个列表。默认情况下,Parallel.ForEach
尝试使用机器的所有可用线程。为了降低对系统的影响,我们可以使用该MaxDegreeOfParallelism
选项。此属性限制了生成的并发任务的数量,因此我们不会影响应用程序的其他正在运行的进程。
该MaxDegreeOfParallelism
选项可以设置为静态值,或者我们可以使用该Environment.ProcessorCount
属性使其依赖于机器的资源。在下面的代码片段中,我们将MaxDegreeOfParallelism
选项配置为最多使用机器 75% 的资源。
Parallel.ForEach(
itemsFromSystemA,
new ParallelOptions {
// multiply the count because a processor has 2 cores
MaxDegreeOfParallelism = Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0))
},
itemId => {
var item = GetItemFromSystemA(itemId);
var result = MigrateToSystemB(item);
SaveToSystemB(result);
}
);
在我们的例子中,这个“重构”导致现在只需要 40 秒来处理 1.000 个项目。对于 60.000 个项目的整个集合,需要 40 分钟。只需几行额外代码,从 30 小时到 40 分钟!因为我们正在使用机器的处理器数量,所以与服务器相比,我的机器上花费的时间要长 20%。
但它并不止于此。
并行 LINQ (PLINQ)虽然该Parallel
解决方案适用于我的用例,但 .NET 也有Parallel LINQ (PLINQ)。
PLINQ
为著名的 LINQ API 带来并行性。这可确保代码在编写更复杂的业务逻辑时保持可读性,您需要在其中对数据进行排序、过滤或转换。
如果您已经熟悉 LINQ,那么我要告诉您一个好消息,因为您会立即有宾至如归的感觉。
PLINQ 查询在许多方面类似于非并行 LINQ to Objects 查询。PLINQ 查询与顺序 LINQ 查询一样,对任何内存中的 IEnumerable 或 IEnumerable 数据源进行操作,并且具有延迟执行,这意味着它们在查询被枚举之前不会开始执行。主要区别在于 PLINQ 尝试充分利用系统上的所有处理器。它通过将数据源划分为段,然后在多个处理器上并行执行单独工作线程上的每个段上的查询来实现此目的。在许多情况下,并行执行意味着查询运行得更快。
要以并行方式处理集合,我们可以使用AsParallel()扩展方法,然后使用任何现有的 LINQ 扩展方法。Parallel
用语法重写的示例PLINQ
如下所示,我们使用ForAll扩展方法来迭代项目。
itemsFromSystemA
.AsParallel()
.WithDegreeOfParallelism(Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0)))
.ForAll(itemId => {
var item = GetItemFromSystemA(itemId);
var result = MigrateToSystemB(item);
SaveToSystemB(result);
});
这也可以通过使用更多的 LINQ 语法来重写。对于这个例子,这样做并没有增加太多价值,但它只是给你一个想法。每个项目都在一个单独的线程上处理,同一项目的后续任务在同一个线程上处理。
itemsFromSystemA
.AsParallel()
.WithDegreeOfParallelism(Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0)))
.Select(itemId => GetItemFromSystemA(itemId))
.Select(item => MigrateToSystemB(item))
.ForAll(itemId => SaveToSystemB(result));
请注意,我们可以使用WithDegreeOfParallelism扩展方法设置并行度。
Parallel
该解决方案和该解决方案之间的性能优势PLINQ
是相同的。
Parallel和之间的区别PLINQ
虽然性能不是在这两种解决方案之间进行选择的因素(在大多数情况下),但PLINQ
和Parallel
方法之间存在细微差别。两种解决方案都有权存在并为不同的问题提供解决方案。
两者的明显优势在“何时使用 Parallel.ForEach 和何时使用 PLINQ”中得到了很好的解释。
我会记住的主要区别是:
并行度
- 与
Parallel
您一起设置最大程度,这意味着它会根据可用资源受到影响 - 与
PLINQ
您一起设置度数,这意味着这是实际使用的线程数
执行顺序
- 在
Parallel
迭代中调用任务的顺序是随机的。也就是说,Parallel
用来执行独立的任务 - 如果订单很重要,请使用
PLINQ
因为订单被保留
使用结果
Parallel
不返回结果。Parallel
is的输出ParallelLoopResult,其中包含集合的完成信息(例如,如果所有任务都已完成),仅此而已。- 当您需要已处理流的返回值时,请使用
PLINQ
. 因为任务确实同时运行,所以我们需要一种方法将所有任务的结果合并到一个结果对象中。要指定每个任务的结果必须如何合并回输出结果,请使用合并选项。
提前中断以停止处理
Parallel
ParallelLoopState.Stop()提供了一种使用和提前退出的方法ParallelLoopState.Break()。两者都阻止了更多迭代的开始,但不同之处在于Stop
,立即停止循环,同时Break
仍运行先前的迭代。- 要停止
PLINQ
迭代,使用CancellationToken但这并不能保证不会开始以下迭代。
除了Parallel
andPLINQ
方法之外,还有第三个库,称为Dataflow (Task Parallel Library)。通过解决我的性能问题,这是我第一次遇到Dataflow
.
Dataflow
可以单独写一篇文章,我对它的了解非常少。根据我过去几天阅读的内容,我将Dataflow
其视为构建处理管道(块)的库。
任务并行库 (TPL) 提供数据流组件来帮助提高支持并发的应用程序的稳健性。这些数据流组件统称为 TPL 数据流库。该数据流模型通过为粗粒度数据流和流水线任务提供进程内消息传递来促进基于参与者的编程。数据流组件建立在 TPL 的类型和调度基础架构之上,并与 C#、Visual Basic 和 F# 语言集成以支持异步编程。
老实说,我过度简化了初始代码片段,真正的用例是使用async/await
语法来获取和保存项目。遗憾的是,这种语法不能很好地与APIParallel
或PLINQ
API 配合使用,但它可以与Dataflow
.
在做了一些研究之后,我偶然发现了“Parallel Foreach async in C#”一文,其中作者迭代了该Parallel.Foreach
方法的异步变体的实现。
帖子中的最后一个实现使用了 C# 的最新功能,包含该Dataflow
库以获得最佳结果。
因为我喜欢这个实现,而且它仍然可读,所以我无耻地将方法复制粘贴到我们的代码库中。
有了这个Dataflow
变体,我可以再减少 3 分钟的时间来处理整个集合。
更新:.NET 6 引入了一个新的Parallel.ForEachAsync方法,这意味着您不需要编写自己的异步变体实现。
await itemsFromSystemA
.ParallelForEachAsync(
async item =>
{
var result = await MigrateToSystemB(item);
await Save(result);
},
Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0))
);
// TODO: can be removed in .NET 6 because it's built-in https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0
public static class AsyncExtensions
{
public static Task ParallelForEachAsync(this IEnumerable source, Func body, int maxDop = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = maxDop
};
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new ActionBlock(body, options);
foreach (var item in source)
block.Post(item);
block.Complete();
return block.Completion;
}
}
结论
启发这篇文章的用例是我真正需要使用并行编程的少数几次之一。这也是第一次产生如此大的影响。
我喜欢 .NET 使将顺序运行的代码重写为并行运行的代码变得多么容易。正因为如此,我们可以专注于交付业务价值,而不会使代码难以编写和阅读。
学习曲线并不陡峭,它会随着用例的复杂性而增长:
- 将该
Parallel.ForEach
方法用于最简单的用例,您只需对集合中的每个项目执行一个操作 PLINQ
当您需要做更多事情时使用这些方法,例如查询集合或流式传输数据DataFlow
当您想要完全控制处理管道时使用这些方法
这是否意味着我将在代码库中到处使用并行编程?不,这不是我推荐的,因为它甚至可能产生负面结果,因为它有自己的潜在陷阱。
我写这篇文章是为了更熟悉并行编程,并在未来将其用作参考点。我只是触及了表面,如果你也有兴趣(像我一样)了解更多关于这个主题的信息,我可以强烈推荐这些资源,我在写这篇文章的过程中获得了洞察力。
更多资源- 并行与并发
- 并行编程模式:使用 .NET Framework 4 理解和应用并行模式
- ParallelOptions.MaxDegreeOfParallelism 与 PLINQ 的 WithDegreeOfParallelism
- PLINQ 的潜在缺陷
- 提前退出并行循环
- 使用 MapReduce、ProducerConsumer 和 ActorModel 在 C# 中实现实用并行化
- 处理管道系列 - 概念
- C# .NET Core 中具有 TPL 数据流的数据处理管道
- 15 年并发
- 接收网络