소개
아마도 C# 5에서 async 및 Wait 키워드가 비동기 프로그래밍을 단순화하는 데 어떻게 도움이 되는지 읽어보셨을 것입니다. 불행하게도 VS2010을 업그레이드한 지 2년이 지났지만 VS2010 이후에는 아직 VS2012로 업그레이드할 준비가 되지 않았습니다. C# 4에서 사용할 수 있으며 "VS 2010에서 동기식으로 보이지만 비동기적으로 실행되는 메서드를 작성할 수 있다면 내 코드가 더 깔끔해질 것입니다."라고 생각할 수도 있습니다.
이 기사를 읽고 나면 그렇게 할 수 있을 것입니다. 동일한 VS2012 async 키워드를 사용하고 C#5의 기능을 활용하여 "동기적으로 보이지만 비동기적으로 실행되는 메서드"를 작성할 수 있는 작은 인프라 코드를 개발하겠습니다.async와 wait가 매우 좋은 구문 설탕이라는 점을 인정해야 하며, 우리 메서드는 이 변경 사항에 적응하기 위해 더 많은 "AsyncResultcallback" 메서드를 작성해야 합니다. 그리고 최종적으로 VS2012(또는 그 이상)로 업그레이드하면 이것이 문제가 될 것입니다. 사소한 것은 이 메서드를 C# 키워드로 바꾸는 것이 힘든 구조적 재작성보다는 간단한 구문 변경이 될 것입니다.
요약
async/await는 비동기 작업 패턴을 기반으로 하는 키워드입니다. 그러나 C#4에는 async 및 Wait가 없지만 .Net5의 이 작은 새로운 기능은 잠시 동안 사용하지 않거나 직접 구축할 수 있습니다. 주요 작업 유형 아직 사용 가능합니다.
C#5 비동기(async) 메서드에서는 작업을 기다려야 합니다. 이로 인해 스레드가 기다리게 되는 것이 아니라 메서드가 호출자에게 Task를 반환합니다. 호출자는 기다리거나(자체 비동기인 경우) 후속 부분을 연결할 수 있습니다. (작업이나 그 결과에 대해 Wait()를 호출할 수도 있지만 이는 스레드에 연결되므로 그렇게 하지 마십시오.) 대기 중인 작업이 성공적으로 완료되면 비동기 메서드는 중단된 위치에서 계속 실행됩니다.
최종목표
Stephen Covey는 목표의 우선순위를 정하라고 제안했습니다. 이것이 지금 우리가 하는 일입니다. async/await를 사용하여 SLAM(동기식 비동기 메서드)을 구현하는 방법을 보여주는 예제가 이미 많이 있습니다. 그렇다면 이러한 키워드를 사용하지 않고 이 기능을 어떻게 구현합니까? C#5 비동기 예제를 수행하고 이를 C#4에서 구현하는 방법을 살펴보겠습니다. 그런 다음 이러한 코드를 변환하는 일반적인 접근 방식에 대해 논의합니다.다음 예에서는 C#5에서 비동기 읽기 및 쓰기 메서드 Stream.CopyToAsync()를 구현하는 방법을 보여줍니다. 이 메서드는 .NET5에서 구현되지 않는다고 가정합니다.
public static async Task CopyToAsync( this Stream input, Stream output, CancellationToken cancellationToken = default(CancellationToken)) { byte[] buffer = new byte[0x1000]; // 4 KiB while (true) { cancellationToken.ThrowIfCancellationRequested(); int bytesRead = await input.ReadAsync(buffer, 0, buffer.Length); if (bytesRead == 0) break; cancellationToken.ThrowIfCancellationRequested(); await output.WriteAsync(buffer, 0, bytesRead); } }
C#4의 경우 이를 두 부분으로 나눕니다. 하나는 액세스 기능이 동일한 메서드이고, 다른 하나는 매개 변수는 동일하지만 반환 유형이 다른 전용 메서드입니다. 개인 메소드는 반복을 사용하여 동일한 프로세스를 구현하며 그 결과는 일련의 대기 작업(IEnumerable563415696acef448fd82edac97f29f1a)입니다. 시퀀스의 실제 작업은 제네릭이 아니거나 다양한 유형의 제네릭의 조합일 수 있습니다. (다행히 일반 Task8742468051c85b06f0a0af9e3e506b5c 유형은 제네릭이 아닌 Task 유형의 하위 유형입니다.)
public static /*async*/ Task CopyToAsync( this Stream input, Stream output, CancellationToken cancellationToken = default(CancellationToken)) { return CopyToAsyncTasks(input, output, cancellationToken).ToTask(); } private static IEnumerable<Task> CopyToAsyncTasks( Stream input, Stream output, CancellationToken cancellationToken) { byte[] buffer = new byte[0x1000]; // 4 KiB while (true) { cancellationToken.ThrowIfCancellationRequested(); var bytesReadTask = input.ReadAsync(buffer, 0, buffer.Length); yield return bytesReadTask; if (bytesReadTask.Result == 0) break; cancellationToken.ThrowIfCancellationRequested(); yield return output.WriteAsync(buffer, 0, bytesReadTask.Result); } }
异步方法通常以"Async"结尾命名(除非它是事件处理器如startButton_Click)。给迭代器以同样的名字后跟“Tasks”(如startButton_ClickTasks)。如果异步方法返回void值,它仍然会调用ToTask()但不会返回Task。如果异步方法返回Task057dd226e557d51bc4e4efe378b7cc42,那么它就会调用通用的ToTask057dd226e557d51bc4e4efe378b7cc42()扩展方法。对应三种返回类型,异步可替代的方法像下面这样:
public /*async*/ void DoSomethingAsync() { DoSomethingAsyncTasks().ToTask(); } public /*async*/ Task DoSomethingAsync() { return DoSomethingAsyncTasks().ToTask(); } public /*async*/ Task<String> DoSomethingAsync() { return DoSomethingAsyncTasks().ToTask<String>(); }
成对的迭代器方法不会更复杂。当异步方法等待非通用的Task时,迭代器简单的将控制权转给它。当异步方法等待task结果时,迭代器将task保存在一个变量中,转到该方法,之后再使用它的返回值。两种情况在上面的CopyToAsyncTasks()例子里都有显示。
对包含通用resultTask057dd226e557d51bc4e4efe378b7cc42的SLAM,迭代器必须将控制转交给确切的类型。ToTask057dd226e557d51bc4e4efe378b7cc42()将最终的task转换为那种类型以便提取其结果。经常的你的迭代器将计算来自中间task的结果数值,而且仅需要将其打包在Task8742468051c85b06f0a0af9e3e506b5c中。.NET 5为此提供了一个方便的静态方法。而.NET 4没有,所以我们用TaskEx.FromResult8742468051c85b06f0a0af9e3e506b5c(value)来实现它。
最后一件你需要知道的事情是如何处理中间返回的值。一个异步的方法可以从多重嵌套的块中返回;我们的迭代器简单的通过跳转到结尾来模仿它。
// C#5 public async Task<String> DoSomethingAsync() { while (…) { foreach (…) { return "Result"; } } } // C#4; DoSomethingAsync() is necessary but omitted here. private IEnumerable<Task> DoSomethingAsyncTasks() { while (…) { foreach (…) { yield return TaskEx.FromResult("Result"); goto END; } } END: ; }
现在我们知道如何在C#4中写SLAM了,但是只有实现了FromResult8742468051c85b06f0a0af9e3e506b5c()和两个 ToTask()扩展方法才能真正的做到。下面我们开始做吧。
简单的开端
我们将在类System.Threading.Tasks.TaskEx下实现3个方法, 先从简单的那2个方法开始。FromResult()方法先创建了一个TaskCompletionSource(), 然后给它的result赋值,最后返回Task。
public static Task<TResult> FromResult<TResult>(TResult resultValue) { var completionSource = new TaskCompletionSource<TResult>(); completionSource.SetResult(resultValue); return completionSource.Task; }
很显然, 这2个ToTask()方法基本相同, 唯一的区别就是是否给返回对象Task的Result属性赋值. 通常我们不会去写2段相同的代码, 所以我们会用其中的一个方法来实现另一个。 我们经常使用泛型来作为返回结果集,那样我们不用在意返回值同时也可以避免在最后进行类型转换。 接下来我们先实现那个没有用泛型的方法。
private abstract class VoidResult { } public static Task ToTask(this IEnumerable<Task> tasks) { return ToTask<VoidResult>(tasks); }
目前为止我们就剩下一个 ToTask8742468051c85b06f0a0af9e3e506b5c()方法还没有实现。
第一次天真的尝试
对于我们第一次尝试实现的方法,我们将枚举每个任务的Wait()来完成,然后将最终的任务做为结果(如果合适的话)。当然,我们不想占用当前线程,我们将另一个线程来执行循环该任务。
// BAD CODE ! public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) { var tcs = new TaskCompletionSource<TResult>(); Task.Factory.StartNew(() => { Task last = null; try { foreach (var task in tasks) { last = task; task.Wait(); } // Set the result from the last task returned, unless no result is requested. tcs.SetResult( last == null || typeof(TResult) == typeof(VoidResult) ? default(TResult) : ((Task<TResult>) last).Result); } catch (AggregateException aggrEx) { // If task.Wait() threw an exception it will be wrapped in an Aggregate; unwrap it. if (aggrEx.InnerExceptions.Count != 1) tcs.SetException(aggrEx); else if (aggrEx.InnerException is OperationCanceledException) tcs.SetCanceled(); else tcs.SetException(aggrEx.InnerException); } catch (OperationCanceledException cancEx) { tcs.SetCanceled(); } catch (Exception ex) { tcs.SetException(ex); } }); return tcs.Task; }
这里有一些好东西,事实上它真的有用,只要不触及用户界面:
它准确的返回了一个TaskCompletionSource的Task,并且通过源代码设置了完成状态。
但这里有些主要的问题。最严重的是:
是需要想点办法的时候了!
连续循环
最大的想法是直接从迭代器中获取其所产生的第一个任务。 我们创建了一个延续,使其在完成时能够检查任务的状态并且(如果成功的话)能接收下一个任务和创建另一个延续直至其结束。(如果没有,即迭代器没有需要完成的需求。)
// 很牛逼,但是我们还没有。 public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) { var taskScheduler = SynchronizationContext.Current == null ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext(); var tcs = new TaskCompletionSource<TResult>(); var taskEnumerator = tasks.GetEnumerator(); if (!taskEnumerator.MoveNext()) { tcs.SetResult(default(TResult)); return tcs.Task; } taskEnumerator.Current.ContinueWith( t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t), taskScheduler); return tcs.Task; } private static void ToTaskDoOneStep<TResult>( IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler, TaskCompletionSource<TResult> tcs, Task completedTask) { var status = completedTask.Status; if (status == TaskStatus.Canceled) { tcs.SetCanceled(); } else if (status == TaskStatus.Faulted) { tcs.SetException(completedTask.Exception); } else if (!taskEnumerator.MoveNext()) { // 设置最后任务返回的结果,直至无需结果为止。 tcs.SetResult( typeof(TResult) == typeof(VoidResult) ? default(TResult) : ((Task<TResult>) completedTask).Result); } else { taskEnumerator.Current.ContinueWith( t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t), taskScheduler); } }
这里有许多值得分享的:
我们的后续部分(continuations)使用涉及SynchronizationContext的TaskScheduler,如果有的话。这使得我们的迭代器在UI线程初始化以后,立刻或者在一个继续点被调用,去访问UI控件。
进程不中断的运行,因此没有线程挂起等待!顺便说一下,在ToTaskDoOneStep()中对自身的调用不是递归调用;它是在taskEnumerator.Currenttask结束后调用的匿名函数,当前活动在调用ContinueWith()几乎立刻退出,它完全独立于后续部分。
此外,我们在继续点中验证每个嵌套task的状态,不是检查一个预测值。
然而,这儿至少有一个大问题和一些小一点的问题。
如果迭代器抛出一个未处理异常,或者抛出OperationCanceledException而取消,我们没有处理它或设置主task的状态。这是我们以前曾经做过的但在此版本丢失了。
为了修复问题1,我们不得不在两个方法中调用MoveNext()的地方引入同样的异常处理机制。即使是现在,两个方法中都有一样的后续部分建立。我们违背了“不要重复你自己”的信条。
如果异步方法被期望给出一个结果,但是迭代器没有提供就退出了会怎么样呢?或者它最后的task是错误的类型呢?第一种情形下,我们默默返回默认的结果类型;第二种情形,我们抛出一个未处理的InvalidCastException,主task永远不会到达结束状态!我们的程序将永久的挂起。
最后,如果一个嵌套的task取消或者发生错误呢?我们设置主task状态,再也不会调用迭代器。可能是在一个using块,或带有finally的try块的内部,并且有一些清理要做。我们应当遵守过程在中断的时候使它结束,而不要等垃圾收集器去做这些。我们怎么做到呢?当然通过一个后续部分!
为了解决这些问题,我们从ToTask()中移走MoveNext()调用,取而代之一个对ToTaskDoOneStep()的初始化的同步调用。然后我们将在一个提防增加合适的异常处理。
最终版本
这里是ToTask8742468051c85b06f0a0af9e3e506b5c()的最终实现. 它用一个TaskCompletionSource返回主task,永远不会引起线程等待,如果有的话还会涉及SynchronizationContext,由迭代器处理异常,直接传播嵌套task的结束(而不是AggregateException),合适的时候向主task返回一个值,当期望一个结果而SLAM迭代器没有以正确的genericTask8742468051c85b06f0a0af9e3e506b5c类型结束时,用一个友好的异常报错。
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) { var taskScheduler = SynchronizationContext.Current == null ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext(); var taskEnumerator = tasks.GetEnumerator(); var completionSource = new TaskCompletionSource<TResult>(); // Clean up the enumerator when the task completes. completionSource.Task.ContinueWith(t => taskEnumerator.Dispose(), taskScheduler); ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, null); return completionSource.Task; } private static void ToTaskDoOneStep<TResult>( IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler, TaskCompletionSource<TResult> completionSource, Task completedTask) { // Check status of previous nested task (if any), and stop if Canceled or Faulted. TaskStatus status; if (completedTask == null) { // This is the first task from the iterator; skip status check. } else if ((status = completedTask.Status) == TaskStatus.Canceled) { completionSource.SetCanceled(); return; } else if (status == TaskStatus.Faulted) { completionSource.SetException(completedTask.Exception); return; } // Find the next Task in the iterator; handle cancellation and other exceptions. Boolean haveMore; try { haveMore = taskEnumerator.MoveNext(); } catch (OperationCanceledException cancExc) { completionSource.SetCanceled(); return; } catch (Exception exc) { completionSource.SetException(exc); return; } if (!haveMore) { // No more tasks; set the result (if any) from the last completed task (if any). // We know it's not Canceled or Faulted because we checked at the start of this method. if (typeof(TResult) == typeof(VoidResult)) { // No result completionSource.SetResult(default(TResult)); } else if (!(completedTask is Task<TResult>)) { // Wrong result completionSource.SetException(new InvalidOperationException( "Asynchronous iterator " + taskEnumerator + " requires a final result task of type " + typeof(Task<TResult>).FullName + (completedTask == null ? ", but none was provided." : "; the actual task type was " + completedTask.GetType().FullName))); } else { completionSource.SetResult(((Task<TResult>) completedTask).Result); } } else { // When the nested task completes, continue by performing this function again. taskEnumerator.Current.ContinueWith( nextTask => ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, nextTask), taskScheduler); } }
瞧! 现在你会在Visual Studio 2010中用没有async和await的 C#4 (或 VB10)写SLAMs(看起来同步的方法,但异步执行)。
有趣的地方
直到最后那个版本,我一直在给ToTask()传递一个CancellationTokenUp,并且将它传播进后续部分的ToTaskDoOneStep()。(这与本文毫不相关,所以我去掉了它们。你可以在样例代码中看注释掉的痕迹。)这有两个原因。第一,处理OperationCanceledException时,我会检查它的CancellationToken以确认它与这个操作是匹配的。如果不是,它将用一个错误来代替取消动作。虽然技术上没错,但不幸的是取消令牌可能会混淆,在其传递给ToTask()调用和后续部分之间的无关信息使它不值得。(如果你们这些 Task专家能给我一个注释里的可确认发生的好的用例,我会重新考虑)
第二个原因是我会检查令牌是否取消,在每次MoveNext()调用迭代器之前,立即取消主task时,和退出进程的时候。这使你可以不经过迭代器检查令牌,具有取消的行为。我不认为这是要做的正确事情(因为对一个异步进程在yield return处取消是不合适的)——更可能是它完全在迭代器进程控制之下——但我想试试。它无法工作。我发现在某些情形,task会取消而却后续部分不会触发。请看样例代码;我靠继续执行来恢复按钮可用,但它没有发生因此按钮在进程结束之后仍不可用。我在样例代码中留下了注释掉的取消检测;你可以将取消令牌的方法参数放回去并测试它。(如果你们Task专家能解释为什么会是这种情形,我将很感激!)