ホームページ  >  記事  >  バックエンド開発  >  .NET マルチスレッド プログラミング - 同時コレクション

.NET マルチスレッド プログラミング - 同時コレクション

黄舟
黄舟オリジナル
2017-02-06 14:29:511222ブラウズ

同時コレクション

1 なぜ同時コレクションを使用するのですか?

主な理由は次のとおりです:

  • System.Collections および System.Collections.Generic 名前空間で提供される従来のリスト、コレクション、および配列は、同期メカニズムがなければ、受け入れには適していません。要素を追加および削除する同時命令。

  • 上記のクラシック コレクションを同時コードで使用するには、複雑な同期管理が必要であり、使用するのが非常に不便です。

  • 複雑な同期メカニズムを使用すると、パフォーマンスが大幅に低下します。

  • NET Framework 4 によって提供される新しいコレクションにより、ロックの使用が必要な回数が最小限に抑えられます。これらの新しいコレクションは、コンペア アンド スワップ (CAS) 命令とメモリ バリアを使用して、相互に排他的な重量ロックの使用を回避します。これによりパフォーマンスが保証されます。

注: 同時コレクションはクラシック コレクションに比べてオーバーヘッドが大きいため、シリアル コードで同時コレクションを使用しても意味がありません。追加のオーバーヘッドが追加されるだけで、クラシック コレクションにアクセスするよりも実行速度が遅くなります。

2 同時コレクション

1) ConcurrentQueue: スレッドセーフな先入れ先出し (FIFO) コレクション

Main メソッド:

  • Enqueue(T item); オブジェクトをコレクションの最後に追加します。

  • TryDequeue(out T result); コレクションの先頭でオブジェクトを削除して返します。戻り値は、操作が成功したかどうかを示します。

  • TryPeek(out T result); コレクションの先頭にあるオブジェクトを削除せずに返してみます。戻り値は、操作が成功したかどうかを示します。

説明:

  • ConcurrentQueue は完全にロックフリーですが、CAS 操作が失敗してリソースの競合に直面すると、スピンして操作を再試行することがあります。

  • ConcurrentQueue は FIFO コレクションです。開始と終了の順序に関係がない状況では、ConcurrentQueue を使用しないようにしてください。

2) ConcurrentStack: スレッドセーフな後入れ先出し (LIFO) コレクション

主なメソッドとプロパティ:

  • Push(T item); オブジェクトをコレクションの先頭に挿入します。

  • TryPop(out T result); コレクションの先頭にあるオブジェクトをポップして返します。戻り値は、操作が成功したかどうかを示します。

  • TryPeek(out T result); コレクションの先頭にあるオブジェクトを削除せずに返してみます。戻り値は、操作が成功したかどうかを示します。

  • IsEmpty { get; } コレクションが空かどうかを示します。

  • PushRange(T[] items); コレクションの先頭に複数のオブジェクトを挿入します。

  • TryPopRange(T[] items); 複数の要素を先頭にポップし、返される結果はポップされた要素の数です。

説明:

  • ConcurrentQueue と同様に、ConcurrentStack は完全にロックフリーですが、CAS 操作が失敗してリソースの競合に直面すると、スピンして操作を再試行することがあります。

  • コレクションに要素が含まれているかどうかを取得するには、Count プロパティがゼロより大きいかどうかを判断する代わりに、IsEmpty プロパティを使用します。 Count の呼び出しは、IsEmpty の呼び出しよりもコストがかかります。

  • PushRange(T[] items) および TryPopRange(T[] items) を使用する場合は、バッファリングによって生じる余分なオーバーヘッドと余分なメモリ消費に注意してください。

3) ConcurrentBag: 反復可能な要素を持つ順序付けされていないコレクション

主なメソッドと属性:

  • TryPeek(out T result); 戻り値は、オブジェクトを削除せずにコレクションからオブジェクトを返そうとします。オブジェクトは正常に取得されました。

  • TryTake(out T result); コレクションからオブジェクトを返し、そのオブジェクトを削除しようとします。戻り値は、オブジェクトが正常に取得されたかどうかを示します。

  • Add(T item); オブジェクトをコレクションに追加します。

  • IsEmpty { get; } 説明は ConcurrentStack と同じです

説明:

  • ConcurrentBag はコレクションにアクセスする各スレッドのローカル キューを維持し、可能な場合はロックでアクセスします。フリーマナーローカルキュー。

  • ConcurrentBag は、同じスレッド内で要素を追加および削除する場合に非常に効率的です。

  • ConcurrentBag はロックを必要とする場合があるため、プロデューサー スレッドとコンシューマー スレッドが完全に分離されているシナリオでは非常に非効率的です。

  • ConcurrentBag による IsEmpty の呼び出しは、この順序付けされていないグループのすべてのロックを一時的に取得する必要があるため、非常にコストがかかります。

4) BlockingCollection:

System.Collections.Concurrent.IProducerConsumerCollectionを実装するスレッドセーフなコレクションで、ブロック機能と制限機能を提供します

主なメソッドとプロパティ:

  • BlockingCollection(intboundedCapacity); boundedCapacity はコレクションの制限サイズを表します。

  • CompleteAdding(); BlockingCollection インスタンスを追加を受け付けなくなったものとしてマークします。

  • IsCompleted { get; } このコレクションが完了済みで空であるとマークされているかどうか。

  • GetConsumingEnumerable(); コレクションから削除した要素を返します

  • Add(T item); 要素をコレクションに追加します。

  • TryTake(T item, int millisecondsTimeout, CancelToken cancelToken);

説明:

  • BlockingCollection() コンストラクターを使用して BlockingCollection をインスタンス化します。これは、boundedCapacity が設定されていないことを意味し、boundedCapacity はデフォルト値です: int .MaxValue 。

  • Bound: BlockingCollection(intboundedCapacity) を使用して、boundedCapacity の値を設定します。コレクション容量がこの値に達すると、要素が削除されるまで、BlockingCollection に要素を追加するスレッドはブロックされます。

境界関数はメモリ内のコレクションの最大サイズを制御できます。これは、多数の要素を処理する必要がある場合に非常に便利です。

  • デフォルトでは、BlockingCollection は ConcurrentQueue をカプセル化します。コンストラクターで IProducerConsumerCollection インターフェイスを実装する同時コレクション (ConcurrentStack や ConcurrentBag など) を指定できます。

  • このコレクションの使用には無限に待機するリスクが含まれるため、TryTake を使用する方が便利です。TryTake は、指定された時間内に項目をコレクションから削除できる場合は true、それ以外の場合は false です。 。

5) ConcurrentDictionary: 複数のスレッドが同時にアクセスできる、キーと値のペアのスレッドセーフなコレクション。

メインメソッド

AddOrUpdate(TKey key, TValue addValue, Func updateValueFactory); 指定されたキーがまだ存在しない場合は、キーと値のペアを辞書に追加します。 、次にディクショナリ内のキーと値のペアを更新します。

  • GetOrAdd(TKey key, TValue value); 指定されたキーがまだ存在しない場合は、キーと値のペアを辞書に追加します。

  • TryRemove(TKey key, out TValue value);辞書から削除して、指定されたキーを持つ値を返してみます。

  • TryUpdate(TKey key, TValue newValue, TValue CompareValue); 指定されたキーの既存の値と指定された値を比較し、等しい場合は、キーを 3 番目の値で更新します。

注:

  • ConcurrentDictionary は、読み取り操作に対して完全にロックフリーです。 ConcurrentDictionary は、複数のタスクまたはスレッドが要素を追加したりデータを変更したりするときに、きめ細かいロックを使用します。詳細なロックを使用すると、辞書全体ではなく、本当にロックする必要がある部分のみがロックされます。

6) IProducerConsumerCollection: プロデューサー/コンシューマーがスレッドセーフなコレクションを操作するためのメソッドを定義します。 このインターフェイスは、System.Collections.Concurrent.BlockingCollection などのより高いレベルの抽象化が基礎となるストレージ メカニズムとしてコレクションを使用できるように、(プロデューサー/コンシューマー コレクション用の) 統一された表現を提供します。

3. 一般的に使用されるパターン

1) 並列プロデューサー-コンシューマー パターン

定義:

このパターンでは、プロデューサーとコンシューマーは 2 種類のオブジェクト モデルであり、コンシューマーはプロデューサーの結果に依存し、プロデューサーは結果の生成中に依存します。 、消費者は結果を消費します。

.NET マルチスレッド プログラミング - 同時コレクション

図 1 並列プロデューサー/コンシューマー パターン

説明:

  • 同時コレクションは、このパターンのオブジェクトの並列操作をサポートしているため、このパターンでの使用に非常に適しています。

  • 同時コレクションを使用しない場合は、同期メカニズムを追加する必要があります。これにより、プログラムがより複雑になり、保守と理解が難しくなり、パフォーマンスが大幅に低下します。

  • 上の図は、生産者-消費者モデルの概略図です。縦軸は、生成者と消費者が同じタイムライン上にあるわけではありませんが、重なっています。これは、生成者が生成することを示すことを目的としています。作成者だけがジェネレータによって生成されたデータを実際に使用します。

2) パイプライン パターン

定義:

パイプラインは複数のステージで構成され、各ステージは一連のプロデューサーとコンシューマーで構成されます。一般に、前のステージは後のステージのジェネレーターであり、隣接する 2 つのステージ間のバッファー キューに依存して、各ステージは同時に実行できます。

.NET マルチスレッド プログラミング - 同時コレクション

図 2 並列パイプライン モード

説明:

  • BlockingCollection はバッファ タンク キューとしてよく使用されます。

  • パイプラインの速度は、パイプラインの最も遅いステージの速度とほぼ同じです。

  • 上の図は、前のステージが後のステージのジェネレーターです。これは最も単純で基本的なパイプライン モデルであり、各ステージにはより多くのデータ処理が含まれます。加工工程。

4 使用方法

ConcurrentBag と BlockingCollection のみが例として使用されており、他の同時コレクションも同様です。

ConcurrentBag

List<string> list = ......
ConcurrentBag<string> bags = new ConcurrentBag<string>();
Parallel.ForEach(list, (item) => 
{
    //对list中的每个元素进行处理然后,加入bags中
    bags.Add(itemAfter);
});

BlockingCollection—Producer Consumer Pattern

public static void Execute()
{
            //调用Invoke,使得生产者任务和消费者任务并行执行
            //Producer方法和Customer方法在Invoke中的参数顺序任意,不论何种顺序都会获得正确的结果
            Parallel.Invoke(()=>Customer(),()=>Producer());
            Console.WriteLine(string.Join(",",customerColl));
}
//生产者集合
private static BlockingCollection<int> producerColl = new BlockingCollection<int>();
 //消费者集合
private static BlockingCollection<string> customerColl = new BlockingCollection<string>();
public static void Producer()
{
            //循环将数据加入生成者集合
            for (int i = 0; i < 100; i++)
            {
                producerColl.Add(i);
            }
            //设置信号,表明不在向生产者集合中加入新数据
            //可以设置更加复杂的通知形式,比如数据量达到一定值且其中的数据满足某一条件时就设置完成添加
            producerColl.CompleteAdding();
}
public static void Customer()
{
            //调用IsCompleted方法,判断生产者集合是否在添加数据,是否还有未"消费"的数据
            //注意不要使用IsAddingCompleted,IsAddingCompleted只表明集合标记为已完成添加,而不能说明其为空
            //而IsCompleted为ture时,那么IsAddingCompleted为ture且集合为空
            while (!producerColl.IsCompleted)
            {
                //调用Take或TryTake "消费"数据,消费一个,移除一个
                //TryAdd的好处是提供超时机制
                customerColl.Add(string.Format("消费:{0}", producerColl.Take()));
            }
}


上記は .NET マルチスレッド プログラミング - 同時収集の内容です。その他の関連コンテンツについては、PHP 中国語 Web サイト (www.php.cn) に注目してください。


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。