首頁 >後端開發 >C#.Net教程 >.NET多執行緒程式設計—並發集合

.NET多執行緒程式設計—並發集合

黄舟
黄舟原創
2017-02-06 14:29:511325瀏覽

並發集合

1 為什麼要使用並發集合?

原因主要有以下幾點:

  • System.Collections和System.Collections.Generic名稱空間中所提供的經典列表、集合和數組都不是線程安全的,若無同步機制,他們不適合接受並發的指令來新增和刪除元素。

  • 在並發程式碼中使用上述經典集合需要複雜的同步管理,使用起來很不方便。

  • 使用複雜的同步機制會大幅降低效能。

  • NET Framework 4所提供的新的集合盡可能地減少需要使用鎖的次數。這些新的集合透過使用比較並交換(compare-and-swap,CAS)指令和記憶體屏障,避免使用互斥的重量級鎖。這對性能有保障。

注意:與經典集合相比,並發集合會有更大的開銷,因此在串行代碼中使用並發集合無意義,只會增加額外的開銷且運行速度比訪問經典集合慢。

2 並發集合

1)ConcurrentQueue:執行緒安全的先進先出 (FIFO) 集合

主要方法:

  • Enqueue(T item);將物件加入集合。

  • TryDequeue(out T result); 嘗試移除並傳回位於集合開始處的對象,傳回值表示操作是否成功。

  • TryPeek(out T result);嘗試傳回集合開始處的對象,但不將其移除,傳回值表示操作是否成功。

說明:

  • ConcurrentQueue是完全無鎖的,但當CAS操作失敗且面臨資源爭用時,它可能會自旋並且重試操作。

  • ConcurrentQueue是FIFO集合,某些和出入順序無關的場合,盡量不要用ConcurrentQueue。

2)ConcurrentStack:線程安全的後進先出 (LIFO) 集合

主要方法及屬性:

  • Push(T item);將物件插入集合的頂部。

  • TryPop(out T result);嘗試彈出並返回集合頂部的對象,返回值表示操作是否成功。

  • TryPeek(out T result);嘗試傳回集合開始處的對象,但不將其移除,傳回值表示操作是否成功。

  • IsEmpty { get; }指示集合是否為空。

  • PushRange(T[] items);將多個物件插入集合的頂部。

  • TryPopRange(T[] items);彈出頂部多個元素,回傳結果為彈出元素個數。

說明:

  • 與ConcurrentQueue相似地,ConcurrentStack完全無鎖的,但當CAS操作失敗且面臨資源爭用時,它可能會自旋並且重試操作。

  • 取得集合是否包含元素使用IsEmpty屬性,而不是判斷Count屬性是否大於零。呼叫Count比呼叫IsEmpty開銷大。

  • 使用PushRange(T[] items)和TryPopRange(T[] items)時注意緩衝引起的額外開銷和額外的記憶體消耗。

3) ConcurrentBag:元素可重複的無序集合

主要方法及屬性:

  • TryPeek(out T result);嘗試從集合中傳回一個對象,但不會移除該物件,傳回值表示是否成功獲得該物件。

  • TryTake(out T result);嘗試從集合傳回一個物件並移除該對象,傳回值表示是否成功取得該物件。

  • Add(T item);將物件加入集合。

  • IsEmpty { get; }解釋同ConcurrentStack

說明:

  • ConcurrentBag為每個訪問集合的線程維護了一個本地隊列,在可能的情況下,它會以無鎖定的方式為每個存取集合的線程維護本地隊列。

  • ConcurrentBag在同一個執行緒中新增和刪除元素的場合下效率非常高。

  • 因為ConcurrentBag有時會需要鎖,在生產者執行緒和消費者執行緒完全分開的場景下效率非常低。

  • ConcurrentBag呼叫IsEmpty的開銷非常大,因為這需要臨時取得這個無序組的所有鎖定。

4)BlockingCollection:實現

System.Collections.Concurrent.IProducerConsumerCollection 的線程安全集合,提供阻塞和限制功能

主要方法及屬性:

  • BlockingCollection(int boundedCapacity);boundedCapacity表示集合限制大小。

  • CompleteAdding();將BlockingCollection實例標記為不再接受任何新增。

  • IsCompleted { get; }此集合是否已標記為已完成新增且為空。

  • GetConsumingEnumerable();從集合中移除並返回移除的元素

  • Add(T item);新增元素到集合。

  • TryTake(T item, int millisecondsTimeout, CancellationToken cancellationToken);

說明:

  • 使用BlockingColldcity. .MaxValue 。

  • 限界:使用BlockingCollection(int boundedCapacity),設定boundedCapacity的值,當集合容量達到這個值得時候,向BlockingCollection新增元素的執行緒將會被阻塞,直到有元素被刪除。

限界功能可控制記憶體中集合最大大小,這對於需要處理大量元素的時候非常有用。

  • 預設情況下,BlockingCollection封裝了一個ConcurrentQueue。可以在建構函式中指定一個實作了IProducerConsumerCollection介面的並發集合,包括:ConcurrentStack、ConcurrentBag。

  • 使用此集合包含易於無限制等待的風險,所以使用TryTake更加,因為TryTake提供了超時控制,指定的時間內可以從集合中移除某個項,則為 true;否則為 false。

5)ConcurrentDictionary:可由多個執行緒同時存取的鍵值對的執行緒安全性集合。

主要方法

AddOrUpdate(TKey key, TValue addValue, Func updateValueFactory);如果指定的鍵尚不存在,則將鍵/值對加入字典;如果指定的鍵已存在,則更新字典中的鍵/值對。

  • GetOrAdd(TKey key, TValue value);如果指定的鍵尚不存在,則將鍵/值對加入字典。

  • TryRemove(TKey key, out TValue value);嘗試從字典中移除並傳回具有指定鍵的值。

  • TryUpdate(TKey key, TValue newValue, TValue comparisonValue);將指定鍵的現有值與指定值進行比較,如果相等,則用第三個值更新該鍵。

說明:

  • ConcurrentDictionary對於讀取操作是完全無鎖定的。當多個任務或執行緒向其中添加元素或修改資料的時候,ConcurrentDictionary使用細粒度的鎖定。使用細粒度的鎖只會鎖定真正需要鎖定的部分,而不是整個字典。

6)IProducerConsumerCollection:定義供生產者/消費者用來操作執行緒安全集合的方法。 此介面提供一個統一的表示(為生產者/消費者集合),從而更高層級抽像如 System.Collections.Concurrent.BlockingCollection可以使用集合作為基礎的儲存機制。

3.常用模式

1)並行的生產者-消費者模式

定義:

生成者和消費者是此模式中的兩類物件模型,消費者依賴生產者的結果,生產者產生結果的同時,消費者使用結果。

.NET多執行緒程式設計—並發集合

圖1 並行的生產者-消費者模式

說明:

  • 並發集合用在此模式下非常合適,因為並發集合支援此模式中物件的並行操作。

  • 若不使用並發集合,那麼就要加入同步機制,從而使程序變得比較複雜,難於維護和理解,同時大大降低性能。

  • 上圖為生產者消費者模式示意圖,縱軸為時間軸,生成者與消費者的並不在一條時間線上,但二者有交叉,意在表明生成者先產生結果,而後消費者才真正使用了生成者產生的數據。

2)流水線模式

定義:

管線由多個階段構成,每個階段由一系列的生產者和消費者構成。一般來講前一個階段是後一個階段的生成者;依賴相鄰兩個階段之間的緩衝區佇列,每個階段可以並發執行。

.NET多執行緒程式設計—並發集合

圖2 並行的管線模式

說明:

  • 常使用BlockingCollection作為緩衝區區域佇列。

  • 流水線的速度近似等於流水線最慢階段的速度。

  • 上圖為流水線模式示意圖,前一階段為後一階段的生成者,這裡展示了最為簡單和基本的流水線模式,更複雜的模式可以認為是每個階段都包括了對數據更多的處理過程。

4 使用方式

僅以ConcurrentBag和BlockingCollection為例,其他的並發集合則與之相似。

ConcurrentBag

List<string> list = ......
ConcurrentBag<string> bags = new ConcurrentBag<string>();
Parallel.ForEach(list, (item) => 
{
    //对list中的每个元素进行处理然后,加入bags中
    bags.Add(itemAfter);
});

BlockingCollection—生產者消費者模式

public static void Execute()
{
            //调用Invoke,使得生产者任务和消费者任务并行执行
            //Producer方法和Customer方法在Invoke中的参数顺序任意,不论何种顺序都会获得正确的结果
            Parallel.Invoke(()=>Customer(),()=>Producer());
            Console.WriteLine(string.Join(",",customerColl));
}
//生产者集合
private static BlockingCollection<int> producerColl = new BlockingCollection<int>();
 //消费者集合
private static BlockingCollection<string> customerColl = new BlockingCollection<string>();
public static void Producer()
{
            //循环将数据加入生成者集合
            for (int i = 0; i < 100; i++)
            {
                producerColl.Add(i);
            }
            //设置信号,表明不在向生产者集合中加入新数据
            //可以设置更加复杂的通知形式,比如数据量达到一定值且其中的数据满足某一条件时就设置完成添加
            producerColl.CompleteAdding();
}
public static void Customer()
{
            //调用IsCompleted方法,判断生产者集合是否在添加数据,是否还有未"消费"的数据
            //注意不要使用IsAddingCompleted,IsAddingCompleted只表明集合标记为已完成添加,而不能说明其为空
            //而IsCompleted为ture时,那么IsAddingCompleted为ture且集合为空
            while (!producerColl.IsCompleted)
            {
                //调用Take或TryTake "消费"数据,消费一个,移除一个
                //TryAdd的好处是提供超时机制
                customerColl.Add(string.Format("消费:{0}", producerColl.Take()));
            }
}


以上就是.NET多執行緒程式設計—並發集合的內容,更多相關內容請關注PHP中文網(www.php.cn)!


陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn