首頁  >  文章  >  後端開發  >  .NET編程之線程池內幕

.NET編程之線程池內幕

黄舟
黄舟原創
2017-02-06 14:19:531284瀏覽

本文透過.NET4.5的ThreadPool原始碼的分析講解揭示.NET執行緒池的內幕,並總結ThreadPool設計的好與不足。


線程池的作用


線程池,顧名思義,線程物件池。 Task和TPL都有用到線程池,所以了解線程池的內幕有助於你寫出更好的程式。由於篇幅有限,這裡我只講解以下核心

概念:

  • 執行緒池的大小

  • 如何呼叫執行緒池新增任務

  • 如何呼叫執行緒池新增任務
  • IOCP的線程,但在這裡我們不研究它,涉及task和TPL的會在其各自的博客中做詳解。

線程池的大小

不管什麼池,總是有尺寸,ThreadPool也不例外。 ThreadPool提供了4個方法來調整執行緒池的大小:

    SetMaxThreads
  • GetMaxThreads
  • reads指定線程池最多可以有多少個線程,而GetMaxThreads自然就是要取得這個值。 SetMinThreads指定在執行緒池中最少存活的執行緒的數量,而GetMinThreads就是取得這個值。
  • 為何要設定一個最大數量和有一個最小數量呢?原來執行緒池的大小取決於若干因素,如虛擬位址空間的大小等。例如你的電腦是4g內存,而一個線程的初始堆疊大小為1m,那麼你最多能創建4g/1m的線程(忽略作業系統本身以及其他進程內存分配);正因為線程有內存開銷,所以如果線程池的線程過多而又沒有被完全使用,那麼這就是對內存的一種浪費,所以限制線程池的最大數是很make sense的。

那麼最小數又是為啥?執行緒池就是執行緒的物件池,物件池的最大的用處是重複使用物件。為啥要重複使用線程,因為線程的創建與銷毀都要佔用大量的cpu時間。所以在高並發狀態下,線程池由於無需創建銷毀線程節約了大量時間,提高了系統的響應能力和吞吐量。最小數可以讓你調整最小的存活線程數量來應付不同的高並發場景。


如何呼叫執行緒池新增任務


執行緒池主要提供了2個方法來呼叫:QueueUserWorkItem和UnsafeQueueUserWorkItem。


兩個方法的程式碼基本上一致,除了attribute不同,QueueUserWorkItem可以被partial trust的程式碼調用,而UnsafeQueueUserWorkItem只能被full tr​​ust的程式碼調用。

public static bool QueueUserWorkItem(WaitCallback callBack)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true);
}

QueueUserWorkItemHelper首先呼叫ThreadPool.EnsureVMInitialized()來確保CLR虛擬機初始化(VM是一個統稱,不是單指java虛擬機,也可以指CLR的execution engine),緊接著實例化ThreadPoolWorkQuquePoolQuquejue方法並傳入callback和true。

SecurityCritical]
public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
{
ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null;
if (!forceGlobal)
queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals;
if (this.loggingEnabled)
FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback);
if (queueThreadLocals != null)
{
queueThreadLocals.workStealingQueue.LocalPush(callback);
}
else
{
ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead;
while (!comparand.TryEnqueue(callback))
{
Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null);
for (; comparand.Next != null; comparand = this.queueHead)
Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand);
}
}
this.EnsureThreadRequested();
}

ThreadPoolWorkQueue主要包含2個「queue」(實際上是陣列),一個為QueueSegment(global work queue),另一個是WorkStealingQueue(local work queue)。兩者俱體的差異會在Task/TPL裡講解,這裡暫不解釋。


由於forceGlobal是true,所以執行了comparand.TryEnqueue(callback),也就是QueueSegment.TryEnqueue。 comparand先從隊列的頭(queueHead)開始enqueue,如果不行就繼續往下enqueue,成功後再賦值給queueHead。

讓我們來看看QueueSegment的源代碼:

public QueueSegment()
{
this.nodes = new IThreadPoolWorkItem[256];
}

public bool TryEnqueue(IThreadPoolWorkItem node)
{
int upper;
int lower;
this.GetIndexes(out upper, out lower);
while (upper != this.nodes.Length)
{
if (this.CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower))
{
Volatile.Write<IThreadPoolWorkItem>(ref this.nodes[upper], node);
return true;
}
}
return false;
}

這個所謂的global work queue其實是一個IThreadPoolWorkItem的數組,而且限死256,這是為啥?難道是因為和IIS線程池(也只有256個線程)對齊?使用interlock和記憶體寫入屏障volatile.write來確保nodes的正確性,比起同步鎖定效能有很大的提升。


最後呼叫EnsureThreadRequested,EnsureThreadRequested會呼叫QCall把請求傳送至CLR,由CLR調度ThreadPool。

執行緒池如何執行任務

執行緒被調度後透過ThreadPoolWorkQueue的Dispatch方法來執行callback。

internal static bool Dispatch()
{
ThreadPoolWorkQueue threadPoolWorkQueue = ThreadPoolGlobals.workQueue;
int tickCount = Environment.TickCount;
threadPoolWorkQueue.MarkThreadRequestSatisfied();
threadPoolWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18);
bool flag1 = true;
IThreadPoolWorkItem callback = (IThreadPoolWorkItem) null;
try
{
ThreadPoolWorkQueueThreadLocals tl = threadPoolWorkQueue.EnsureCurrentThreadHasQueue();
while ((long) (Environment.TickCount - tickCount) < (long) ThreadPoolGlobals.tpQuantum)
{
try
{
}
finally
{
bool missedSteal = false;
threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal);
if (callback == null)
flag1 = missedSteal;
else
threadPoolWorkQueue.EnsureThreadRequested();
}
if (callback == null)
return true;
if (threadPoolWorkQueue.loggingEnabled)
FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback);
if (ThreadPoolGlobals.enableWorkerTracking)
{
bool flag2 = false;
try
{
try
{
}
finally
{
ThreadPool.ReportThreadStatus(true);
flag2 = true;
}
callback.ExecuteWorkItem();
callback = (IThreadPoolWorkItem) null;
}
finally
{
if (flag2)
ThreadPool.ReportThreadStatus(false);
}
}
else
{
callback.ExecuteWorkItem();
callback = (IThreadPoolWorkItem) null;
}
if (!ThreadPool.NotifyWorkItemComplete())
return false;
}
return true;
}
catch (ThreadAbortException ex)
{
if (callback != null)
callback.MarkAborted(ex);
flag1 = false;
}
finally
{
if (flag1)
threadPoolWorkQueue.EnsureThreadRequested();
}
return true;
}

while語句判斷如果執行時間少於30ms會不斷繼續執行下一個callback。這是因為大多數機器線程切換大概在30ms,如果該線程只執行了不到30ms就在等待中斷線程切換那就太浪費CPU了,浪費可恥啊!

Dequeue負責找到需要執行的callback:

public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal)
{
callback = (IThreadPoolWorkItem) null;
missedSteal = false;
ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 = tl.workStealingQueue;
workStealingQueue1.LocalPop(out callback);
if (callback == null)
{
for (ThreadPoolWorkQueue.QueueSegment comparand = this.queueTail; !comparand.TryDequeue(out callback) && comparand.Next != null && comparand.IsUsedUp(); 
comparand = this.queueTail)
Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueTail, comparand.Next, comparand);
}
if (callback != null)
return;
ThreadPoolWorkQueue.WorkStealingQueue[] current = ThreadPoolWorkQueue.allThreadQueues.Current;
int num = tl.random.Next(current.Length);
for (int length = current.Length; length > 0; --length)
{
ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 = Volatile.Read<ThreadPoolWorkQueue.WorkStealingQueue>(ref current[num % current.Length]);
if (workStealingQueue2 != null && workStealingQueue2 != workStealingQueue1 && workStealingQueue2.TrySteal(out callback, ref missedSteal))
break;
++num;
}
}

因為我們把callback加到了global work queue,所以local work queue(workStealingQueue.LocalPop(out callback))) 找不到裡講解。接著又去global work queue查找,先從global work queue的起始位置查找直至尾部,因此global work quque裡的callback是FIFO的執行順序。

public bool TryDequeue(out IThreadPoolWorkItem node)
{
int upper;
int lower;
this.GetIndexes(out upper, out lower);
while (lower != upper)
{
// ISSUE: explicit reference operation
// ISSUE: variable of a reference type
int& prevUpper = @upper;
// ISSUE: explicit reference operation
int newUpper = ^prevUpper;
// ISSUE: explicit reference operation
// ISSUE: variable of a reference type
int& prevLower = @lower;
// ISSUE: explicit reference operation
int newLower = ^prevLower + 1;
if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower))
{
SpinWait spinWait = new SpinWait();
while ((node = Volatile.Read<IThreadPoolWorkItem>(ref this.nodes[lower])) == null)
spinWait.SpinOnce();
this.nodes[lower] = (IThreadPoolWorkItem) null;
return true;
}
}
node = (IThreadPoolWorkItem) null;
return false;
}

使用自旋锁和内存读屏障来避免内核态和用户态的切换,提高了获取callback的性能。如果还是没有callback,那么就从所有的local work queue里随机选取一个,然后在该local work queue里“偷取”一个任务(callback)。


拿到callback后执行callback.ExecuteWorkItem(),通知完成。

总结

ThreadPool提供了方法调整线程池最少活跃的线程来应对不同的并发场景。ThreadPool带有2个work queue,一个golbal一个local。

执行时先从local找任务,接着去global,最后才会去随机选取一个local偷一个任务,其中global是FIFO的执行顺序。

Work queue实际上是数组,使用了大量的自旋锁和内存屏障来提高性能。但是在偷取任务上,是否可以考虑得更多,随机选择一个local太随意。

首先要考虑偷取的队列上必须有可执行任务;其次可以选取一个不在调度中的线程的local work queue,这样降低了自旋锁的可能性,加快了偷取的速度;最后,偷取的时候可以考虑像golang一样偷取别人queue里一半的任务,因为执行完偷到的这一个任务之后,下次该线程再次被调度到还是可能没任务可执行,还得去偷取别人的任务,这样既浪费CPU时间,又让任务在线程上分布不均匀,降低了系统吞吐量!


另外,如果禁用log和ETW trace,可以使ThreadPool的性能更进一步。

以上就是.NET编程之线程池内幕的内容,更多相关内容请关注PHP中文网(www.php.cn)!


陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn