您当前的位置: 首页 > 

寒冰屋

暂无认证

  • 0浏览

    0关注

    2286博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

异步编程模型(C#5.0系列)

寒冰屋 发布时间:2019-12-03 21:20:09 ,浏览量:0

 

关于C#5.0的新功能——async和await关键字。它们是语法糖,可以简化异步操作代码的构造。当C#编译器看到一个await表达式时,它会生成代码,该代码自动异步地调用该表达式,然后立即将控制流返回给调用方,以便调用方代码可以继续执行而不会阻塞。异步操作完成后,控制流将转到await表达式下方的代码,并按顺序执行代码,直到达到退出条件为止(退出条件可以是:方法或循环的迭代的结尾等)。我强调await关键字只是语法糖,因此它是编译器生成等效代码而不是您手动编写的替代方法。在了解C#5.0为您为async和await关键字做什么之前,您应该首先了解Microsoft .NET Framework如何提供异步编程模型(APM)。

在.NET Framework中,有许多方法可以实现异步操作:通过使用线程、线程池、BeginXxx和EndXxx方法,基于事件的APM或基于任务的APM。第一种方法,使用的线程并不建议,因为创建线程非常昂贵*,并且它需要许多手动控件才能正常工作,因此我将跳过此讨论。第二种方法是使用线程池,这是最简单,最常用的方法;以指定类型声明的BeginXxx和EndXxx方法提供了执行异步操作的标准方法;基于事件的异步编程模型不如BeginXxx和EndXxx方法,.NET Framework仅提供了很少的一组类型,这些类型支持基于事件的APM。最后一个,.NET Framework 4中引入了基于任务的APM,它是任务并行库(TPL)的一部分,它基于任务调度程序调度异步操作,它还提供了许多功能来扩展任务并行性。默认任务计划程序是通过使用线程池实现的。.NETFramework还提供了由“同步上下文(Synchronization Contexts)”实现的任务计划程序,此外,您还可以实现自己的任务计划程序并使用它来处理任务。

*创建一个线程需要大约1.5 MB的内存空间,Windows还将创建许多其他数据结构来使用该线程,例如线程环境块(TEB),用户模式堆栈和内核模式堆栈。引入新线程可能还需要线程上下文切换,这也会影响性能。因此,请尽可能避免创建其他线程。

在本文中,我将介绍执行异步操作的不同方法,并显示示例以指导您同时使用这两种方法。

线程池APM

当您要执行异步操作时,可以很容易地使用线程池来实现这一点,方法是调用System.Threading.ThreadPool的QueueUserWorkItem static方法,传递WaitCallback委托的实例,还可以选择传递一个Object实例,该实例表示与WaitCallback实例相关联的附加参数。下面的示例演示如何使用线程池对异步操作进行排队。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
 
namespace ApmDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            // Define a WaitCallback instance.
            WaitCallback writeCallback = state => Console.WriteLine(state);
 
            // Queue user work items with ThreadPool.
            ThreadPool.QueueUserWorkItem(writeCallback, "This is the first line");
            ThreadPool.QueueUserWorkItem(writeCallback, "This is the second line");
            ThreadPool.QueueUserWorkItem(writeCallback, "This is the third line");
 
            Console.ReadKey();
        }
    }
}

在上面的示例中,我通过将lambda表达式分配为委托主体来初始化WaitCallback实例的实例,然后调用ThreadPool的静态方法QueueUserWorkItem,将此实例作为第一个参数,将string传递为其第二个参数。当调用此方法时,线程池在池中寻找一个空闲线程,将WaitCallback委托的实例与该线程关联,然后分派该线程以在某个时间执行委托。如果池中没有空闲线程,则线程池将创建一个新线程,关联该委托实例,然后在某个时间分派执行。通过调用QueueUserWorkItem方法三遍,我将三个用户工作项排队到线程池中。

当我尝试运行该程序时,我可能会得到以下输出:

This is the first line
This is the second line
This is the third line

但有时,我也会得到以下输出:

This is the second line
This is the first line
This is the third line

请注意,排队的用户工作项的执行顺序是不可预测的,因为无法知道线程池中的线程何时按计划执行代码。如上所述,工作项可以按顺序完成,并且工作项也可以按相反的顺序完成。因此,请勿编写依赖于执行顺序的异步代码。

我强烈建议您尽可能多地使用线程池APM,原因如下:

  1. 线程池由CLR自动管理。当您将用户项排队到线程池中时,您无需关心它将与哪个线程相关联以及何时执行。CLR为您处理所有事情——这种模式使您可以编写易于阅读,简单明了且无故障的代码。
  2. 线程池明智地管理线程。当执行异步操作时,CLR需要附加线程来执行此操作,因此该操作可以在不阻塞当前线程的情况下进行,但是创建新线程的成本很高,每次为用户工作项提供服务时都会引入一个新线程,而且很繁琐。浪费资源。线程池最初管理一组线程,当用户工作项排队时,线程池将该工作项添加到全局工作项列表中,然后CLR线程将检查该全局工作项列表,如果不为空,则线程拾取一个工作项,并将其专用于池中的空闲线程;如果没有可用线程,则线程池将创建一个新线程,并将其专用于此新创建的线程。线程池始终选择使用尽可能少的线程来服务所有排队的用户工作项。因此,通过使用线程池,CLR使用更少的系统资源,使异步操作调度变得高效。
  3. 线程池具有更好的性能。线程池机制确保它可以使用最大或已配置的CPU资源来处理服务器用户工作项。如果在多核CPU环境中运行程序,则线程池最初会创建线程,该线程的数量等于该环境中已安装的CPU的数量。在计划用户工作项时,线程池会自动平衡线程,并确保使用每个逻辑CPU内核来提供工作项。这带来了分配CPU资源的灵活性,还有助于提高整个系统的性能。

尽管使用线程池有很多好处,但也有一些局限性:

  1. 线程池将用户工作项排队,并在不确定的时间执行它,当完成对用户工作项的处理时,调用者代码无法知道何时完成,因此在此工作项已完成之后编写连续代码非常困难。特别地是,某些操作(例如从文件流中读取多个字节)必须在异步完成该操作时获得通知,然后调用方代码才能确定从文件流中读取了多少字节,并使用这些字节执行其他操作东西。
  2. ThreadPool的QueueUserWorkItem方法仅需要接收一个参数,如果您的代码被设计为处理一个以上的参数,则不可能直接传递的所有参数到这种方法的委托; 相反,您可以创建其他数据结构来包装这些参数,然后将包装类型实例传递给方法。这会降低代码的可读性和可维护性。

要解决这些问题,您可以使用以下标准方法执行异步操作。

标准APM

框架类库(FCL)附带了具有各种类型的BeginXxx和EndXxx方法,这些方法旨在执行异步操作。例如,System.IO.FileStream类型定义Read,BeginRead而EndRead方法,Read方法是同步方法,它从文件流中同步读取多个字节;换句话说,它直到read文件流中的操作完成才返回。BeginRead和EndRead方法一对,当调用BeginRead方法时,CLR将此操作排队到硬件设备(在这种情况下为硬盘),然后立即将控制流返回到下一行代码,然后继续执行;当硬件设备完成异步读取操作时,设备将通知Windows内核该操作已完成,然后Windows内核通知CLR执行通过调用BeginRead方法指定为参数的委托,该委托中的代码必须调用EndRead方法,以便CLR可以将从缓冲区读取的字节数传输到调用委托,然后代码可以访问从文件流读取的字节。

这是Read,BeginRead和EndRead方法签名的定义。

public override IAsyncResult BeginRead(byte[] array, int offset, 
    int numBytes, AsyncCallback userCallback, object stateObject);

 

public override int EndRead(IAsyncResult asyncResult);

 

 

public override int Read(byte[] array, int offset, int count);

 

通常,BeginXxx方法具有与Xxx方法相同的参数以及两个附加参数:userCallback和stateObject。userCallback是AsyncCallback类型,它接受一个IAsyncResult类型的参数,该参数为该异步操作带来附加信息; stateObject参数是要传递给userCallback委托的实例,此委托的asyncResult参数上定义的AsyncState 属性可以访问该实例。

以下代码演示了如何使用BeginXxx和EndXxx方法执行异步操作。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;
 
namespace ApmDemo
{
    internal class Program
    {
        private const string FilePath = @"c:\demo.dat";
 
        private static void Main(string[] args)
        {
            // Test async write bytes to the file stream.
            Program.TestWrite();
 
            // Wait operations to complete.
            Thread.Sleep(60000);
        }
 
        private static void TestWrite()
        { 
            // Must specify FileOptions.Asynchronous otherwise the BeginXxx/EndXxx methods are
            // handled synchronously.
            FileStream fs = new FileStream(Program.FilePath, FileMode.OpenOrCreate,
                FileAccess.Write, FileShare.None, 8, FileOptions.Asynchronous);
 
            string content = "A quick brown fox jumps over the lazy dog";
            byte[] data = Encoding.Unicode.GetBytes(content);
 
            // Begins to write content to the file stream.
            Console.WriteLine("Begin to write");
            fs.BeginWrite(data, 0, data.Length, Program.OnWriteCompleted, fs);
            Console.WriteLine("Write queued");
        }
 
        private static void OnWriteCompleted(IAsyncResult asyncResult)
        { 
            // End the async operation.
            FileStream fs = (FileStream)asyncResult.AsyncState;
            fs.EndWrite(asyncResult);
 
            // Close the file stream.
            fs.Close();
            Console.WriteLine("Write completed");
 
            // Test async read bytes from the file stream.
            Program.TestRead();
        }
 
        private static void TestRead()
        {
            // Must specify FileOptions.Asynchronous otherwise the BeginXxx/EndXxx methods are
            // handled synchronously.
            FileStream fs = new FileStream(Program.FilePath, FileMode.OpenOrCreate,
                FileAccess.Read, FileShare.None, 8, FileOptions.Asynchronous);
 
            byte[] data = new byte[1024];
 
            // Begins to read content to the file stream.
            Console.WriteLine("Begin to read");
            // Pass both Fs and data as async state object.
            fs.BeginRead(data, 0, data.Length, Program.OnReadCompleted, 
                         new { Stream = fs, Data = data });
            Console.WriteLine("Read queued");
        }
 
        private static void OnReadCompleted(IAsyncResult asyncResult)
        {
            dynamic state = asyncResult.AsyncState;
 
            // End read.
            int bytesRead = state.Stream.EndRead(asyncResult);
 
            // Get content.
            byte[] data = state.Data;
            string content = Encoding.Unicode.GetString(data, 0, bytesRead);
            
            // Display content and close stream.
            Console.WriteLine("Read completed. Content is: {0}", content);
            state.Stream.Close();
            Console.ReadKey();
        }
    }
}

此程序使用在System.IO.FileStream类中定义的BeginRead、EndRead、BeginWrite和EndWrite方法检测从指定文件流到指定文件流的异步读/写操作。当我允许程序时,我得到以下输出内容:

现在您可能已经知道如何通过调用BeginXxx和EndXxx方法使用标准方法执行异步操作。实际上,这种标准方式支持我在这里演示的更多功能,例如取消功能(cancellation),我将在以后的文章中进行讨论,并且支持取消功能确实是这种模式的一大优势。通过使用这种模式,可以解决我为线程池APM列出的一些问题,并且还可以享受我在下面总结的其他优点。

  1. 支持继续。异步操作完成后,将调用userCallback委托,因此调用者代码可以基于此异步操作的结果执行其他操作。
  2. 支持基于I / O的异步操作。标准APM与内核对象一起执行基于I / O的异步操作。通过调用该BeginXxx方法请求基于I / O的异步操作时,CLR不会引入新的线程池线程来专用于此任务,而是使用Windows内核对象来等待硬件I / O设备返回(通过其驱动程序软件)完成任务时。CLR仅使用硬件设备驱动程序和内核对象来执行基于I / O的异步操作,不再使用托管资源来处理这种情况。因此,它实际上通过释放CPU时间片和线程使用率来提高系统性能。
  3. 支持取消功能。触发异步操作时,用户可以通过调用System.Threading.CancellationTokenSource的Cancel方法取消此操作,我将在以后的文章中介绍此类。

但是,通过使用标准的APM,您的代码变得更加复杂。这是因为所有任务继续都发生在调用上下文之外,例如,在上面的读/写文件流示例中,OnReadCompleted和和OnWriteCompleted是独立的方法,并由与当前调用线程不同的线程调用,这种行为可能会使开发人员感到困惑,因此使您的代码逻辑不清楚。

注意:async方法和await表达式为异步编程带来了清晰、逻辑和有组织的代码结构。

基于事件的APM

框架类库(FCL)还附带了一些支持基于事件的APM的类型。例如,System.Net.WebClient类定义一个DownloadDataAsync方法和一个DownloadDataCompleted事件,通过调用DownloadDataAsync方法,CLR开始异步操作,以从指定的URL下载数据,完成后将触发DownloadDataCompleted事件,类型System.Net.DownloadDataCompletedEventArgs的e参数包含结果和其他此操作的信息。这里的代码演示了如何使用基于事件的APM执行异步操作。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;
using System.Net;
 
namespace ApmDemo
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            WebClient wc = new WebClient();
            wc.DownloadDataAsync(new Uri("http://www.markzhou.com"));
            wc.DownloadDataCompleted += (s, e) => Console.WriteLine
                                                  (Encoding.UTF8.GetString(e.Result));
 
            Console.ReadKey();
        }
    }
}

实际上,它的作用与使用BeginXxx和EndXxx方法相同,不同之处在于基于事件的APM更接近对象模型层,您可以使用设计器和属性窗口将组件拖放到用户界面,然后通过属性窗口设置事件处理程序,相反,标准APM不提供要订阅的事件,这有助于提高系统性能,因为实现事件可能需要额外的系统资源。

FCL支持基于事件的APM的类型很少,我个人建议不要使用这种模式,基于事件的APM可能适合应用程序开发人员,因为它们是组件使用者,而不是组件设计者,对于组件设计人员(库开发人员)来说,设计人员的可支持性不是强制性的。

基于任务的APM

Microsoft .NET Framework 4.0引入了新的任务并行库(TPL),用于并行计算和异步编程。在System.Threading.Tasks命名空间中定义的主要使用的Task类表示要完成的用户工作项,要使用基于任务的APM,您必须创建Task的新实例或Task类,并将Action或Action委托的实例作为Task或Task构造函数的第一个参数,然后调用Task的实例方法Start,通知任务调度程序尽快安排此任务。

以下代码显示了如何使用基于APM任务的执行绑定计算的异步操作。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Task based APM demo");
            Task t = new Task(() =>
            {
                Console.WriteLine("This test is output asynchronously");
            });
            t.Start();
            Console.WriteLine("Task started");
            
            // Wait task(s) to complete.
            Task.WaitAll(t);
        }
    }
}

如果运行此程序,将得到以下输出:

或者,如果任务委托返回一个值,则可以使用Task而不是Task,任务完成后,可以按Task的Result属性查询结果。下面的代码演示如何使用Task计算到2的n次指数(n仅为正)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Task based APM demo");
            Func calc = (n) => { return 2 
            {
                return calc(10);
            });
 
            t.Start();
            Console.WriteLine("Task started");
            
            // Wait task(s) to complete.
            // After t is complete, get the result.
            Task.WaitAll(t);
            Console.WriteLine(t.Result);
        }
    }
}

运行此程序时,将得到以下输出:

Task的静态方法WaitAll等待参数数组同步在指定的所有任务,这意味着当前线程将被阻塞,直到所有指定的任务完成。如果您不想阻塞当前线程,并且打算在完成某项任务之后执行某些操作,则可以使用Task的实例方法ContinueWith,以下代码显示了如何执行延续任务:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Task based APM demo");
            Func calc = (n) => { return 2 
            {
                return calc(10);
            });
            
            // Set a continuation operation.
            t.ContinueWith(task => { Console.WriteLine(task.Result); return task.Result; });
            t.Start();
            Console.WriteLine("Task started");
            
            // Wait for a user input to exit the program.
            Console.ReadKey();
        }
    }
}

基于任务的APM具有许多特征,下面列出一些重要特征:

  1. 您可以在创建任务时指定TaskCreationOptions,以指示任务计划程序将如何计划任务。
  2. 您可以在创建任务时指定CancellationTokenSource,以指示用于取消任务的关联取消令牌。
  3. 您可以使用ContinueWith或ContinueWith方法执行延续任务。
  4. 您可以通过调用Task的静态WaitAll方法等待所有指定的任务同步完成,也可以通过调用同步完成Task的静态WaitAny方法等待任何的任务同步完成。
  5. 如果您要使用相同的创建/继续设置创建一堆任务,则可以使用TaskFactory的实例StartNew方法。
  6. 基于任务的APM需要任务计划程序才能工作,默认任务计划程序在线程池的顶部实现,但是,您可以将与任务关联的任务计划程序更改为同步上下文任务计划程序或自定义任务计划程序。
  7. 您可以通过调用TaskFactory的实例FromAsync或FromAsync方法,轻松地将BeginXxx和EndXxx模式异步操作转换为基于任务的APM 。
任务、异步方法和等待表达式

我想指出的是,C#5.0中的async方法和await表达式/语句是在基于任务的APM的基础上在编译器级别实现的。一个async方法必须有返回类型void,或返回类型Task或Task,这种限制是显而易见的,因为如果在async的方法中没有await表达式,该方法将被同步调用; 因此可以将此方法视为常规方法,使void返回值清晰可见;否则,如果async方法至少包含一个await表达式,则由于await基于任务的APM,必须从此方法返回Task或Task实例,才能在此方法上执行另一个await表达式。

为了清楚起见,我修改了代码,使用async和await将第n个指数计算为2,如下内容:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Task based APM demo");
            
            // Call Exponent() asynchronously.
            // And immediately return the control flow.
            // If I don't put a Task here, the program will sometimes
            // terminate immediately.
            Task t = new Task(async () =>
            {
                int result = await Program.Exponent(10);
 
                // After the operation is completed, the control flow will go here.
                Console.WriteLine(result);
            });
 
            t.Start();
            Console.ReadKey();
        }
 
        static async Task Exponent(int n)
        {
            Console.WriteLine("Task started");
            return await TaskEx.Run(() => 2             
关注
打赏
1665926880
查看更多评论
0.0458s