Home > Article > Backend Development > .NET Programming Thread Pool Insider
This article reveals the inside story of .NET thread pool by analyzing and explaining the ThreadPool source code of .NET4.5, and summarizes the advantages and disadvantages of ThreadPool design.
The role of thread pool
Thread pool, as the name suggests, is a thread object pool. Both Task and TPL use thread pools, so understanding the inside story of thread pools can help you write better programs. Due to limited space, I will only explain the following core
concepts here:
The size of the thread pool
How to call threads Adding tasks to the pool
How the thread pool performs tasks
Threadpool also supports manipulating IOCP threads, but we will not study it here. Tasks and TPL will be explained in detail in their respective blogs.
Thread pool size
No matter what kind of pool, there is always a size, and ThreadPool is no exception. ThreadPool provides 4 methods to adjust the size of the thread pool:
SetMaxThreads
GetMaxThreads
SetMinThreads
GetMinThreads
SetMaxThreads specifies the maximum number of threads the thread pool can have, and GetMaxThreads naturally obtains this value. SetMinThreads specifies the minimum number of surviving threads in the thread pool, and GetMinThreads obtains this value.
Why do we need to set a maximum quantity and a minimum quantity? It turns out that the size of the thread pool depends on several factors, such as the size of the virtual address space. For example, your computer has 4g of memory, and the initial stack size of a thread is 1m, then you can create up to 4g/1m threads (ignoring the operating system itself and other process memory allocations); precisely because threads have memory overhead, if the thread If there are too many threads in the pool and they are not fully used, then this is a waste of memory, so limiting the maximum number of thread pools makes sense.
So what is the minimum number? The thread pool is the object pool of threads. The biggest use of the object pool is to reuse objects. Why should we reuse threads? Because the creation and destruction of threads takes up a lot of CPU time. Therefore, in a high concurrency state, the thread pool saves a lot of time because it does not need to create and destroy threads, improving the system's responsiveness and throughput. Minimum number allows you to adjust the minimum number of surviving threads to deal with different high-concurrency scenarios.
How to call the thread pool to add a task
The thread pool mainly provides 2 methods to call: QueueUserWorkItem and UnsafeQueueUserWorkItem .
The codes of the two methods are basically the same, except for the different attributes. QueueUserWorkItem can be called by partial trust code, while UnsafeQueueUserWorkItem can only be called by full trust code.
public static bool QueueUserWorkItem(WaitCallback callBack) { StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true); }
QueueUserWorkItemHelper first calls ThreadPool.EnsureVMInitialized() to ensure that the CLR virtual machine is initialized (VM is a general term, not just the java virtual machine, but also the CLR execution engine), then instantiates the ThreadPoolWorkQueue, and finally Call the Enqueue method of ThreadPoolWorkQueue and pass in callback and true.
SecurityCritical] public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) { ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null; if (!forceGlobal) queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals; if (this.loggingEnabled) FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback); if (queueThreadLocals != null) { queueThreadLocals.workStealingQueue.LocalPush(callback); } else { ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead; while (!comparand.TryEnqueue(callback)) { Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null); for (; comparand.Next != null; comparand = this.queueHead) Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand); } } this.EnsureThreadRequested(); }
ThreadPoolWorkQueue mainly contains 2 "queues" (actually arrays), one is QueueSegment (global work queue), and the other is WorkStealingQueue (local work queue). The specific differences between the two will be explained in Task/TPL and will not be explained here.
Since forceGlobal is true, comparand.TryEnqueue(callback) is executed, which is QueueSegment.TryEnqueue. comparand starts enqueue from the head of the queue (queueHead). If it fails, continue to enqueue. After success, it assigns the value to queueHead.
Let's take a look at the source code of QueueSegment:
public QueueSegment() { this.nodes = new IThreadPoolWorkItem[256]; } public bool TryEnqueue(IThreadPoolWorkItem node) { int upper; int lower; this.GetIndexes(out upper, out lower); while (upper != this.nodes.Length) { if (this.CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower)) { Volatile.Write<IThreadPoolWorkItem>(ref this.nodes[upper], node); return true; } } return false; }
This so-called global work queue is actually an array of IThreadPoolWorkItem, and it is limited to 256. Why is this? Is it because it is aligned with the IIS thread pool (which only has 256 threads)? Use interlock and the memory write barrier volatile.write to ensure the correctness of nodes, which greatly improves the performance compared to synchronization locks.
Finally call EnsureThreadRequested. EnsureThreadRequested will call QCall to send the request to the CLR, and the CLR will schedule the ThreadPool.
How the thread pool performs tasks
After the thread is scheduled, the callback is executed through the Dispatch method of ThreadPoolWorkQueue.
internal static bool Dispatch() { ThreadPoolWorkQueue threadPoolWorkQueue = ThreadPoolGlobals.workQueue; int tickCount = Environment.TickCount; threadPoolWorkQueue.MarkThreadRequestSatisfied(); threadPoolWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18); bool flag1 = true; IThreadPoolWorkItem callback = (IThreadPoolWorkItem) null; try { ThreadPoolWorkQueueThreadLocals tl = threadPoolWorkQueue.EnsureCurrentThreadHasQueue(); while ((long) (Environment.TickCount - tickCount) < (long) ThreadPoolGlobals.tpQuantum) { try { } finally { bool missedSteal = false; threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal); if (callback == null) flag1 = missedSteal; else threadPoolWorkQueue.EnsureThreadRequested(); } if (callback == null) return true; if (threadPoolWorkQueue.loggingEnabled) FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback); if (ThreadPoolGlobals.enableWorkerTracking) { bool flag2 = false; try { try { } finally { ThreadPool.ReportThreadStatus(true); flag2 = true; } callback.ExecuteWorkItem(); callback = (IThreadPoolWorkItem) null; } finally { if (flag2) ThreadPool.ReportThreadStatus(false); } } else { callback.ExecuteWorkItem(); callback = (IThreadPoolWorkItem) null; } if (!ThreadPool.NotifyWorkItemComplete()) return false; } return true; } catch (ThreadAbortException ex) { if (callback != null) callback.MarkAborted(ex); flag1 = false; } finally { if (flag1) threadPoolWorkQueue.EnsureThreadRequested(); } return true; }
The while statement determines that if the execution time is less than 30ms, the next callback will continue to be executed. This is because most machine thread switching takes about 30ms. If the thread only executes for less than 30ms and then waits for the interrupt thread to switch, it would be a waste of CPU. It is a shameful waste!
Dequeue is responsible for finding the callback that needs to be executed:
public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal) { callback = (IThreadPoolWorkItem) null; missedSteal = false; ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 = tl.workStealingQueue; workStealingQueue1.LocalPop(out callback); if (callback == null) { for (ThreadPoolWorkQueue.QueueSegment comparand = this.queueTail; !comparand.TryDequeue(out callback) && comparand.Next != null && comparand.IsUsedUp(); comparand = this.queueTail) Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueTail, comparand.Next, comparand); } if (callback != null) return; ThreadPoolWorkQueue.WorkStealingQueue[] current = ThreadPoolWorkQueue.allThreadQueues.Current; int num = tl.random.Next(current.Length); for (int length = current.Length; length > 0; --length) { ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 = Volatile.Read<ThreadPoolWorkQueue.WorkStealingQueue>(ref current[num % current.Length]); if (workStealingQueue2 != null && workStealingQueue2 != workStealingQueue1 && workStealingQueue2.TrySteal(out callback, ref missedSteal)) break; ++num; } }
Because we added the callback to the global work queue, the local work queue (workStealingQueue.LocalPop(out callback)) cannot be found Callback, local work queue search callback will be explained in the task. Then go to the global work queue to search, first search from the beginning of the global work queue to the end, so the callback in the global work quque is the execution order of the FIFO.
public bool TryDequeue(out IThreadPoolWorkItem node) { int upper; int lower; this.GetIndexes(out upper, out lower); while (lower != upper) { // ISSUE: explicit reference operation // ISSUE: variable of a reference type int& prevUpper = @upper; // ISSUE: explicit reference operation int newUpper = ^prevUpper; // ISSUE: explicit reference operation // ISSUE: variable of a reference type int& prevLower = @lower; // ISSUE: explicit reference operation int newLower = ^prevLower + 1; if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower)) { SpinWait spinWait = new SpinWait(); while ((node = Volatile.Read<IThreadPoolWorkItem>(ref this.nodes[lower])) == null) spinWait.SpinOnce(); this.nodes[lower] = (IThreadPoolWorkItem) null; return true; } } node = (IThreadPoolWorkItem) null; return false; }
使用自旋锁和内存读屏障来避免内核态和用户态的切换,提高了获取callback的性能。如果还是没有callback,那么就从所有的local work queue里随机选取一个,然后在该local work queue里“偷取”一个任务(callback)。
拿到callback后执行callback.ExecuteWorkItem(),通知完成。
总结
ThreadPool提供了方法调整线程池最少活跃的线程来应对不同的并发场景。ThreadPool带有2个work queue,一个golbal一个local。
执行时先从local找任务,接着去global,最后才会去随机选取一个local偷一个任务,其中global是FIFO的执行顺序。
Work queue实际上是数组,使用了大量的自旋锁和内存屏障来提高性能。但是在偷取任务上,是否可以考虑得更多,随机选择一个local太随意。
首先要考虑偷取的队列上必须有可执行任务;其次可以选取一个不在调度中的线程的local work queue,这样降低了自旋锁的可能性,加快了偷取的速度;最后,偷取的时候可以考虑像golang一样偷取别人queue里一半的任务,因为执行完偷到的这一个任务之后,下次该线程再次被调度到还是可能没任务可执行,还得去偷取别人的任务,这样既浪费CPU时间,又让任务在线程上分布不均匀,降低了系统吞吐量!
另外,如果禁用log和ETW trace,可以使ThreadPool的性能更进一步。
以上就是.NET编程之线程池内幕的内容,更多相关内容请关注PHP中文网(www.php.cn)!