- 介绍
- 具体案例
- 等待线程信号——ManualResetEvent
- 等待线程信号——AutoResetEvent
- 多个线程同时写一个文件
- 串联并行任务
- 使用Parallel类执行并行操作
- 为每个线程单独分配变量值
- 保留异步上下文中的本地变量值
- 取消并行任务
- 总结
随着.net core
越来越流行,对.net core
基础知识的了解,实际应用等相关的知识也应该有所了解。所以就有了这篇文章,案例都是来自阅读的书籍,或者实际工作中感觉比较有用的应用。分享亦总结。
本文主要介绍 .net core
相关的异步和并行案例。
【导语】
抽象类 WaitHandle
规范了线程之间发送和等待事件信号的行为逻辑。
线程之间所执行的代码往往是相互独立的,在某些由特殊要求的场合,会使得代码逻辑不可控。例如,A、B 两个线程分别进行运算,但是 B 线程的运算开始之前必须保证 A 线程的运算已经完成,这种情况下,就需要线程同步了。
线程同步的一种解决方案就是发送信号与等待信号。例如上述例子,可以在线程之际共享一个事件句柄, B 线程调用 WaitOne
方法后会被阻止,然后等待 A 线程发送信号; A 线程在完成其计算后发出信号, B 线程收到信号后才会继续执行,这样就可以确保执行 A 线程的代码,再执行 B 线程的代码。
ManualResetEvent
类是事件等待句柄的一个实现版本,它的特点是————发出事件信号(调用 Set
方法)之后会一直保持有信号状态,此时所有处于等待中的线程都会继续执行。要把事件句柄切换会无信号状态,必须手动调用 Reset
方法。也就是说,ManualResetEvent
对象需要手动切换信号状态,如果调用 Set
方法之后忘记调用 Reset
方法,那么该事件句柄就会一直处于有信号状态,所有被阻止的线程都会释放并继续执行。
本实例演示了如果再新的线程上计算从 1 到 100 的累加运算,即计算 1+2+3+...+100
的总和。主线程必须等待新线程计算完毕后才能继续,虽然主线程可以调用 Sleep
方法来暂停一段时间,但是要暂停的时间是不可预估的,因此本实例使用时间等待句柄的效果较好。
【操作流程】
步骤1:新建控制台应用程序项目。
步骤2:在项目模板创建的 Program
类中声明一个 ManualResetEvent
类型的私有字段,为了可以在 Main
方法中直接访问,字段可以声明为静态字段。
static ManualResetEvent mnlEvt = new ManualResetEvent(false);
注意:ManualResetEvent
类的构造函数包含一个 bool
类型的参数,用来标识事件句柄在创建时的初始状态————有信号还是无信号。本实例中,主线程需要等待另一个线程计算完成才能继续,因此 ManualResetEvent
对象的初始化应该为无信号,否则主线程是不会等待的。将参数设置为 false
表示初始化状态为无信号。
步骤3:创建新线程。
Thread th = new Thread(() =>
{
int n = 1;
int result = 0;
while(n
{
Console.WriteLine("正在进行第一阶段……");
Thread.Sleep(2000);
Console.WriteLine("第一阶段处理完成!");
// 发送信号
evt1.Set();
});
Thread th2 = new Thread(() =>
{
// 等待第一阶段完成
evt1.WaitOne();
Console.WriteLine("正在进行第二阶段……");
Thread.Sleep(2000);
Console.WriteLine("第二阶段处理完成!");
// 发出信号
evt2.Set();
});
Thread th3 = new Thread(() =>
{
// 等待第二阶段完成
evt2.WaitOne();
Console.WriteLine("正在进行第三阶段……");
Thread.Sleep(2000);
Console.WriteLine("第三阶段处理完成!");
// 发送信号
evt3.Set();
});
步骤4:依次启动三个线程。
th1.Start();
th2.Start();
th3.Start();
步骤5:主线程等待最后一个阶段完成(即收到 evt3
发送的信号)才能继续执行。
evt3.WaitOne();
Console.WriteLine("\n已完成所有操作。");
步骤6:运行应用程序项目,结果如下。
【导语】
作为公共基础类,WaitHandle
类公开了三个比较实用的静态方法:
(1)WaitAny
:调用此方法后,当前线程将被阻止。如果指定的事件具体数组中有任意一个事件发出信号,则此方法将返回数组中发出信号的事件句柄的索引,并结束等待。
(2)WaitAll
:在指定的事件句柄数组中,必须当所有事件句柄都发出信号后,才会结束等待。
(3)SignalAndWait
:可以直接切换两个事件句柄的状态。
本实例演示了 WaitAll
方法的使用。实例的任务是把 9 个字节写入到文件中。这个过程是通过 3 个线程来完成的,并且这些线程的执行是无序的。为了保证 9 字节能按照原有的顺序写入,可以将这些序列进行“分段”,即:第一个线程写入第1、2、3字节,第二个线程写入第4、5、6字节,第三个线程写入第7、8、9字节。每个线程只负责写自己改写入的位置,就算 3 个线程是无序执行的,最终也不会破坏原来字节的顺序。各个线程对应着一个事件句柄(本实例使用 AutoResetEvent
类),只要线程完成自己该做的任务后,就通知对应的事件句柄发出信号。主线程将通过 WaitHandle
类的 WaitAll
方法等待所有线程执行完成。
【操作流程】
步骤1:新建控制台应用程序项目。
步骤2:在 Program
类中声明两个只读的字段,位了便于在 Main
方法中使用,可以声明为静态字段。这两个字段分别是要输出的文件名称和一个字节数组(包含要写入文件的 9 个字节)。
// 文件名
static readonly string FileName = "demoFile.data";
// 要写入文件的 9 个字节
static readonly byte[] orgBuffer =
{
0x0C, 0x10, 0x02,
0xE3, 0x71, 0xA2,
0x13, 0xB8, 0x06
};
步骤3:在 Program
类中声明一个静态字段———— AutoResetEvent
数组,它将包含 3 个元素,可以作为与执行线程相对应的事件句柄。
static AutoResetEvent[] writtenEvents = {
new AutoResetEvent(false),
new AutoResetEvent(false),
new AutoResetEvent(false)
};
步骤4:启动 3 个新线程,每个线程负责写 3 字节。
for (int n = 0; n < 3; n++)
{
Thread th = new Thread((p) =>
{
// 先把要写的字节复制出来
int currentCount = Convert.ToInt32(p);
int copyIndex = currentCount * 3;
byte[] tmpBuffer = new byte[3];
Array.Copy(orgBuffer, copyIndex, tmpBuffer, 0, 3);
// 打开文件流
using (FileStream fs = new FileStream(FileName, FileMode.OpenOrCreate, FileAccess.Write, FileShare.Write))
{
// 定位流的当前位置
fs.Seek(copyIndex, SeekOrigin.Begin);
// 写入数据
fs.Write(tmpBuffer, 0, tmpBuffer.Length);
}
// 发出信号
writtenEvents[currentCount].Set();
});
// 标识为后台线程
th.IsBackground = true;
// 启动线程
th.Start(n);
}
注意:由于多个线程同时写入一个文件,因此创建 FileStream
实例时,必须指定一个有效的 FileShare
枚举值,本例中应为 Write
。指定此参数的目的是允许多个线程同时写一个文件,否则会发生报错。
步骤5:在主线程中,调用 WaitAll
方法等待所有事件句柄发出的信号。传递给方法的参数就是前面声明的 AutoResetEvent
数组。
Console.WriteLine("等待所有线程完成文件写入……");
WaitHandle.WaitAll(writtenEvents);
Console.WriteLine("文件写入完成。");
步骤6:为了验证 9 字节是否正确的写入文件,在写入完成后再读出文件中的字节。
using (FileStream fsin = new FileStream(FileName, FileMode.Open))
{
byte[] buffer = new byte[fsin.Length];
fsin.Read(buffer, 0, buffer.Length);
Console.WriteLine($"从文件读出来的字节:\n{BitConverter.ToString(buffer)}");
}
步骤7:运行应用程序项目,结果如下。
可以对比两次输出的字节数组,如果相同,说明 3 个线程已经把字节序列正确的写入文件。
串联并行任务【导语】
在一些复杂的处理逻辑中,经常会执行多个并行任务,并且这些任务都需要按照一定的顺序执行,在这种情况下,把并行任务进行串联比等待事件句柄信号更简单。
Task
类公开 ContinueWith
实例方法,调用该方法后,会将当前任务与下一个要执行的任务串联,当前任务执行完成后就启动下一个任务。ContinueWith
方法返回 Task
实例,即串联执行的新任务,并且 ContinueWith
方法可以连续调用,例如以下形式。
myTask.ContinueWith(...)
.ContinueWith(...)
.ContinueWith(...)
...;
本实例将演示通过三个 Task
进行加法运算,第一个 Task
返回整数值 10,第二个 Task
在第一个 Task
返回值的基础上再加上 15 并返回,第三个 Task
在第二个 Task
所返回的结果上再加上 20 并返回计算结。这三个 Task
必须按照顺序执行,因此应该调用 ContinueWith
方法进行串联。
【操作流程】
步骤1:新建控制台应用程序项目。
步骤2:串联执行三个并行任务,最终返回给 task
变量的是最后执行的 Task
所返回的结果。
Task task = Task.Run(() => 10) //返回 10
.ContinueWith(lasttask => lasttask.Result + 15) // 返回 25
.ContinueWith(lasttask => lasttask.Result + 20); // 返回 45
步骤3:等待并非任务完成。
task.Wait();
步骤4:运行应用程序项目,得到以下结果。
计算结果:45
使用Parallel类执行并行操作【导语】
Parallel
类是一个轻量级的并行操作执行类,注意用于基于 for
或 foreach
循环的并行代码上,该类会充分调配处理器的资源来运行循环,提升性能。
本实例将使用 Parallel
类启动并行的 foreach
循环来向文件写入数据,每一轮循环负责写入一个文件。
【操作流程】
步骤1:新建控制台应用程序项目。
步骤2:创建一个字符串数组实例,包含要创建的文件名称列表。
string[] fileNames =
{
"demo_1_dx", "demo_2_dx", "demo_3_dx", "demo_4_dx",
"demo_5_dx", "demo_6_dx", "demo_7_dx", "demo_8_dx"
};
步骤3:调用 Parallel.ForEach
方法循环写入文件,文件长度以及字节序列均随机产生。
Random rand = new Random();
Parallel.ForEach(fileNames, (fn) =>
{
int len;
byte[] data;
lock (rand)
{
// 随机产生文件长度
len = rand.Next(100, 90000);
data = new byte[len];
// 生成随机字节序列
rand.NextBytes(data);
}
using(FileStream fs = new FileStream(fn, FileMode.Create))
{
fs.Write(data);
}
Console.WriteLine($"已向文件 {fn} 写入 {data.Length} 字节");
});
步骤4:运行应用程序项目,结果如下。
【导语】
在某些应用场景下,对于同一个变量,需要允许访问它的各个线程都保留独立的值,即在使用同一个变量的情况下,每个线程可以为该变量分配独立的变量值,这些值只能当前线程中有效。
要实现这样的需求,就要借助 ThreadLocal
类,该类的实例可以在多个线程间共享,并且每个线程可以通过Value
属性设置各自的值,线程与线程之间互补干扰。如果需要知道 ThreadLocal
变量被设置过哪些值,可以访问 Values
属性,要是 Values
属性可用,在调用 ThreadLocal
类的构造函数的时候,需要调用带有 trackAllValues
参数(bool
类型)的重载版本,并将 trackAllValues
参数设置为 true
。
【操作流程】
步骤1:新建控制台应用程序项目。
步骤2:在 Program
类中声明一个 ThreadLocal
类型的静态字段,并初始化。
static ThreadLocal _localvar = new ThreadLocal(true);
本实例稍后会访问 Values
属性,所有在调用 ThreadLocal
类构造函数时要将 trackAllValues
参数设置为 true
。
步骤3:创建三个线程,并在线程所执行的代码上修改 ThreadLocal
实例的 Values
属性。
Thread th1 = new Thread(() =>
{
_localvar.Value = 5000;
Console.WriteLine($"在 ID 为 {Thread.CurrentThread.ManagedThreadId} 的线程,本地线程变量的值为:{_localvar.Value}");
});
th1.Start();
Thread th2 = new Thread(() =>
{
_localvar.Value = 9000;
Console.WriteLine($"在 ID 为 {Thread.CurrentThread.ManagedThreadId} 的线程,本地线程变量的值为:{_localvar.Value}");
});
th2.Start();
Thread th3 = new Thread(() =>
{
_localvar.Value = 7500;
Console.WriteLine($"在 ID 为 {Thread.CurrentThread.ManagedThreadId} 的线程,本地线程变量的值为:{_localvar.Value}");
});
th3.Start();
步骤4:等待三个线程执行完成。
th1.Join();
th2.Join();
th3.Join();
步骤5:此时,在主线程代码中可用访问 Values
的属性,枚举出被设置过的值。
Console.WriteLine("\n设置过的所有值:");
foreach (int n in _localvar.Values)
{
Console.Write(" {0}", n);
}
步骤6:运行应用程序项目,结果如下。
【导语】
在基于 Task
的异步等待上下文中,ThreadLocal
类型的本地变量无法发挥作用,请思考以下例子。
ThreadLocal local = new ();
async Task WorkAsync()
{
local.Value = "hello";
Console.WriteLine("异步等待前:{0}",local.Value);
await Task.Delay(500);
Console.WriteLine("异步等待后:{0}",local.Value);
}
在进入异步等待前,本地变量将字符串常量赋值为“hello”,随后调用 Delay
方法,并异步等待方法返回。回到当前上下文后,本地变量的值变为默认值(字符串的默认值是 null
),也就是说,之前赋值的字符串“hello”以及读不到了。
这是因为基于并行任务的异步上下文是由内部框架自动调度的,异步等待前后,本地变量可能处于不同的线程上,即 await
语句使用前后的代码并不是在同一个线程上,所以在等待方法返回后就取不到本地变量的值了。要解决这个问题,可用 AsyncLocal
类替换 ThreadLocal
类。AsyncLocal
类能够在异步上下文之间保留原有的数据,即使异步等待前后的代码不在同一个线程上,也能够访问之前设置的值。
【操作流程】
步骤1:新建控制台应用程序项目。
步骤2:在 Program
类中声明一个静态字段,类型为 AsyncLocal
。
static AsyncLocal local = new AsyncLocal();
步骤3:定义一个异步方法,在方法内调用 Task.Delay
方法,并异步等待方法返回。进入异步等待前,对 local
变量赋值;异步等待返回后,读取 lacal
变量的值。
static async Task RunThisCodeAsync()
{
local.Value = "Follow me";
Console.WriteLine("异步等待前:{0}", local.Value);
await Task.Delay(150);
Console.WriteLine("异步等待后:{0}", local.Value);
}
步骤4:在 Main
方法中调用 RunThisCodeAsync
方法。
RunThisCodeAsync().Wait();
步骤5:运行应用程序项目,结果如下。
可以看到,等待之前所赋的值,在异步上下文返回后仍然能顺利的读取。
取消并行任务【导语】
在实际开发中,经常会遇到在后台使用 Task
执行一些比较耗时代码的情况。处于友好的用户体验考虑,在执行长时间任务的过程中应该向用户返回处理进度;此外,由于运行耗时较长,用户可能不想再继续等待,应该允许用户取消任务。
CancellationTokenSource
类提供了取消任务的处理模型,通过 Token
属性可以获得 CancellationToken
结构实例的副本。所有被复制的 CancellationToken
对象都会监听 CancellationTokenSource
实例的状态,一旦 CancellationTokenSource
实例调用了 Cancel
方法,各个 CancellationToken
副本就会收到通知,此时 CancellationToken
对象的 IsCancellationRequested
属性将返回 true
。可以通过检查 IsCancellationRequested
属性来判断并行任务是否被取消。
本实例将演示一个累加运算,计算过程用一个异步方法封装。调用方法时,可以传递一个整数值,表示参与累加运算的最大值,计算从 0 开始累加,直到最大值,例如,最大值为 5,那么就计算0+1+2+3+4+5
。在程序执行运算的过程中,用户随时可以按下 C 键取消任务。
【操作流程】
步骤1:新建控制台应用程序项目。
步骤2:定义用于执行累加计算的异步方法。
static Task RunAsync(int maxNum, CancellationToken token = default)
{
TaskCompletionSource tcl = new TaskCompletionSource();
int x = 0;
int res = 0;
while(x < maxNum)
{
if (token.IsCancellationRequested)
{
break;
}
res += x;
x += 1;
Task.Delay(500).Wait();
}
tcl.SetResult(res);
return tcl.Task;
}
taken
参数用于监听任务是否被取消。本方法中使用了 TaskCompletionSource
类,这个类可以灵活的设置 Task
的运行结果(通过 SetResult
方法设置),在访问Task
属性就能获取要返回的并行任务实例。
步骤3:在 Main
方法中实例化 CancellationTokenSource
。
CancellationTokenSource cansrc = new CancellationTokenSource();
步骤4:在调用累加计算的异步方法之前,可以开启一个并行任务,用于判断用户是否按下了 C 键,如果是,就调用 CancellationTokenSource
对象的 Cancel
方法。
Task.Run(() =>
{
Console.WriteLine("按 C 键取消任务。");
while (true)
{
var info = Console.ReadKey(true);
if (info.Key == ConsoleKey.C)
{
cansrc.Cancel();
break;
}
}
});
步骤5:调用异步方法,并等待计算完成。
int result = await RunAsync(200, cansrc.Token);
Console.WriteLine("计算结果:{0}", result);
访问 Token
属性,会复制一份 CancellationToken
实例,并能够监听 Cancel
方法的调用。
步骤6:当不再使用 CancellationTokenSource
对象是, 需要将其释放。
cansrc.Dispose();
步骤7:运行应用程序项目,累加计算开始。此过程中如果按下 C 键,任务取消,并把以及完成的部分计算结果返回。
本文到这里就结束了,下一篇将介绍网络编程的知识案例。