Heim  >  Artikel  >  Java  >  Ausführliche Erläuterung des Java-Thread-Pools und Erstellung einfacher Beispiele

Ausführliche Erläuterung des Java-Thread-Pools und Erstellung einfacher Beispiele

高洛峰
高洛峰Original
2017-02-07 14:44:521365Durchsuche

Java Thread Pool

Ich habe kürzlich die Parallelitätsfunktion des Projekts verbessert, aber die Entwicklung verlief holprig. Nachdem ich viele Informationen gelesen hatte, vertiefte ich schließlich mein Verständnis. Deshalb hatte ich vor, gemeinsam den Quellcode zu überprüfen und die Prinzipien der gleichzeitigen Programmierung zusammenzufassen.

Seien Sie darauf vorbereitet, mit dem am häufigsten verwendeten Thread-Pool zu beginnen und die Implementierungsprinzipien des gesamten Lebenszyklus des Thread-Pools rund um die Erstellung, Ausführung und das Herunterfahren zu verstehen. Später werden wir Themen wie atomare Variablen, gleichzeitige Container, blockierende Warteschlangen, Synchronisierungstools, Sperren usw. untersuchen. Die Parallelitätstools in java.util.concurrent sind nicht schwer zu verwenden, aber man kann sie nicht einfach verwenden, wir müssen den verdammten Quellcode lesen, haha. Das von mir verwendete JDK ist übrigens 1.8.

Executor-Framework

Executor ist ein Thread-Pool-Management-Framework. Es gibt nur eine Methodeexecute in der Schnittstelle, die ausführbare Aufgaben ausführt. Die ExecutorService-Schnittstelle erweitert Executor, fügt Thread-Lebenszyklusverwaltung hinzu und stellt Methoden wie die Beendigung von Aufgaben und die Rückgabe von Aufgabenergebnissen bereit. AbstractExecutorService implementiert ExecutorService und stellt Standardimplementierungslogik wie die Submit-Methode bereit.

Dann erbt das heutige Thema, ThreadPoolExecutor, AbstractExecutorService und stellt die spezifische Implementierung des Thread-Pools bereit.

Konstruktormethode

Das Folgende ist der häufigste Konstruktor von ThreadPoolExecutor mit bis zu sieben Parametern. Ich werde nicht den spezifischen Code veröffentlichen, sondern nur einige Anweisungen zur Parameterüberprüfung und -einstellung.

public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
  }

corePoolSize ist die Zielgröße des Thread-Pools. Dies ist die Größe, wenn der Thread-Pool gerade erst erstellt wurde und keine Aufgaben ausgeführt werden müssen. MaximumPoolSize ist die maximale Obergrenze des Thread-Pools. keepAliveTime ist die Überlebenszeit des Threads. Wenn die Anzahl der Threads im Thread-Pool größer als corePoolSize ist, werden inaktive Threads, die die Überlebenszeit überschreiten, recycelt. Selbstverständlich werden die restlichen drei Parameter später analysiert.

Standardmäßiger benutzerdefinierter Thread-Pool

ThreadPoolExecutor stellt einige benutzerdefinierte Thread-Pools vor, die von der Factory-Methode in Executors erstellt wurden. Lassen Sie uns die Erstellungsparameter von newSingleThreadExecutor, newFixedThreadPool und newCachedThreadPool analysieren.

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
  }

corePoolSize und maximumPoolSize von newFixedThreadPool werden beide auf die eingehende feste Zahl gesetzt, und keepAliveTim wird auf 0 gesetzt. Nachdem der Thread-Pool erstellt wurde, wird die Anzahl der Threads festgelegt, was für Situationen geeignet ist, in denen Thread-Stabilität erforderlich ist.

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>()));
  }

newSingleThreadExecutor ist eine Version von newFixedThreadPool mit einer festen Anzahl von Threads von 1, die die Serialisierung von Aufgaben im Pool gewährleistet. Beachten Sie, dass FinalizableDelegatedExecutorService zurückgegeben wird. Schauen wir uns den Quellcode an:

static class FinalizableDelegatedExecutorService
    extends DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
      super(executor);
    }
    protected void finalize() {
      super.shutdown();
    }
  }

FinalizableDelegatedExecutorService erbt DelegatedExecutorService und fügt nur den Vorgang zum Schließen des Thread-Pools während des GC hinzu :

static class DelegatedExecutorService extends AbstractExecutorService {
    private final ExecutorService e;
    DelegatedExecutorService(ExecutorService executor) { e = executor; }
    public void execute(Runnable command) { e.execute(command); }
    public void shutdown() { e.shutdown(); }
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }
    public boolean isShutdown() { return e.isShutdown(); }
    public boolean isTerminated() { return e.isTerminated(); }
    //...
  }

Der Code ist sehr einfach. Er umhüllt ExecutorService, sodass nur die Methoden von ExecutorService verfügbar gemacht werden, sodass die Parameter des Thread-Pools nicht mehr konfiguriert werden können. Ursprünglich können die vom Thread-Pool erstellten Parameter angepasst werden, und ThreadPoolExecutor stellt die Set-Methode bereit. Der Zweck der Verwendung von newSingleThreadExecutor besteht darin, einen seriellen Single-Thread-Thread-Pool zu generieren. Es wäre langweilig, wenn auch die Thread-Pool-Größe konfiguriert werden könnte.

Executors stellt außerdem die unconfigurableExecutorService-Methode bereit, die den gewöhnlichen Thread-Pool in einen nicht konfigurierbaren Thread-Pool einschließt. Wenn Sie nicht möchten, dass der Thread-Pool von unbekannten zukünftigen Generationen geändert wird, können Sie diese Methode aufrufen.

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
  }

newCachedThreadPool generiert einen zwischengespeicherten Thread-Pool. Die Anzahl der Threads kann zwischen 0 und Integer.MAX_VALUE liegen, und das Timeout beträgt 1 Minute. Die Verwendung des Thread-Pools hat folgende Auswirkungen: Wenn ein Thread im Leerlauf vorhanden ist, wird er wiederverwendet. Wenn kein Thread im Leerlauf vorhanden ist, wird ein neuer Thread erstellt recycelt.

newScheduledThreadPool

newScheduledThreadPool erstellt einen Thread-Pool, der regelmäßig Aufgaben ausführen kann. Dies soll in diesem Artikel nicht behandelt werden und wird später in einem separaten Artikel ausführlich besprochen.

Wartewarteschlange

Das Thread-Limit von newCachedThreadPool ist nahezu unbegrenzt, aber die Systemressourcen sind begrenzt und die Verarbeitungsgeschwindigkeit der Aufgabe ist möglicherweise nicht so schnell wie die Übermittlungsgeschwindigkeit des Aufgabe. Daher kann für ThreadPoolExecutor eine Blockierungswarteschlange bereitgestellt werden, um ausführbare Aufgaben zu speichern, die aufgrund unzureichender Threads warten.

JDK bietet mehrere Implementierungsmethoden für BlockingQueue. Häufig verwendete Methoden sind:

ArrayBlockingQueue: Blockierungswarteschlange der Array-Struktur

LinkedBlockingQueue: Blockierungswarteschlange der verknüpften Listenstruktur

PriorityBlockingQueue: Priorisierte Blockierungswarteschlange

SynchronousQueue: Blockierungswarteschlange, die keine Elemente speichert

newFixedThreadPool und newSingleThreadExecutor verwenden standardmäßig eine unbegrenzte LinkedBlockingQueue. Es ist zu beachten, dass sich die Warteschlange auf unbestimmte Zeit verlängert und es immer einen Moment gibt, in dem die Systemressourcen erschöpft sind, wenn ständig Aufgaben eingereicht werden, der Thread-Pool sie jedoch nicht rechtzeitig verarbeiten kann. Daher wird empfohlen, eine begrenzte Warteschlange zu verwenden, um eine Erschöpfung der Ressourcen zu vermeiden. Aber die Lösung eines Problems bringt neue Probleme mit sich: Was sollen wir zu diesem Zeitpunkt tun, nachdem die Warteschlange gefüllt ist und neue Aufgaben hinzukommen? Der Umgang mit der Warteschlangensättigung wird später erläutert.

newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。

饱和策略

当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
  }

AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:

public static class DiscardPolicy implements RejectedExecutionHandler {
   public DiscardPolicy() { }
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   }
 }

DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
   public DiscardOldestPolicy() { }
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (!e.isShutdown()) {
       e.getQueue().poll();
       e.execute(r);
     }
   }
 }

DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
   public CallerRunsPolicy() { }
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (!e.isShutdown()) {
       r.run();
     }
   }
 }

最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。

ThreadFactory

每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:

public static ThreadFactory defaultThreadFactory() {
  return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
 
    DefaultThreadFactory() {
      SecurityManager s = System.getSecurityManager();
      group = (s != null) ? s.getThreadGroup() :
                 Thread.currentThread().getThreadGroup();
      namePrefix = "pool-" +
             poolNumber.getAndIncrement() +
            "-thread-";
    }
 
    public Thread newThread(Runnable r) {
      Thread t = new Thread(group, r,
                 namePrefix + threadNumber.getAndIncrement(),
                 0);
      if (t.isDaemon())
        t.setDaemon(false);
      if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
      return t;
    }
  }

   

平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。

不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

更多Java 线程池详解及创建简单实例相关文章请关注PHP中文网!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn