ホームページ  >  記事  >  ウェブフロントエンド  >  C# でイテレーターを使用して待機中のタスクを処理する_基礎知識

C# でイテレーターを使用して待機中のタスクを処理する_基礎知識

WBOY
WBOYオリジナル
2016-05-16 15:50:31955ブラウズ

はじめに

C# 5 について、async キーワードと await キーワード、およびそれらが非同期プログラミングを簡素化する方法について読んだことがあるかもしれません。残念ながら、VS2010 のアップグレードからわずか 2 年が経過しても、VS2010 以降はまだ VS2012 にアップグレードする準備ができていません。これは C# 4 で利用可能であり、「同期的に見えて非同期で実行されるメソッドを VS 2010 で記述できたら、コードはもっときれいになるだろう

」と考えているかもしれません。

この記事を読めば、それができるようになります。同じ VS2012 async キーワードを使用して、C#5 の機能を活用しながら、「同期的に見えるが、非同期で実行されるメソッド」を作成できる小さなインフラストラクチャ コードを開発します。

async と await は非常に優れた構文であることを認めなければなりません。この変更に適応するには、メソッドでさらに多くの「AsyncResultcallback」メソッドを記述する必要があります。そして、最終的に VS2012 (またはそれ以降) にアップグレードするときに、これが問題になります。些細なことですが、このメソッドを C# キーワードに置き換えるのは、骨の折れる構造の書き換えではなく、単純な構文の変更だけです。

概要

async/await は、非同期タスク パターンに基づくキーワードです。 こちら にはすでに完全なドキュメントがあるため、ここでは説明しません。しかし、TAPは非常にハンサムであることを指摘しなければなりません。これを使用すると、将来のある時点で完了する小さな作業単位 (タスク) を多数作成できます。タスクは他の (ネストされた) タスクを開始したり、後でのみ開始されるフォローアップ タスクを作成したりできます。前のタスクの完了。先行タスクと後続タスクは、1 対多または多対 1 の関係でリンクできます。埋め込みタスクが完了すると、親タスクをスレッド (重量のあるリソース) に結び付ける必要はありません。タスクを実行するときにスレッドのタイミングを心配する必要はなくなり、いくつかの小さなプロンプトを作成するだけで、フレームワークがこれらの処理を自動的に実行します。プログラムが実行を開始すると、すべてのタスクは海に流れ込む小川のようにそれぞれの目的地に到達し、パチンコの小さな鉄球のように跳ね返って相互作用します。

ただし、C#4 には async と await がありませんが、欠けているのは .Net5 のこの小さな新機能だけであり、これらの新機能をしばらく回避するか、自分で構築することができます。キー タスクの種類 まだ利用可能です。


C#5 非同期 (async) メソッドでは、タスクを待つ必要があります。これによってスレッドは待機することはなく、むしろメソッドが呼び出し元にタスクを返し、呼び出し元は待機したり (それ自体が非同期の場合)、フォローアップ パーツを追加したりできます。 (タスクまたはその結果に対して Wait() を呼び出すこともできますが、これはスレッドに結合されるため、これは避けてください。) 待機中のタスクが正常に完了すると、非同期メソッドは中断したところから実行を継続します。

おそらく、C#5 コンパイラーがその非同期メソッドを、ステート マシンを実装する生成された入れ子クラスに書き換えることをご存知かと思います。 C# には、(2.0 以降では) イテレーター (yield return メソッド) というもう 1 つの機能があります。ここでのアプローチは、プロセス全体で待機ステップを持つ一連のタスクを返すイテレーター メソッドを使用して、C# 4 でステート マシンを構築することです。イテレータから返されたタスクの列挙を受け取り、シーケンス全体の完了を表し、最終結果 (存在する場合) を提供するオーバーロードされたタスクを返すメソッドを作成できます。

最終目標

スティーブン・コヴィーは、目標を優先することを提案しました。これが私たちが今行っていることです。 async/await を使用して SLAM (同期のように見える非同期メソッド) を実装する方法を示す例がすでにたくさんあります。では、これらのキーワードを使用せずにこの関数を実装するにはどうすればよいでしょうか。 C#5 の非同期の例を実行し、それを C#4 で実装する方法を見てみましょう。次に、これらのコードを変換する一般的なアプローチについて説明します。

次の例は、C#5 で非同期読み取りおよび書き込みメソッド Stream.CopyToAsync() を実装する方法を示しています。このメソッドは .NET5 では実装されていないと仮定します。

public static async Task CopyToAsync(
  this Stream input, Stream output,
  CancellationToken cancellationToken = default(CancellationToken))
{
  byte[] buffer = new byte[0x1000];  // 4 KiB
  while (true) {
    cancellationToken.ThrowIfCancellationRequested();
    int bytesRead = await input.ReadAsync(buffer, 0, buffer.Length);
    if (bytesRead == 0) break;
 
    cancellationToken.ThrowIfCancellationRequested();
    await output.WriteAsync(buffer, 0, bytesRead);
  }
}


C#4 の場合、これを 2 つの部分に分割します。1 つは同じアクセス機能を持つメソッドで、もう 1 つは同じパラメーターを持つが戻り値の型が異なるプライベート メソッドです。プライベート メソッドは反復を使用して同じプロセスを実装し、その結果は一連の待機タスク (IEnumerable) になります。シーケンス内の実際のタスクは、非ジェネリック、またはさまざまなタイプのジェネリックの任意の組み合わせにすることができます。 (幸いなことに、汎用 Task8742468051c85b06f0a0af9e3e506b5c タイプは、非汎用 Task タイプのサブタイプです)

同じアクセス機能 (パブリック) メソッドは、対応する非同期メソッドと同じ型 (void、Task、または汎用 Task8742468051c85b06f0a0af9e3e506b5c) を返します。拡張メソッドを使用してプライベート イテレータを呼び出し、それを Task または Task8742468051c85b06f0a0af9e3e506b5c に変換します。

public static /*async*/ Task CopyToAsync(
  this Stream input, Stream output,
  CancellationToken cancellationToken = default(CancellationToken))
{
  return CopyToAsyncTasks(input, output, cancellationToken).ToTask();
}
private static IEnumerable<Task> CopyToAsyncTasks(
  Stream input, Stream output,
  CancellationToken cancellationToken)
{
  byte[] buffer = new byte[0x1000];  // 4 KiB
  while (true) {
    cancellationToken.ThrowIfCancellationRequested();
    var bytesReadTask = input.ReadAsync(buffer, 0, buffer.Length);
    yield return bytesReadTask;
    if (bytesReadTask.Result == 0) break;
 
    cancellationToken.ThrowIfCancellationRequested();
    yield return output.WriteAsync(buffer, 0, bytesReadTask.Result);
  }
}

异步方法通常以"Async"结尾命名(除非它是事件处理器如startButton_Click)。给迭代器以同样的名字后跟“Tasks”(如startButton_ClickTasks)。如果异步方法返回void值,它仍然会调用ToTask()但不会返回Task。如果异步方法返回Task057dd226e557d51bc4e4efe378b7cc42,那么它就会调用通用的ToTask057dd226e557d51bc4e4efe378b7cc42()扩展方法。对应三种返回类型,异步可替代的方法像下面这样:
 

public /*async*/ void DoSomethingAsync() {
  DoSomethingAsyncTasks().ToTask();
}
public /*async*/ Task DoSomethingAsync() {
  return DoSomethingAsyncTasks().ToTask();
}
public /*async*/ Task<String> DoSomethingAsync() {
  return DoSomethingAsyncTasks().ToTask<String>();
}

成对的迭代器方法不会更复杂。当异步方法等待非通用的Task时,迭代器简单的将控制权转给它。当异步方法等待task结果时,迭代器将task保存在一个变量中,转到该方法,之后再使用它的返回值。两种情况在上面的CopyToAsyncTasks()例子里都有显示。

对包含通用resultTask057dd226e557d51bc4e4efe378b7cc42的SLAM,迭代器必须将控制转交给确切的类型。ToTask057dd226e557d51bc4e4efe378b7cc42()将最终的task转换为那种类型以便提取其结果。经常的你的迭代器将计算来自中间task的结果数值,而且仅需要将其打包在Task8742468051c85b06f0a0af9e3e506b5c中。.NET 5为此提供了一个方便的静态方法。而.NET 4没有,所以我们用TaskEx.FromResult8742468051c85b06f0a0af9e3e506b5c(value)来实现它。

最后一件你需要知道的事情是如何处理中间返回的值。一个异步的方法可以从多重嵌套的块中返回;我们的迭代器简单的通过跳转到结尾来模仿它。

 

// C#5
public async Task<String> DoSomethingAsync() {
  while (…) {
    foreach (…) {
      return "Result";
    }
  }
}
 
// C#4; DoSomethingAsync() is necessary but omitted here.
private IEnumerable<Task> DoSomethingAsyncTasks() {
  while (…) {
    foreach (…) {
      yield return TaskEx.FromResult("Result");
      goto END;
    }
  }
END: ;
}

现在我们知道如何在C#4中写SLAM了,但是只有实现了FromResult8742468051c85b06f0a0af9e3e506b5c()和两个 ToTask()扩展方法才能真正的做到。下面我们开始做吧。

简单的开端

我们将在类System.Threading.Tasks.TaskEx下实现3个方法, 先从简单的那2个方法开始。FromResult()方法先创建了一个TaskCompletionSource(), 然后给它的result赋值,最后返回Task。
 

public static Task<TResult> FromResult<TResult>(TResult resultValue) {
  var completionSource = new TaskCompletionSource<TResult>();
  completionSource.SetResult(resultValue);
  return completionSource.Task;
}

很显然, 这2个ToTask()方法基本相同, 唯一的区别就是是否给返回对象Task的Result属性赋值. 通常我们不会去写2段相同的代码, 所以我们会用其中的一个方法来实现另一个。 我们经常使用泛型来作为返回结果集,那样我们不用在意返回值同时也可以避免在最后进行类型转换。 接下来我们先实现那个没有用泛型的方法。
 

private abstract class VoidResult { }
 
public static Task ToTask(this IEnumerable<Task> tasks) {
  return ToTask<VoidResult>(tasks);
}

目前为止我们就剩下一个 ToTask8742468051c85b06f0a0af9e3e506b5c()方法还没有实现。

第一次天真的尝试

对于我们第一次尝试实现的方法,我们将枚举每个任务的Wait()来完成,然后将最终的任务做为结果(如果合适的话)。当然,我们不想占用当前线程,我们将另一个线程来执行循环该任务。
 

// BAD CODE !
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks)
{
  var tcs = new TaskCompletionSource<TResult>();
  Task.Factory.StartNew(() => {
    Task last = null;
    try {
      foreach (var task in tasks) {
        last = task;
        task.Wait();
      }
 
      // Set the result from the last task returned, unless no result is requested.
      tcs.SetResult(
        last == null || typeof(TResult) == typeof(VoidResult)
          &#63; default(TResult) : ((Task<TResult>) last).Result);
 
    } catch (AggregateException aggrEx) {
      // If task.Wait() threw an exception it will be wrapped in an Aggregate; unwrap it.
      if (aggrEx.InnerExceptions.Count != 1) tcs.SetException(aggrEx);
      else if (aggrEx.InnerException is OperationCanceledException) tcs.SetCanceled();
      else tcs.SetException(aggrEx.InnerException);
    } catch (OperationCanceledException cancEx) {
      tcs.SetCanceled();
    } catch (Exception ex) {
      tcs.SetException(ex);
    }
  });
  return tcs.Task;
}


这里有一些好东西,事实上它真的有用,只要不触及用户界面:
它准确的返回了一个TaskCompletionSource的Task,并且通过源代码设置了完成状态。

  •     它显示了我们怎么通过迭代器的最后一个任务设置task的最终Result,同时避免可能没有结果的情况。
  •     它从迭代器中捕获异常并设置Canceled或Faulted状态. 它也传播枚举的task状态 (这里是通过Wait(),该方法可能抛出一个包装了cancellation或fault的异常的集合).

但这里有些主要的问题。最严重的是:

  •     由于迭代器需要实现“异步态的”的诺言,当它从一个UI线程初始化以后,迭代器的方法将能访问UI控件。你能发现这里的foreach循环都是运行在后台;从那个时刻开始不要触摸UI!这种方法没有顾及SynchronizationContext。
  •      在UI之外我们也有麻烦。我们可能想制造大量大量的由SLAM实现的并行运行的Tasks。但是看看循环中的Wait()!当等待一个嵌套task时,可能远程需要一个很长的时间完成,我们会挂起一个线程。我们面临线程池的线程资源枯竭的情况。
  •     这种解包Aggregate异常的方法是不太自然的。我们需要捕获并传播它的完成状态而不抛出异常。
  •     有时SLAM可以立刻决定它的完成状态。那种情形下,C#5的async可以异步并且有效的操作。这里我们总是计划了一个后台task,因此失去了那种可能。

是需要想点办法的时候了!

连续循环

最大的想法是直接从迭代器中获取其所产生的第一个任务。 我们创建了一个延续,使其在完成时能够检查任务的状态并且(如果成功的话)能接收下一个任务和创建另一个延续直至其结束。(如果没有,即迭代器没有需要完成的需求。)

 

// 很牛逼,但是我们还没有。
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks)
{
  var taskScheduler =
    SynchronizationContext.Current == null
      &#63; TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();
  var tcs = new TaskCompletionSource<TResult>();
  var taskEnumerator = tasks.GetEnumerator();
  if (!taskEnumerator.MoveNext()) {
    tcs.SetResult(default(TResult));
    return tcs.Task;
  }
 
  taskEnumerator.Current.ContinueWith(
    t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),
    taskScheduler);
  return tcs.Task;
}
private static void ToTaskDoOneStep<TResult>(
  IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,
  TaskCompletionSource<TResult> tcs, Task completedTask)
{
  var status = completedTask.Status;
  if (status == TaskStatus.Canceled) {
    tcs.SetCanceled();
 
  } else if (status == TaskStatus.Faulted) {
    tcs.SetException(completedTask.Exception);
 
  } else if (!taskEnumerator.MoveNext()) {
    // 设置最后任务返回的结果,直至无需结果为止。
    tcs.SetResult(
      typeof(TResult) == typeof(VoidResult)
        &#63; default(TResult) : ((Task<TResult>) completedTask).Result);
 
  } else {
    taskEnumerator.Current.ContinueWith(
      t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),
      taskScheduler);
  }
}

这里有许多值得分享的:


    我们的后续部分(continuations)使用涉及SynchronizationContext的TaskScheduler,如果有的话。这使得我们的迭代器在UI线程初始化以后,立刻或者在一个继续点被调用,去访问UI控件。
    进程不中断的运行,因此没有线程挂起等待!顺便说一下,在ToTaskDoOneStep()中对自身的调用不是递归调用;它是在taskEnumerator.Currenttask结束后调用的匿名函数,当前活动在调用ContinueWith()几乎立刻退出,它完全独立于后续部分。
    此外,我们在继续点中验证每个嵌套task的状态,不是检查一个预测值。


然而,这儿至少有一个大问题和一些小一点的问题。

    如果迭代器抛出一个未处理异常,或者抛出OperationCanceledException而取消,我们没有处理它或设置主task的状态。这是我们以前曾经做过的但在此版本丢失了。
    为了修复问题1,我们不得不在两个方法中调用MoveNext()的地方引入同样的异常处理机制。即使是现在,两个方法中都有一样的后续部分建立。我们违背了“不要重复你自己”的信条。

    如果异步方法被期望给出一个结果,但是迭代器没有提供就退出了会怎么样呢?或者它最后的task是错误的类型呢?第一种情形下,我们默默返回默认的结果类型;第二种情形,我们抛出一个未处理的InvalidCastException,主task永远不会到达结束状态!我们的程序将永久的挂起。

    最后,如果一个嵌套的task取消或者发生错误呢?我们设置主task状态,再也不会调用迭代器。可能是在一个using块,或带有finally的try块的内部,并且有一些清理要做。我们应当遵守过程在中断的时候使它结束,而不要等垃圾收集器去做这些。我们怎么做到呢?当然通过一个后续部分!

为了解决这些问题,我们从ToTask()中移走MoveNext()调用,取而代之一个对ToTaskDoOneStep()的初始化的同步调用。然后我们将在一个提防增加合适的异常处理。

最终版本

这里是ToTask8742468051c85b06f0a0af9e3e506b5c()的最终实现. 它用一个TaskCompletionSource返回主task,永远不会引起线程等待,如果有的话还会涉及SynchronizationContext,由迭代器处理异常,直接传播嵌套task的结束(而不是AggregateException),合适的时候向主task返回一个值,当期望一个结果而SLAM迭代器没有以正确的genericTask8742468051c85b06f0a0af9e3e506b5c类型结束时,用一个友好的异常报错。
 

public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) {
  var taskScheduler =
    SynchronizationContext.Current == null
      &#63; TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();
  var taskEnumerator = tasks.GetEnumerator();
  var completionSource = new TaskCompletionSource<TResult>();
 
  // Clean up the enumerator when the task completes.
  completionSource.Task.ContinueWith(t => taskEnumerator.Dispose(), taskScheduler);
 
  ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, null);
  return completionSource.Task;
}
 
private static void ToTaskDoOneStep<TResult>(
  IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,
  TaskCompletionSource<TResult> completionSource, Task completedTask)
{
  // Check status of previous nested task (if any), and stop if Canceled or Faulted.
  TaskStatus status;
  if (completedTask == null) {
    // This is the first task from the iterator; skip status check.
  } else if ((status = completedTask.Status) == TaskStatus.Canceled) {
    completionSource.SetCanceled();
    return;
  } else if (status == TaskStatus.Faulted) {
    completionSource.SetException(completedTask.Exception);
    return;
  }
 
  // Find the next Task in the iterator; handle cancellation and other exceptions.
  Boolean haveMore;
  try {
    haveMore = taskEnumerator.MoveNext();
 
  } catch (OperationCanceledException cancExc) {
    completionSource.SetCanceled();
    return;
  } catch (Exception exc) {
    completionSource.SetException(exc);
    return;
  }
 
  if (!haveMore) {
    // No more tasks; set the result (if any) from the last completed task (if any).
    // We know it's not Canceled or Faulted because we checked at the start of this method.
    if (typeof(TResult) == typeof(VoidResult)) {    // No result
      completionSource.SetResult(default(TResult));
 
    } else if (!(completedTask is Task<TResult>)) {   // Wrong result
      completionSource.SetException(new InvalidOperationException(
        "Asynchronous iterator " + taskEnumerator +
          " requires a final result task of type " + typeof(Task<TResult>).FullName +
          (completedTask == null &#63; ", but none was provided." :
            "; the actual task type was " + completedTask.GetType().FullName)));
 
    } else {
      completionSource.SetResult(((Task<TResult>) completedTask).Result);
    }
 
  } else {
    // When the nested task completes, continue by performing this function again.
    taskEnumerator.Current.ContinueWith(
      nextTask => ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, nextTask),
      taskScheduler);
  }
}

瞧! 现在你会在Visual Studio 2010中用没有async和await的 C#4 (或 VB10)写SLAMs(看起来同步的方法,但异步执行)。

有趣的地方

直到最后那个版本,我一直在给ToTask()传递一个CancellationTokenUp,并且将它传播进后续部分的ToTaskDoOneStep()。(这与本文毫不相关,所以我去掉了它们。你可以在样例代码中看注释掉的痕迹。)这有两个原因。第一,处理OperationCanceledException时,我会检查它的CancellationToken以确认它与这个操作是匹配的。如果不是,它将用一个错误来代替取消动作。虽然技术上没错,但不幸的是取消令牌可能会混淆,在其传递给ToTask()调用和后续部分之间的无关信息使它不值得。(如果你们这些 Task专家能给我一个注释里的可确认发生的好的用例,我会重新考虑)

第二个原因是我会检查令牌是否取消,在每次MoveNext()调用迭代器之前,立即取消主task时,和退出进程的时候。这使你可以不经过迭代器检查令牌,具有取消的行为。我不认为这是要做的正确事情(因为对一个异步进程在yield return处取消是不合适的)——更可能是它完全在迭代器进程控制之下——但我想试试。它无法工作。我发现在某些情形,task会取消而却后续部分不会触发。请看样例代码;我靠继续执行来恢复按钮可用,但它没有发生因此按钮在进程结束之后仍不可用。我在样例代码中留下了注释掉的取消检测;你可以将取消令牌的方法参数放回去并测试它。(如果你们Task专家能解释为什么会是这种情形,我将很感激!)

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。