Maison  >  Article  >  développement back-end  >  Insider du pool de threads de programmation .NET

Insider du pool de threads de programmation .NET

黄舟
黄舟original
2017-02-06 14:19:531331parcourir

Cet article révèle l'histoire intérieure du pool de threads .NET en analysant et expliquant le code source ThreadPool de .NET4.5, et résume les avantages et les inconvénients de la conception ThreadPool.


Le rôle du pool de threads


Le pool de threads, comme son nom l'indique, est un pool d'objets de threads. Task et TPL utilisent tous deux des pools de threads, donc comprendre l'histoire intérieure des pools de threads peut vous aider à écrire de meilleurs programmes. En raison de l'espace limité, je n'expliquerai ici que les

concepts fondamentaux suivants :

  • La taille du pool de threads

  • Comment appeler des threads Ajout de tâches au pool

  • Comment le pool de threads exécute les tâches

Threadpool prend également en charge la manipulation des threads IOCP, mais nous le ferons Je ne l'étudie pas ici. Task et TPL seront expliqués en détail dans leurs blogs respectifs.

Taille du pool de threads

Quel que soit le type de pool, il y a toujours une taille, et ThreadPool ne fait pas exception. ThreadPool propose 4 méthodes pour ajuster la taille du pool de threads :

  • SetMaxThreads

  • GetMaxThreads

  • SetMinThreads

  • GetMinThreads

SetMaxThreads spécifie le nombre maximum de threads que le pool de threads peut avoir, et GetMaxThreads obtient naturellement cette valeur. SetMinThreads spécifie le nombre minimum de threads survivants dans le pool de threads et GetMinThreads obtient cette valeur.


Pourquoi devons-nous fixer une quantité maximale et une quantité minimale ? Il s'avère que la taille du pool de threads dépend de plusieurs facteurs, tels que la taille de l'espace d'adressage virtuel. Par exemple, votre ordinateur dispose de 4 Go de mémoire et la taille initiale de la pile d'un thread est de 1 m, vous pouvez alors créer jusqu'à 4 Go/1 million de threads (en ignorant le système d'exploitation lui-même et les autres allocations de mémoire de processus précisément parce que les threads ont une surcharge de mémoire) ; , si le thread S'il y a trop de threads dans le pool et qu'ils ne sont pas entièrement utilisés, il s'agit d'un gaspillage de mémoire, il est donc logique de limiter le nombre maximum de pools de threads.


Alors quel est le nombre minimum ? Le pool de threads est le pool d'objets de threads. La plus grande utilisation du pool d'objets est de réutiliser des objets. Pourquoi devrions-nous réutiliser les threads ? Parce que la création et la destruction de threads prennent beaucoup de temps CPU. Par conséquent, dans un état de concurrence élevée, le pool de threads permet de gagner beaucoup de temps car il n'a pas besoin de créer et de détruire des threads, ce qui améliore la réactivité et le débit du système. Le nombre minimum vous permet d'ajuster le nombre minimum de threads survivants pour faire face à différents scénarios à forte concurrence.


Comment appeler le pool de threads pour ajouter une tâche


Le pool de threads propose principalement 2 méthodes pour appeler : QueueUserWorkItem et UnsafeQueueUserWorkItem .


Les codes des deux méthodes sont fondamentalement les mêmes, à l'exception des différents attributs que QueueUserWorkItem peut être appelé par code de confiance partielle, tandis qu'UnsafeQueueUserWorkItem ne peut être appelé que par confiance totale. code.

public static bool QueueUserWorkItem(WaitCallback callBack)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true);
}

QueueUserWorkItemHelper appelle d'abord ThreadPool.EnsureVMInitialized() pour garantir que la machine virtuelle CLR est initialisée (VM est un terme général, pas seulement la machine virtuelle Java, mais aussi le moteur d'exécution CLR), puis instancie le ThreadPoolWorkQueue , et enfin appelle la méthode Enqueue de ThreadPoolWorkQueue et transmet le rappel et true.

SecurityCritical]
public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
{
ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null;
if (!forceGlobal)
queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals;
if (this.loggingEnabled)
FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback);
if (queueThreadLocals != null)
{
queueThreadLocals.workStealingQueue.LocalPush(callback);
}
else
{
ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead;
while (!comparand.TryEnqueue(callback))
{
Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null);
for (; comparand.Next != null; comparand = this.queueHead)
Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand);
}
}
this.EnsureThreadRequested();
}

ThreadPoolWorkQueue contient principalement 2 "files d'attente" (en fait des tableaux), l'une est QueueSegment (file d'attente de travail globale) et l'autre est WorkStealingQueue (file d'attente de travail locale). Les différences spécifiques entre les deux seront expliquées dans Task/TPL et ne seront pas expliquées ici.


Puisque forceGlobal est vrai, comparand.TryEnqueue(callback) est exécuté, ce qui est QueueSegment.TryEnqueue. comparand démarre la mise en file d'attente à partir de la tête de la file d'attente (queueHead). En cas d'échec, continuez la mise en file d'attente. Après le succès, il attribue la valeur à queueHead.

Jetons un coup d'œil au code source de QueueSegment :

public QueueSegment()
{
this.nodes = new IThreadPoolWorkItem[256];
}

public bool TryEnqueue(IThreadPoolWorkItem node)
{
int upper;
int lower;
this.GetIndexes(out upper, out lower);
while (upper != this.nodes.Length)
{
if (this.CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower))
{
Volatile.Write<IThreadPoolWorkItem>(ref this.nodes[upper], node);
return true;
}
}
return false;
}

Cette file d'attente de travail dite globale est en fait un tableau d'IThreadPoolWorkItem, et elle est limitée à 256. Pourquoi ce? Est-ce parce qu'il est aligné sur le pool de threads IIS (qui ne compte que 256 threads) ? Utilisez le verrouillage et la barrière d'écriture mémoire volatile.write pour garantir l'exactitude des nœuds, ce qui améliore considérablement les performances par rapport aux verrous de synchronisation.


Enfin, appelez EnsureThreadRequested. EnsureThreadRequested appellera QCall pour envoyer la demande au CLR, et le CLR planifiera le ThreadPool.

Comment le pool de threads exécute les tâches


Une fois le thread planifié, le rappel est exécuté via la méthode Dispatch de ThreadPoolWorkQueue.

internal static bool Dispatch()
{
ThreadPoolWorkQueue threadPoolWorkQueue = ThreadPoolGlobals.workQueue;
int tickCount = Environment.TickCount;
threadPoolWorkQueue.MarkThreadRequestSatisfied();
threadPoolWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18);
bool flag1 = true;
IThreadPoolWorkItem callback = (IThreadPoolWorkItem) null;
try
{
ThreadPoolWorkQueueThreadLocals tl = threadPoolWorkQueue.EnsureCurrentThreadHasQueue();
while ((long) (Environment.TickCount - tickCount) < (long) ThreadPoolGlobals.tpQuantum)
{
try
{
}
finally
{
bool missedSteal = false;
threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal);
if (callback == null)
flag1 = missedSteal;
else
threadPoolWorkQueue.EnsureThreadRequested();
}
if (callback == null)
return true;
if (threadPoolWorkQueue.loggingEnabled)
FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback);
if (ThreadPoolGlobals.enableWorkerTracking)
{
bool flag2 = false;
try
{
try
{
}
finally
{
ThreadPool.ReportThreadStatus(true);
flag2 = true;
}
callback.ExecuteWorkItem();
callback = (IThreadPoolWorkItem) null;
}
finally
{
if (flag2)
ThreadPool.ReportThreadStatus(false);
}
}
else
{
callback.ExecuteWorkItem();
callback = (IThreadPoolWorkItem) null;
}
if (!ThreadPool.NotifyWorkItemComplete())
return false;
}
return true;
}
catch (ThreadAbortException ex)
{
if (callback != null)
callback.MarkAborted(ex);
flag1 = false;
}
finally
{
if (flag1)
threadPoolWorkQueue.EnsureThreadRequested();
}
return true;
}

L'instruction while détermine que si le temps d'exécution est inférieur à 30 ms, elle continuera à exécuter le prochain rappel. En effet, la plupart des changements de thread machine prennent environ 30 ms. Si le thread ne s'exécute que pendant moins de 30 ms et attend ensuite que le thread d'interruption change, ce serait un gaspillage de CPU. C'est un gaspillage honteux !


Dequeue est chargé de trouver le rappel qui doit être exécuté :

public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal)
{
callback = (IThreadPoolWorkItem) null;
missedSteal = false;
ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 = tl.workStealingQueue;
workStealingQueue1.LocalPop(out callback);
if (callback == null)
{
for (ThreadPoolWorkQueue.QueueSegment comparand = this.queueTail; !comparand.TryDequeue(out callback) && comparand.Next != null && comparand.IsUsedUp(); 
comparand = this.queueTail)
Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueTail, comparand.Next, comparand);
}
if (callback != null)
return;
ThreadPoolWorkQueue.WorkStealingQueue[] current = ThreadPoolWorkQueue.allThreadQueues.Current;
int num = tl.random.Next(current.Length);
for (int length = current.Length; length > 0; --length)
{
ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 = Volatile.Read<ThreadPoolWorkQueue.WorkStealingQueue>(ref current[num % current.Length]);
if (workStealingQueue2 != null && workStealingQueue2 != workStealingQueue1 && workStealingQueue2.TrySteal(out callback, ref missedSteal))
break;
++num;
}
}

Parce que nous avons ajouté le rappel à la file d'attente de travail globale, la file d'attente de travail locale (workStealingQueue.LocalPop (out callback)) finds Si le rappel n'est pas trouvé, la recherche du rappel dans la file d'attente de travail locale sera expliquée dans la tâche. Ensuite, accédez à la file d'attente de travail globale pour effectuer une recherche, recherchez d'abord du début à la fin de la file d'attente de travail globale, de sorte que le rappel dans la file d'attente de travail globale soit l'ordre d'exécution du FIFO.

public bool TryDequeue(out IThreadPoolWorkItem node)
{
int upper;
int lower;
this.GetIndexes(out upper, out lower);
while (lower != upper)
{
// ISSUE: explicit reference operation
// ISSUE: variable of a reference type
int& prevUpper = @upper;
// ISSUE: explicit reference operation
int newUpper = ^prevUpper;
// ISSUE: explicit reference operation
// ISSUE: variable of a reference type
int& prevLower = @lower;
// ISSUE: explicit reference operation
int newLower = ^prevLower + 1;
if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower))
{
SpinWait spinWait = new SpinWait();
while ((node = Volatile.Read<IThreadPoolWorkItem>(ref this.nodes[lower])) == null)
spinWait.SpinOnce();
this.nodes[lower] = (IThreadPoolWorkItem) null;
return true;
}
}
node = (IThreadPoolWorkItem) null;
return false;
}

使用自旋锁和内存读屏障来避免内核态和用户态的切换,提高了获取callback的性能。如果还是没有callback,那么就从所有的local work queue里随机选取一个,然后在该local work queue里“偷取”一个任务(callback)。


拿到callback后执行callback.ExecuteWorkItem(),通知完成。

总结

ThreadPool提供了方法调整线程池最少活跃的线程来应对不同的并发场景。ThreadPool带有2个work queue,一个golbal一个local。

执行时先从local找任务,接着去global,最后才会去随机选取一个local偷一个任务,其中global是FIFO的执行顺序。

Work queue实际上是数组,使用了大量的自旋锁和内存屏障来提高性能。但是在偷取任务上,是否可以考虑得更多,随机选择一个local太随意。

首先要考虑偷取的队列上必须有可执行任务;其次可以选取一个不在调度中的线程的local work queue,这样降低了自旋锁的可能性,加快了偷取的速度;最后,偷取的时候可以考虑像golang一样偷取别人queue里一半的任务,因为执行完偷到的这一个任务之后,下次该线程再次被调度到还是可能没任务可执行,还得去偷取别人的任务,这样既浪费CPU时间,又让任务在线程上分布不均匀,降低了系统吞吐量!


另外,如果禁用log和ETW trace,可以使ThreadPool的性能更进一步。

以上就是.NET编程之线程池内幕的内容,更多相关内容请关注PHP中文网(www.php.cn)!


Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn