
任务概述
线程(Thread)是创建并发的底层工具,因此有一定的局限性(不易得到返回值(必须通过创建共享域);异常的捕获和处理也麻烦;同时线程执行完毕后无法再次开启该线程),这些局限性会降低性能同时影响并发性的实现(不容易组合较小的并发操作实现较大的并发操作,会增加手工同步处理(加锁,发送信号)的依赖,容易出现问题)。
线程池的(ThreadPool)的QueueUserWorkItem方法很容发起一次异步的计算限制操作。但这个技术同样有着许多限制,最大的问题是没有内建的机制让你知道操作在什么时候完成,也没有机制在操作完成时获得返回值。
而Task类可以解决上述所有的问题。
任务(Task)表示一个通过或不通过线程实现的并发操作,任务是可组合的,使用延续(continuation)可将它们串联在一起,它们可以使用线程池减少启动延迟,可使用回调方法避免多个线程同时等待I/O密集操作。
然而,在今天这篇博客中,我们要知道的是,QueueUserWorkItem这个技术存在许多限制。其中最大的问题是没有一个内建的机制让你知道操作在什么时候完成,也没有一个机制在操作完成是获得一个返回值,这些问题使得我们都不敢启用这个技术。
1 System.Threading.Tasks.Task简介
基础任务(Task)
微软在.NET 4.0 引入任务(Task)的概念。通过System.Threading.Tasks命名空间使用任务。它是在ThreadPool的基础上进行封装的。Task默认都是使用池化线程,它们都是后台线程,这意味着主线程结束时其它任务也会随之停止。
启动一个任务有多种方式,如以下示例:
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 Console.WriteLine("主线程Id:{0}", Thread.CurrentThread.ManagedThreadId);
6 int workerThreadsCount, completionPortThreadsCount;
7 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount);
8 Console.WriteLine("剩余工作线程数:{0},剩余IO线程数{1}", workerThreadsCount, completionPortThreadsCount);
9 //第一种:实例化方式Start启动
10 {
11 Task task = new Task(() =>
12 {
13 Test("one-ok");
14 });
15 task.Start();
16 }
17 //第二种:通过Task类静态方法Run方式进行启动
18 {
19 Task.Run(() =>
20 {
21 Test("two-ok");
22 });
23 }
24 //第三种:通过TaskFactory的StartNew方法启动
25 {
26 TaskFactory taskFactory = new TaskFactory();
27 taskFactory.StartNew(() =>
28 {
29 Test("three-ok");
30 });
31 }
32 //第四种:.通过Task.Factory进行启动
33 {
34 Task taskStarNew = Task.Factory.StartNew(() =>
35 {
36 Test("four-ok");
37 });
38 }
39 //第五种:通过Task对象的RunSynchronously方法启动(同步,由主线程执行,会卡主线程)
40 {
41 Task taskRunSync = new Task(() =>
42 {
43 Console.WriteLine("线程Id:{0},执行方法:five-ok", Thread.CurrentThread.ManagedThreadId);
44 });
45 taskRunSync.RunSynchronously();
46 }
47 Thread.Sleep(1000);
48 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount);
49 Console.WriteLine("剩余工作线程数:{0},剩余IO线程数{1}", workerThreadsCount, completionPortThreadsCount);
50 Console.ReadKey();
51 }
52 static void Test(string o)
53 {
54 Thread.Sleep(2000);
55 Console.WriteLine("线程Id:{0},执行方法:{1}", Thread.CurrentThread.ManagedThreadId, o);
56 }
57 /*
58 * 作者:Jonins
59 * 出处:http://www.cnblogs.com/jonins/
60 */
61 }
执行结果:
上面示例中除去使用RunSynchronously方法启动的是同步任务(由启用的线程执行任务)外,其它几种方式内部都由线程池内的工作者线程处理。
说明:
1.事实上Task.Factory类型本身就是TaskFactory(任务工厂),而Task.Run(在.NET4.5引入,4.0版本调用的是后者)是Task.Factory.StartNew的简写法,是后者的重载版本,更灵活简单些。
2.调用静态Run方法会自动创建Task对象并立即调用Start
3.如Task.Run等方式启动任务并没有调用Start,因为它创建的是“热”任务,相反“冷”任务的创建是通过Task构造函数。
Microsoft为了克服这些限制(同时解决其他一些问题),引入了任务(tasks)的概念。顺带说一下我们得通过System.Threading.Tasks命名空间来使用它们。
一个Task表示一个异步操作,Task的创建和执行是独立的。
返回值(Task<TResult>)&状态(Status)
Task有一个泛型子类Task<TResult>,它允许任务返回一个值。调用Task.Run,传入一个Func<Tresult>代理或兼容的Lambda表达式,然后查询Result属性获得结果。如果任务没有完成,那么访问Result属性会阻塞当前线程,直至任务完成。
1 public static Task<TResult> Run<TResult>(Func<TResult> function);
而任务的Status属性可用于跟踪任务的执行状态,如下所示:
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 Task<int> task = Task.Run(() =>
6 {
7 int total = 0;
8 for (int i = 0; i <= 100; i++)
9 {
10 total += i;
11 }
12 Thread.Sleep(2000);
13 return total;
14 });
15 Console.WriteLine("任务状态:{0}",task.Status);
16 Thread.Sleep(1000);
17 Console.WriteLine("任务状态:{0}", task.Status);
18 int totalCount = task.Result;//如果任务没有完成,则阻塞
19 Console.WriteLine("任务状态:{0}", task.Status);
20 Console.WriteLine("总数为:{0}",totalCount);
21 Console.ReadKey();
22 }
23 }
执行如下:
Reulst属性内部会调用Wait(等待);
任务的Status属性是一个TaskStatus枚举类型:
1 public TaskStatus Status { get; }
说明如下:
枚举值 | 说明 |
Canceled |
任务已通过对其自身的 CancellationToken 引发 OperationCanceledException 对取消进行了确认,此时该标记处于已发送信号状态; 或者在该任务开始执行之前,已向该任务的 CancellationToken 发出了信号。 |
Created | 该任务已初始化,但尚未被计划。 |
Faulted | 由于未处理异常的原因而完成的任务。 |
RanToCompletion | 已完成执行的任务。 |
Running | 任务正在运行,尚未完成。 |
WaitingForActivation | 该任务正在等待 .NET Framework 基础结构在内部将其激活并进行计划。 |
WaitingForChildrenToComplete | 该任务已完成执行,正在隐式等待附加的子任务完成。 |
WaitingToRun | 该任务已被计划执行,但尚未开始执行。 |
现在我要说的是,用线程池不是调用ThreadPool的QueueUserWorkItem方法,而是用任务来做相同的事:
只读属性:
任务集合返回值(WhenAll&WhenAny)
Task中有非常方便的对并行运行的任务集合获取返回值的方式,比如WhenAll和WhenAny。
复制代码 1 static void Main(string[] args)
返回值 |
名称 |
说明 |
object |
AsyncState |
表示在创建任务时传递给该任务的状态数据 |
TaskCreationOptions |
CreationOptions |
获取用于创建此任务的 TaskCreationOptions |
|
CurrentId |
当前正在执行 Task 的 ID |
AggregateException |
Exception |
获取导致 AggregateException 提前结束的 Task。如果 Task 成功完成或尚未引发任何异常,则返回 null |
TaskFactory |
Factory |
提供对用于创建 Task 和 Task<TResult> 的工厂方法的访问 |
int |
Id |
获取此 Task 实例的 ID |
bool |
IsCanceled |
指明此 Task 实例是否由于被取消的原因而已完成执行 |
bool |
IsCompleted |
指明此 Task 是否已完成 |
bool |
IsFaulted |
指明Task 是否由于未经处理异常的原因而完成 |
TaskStatus |
Status |
获取此任务的 TaskStatus |
1.WhenAll
WhenAll:等待提供的所有 Task 对象完成执行过程(所有任务全部完成)。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 List<Task<int>> taskList = new List<Task<int>>();//声明一个任务集合
6 TaskFactory taskFactory = new TaskFactory();
7 for (int i = 0; i < 5; i++)
8 {
9 int total = i;
10 Task<int> task = taskFactory.StartNew(() => Test(total));
11 taskList.Add(task);//将任务放进集合中
12 }
13 Console.WriteLine("主线程Id:{0},继续执行A.....", Thread.CurrentThread.ManagedThreadId);
14 Task<int[]> taskReulstList = Task.WhenAll(taskList);//创建一个任务,该任务将集合中的所有 Task 对象都完成时完成
15 for (int i = 0; i < taskReulstList.Result.Length; i++)//这里调用了Result,所以会阻塞线程,等待集合内所有任务全部完成
16 {
17 Console.WriteLine("返回值:{0}", taskReulstList.Result[i]);//遍历任务集合内Task返回的值
18 }
19 Console.WriteLine("主线程Id:{0},继续执行B.....", Thread.CurrentThread.ManagedThreadId);
20 Console.ReadKey();
21 }
22 private static int Test(int o)
23 {
24 Console.WriteLine("线程Id:{0},Task执行成功,参数为:{1}", Thread.CurrentThread.ManagedThreadId, o);
25 Thread.Sleep(500 * o);
26 return o;
27 }
28 }
执行结果:
2 {
2.WhenAny
WhenAny:等待提供的任一 Task 对象完成执行过程(只要有一个任务完成)。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 List<Task<int>> taskList = new List<Task<int>>();//声明一个任务集合
6 TaskFactory taskFactory = new TaskFactory();
7 for (int i = 0; i < 5; i++)
8 {
9 int total = i;
10 Task<int> task = taskFactory.StartNew(() => Test(total));
11 taskList.Add(task);//将任务放进集合中
12 }
13 Console.WriteLine("主线程Id:{0},继续执行A.....", Thread.CurrentThread.ManagedThreadId);
14 Task<Task<int>> taskReulstList = Task.WhenAny(taskList);//创建一个任务,该任务将在集合中的任意 Task 对象完成时完成
15 Console.WriteLine("返回值:{0}", taskReulstList.Result.Result);//得到任务集合内最先完成的任务的返回值
16 Console.WriteLine("主线程Id:{0},继续执行B.....", Thread.CurrentThread.ManagedThreadId);
17 Console.ReadKey();
18 }
19 private static int Test(int o)
20 {
21 Console.WriteLine("线程Id:{0},Task执行成功,参数为:{1}", Thread.CurrentThread.ManagedThreadId, o);
22 Thread.Sleep(500 * o);
23 return o;
24 }
25 }
执行结果(这里返回值肯定会是0,因为休眠最短):
3 Console.WriteLine(“主线程启动”);
2 Task状态和生命周期
等待(Wait)&执行方式(TaskCreationOptions)
4 //ThreadPool.QueueUserWorkItem(StartCode,5);
一个Task实例只会完成其生命周期一次,当Task达到它的3种可能的最终状态之一时,它就再也回不去之前的状态了。任务的生命周期从TaskStatus.Created状态真正开始。
1.任务等待(Wait)
调用任务的Wait方法可以阻塞任务直至任务完成,类似于线程的join。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 Task task = Task.Run(() =>
6 {
7 Console.WriteLine("线程执行Begin");
8 Thread.Sleep(2000);
9 Console.WriteLine("线程执行End");
10 });
11 Console.WriteLine("任务是否完成:{0}", task.IsCompleted);
12 task.Wait();//阻塞,直至任务完成
13 Console.WriteLine("任务是否完成:{0}", task.IsCompleted);
14 Console.ReadKey();
15 }
16 }
执行如下:
注意:
线程调用Wait方法时,系统检测线程要等待的Task是否已经开始执行。如果是线程则会阻塞直到Task运行结束为止。但如果Task还没有开始执行任务,系统可能(取决于TaskScheduler)使用调用Wait的线程来执行Task,这种情况下调用Wait的线程不会阻塞,它会执行Task并立即返回。好处在于没有线程会被阻塞,所以减少了资源占用。不好的地方在于加入线程在调用Wait前已经获得了一个线程同步锁,而Task试图获取同一个锁,就会造成死锁的线程。
5 new Task(StartCode, 5).Start();
1) 初始状态:
2.任务执行方式(TaskCreationOptions)
我们知道为了创建一个Task,需要调用构造函数并传递一个Action或Action<object>委托,如果传递的是期待一个Object的方法,还必须向Task的构造函数穿都要传给操作的实参。还可以选择向构造器传递一些TaskCreationOptions标记来控制Task的执行方式。
TaskCreationOptions为枚举类型
枚举值 | 说明 |
None | 默认。 |
PreferFairness | 尽可能公平的方式安排任务,即先进先执行。 |
LongRunning | 指定任务将是长时间运行的,会新建线程执行,不会使用池化线程。 |
AttachedToParent | 指定将任务附加到任务层次结构中的某个父级 |
DenyChildAttach | 任务试图和这个父任务连接将抛出一个InvalidOperationException |
HideScheduler | 强迫子任务使用默认调度而非父级任务调度 |
在默认情况下,Task内部是运行在池化线程上,这种线程会非常适合执行短计算密集作业。如果要执行长阻塞操作,则要避免使用池化线程。
在池化线程上运行一个长任务问题不大,但是如果要同时运行多个长任务(特别是会阻塞的任务),则会对性能产生影响。最好使用:TaskCreationOptions.LongRunning。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 int workerThreadsCount, completionPortThreadsCount;
6 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount);
7 Console.WriteLine("剩余工作线程数:{0},剩余IO线程数{1},主线程Id:{2}", workerThreadsCount, completionPortThreadsCount, Thread.CurrentThread.ManagedThreadId);
8 Task task = Task.Factory.StartNew(() =>
9 {
10 Console.WriteLine("长任务执行,线程Id:{0}", Thread.CurrentThread.ManagedThreadId);
11 Thread.Sleep(2000);
12 }, TaskCreationOptions.LongRunning);
13 Thread.Sleep(1000);
14 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount);
15 Console.WriteLine("剩余工作线程数:{0},剩余IO线程数{1},主线程Id:{2}", workerThreadsCount, completionPortThreadsCount, Thread.CurrentThread.ManagedThreadId);
16 Console.ReadKey();
17 }
18 }
执行结果如下:
注意:
如果使运行I/O密集任务,则可以使用TaskCompletionSource和异步函数(asynchronous
functions),通过回调(延续)实现并发性,而是不通过线程实现。
如果使运行计算密集性任务,则可以使用一个生产者/消费者队列,控制这些任务的并发数量,避免出现线程和进程阻塞的问题。
6 Console.WriteLine(“主线程运行到此!”);
Task实例有三种可能的初始状态
延续(continuation)&延续选项(TaskContinuationOptions)
延续(continuation)会告诉任务在完成后继续执行下面的操作。延续通常由一个回调方法实现,它会在操作完成之后执行一次。给一个任务附加延续的方法有两种
7 Thread.Sleep(1000);
值 |
说明 |
TaskStatus.Created |
该任务已初始化,但尚未被计划。使用Task构造函数创建Task实例时的初始状态。 |
TaskStatus.WaitingForActivation |
该任务正在等待 .NET Framework 基础结构在内部将其激活并进行计划。一个任务的初始状态,这个任务只有当其依赖的任务完成之后才会被调度。 |
TaskStatus.WaitingToRun |
该任务已被计划执行,但尚未开始执行。使用TaskFactory.StartNew创建的任务的初始状态。 |
1.GetAwaiter
任务的方法GetAwaiter是Framework
4.5新增加的,而C#
5.0的异步功能使用了这种方法,因此它非常重要。给一个任务附加延续如下:
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 Task<int> task = Task.Run(() =>
6 {
7 int total = 0;
8 for (int i = 0; i <= 100; i++)
9 {
10 total += i;
11 }
12 Thread.Sleep(2000);
13 return total;
14 });
15 var awaiter = task.GetAwaiter();
16 awaiter.OnCompleted(() =>
17 {
18 int result = awaiter.GetResult();//在延续中获取Task的执行结果
19 Console.WriteLine(result);
20 });
21 Console.ReadKey();
22 }
23 }
执行结果控制台会打印:5050。
调用GetAwaiter会返回一个等待者(awaiter)对象,它会让先导(antecedent)任务在任务完成(或出错)之后执行一个代理。已经完成的任务也可以附加一个延续,这事延续会马上执行。
注意:
1.等待者(awaiter)可以是任意对象,但必须包含特定的两个方法和一个Boolean类型属性。
1 public struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion
2 {
3 public bool IsCompleted { get; }
4 public TResult GetResult();
5 public void OnCompleted(Action continuation);
6 }
2.先导任务出现错误,那么当延续代码调用awaiter.GetResult()时就会重新抛出异常。我们可以需要调用GetResult,而是直接访问先导任务的Result属性(task.Result)。
GetResult的好处是,当先导任务出现错误时,异常可以直接抛出而不封装在AggregateException中。
3.如果出现同步上下文,那么会自动捕捉它,然后延续提交到这个上下文中。在无需同步上下文的情况下通常不采用这种方法,使用ConfigureAwait代替它。它通常会使延续运行在先导任务所在的线程上,从而避免不必要的过载。
1 var awaiter = task.ConfigureAwait(false).GetAwaiter();
8 }
2)中间状态:
2.ContinueWith
另一种附加延续的方法是调用任务的ContinueWith方法:
1 static void Main(string[] args)
2 {
3 Task<int> task = Task.Run(() =>
4 {
5 int total = 0;
6 for (int i = 0; i <= 100; i++)
7 {
8 total += i;
9 }
10 Thread.Sleep(2000);
11 return total;
12 });
13 task.ContinueWith(continuationAction =>
14 {
15 int result = continuationAction.Result;
16 Console.WriteLine(result);
17 });
18 Console.ReadKey();
19 }
ContinueWith本身会返回一个Task,它非常适用于添加更多的延续。然后如果任务出现错误,我们必须直接处理AggregateException。
如果想让延续运行在统一个线程上,必须指定 TaskContinuationOptions.ExecuteSynchronously;否则它会弹回线程池。ContinueWith特别适用于并行编程场景。
9 10 private static void StartCode(object i)
Task实例有两种可能的中间状态
3.延续选项(TaskContinuationOptions)
在使用ContinueWith时可以指定任务的延续选项即TaskContinuationOptions,它的前六个枚举类型与之前说的TaskCreationOptions枚举提供的标志完全一样,补充后续几个枚举值:
枚举值 | 说明 |
LazyCancellation | 除非先导任务完成,否则禁止延续任务完成(取消)。 |
NotOnRanToCompletion | 指定不应在延续任务前面的任务已完成运行的情况下安排延续任务。 |
NotOnFaulted | 指定不应在延续任务前面的任务引发了未处理异常的情况下安排延续任务。 |
NotOnCanceled | 指定不应在延续任务前面的任务已取消的情况下安排延续任务。 |
OnlyOnCanceled | 指定只应在延续前面的任务已取消的情况下安排延续任务。 |
OnlyOnFaulted | 指定只有在延续任务前面的任务引发了未处理异常的情况下才应安排延续任务。 |
OnlyOnRanToCompletion | 指定只有在延续任务前面的任务引发了未处理异常的情况下才应安排延续任务。 |
ExecuteSynchronously | 指定希望由先导任务的线程执行,先导任务完成后线程继续执行延续任务。 |
ExecuteSynchronously是指同步执行,两个任务都在同一个=线程一前一后的执行。
ContinueWith结合TaskContinuationOptions使用的示例:
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 Task<int> task = Task.Run(() =>
6 {
7 int total = 0;
8 for (int i = 0; i <= 100; i++)
9 {
10 total += i;
11 }
12 if (total == 5050)
13 {
14 throw new Exception("错误");//这段代码可以注释或开启,用于测试
15 }
16 return total;
17 });
18 //指定先导任务无报错的延续任务
19 task.ContinueWith(continuationAction =>
20 {
21 int result = continuationAction.Result;
22 Console.WriteLine(result);
23 }, TaskContinuationOptions.NotOnFaulted);
24 //指定先导任务报错时的延续任务
25 task.ContinueWith(continuationAction =>
26 {
27 foreach (Exception ex in continuationAction.Exception.InnerExceptions)//有关AggregateException异常处理后续讨论
28 {
29 Console.WriteLine(ex.Message);
30 }
31 }, TaskContinuationOptions.OnlyOnFaulted);
32 Console.ReadKey();
33 }
34 }
执行结果会打印:报错,如果注释掉抛出异常的代码则会打印5050。
11 {
值 |
说明 |
TaskStatus.Running |
该任务正在运行,但尚未完成 |
TaskStatus.WaitingForChildrenToComplete |
该任务已完成执行,正在隐式等待附加的子任务完成 |
TaskCompletionSource
另一种创建任务的方法是使用TaskCompletionSource。它允许创建一个任务,并可以任务分发给使用者,并且这些使用者可以使用该任务的任何成员。它的实现原理是通过一个可以手动操作的“附属”任务,用于指示操作完成或出错的时间。
TaskCompletionSource的真正作用是创建一个不绑定线程的任务(手动控制任务工作流,可以使你把创建任务和完成任务分开)。
这种方法非常适合I/O密集作业:可以利用所有任务的优点(它们能够生成返回值、异常和延续),但不会在操作执行期间阻塞线程。
例如,假设一个任务需要等待2秒,然后返回10,我们的方法会返回在一个2秒后完成的任务,通过给任务附加一个延续就可以在不阻塞任何线程的前提下打印这个结果,如下:
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var awaiter = Demo(2000).GetAwaiter();//得到任务通过延续输出返回值
6 awaiter.OnCompleted(() =>
7 {
8 Console.WriteLine(awaiter.GetResult());
9 });
10 Console.WriteLine("主线程继续执行....");
11 Console.ReadKey();
12 }
13 static Task<int> Demo(int millis)
14 {
15 //创建一个任务完成源
16 TaskCompletionSource<int> taskCompletionSource = new TaskCompletionSource<int>();
17 var timer = new System.Timers.Timer(millis) { AutoReset = false };
18 timer.Elapsed += delegate
19 {
20 timer.Dispose(); taskCompletionSource.SetResult(10);//写入返回值
21 };
22 timer.Start();
23 return taskCompletionSource.Task;//返回任务
24 }
25 }
执行结果:
注意:如果多次调用SetResult、SetException或SetCanceled,它们会抛出异常,而TryXXX会返回false。
12 Console.WriteLine(“开始执行子线程…{0}”,i);
任务取消(CancellationTokenSource)
一些情况下,后台任务可能运行很长时间,取消任务就非常有用了。.NET提供了一种标准的任务取消机制可用于基于任务的异步模式。
取消基于CancellationTokenSource类,该类可用于发送取消请求。请求发送给引用CancellationToken类的任务,其中CancellationToken类与CancellationTokenSource类相关联。
使用示例如下:
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //构造函数 指定延迟2秒后自动取消任务
6 CancellationTokenSource source = new CancellationTokenSource(2000);
7 //注册一个任务取消后执行的委托
8 source.Token.Register(() =>
9 {
10 Console.WriteLine("线程Id:{0} 任务被取消后的业务逻辑正在运行", Thread.CurrentThread.ManagedThreadId);
11 });
12 //启动任务,将取消标记源带入参数
13 Task.Run(() =>
14 {
15 while (!source.IsCancellationRequested)//IsCancellationRequested为True时取消任务
16 {
17 Thread.Sleep(100);
18 Console.WriteLine("线程Id:{0} 任务正在运行", Thread.CurrentThread.ManagedThreadId);
19 }
20 }, source.Token);
21 //主线程挂起2秒后手动取消任务
22 {
23 //Thread.Sleep(2000);
24 //source.Cancel();//手动取消任务
25 }
26 //主线程不阻塞,2秒后自动取消任务
27 {
28 source.CancelAfter(2000);
29 }
30 Console.ReadKey();
31 }
32 }
执行结果:
根据Register方法绑定任务取消后的委托
1 public CancellationTokenRegistration Register(Action callback);
2 public CancellationTokenRegistration Register(Action callback, bool useSynchronizationContext);
3 public CancellationTokenRegistration Register(Action<object> callback, object state);
4 public CancellationTokenRegistration Register(Action<object> callback, object state, bool useSynchronizationContext);
手动取消任务:Cancel方法
自动取消任务:
1.CancelAfter方法后面可以带入参数指定延迟多少后时间取消任务。
1 public void CancelAfter(TimeSpan delay);
2 public void CancelAfter(int millisecondsDelay);
2.CancellationTokenSource构造函数可以带入参数指定延迟多少时间后取消任务。
1 public CancellationTokenSource(TimeSpan delay);
2 public CancellationTokenSource(int millisecondsDelay);
任务绑定CancellationTokenSource对象,在Task源码中可以带入CancellationToken对象的启动任务方式都可以绑定CancellationTokenSource。
13 Thread.Sleep(1000);//模拟代码操作
3) 最终状态:
异步等待 (Task.Delay)
异步等待非常实用,因此它成为Task类的一个静态方法
常用的使用方式有2种,如下:
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //第1种
6 {
7 Task.Delay(2000).ContinueWith((o) =>
8 {
9 Console.WriteLine("线程Id:{0},异步等待2秒后执行的逻辑", Thread.CurrentThread.ManagedThreadId);
10 });
11 }
12 //第2种
13 {
14 Task.Delay(3000).GetAwaiter().OnCompleted(() =>
15 {
16 Console.WriteLine("线程Id:{0},异步等待3秒后执行的逻辑", Thread.CurrentThread.ManagedThreadId);
17 });
18 }
19 Console.WriteLine("主线程Id:{0},继续执行", Thread.CurrentThread.ManagedThreadId);
20 Console.ReadKey();
21 }
22 }
执行结果如下:
Task.Delay是Thread.Sleep的异步版本。而它们的区别如下(引自 禅道 ):
1.Thread.Sleep 是同步延迟,Task.Delay异步延迟。
2.Thread.Sleep 会阻塞线程,Task.Delay不会。
3.Thread.Sleep不能取消,Task.Delay可以。
4. Task.Delay() 比 Thread.Sleep()
消耗更多的资源,但是Task.Delay()可用于为方法返回Task类型;或者根据CancellationToken取消标记动态取消等待。5. Task.Delay() 实质创建一个运行给定时间的任务, Thread.Sleep()
使当前线程休眠给定时间。
14 }
Task实例有三种可能的最终状态
异常(AggregateException)
与线程不同,任务可以随时抛出异常。所以,如果任务中的代码抛出一个未处理异常,那么这个异常会自动传递到调用Wait()或Task<TResult>的Result属性的代码上。
任务的异常将会自动捕获并抛给调用者。为确保报告所有的异常,CLR会将异常封装在AggregateException容器中,该容器公开的InnerExceptions属性中包含所有捕获的异常,从而更适合并行编程。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 try
6 {
7 Task.Run(() =>
8 {
9 throw new Exception("错误");
10 }).Wait();
11 }
12 catch (AggregateException axe)
13 {
14 foreach (var item in axe.InnerExceptions)
15 {
16 Console.WriteLine(item.Message);
17 }
18 }
19 Console.ReadKey();
20 }
21 }
上述示例控制台会显示:错误
注意:
使用Task的IsFaulted和IsCanceled属性,就可以不重新抛出异常而检测出错的任务。
1.IsFaulted和IsCanceled都返回False,表示没有错误发生。
2.IsCanceled为True,则任务抛出了OperationCanceledOperation(取消线程正在执行的操作时在线程中抛出的异常)。
3.IsFaulted为True,则任务抛出另一种异常,而Exception属性包含了该错误。
15 }
值 |
说明 |
TaskStatus.Canceled |
该任务已通过对其自身的 CancellationToken 引发 OperationCanceledException 对取消进行了确认,此时该标记处于已发送信号状态;或者在该任务开始执行之前,已向该任务的 CancellationToken 发出了信号。Task属性IsFaulted被设置为true |
TaskStatus.Faulted |
由于未处理异常的原因而完成的任务。Task属性IsCanceled被设置为true |
TaskStatus.RunToCompletion |
已成功完成执行的任务。Task属性IsCompleted被设置为true,IsFaulted和IsCanceled被设置为false |
1.Flatten
当子任务抛出异常时,通过调用Flatten方法,可以消除任意层次的嵌套以简化异常处理。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var parent = Task.Factory.StartNew(() =>
6 {
7 int[] numbers = { 0 };
8 var childFactory = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
9 childFactory.StartNew(() => 10 / numbers[0]);//除零
10 childFactory.StartNew(() => numbers[1]);//超出索引范围
11 childFactory.StartNew(() => throw null);//空引用
12 });
13 try
14 {
15 parent.Wait();
16 }
17 catch (AggregateException axe)
18 {
19 foreach (var item in axe.Flatten().InnerExceptions)
20 {
21 Console.WriteLine(item.Message);
22 }
23 }
24 Console.ReadKey();
25 }
26 }
嘿,你会发现结果是一样的。再来看看这个是什么:TaskCreationOptions这个类型是一个枚举类型,传递一些标志来控制Task的执行方式。TaskCreationOptions定义如下:慢点,注释很详细,看看这些有好处,TaskScheduler(任务调度器)不懂没关系,请继续往下看,我会介绍的,但请注意,这些标识都只是一些提议而已,在调度一个Task时,可能会、也可能不会采纳这些提议,不过有一条要注意:AttachedToParent标志,它总会得到Task采纳,因为它和TaskScheduler本身无关。
2.Handle
如果需要只捕获特定类型异常,并重抛其它类型的异常,Handle方法为此提供了一种快捷方式。
Handle接受一个predicate(异常断言),并在每个内部异常上运行此断言。
1 public void Handle(Func<Exception, bool> predicate);
如果断言返回True,它认为该异常是“已处理”,当所有异常过滤之后:
1.如果所有异常是已处理的,异常不会抛出。
2.如果存在异常未处理,就会构造一个新的AggregateException对象来包含这些异常并抛出。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var parent = Task.Factory.StartNew(() =>
6 {
7 int[] numbers = { 0 };
8 var childFactory = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
9 childFactory.StartNew(() => 10 / numbers[0]);//除零
10 childFactory.StartNew(() => numbers[1]);//超出索引范围
11 childFactory.StartNew(() => throw null);//空引用
12 });
13 try
14 {
15 try
16 {
17 parent.Wait();
18 }
19 catch (AggregateException axe)
20 {
21 axe.Flatten().Handle(ex =>
22 {
23 if (ex is DivideByZeroException)
24 {
25 Console.WriteLine("除零-错误处理完毕");
26 return true;
27 }
28 if (ex is IndexOutOfRangeException)
29 {
30 Console.WriteLine("超出索引范围-错误处理完毕");
31 return true;
32 }
33 return false;//所有其它 异常重新抛出
34 });
35
36 }
37 }
38 catch (AggregateException axe)
39 {
40 foreach (var item in axe.InnerExceptions)//捕获重新抛出的异常
41 {
42 Console.WriteLine(item.Message);
43 }
44 }
45 Console.ReadKey();
46 }
47 }
执行结果:
来看下这段代码:
3 创建并执行任务
结语
1.async和await这两个关键字下篇记录。
2.任务调度器(TaskScheduler)是Task之所以如此灵活的本质,我们常说Task是在ThreadPool上更升级化的封装,其实很大程度上归功于这个对象,考虑下篇要不要说一下,但其实我看的都头疼…
3.Task类包含很多的重载,最好F12跳到Task内熟悉下结构。
1 static void Main(string[] args)
1)public Task StartNew(Action action)
参考文献
CLR via C#(第4版) Jeffrey Richter
C#高级编程(第10版) C# 6 & .NET Core 1.0 Christian Nagel
果壳中的C# C#5.0权威指南 Joseph Albahari
C#并发编程 经典实例 Stephen Cleary
…
2 {
参数:
3
action:要异步执行的操作委托
4 //1000000000这个数字会抛出System.AggregateException
返回值:
5
已启动的
System.Threading.Tasks.Task
6 Taskt = new Task(n => Sum((Int32)n), 1000000000);
异常:
7
System.ArgumentNullException:当
action
参数为 null 时引发的异常。
8 //可以现在开始,也可以以后开始
2)public static Task Run(Action action)
9 10 t.Start();
参数:
11 12 //Wait显式的等待一个线程完成
action:表示在线程池执行的队列的任务
13 14 t.Wait();
返回值:
15 16 Console.WriteLine(“The Sum is:”+t.Result);
已启动的
System.Threading.Tasks.Task
17 }
异常:
18 19 private static Int32 Sum(Int32 i)
System.ArgumentNullException:当
action
参数为 null 时引发的异常。
20 {
3)public void Start()
21 Int32 sum = 0;
启动
System.Threading.Tasks.Task,并将它安排到当前的
System.Threading.Tasks.TaskScheduler中执行。
22 for (; i > 0; i–)
异常:
23 checked { sum += i; }
System.ObjectDisposedException:已释放 System.Threading.Tasks.Task
实例。
24 return sum;
System.InvalidOperationException:System.Threading.Tasks.Task
未处于有效状态,无法启动。
它可能已启动、已执行或已取消,或者可能已经不支持以直接计划的方式创建。
25 }
注意:
26 }
仅使用Task的构造器来创建Task的实例并不能启动任务,还要使用Start才能启动任务。
这段代码大家应该猜得出是什么意思吧,人人都会写。 但是,我的结果为什么是t.Result而不直接是返回的Sum呢?
有没有多此一举的感觉?下面我来说说这段代码我想表达的意思: 在一个线程调用Wait方法时,系统会检查线程要等待的Task是否已经开始执行,如果任务正在执行,那么这个Wait方法会使线程阻塞,知道Task运行结束为止。 就说上面的程序执行,因为累加数字太大,它抛出算术运算溢出错误,在一个计算限制任务抛出一个未处理的异常时,这个异常会被“包含”不并存储到一个集合中,而线程池线程是允许返回到线程池中的,在调用Wait方法或者Result属性时,这个成员会抛出一个System.AggregateException对象。 现在你会问,为什么要调用Wait或者Result?或者一直不查询Task的Exception属性?你的代码就永远注意不到这个异常的发生,如果不能捕捉到这个异常,垃圾回收时,抛出AggregateException,进程就会立即终止,这就是“牵一发动全身”,莫名其妙程序就自己关掉了,谁也不知道这是什么情况。所以,必须调用前面提到的某个成员,确保代码注意到异常,并从异常中恢复。悄悄告诉你,其实在用Result的时候,内部会调用Wait。 怎么恢复? 为了帮助你检测没有注意到的异常,可以向TaskScheduler的静态UnobservedTaskException时间等级一个回调方法,当Task被垃圾回收时,如果出现一个没有被注意到的异常,CLR终结器会引发这个事件。一旦引发,就会向你的时间处理器方法传递一个UnobservedTaskExceptionEvenArgs对象,其中包含了你没有注意的AggregateException。然后再调用UnobservedTasExceptionEvenArgs的SetObserved方法来指出你的异常已经处理好了,从而阻止CLR终止进程。这是个图省事的做法,要少做这些,宁愿终止进程,也不要呆着已经损坏的状态而继续运行。做人也一样,病了宁肯休息,也不要带病坚持上班,你没那么伟大,公司也不需要你的这一点伟大,命是自己的。(─.─|||扯远了。 除了单个等待任务,Task
还提供了两个静态方法:WaitAny和WaitAll,他们允许线程等待一个Task对象数组。 WaitAny方法会阻塞调用线程,知道数组中的任何一个Task对象完成,这个方法会返回一个索引值,指明完成的是哪一个Task对象。如果发生超时,方法将返回-1。它可以通过一个CancellationToken取消,会抛出一个OperationCanceledException。 WaitAll方法也会阻塞调用线程,知道数组中的所有Task对象都完成,如果全部完成就返回true,如果超时就返回false。当然它也能取消,同样会抛出OperationCanceledException。 说了这么两个取消任务的方法,现在来试试这个方法,加深下印象,修改先前例子代码,完整代码如下:
4)Task.Factory.StartNew与Task.Run
1 static void Main(string[] args)
Task.Factory.StartNew重载方法提供更多的参数,可以控制如何计划执行任务以及如何向调试器公开计划任务的机制和控制任务的创建和执行的可选行为。
2 {
而Task.Run提供的方法则不具有上述控制机制。
3 CancellationTokenSource cts = new
CancellationTokenSource();
4
4 等待任务完成
5
1)public void Wait()
6 7 Taskt = new Task(() => Sum(cts.Token,10000),
cts.Token);
等待 System.Threading.Tasks.Task
完成执行过程
8 9 //可以现在开始,也可以以后开始
异常:
10 11 t.Start();
ObjectDisposedException:Task 对象已被释放。
12 13 //在之后的某个时间,取消CancellationTokenSource
以取消Task
AggregateException:System.Threading.Tasks.Task 已取消或在 System.Threading.Tasks.Task
的执行期间引发了异常。如果任务已被取消,System.AggregateException将包含其
System.AggregateException.InnerExceptions 集合中的 System.OperationCanceledException。
14 15
cts.Cancel();//这是个异步请求,Task可能已经完成了。我是双核机器,Task没有完成过
2)public static void
WaitAll(params Task[] tasks)
16 17 18 //注释这个为了测试抛出的异常
参数:
19 //Console.WriteLine(“This sum is:” + t.Result);
tasks:要等待的 Task 实例的数组
20 try
异常:
21 {
ObjectDisposedException:一个或多个 Task 中的对象 tasks 已被释放。
22 //如果任务已经取消了,Result会抛出AggregateException
ArgumentNullException:tasks
参数为 null或tasks 参数包含 null 元素。
23 24 Console.WriteLine(“This sum is:” + t.Result);
AggregateException:在至少一个 Task 实例已取消。如果任务已被取消, AggregateException 异常包含
OperationCanceledException 中的异常其
AggregateException.InnerExceptions 集合。或在至少一个执行期间引发了异常
Task 实例。
25 }
说明:
26 catch (AggregateException x)
主线程会等待作为参数传入的任务tasks执行结束才会执行下一条语句。
27 {
3)public static int WaitAny(params Task[] tasks)
28 //将任何OperationCanceledException对象都视为已处理。
参数:
29 //其他任何异常都造成抛出一个AggregateException,其中
tasks:要等待的 Task 实例的数组
30 //只包含未处理的异常
异常:
31 32 x.Handle(e => e is OperationCanceledException);
System.ObjectDisposedException:System.Threading.Tasks.Task
已被释放。
33 Console.WriteLine(“Sum was Canceled”);
System.ArgumentNullException:tasks 参数为 null。
34 }
System.ArgumentException:tasks 参数包含 null 元素。
35 36 }
37 38 private static Int32 Sum(CancellationToken ct ,Int32 i)
5 取消任务
39 {
使用System.Threading.CancellationToken和System.Threading.CancellationTokenSource中断Task的执行。
40 Int32 sum = 0;
1)System.Threading.CancellationToken
41 for (; i > 0; i–)
传播有关应取消操作的通知
42 {
属性:
43 //在取消标志引用的CancellationTokenSource上如果调用
public bool IsCancellationRequested { get; }
44 //Cancel,下面这一行就会抛出OperationCanceledException
方法:
45 46 ct.ThrowIfCancellationRequested();
public void ThrowIfCancellationRequested();
47 48 checked { sum += i; }
如果已请求取消此标记,则引发
System.OperationCanceledException。
49 }
异常: