目录
异步流
Cancellation
使用Linq
使用异步流
使用工作线程的示例
使用通道的示例
使用DataFlow块的示例
三种方法的基准测试结果
结论
- 下载演示 - 8.5 KB
C#8中引入了异步流。它们提供了在枚举中异步等待数据流中的每个项变为可用的能力。这在使用从网络套接字发出的那种零星数据流时特别有利。异步流的返回类型是IAsyncEnumerable类型。enumerable有一个async枚举器,它异步等待下一项可用。下面是一个async可枚举的例子:
public async IAsyncEnumerable GetLines(int count,
[EnumeratorCancellation]CancellationToken token = default)
{
bool isContinue = true;
int itemCount = 0;
while (isContinue)
{
string jsonResponse = await GetWebpageAsync(token);
string[] items = JsonConvert.DeserializeObject(jsonResponse);
foreach (var item in items)
{
yield return item;
itemCount++;
if (itemCount == count)
{
isContinue = false;
break;
}
}
}
}
Enumerable使用ValueTasks减少内存分配和提高效率。ValueTask是一个结构体,可以存储在堆上。它也可以重复使用。在示例中,异步await使用ValueTask引用Task的实例。但是同步foreach循环使用ValueTasks不需要包含对Task的引用,因为数据项已经可用。这里有一篇关于ValueTasks.。处理ValueTasks时需要小心,因为它们不能直接替代Tasks。最安全的选择是通过等待它们一次来简单地提取它们的有效负载,然后将其保留。该方法是这样使用的:
await foreach (string line in GetLines(42))
{
Console.WriteLine(line);
}
Cancellation可以通过CancellationToken直接传入方法来实现。但也可以取消枚举,而不必引用父方法。
IAsyncEnumerable lines= GetLines (42);
await foreach(var line in lines.WithCancellation(cancellationToken))
{
..
}
GetLines方法必须用[EnumeratorCancellation]属性修饰才能启用此功能。默认情况下不处理取消。该方法需要监视令牌的状态并在需要时采取一些措施。处理此问题的一种简单方法是调用cancellationToken.ThrowIfCancellationRequested();
使用LinqIAsyncEnumerables NuGet包中为System.Linq.Async提供了Linq扩展方法。所有熟悉的扩展方法都存在。返回可枚举的查询返回一个IAsyncEnumerable和返回单个值的方法返回一个ValueTask并具有后缀Async,AnyAsync和CountAsync是两个示例。可以将异步lambda表达式传递给某些方法,以便可以等待它们,这些扩展已Await附加到它们,如WhereAwait。所有这些方法都已添加到System.Linq命名空间中。
IAsyncEnumerable lines= GetLines (42);
IAsyncEnumerable query = lines.Where(l => l.Substring(1, 1) == "1");
var verySlowQuery = lines.WhereAwait(async l=> { await Task.Delay(500); return true; }) ;
int count =await lines.CountAsync();//CountAsync returns ValueTask
使用异步流的主要原因之一是防止用户界面(UI)消息循环阻塞。如果循环阻塞,控件将不会更新,界面将变得无响应。因此,异步处理枚举产生的每个项目是有意义的。在下面的示例中,该项目是一行文本,需要先对其进行处理,然后才能在UI中显示。这有两个部分,第一个是处理行,第二个是显示结果。第一部分可以在自己的线程上运行,但显示必须在UI线程上更新。通过调用以下无意义的方法来模拟处理工作。
private string WorkersFunction (string line)
{
string vowels = "aeiou";
Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread
//remove all the vowels to show something has been done
return string.Concat(line.Where(c => !vowels.Contains(c)));
}
这里的想法是使用多个工作线程来处理这些行。工人是通过调用Task.Factory.StartNew来启动的WorkersFunction。StartNew有一个ContinuationTask附加到它,将在WorkersFunction完成后更新UI。由于将同时有多个工作线程处于活动状态,因此需要有一个独占的任务调度程序,一次只允许一个任务写入UI。这是通过使用ConcurrentExclusiveSchedulerPair的ExclusiveScheduler成员来实现的。
private readonly TaskScheduler uiTaskScheduler;
public AsyncStreamTest()
{
//call the constructor from the UI thread
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(
TaskScheduler.FromCurrentSynchronizationContext());
uiTaskScheduler = taskSchedulerPair.ExclusiveScheduler;
}
另一个要求是控制活动工作线程的数量,以减少线程池、内存和处理器的压力。这是通过使用SemaphoreSlim。SemaphoreSlim记录成功调用进入信号量的次数减去离开信号量的调用次数。如果差值等于在信号量的构造函数中设置的数字,则在收到离开的调用之前,不会接受进一步的进入调用。离开的调用是通过调用SemahoreSlim.Release()来的实现。仅当从SemaphoreSlim.WaitAsync()返回的任务完成时,对enter的调用才会成功。确保在工作线程完成时始终调用release是很重要的,确保这一点的最佳方法是在finally块内调用SemaphoreSlim.Release()。
public async Task UsingWorkerThreadsAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
List tasks = new();
try
{
await foreach (var line in lines.WithCancellation(token))
{
await semaphoreSlim.WaitAsync(token);
var task = Task.Factory.StartNew(() => WorkersFunction(line, semaphoreSlim),
token,
TaskCreationOptions.None,
TaskScheduler.Default).ContinueWith(
(t) => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(t.Result)),
token, TaskContinuationOptions.AttachedToParent, uiTaskScheduler);
tasks.Add(task);
}
}
catch (OperationCanceledException)
{ }
await Task.WhenAll(tasks);
}
此处所需的功能是父任务仅在延续任务完成后才能完成,这是通过设置ContinuationOptions.AttachedToParent请求标志来实现的。如果不进行设置,则继续任务将不被观察,并且无法知道它是否曾经完成以及它完成的状态。Task.Factory.StartNew()优先于Task.Run()的原因是Task.Run设置了它的DenyChildAttach标志并导致ContinuationOptions.AttachedToParent请求被否认。
这是示例应用程序的屏幕截图,显示了此方法的输出:
测试行有它们的索引号,很明显,它们的完成顺序与它们开始的顺序不同。为了确保保持原始顺序,有必要调用一个实现某种先进先出队列的方法。
使用通道的示例System.Threading.Channel.Channel是允许两个不同线程交换数据的有效方式。Stephen Toub为这个类写了一篇精彩的介绍,非常值得一读。该Channel类本质上是一个受管理的先进先出队列。它旨在异步使用,因为所有API都是异步的,这使其成为流行的BlockingCollection的绝佳替代品。在这个例子中,有一个Channel.Writer接收由通过重复调用Task.Run创建的任务并将它们写入缓冲区。还有一个Channel.Reader实例从缓冲区中读取Tasks,等待它们完成,然后更新UI线程。
public async Task ChannelExample(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
var channel = Channel.CreateBounded(
new BoundedChannelOptions(Constants.BufferSize){ SingleWriter = true });
var readerTask = ReadFromSingleChannelAsync(channel, token);
try
{
await foreach (var line in lines.WithCancellation(token))
{ //Cancelling the semaphore directly can be problematical
await semaphoreSlim.WaitAsync(CancellationToken.None);
var workerTask = Task.Run(() =>; WorkersFunction(line, semaphoreSlim));
await channel.Writer.WriteAsync(workerTask, token);
}
}
catch (OperationCanceledException) { }
channel.Writer.Complete();
await readerTask;
}
与前面的示例一样,该方法使用SemaphoreSlim来限制工作任务的数量。它创建一个实例Channel并设置其数据缓冲区的大小。然后启动以下阅读器任务,但在此阶段不等待。
private async Task ReadFromChannelAsync(Channel channel,
CancellationToken token = default)
{
while (await channel.Reader.WaitToReadAsync(token))
{
var readTask = await channel.Reader.ReadAsync(token);
var result = await readTask;
RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(result));
}
}
该channel.Reader.WaitToReadAsync方法在数据可供读取时返回true,并且在通道写入器关闭通道时返回false。由于该方法是异步的,因此可以从UI线程调用,并且由于只有一个reader,因此无需同步即可更新UI线程。当枚举结束时,Channel.Writer.Complete被调用,这反过来又导致readerTask完成。
使用DataFlow块的示例DataFlow块是TPL DataFlow库中的类,它们链接在一起形成数据处理管道。有许多不同的类型,但在此示例中,仅使用了两个块,一个以与Channel类似的方式运行的TransformBlock,如前所述,一个简单地将输出从TransformBlock写入UI的ActionBlock。需要编写的用户代码非常少,这只是设置各种选项并将两个块“连接”在一起的情况。该方法已被注释,因为实际上每一行都需要一些解释,这可能是该DataFlow库没有应有的受欢迎的原因。
public async Task UsingDataFlowAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
//set the TransformerBlock options
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = workersCount, //number of active worker threads
SingleProducerConstrained = true, //this saves having to gate the input
BoundedCapacity = Constants.BufferCapacity,
CancellationToken = token
};
//The Transform block takes a string as its input. It passes it to the WorkersFunction
//and outputs the value returned from that function.
var transformBlock = new TransformBlock(
(message) => WorkersFunction(message), options);
//The ActionBlock takes the output string from the TransformBlock and
//raises the ItemsUpdateEvent on the UI thread, passing the output string to the
//EventArgs of that event
var uiUpdaterBlock = new ActionBlock(msg => RaiseItemsUpdatedEvent(
new ItemsUpdatedEventArgs(msg)),
new ExecutionDataflowBlockOptions {
TaskScheduler = uiTaskScheduler, CancellationToken = token });
//Setting the DataFlowLinkOption PropagateCompletion flag means that,
//if the TransformBlock receives completion request that request will be passed
//on to the ActionBlock
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
//Couple the TransformerBlock to the ActionBlock
transformBlock.LinkTo(uiUpdaterBlock, linkOptions);
await foreach (var line in lines.WithCancellation(token))
{ //Send the line to the TransformerBlock and await for it to be accepted
_ = await transformBlock.SendAsync(line);
}
//Complete the TransformBlock and await for the ActionBlock to complete
transformBlock.Complete();
await uiUpdaterBlock.Completion;
}
DataFlow管道有一种结构化且稳健的关闭或取消方式。所有输入都被拒绝,然后数据缓冲区沿管道按顺序刷新。将TransformBlock数据划分为大小等于工作线程数的组。当一个分区完成后,它作为一批数据输出。这种技术是有效的,但它会导致第一个项目出现在UI中之前的延迟。
三种方法的基准测试结果以下测试是使用最高级的BenchmarkDotNet NuGet包运行的。它在没有消息循环的控制台应用程序上单独运行测试。因此,测试方法对更新UI的调用必然是虚幻的。运行测试时需要捕获默认同步上下文而不是当前上下文。如果捕获当前上下文,则测试时间将是无限的。显示的平均时间仅用于三个测试之间的比较目的。它们并不表示单个调用方法所花费的时间。它们只能通过运行大约10次预热迭代,然后再运行100次定时迭代,并在去除任何异常值后取定时运行的平均值来实现。测试使用10,000行文本、3000页大小和最大35个活动工作线程。
Method
Mean
StdDev
Rank
Gen 0
Gen 1
Allocated
DataFlow
4.510 s
0.0015 s
1
1000.0000
5MB
WorkerTasks
4.645 s
0.0326 s
2
2000.0000
1000.0000
8MB
Channel
4.666 s
0.0251 s
2
1000.0000
6MB
在上面的结果表中,Gen 0和Gen 1列中的数字与垃圾收集器活动有关。它们是对被测方法的每1000次调用的集合数。DataFow和Channel测试产生了一个轻量级的第0代集合。但是该WorkerTasks测试会导致两个第0代收集运行,此外,它还会触发对更密集的第1代收集的调用。该方法具有无序输出和最大的占用空间。它似乎需要维护和翻阅10,000个任务的列表,以检查它们是否仍然“热”。使用的测试DataFlow块执行良好并且具有相对较小的内存占用。但它在第一个数据项发送到UI之前的延迟最长。该Channel方法执行和扩展良好。就其效率和快速更新UI的能力而言,它可能是最佳选择。
结论我希望这里提供的观察和示例有助于理解最近添加到C#语言和.NET框架中的许多异步数据管理模式中的一些模式。
https://www.codeproject.com/Articles/5319506/Getting-Started-with-Asynchronous-Streams