Latar belakang: Dengan peningkatan perkakasan komputer, perisian kami mempunyai keupayaan untuk melaksanakan tugas dalam berbilang rangkaian. Apabila kita melakukan pengaturcaraan berbilang utas, kita perlu mencipta utas Jika konkurensi program adalah sangat tinggi, kita akan mencipta sejumlah besar utas, dan setiap utas akan melakukan tugas yang singkat dan kemudiannya akan menyebabkannya sangat mengurangkan prestasi sistem dan meningkatkan overhed pelayan, kerana mencipta dan memusnahkan benang memerlukan penggunaan tambahan.
Pada masa ini kita boleh menggunakan teknologi pengumpulan untuk mengoptimumkan kecacatan ini, dan kumpulan benang telah dilahirkan.
Intipati teknologi pengumpulan adalah untuk mencapai penggunaan semula sumber dan mengurangkan kos penciptaan dan pemusnahan sumber dalam senario konkurensi yang tinggi Jika bilangan mata wang adalah kecil, tiada kelebihan yang jelas (sumber sentiasa menduduki memori sistem dan tidak mempunyai peluang untuk digunakan) ).
Pengenalan kepada teknologi pengumpulan: Bilakah teknologi pengumpulan? Teknologi penyatuan ialah teknik pengaturcaraan yang boleh mengoptimumkan program dengan ketara apabila terdapat keselarasan yang tinggi dalam program dan mengurangkan overhed tambahan seperti penciptaan yang kerap dan pemusnahan sambungan dalam sistem. Teknologi pengumpulan yang sering kami hubungi termasuk kumpulan sambungan pangkalan data, kumpulan benang, kumpulan objek, dsb. Ciri teknologi pengumpulan adalah untuk mengekalkan beberapa sumber kos tinggi dalam kumpulan tertentu (memori), dan menentukan bilangan sambungan minimum, bilangan sambungan maksimum, baris gilir menyekat, peraturan limpahan dan konfigurasi lain untuk memudahkan pengurusan bersatu. Dalam keadaan biasa, ia juga akan datang dengan beberapa fungsi sokongan seperti pemantauan dan kitar semula paksa.
Teknologi pengumpulan ialah teknologi penggunaan sumber yang biasa ialah:
Apabila kos untuk mendapatkan sumber adalah tinggi
Pengkelasan sumber teknologi pengumpulan:
Pelaksana, yang menyediakan satu siri kaedah kilang untuk mencipta utas Kumpulan dan kumpulan utas yang dikembalikan semuanya melaksanakan antara muka ExecutorService. Antara muka
ExecutorService ialah antara muka subkelas antara muka Executor dan digunakan secara lebih meluas Ia menyediakan kaedah pengurusan kitaran hayat kumpulan benang dan mengembalikan Objek Masa Depan.
Maksudnya, kami mencipta kumpulan benang melalui Pelaksana, dapatkan dan laksanakan tugas tak segerak melalui ExecutorService
(laksanakan antara muka Runnable) ExecutorService
Pelaksana boleh mencipta beberapa jenis Kumpulan benang:
Cipta kumpulan benang yang boleh disimpan dalam cache Jika bilangan benang kumpulan benang berlebihan, sumber benang yang berlebihan akan dikitar semula selepas 60 saat buku tugasan bertambah , jika benang tidak mencukupi, benang baharu akan dibuat. newCachedThreadPool
Buat kumpulan benang panjang tetap untuk menyokong pelaksanaan tugas berjadual dan berkala. newScheduledThreadPool
Buat kumpulan benang satu-benang dan hanya gunakan satu-satunya utas untuk melaksanakan tugasan, memastikan tugasan diselesaikan mengikut susunan yang diserahkan. newSingleThreadExecutor
Tujuh parameter ThreadPoolExecutor:
(1)Bilangan utas teras, utas teras akan sentiasa dikekalkan dan tidak akan dimusnahkan. corePoolSize
Bilangan maksimum utas Apabila utas teras tidak dapat memenuhi keperluan tugas, sistem akan mencipta utas baharu untuk melaksanakan tugas. maximumPoolSize
Masa hidup, berapa lama benang selain daripada benang teras melahu akan dimusnahkan. keepAliveTime
mewakili unit masa untuk kemandirian benang. timeUnit
Menyekat baris gilirBlockingQueue
Jenis baris gilir adalah seperti berikut: LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue TransferQueue.
(6)threadFactory
Kilang benang digunakan untuk mencipta benang boleh disesuaikan Contohnya, kita boleh menentukan nama kumpulan benang, yang sangat membantu semasa menyelesaikan masalah jstack.
(7)rejectedExecutionHandler
Dasar penolakan,
Apabila semua urutan (bilangan maksimum urutan) sibuk dan baris gilir tugasan penuh dengan tugasan, dasar penolakan akan dilaksanakan.
JDK memberi kita empat strategi penolakan, yang kita semua mesti biasa dengan
AbortPolicy: buang tugas, Dan buang pengecualian RejectedExecutionException. Lalai
DiscardPolicy: Buang tugas terbaharu tanpa membuang pengecualian.
DiscardOldestPolicy: Buang tugasan dengan masa giliran paling lama, iaitu tugasan tertua.
CallerRuns: Tugas diproses oleh pemanggil (urutan yang menyerahkan tugas tak segerak).
Untuk melaksanakan pool thread, kita perlu mengambil berat tentang kelas ThreadPoolExecutor, kerana Executors juga membuat pool thread melalui objek ThreadPoolExecutor baharu .
Melihat perhubungan warisan kelas ThreadPoolExecutor
, anda boleh melihat sebab kumpulan benang yang dicipta oleh Executors
mengembalikan hasil ExecutorService, kerana ThreadPoolExecutor ialah kelas pelaksanaan antara muka ExecutorService dan intipati penciptaan kumpulan benang oleh Pelaksana adalah untuk mencipta objek ThreadPoolExecutor.
Mari kita lihat kod sumber ThreadPoolExecutor
yang pertama ialah pembolehubah dan pemalar yang ditakrifkan dalam ThreadPoolExecutor
:
// 复合类型变量 是一个原子整数 控制状态(运行状态|线程池活跃线程数量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 容量 // 运行状态存储在高位3位 private static final int RUNNING = -1 << COUNT_BITS; // 接受新任务,并处理队列任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务,但会处理队列任务 private static final int STOP = 1 << COUNT_BITS; // 不接受新任务,不会处理队列任务,中断正在处理的任务 private static final int TIDYING = 2 << COUNT_BITS; // 所有的任务已结束,活跃线程为0,线程过渡到TIDYING状 态,将会执行terminated()钩子方法 private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法已经完成 // 设置 ctl 参数方法 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } /** * 阻塞队列 */ private final BlockingQueue<Runnable> workQueue; /** * Lock 锁. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 工人们 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * 等待条件支持等待终止 */ private final Condition termination = mainLock.newCondition(); /** * 最大的池大小. */ private int largestPoolSize; /** * 完成任务数 */ private long completedTaskCount; /** * 线程工厂 */ private volatile ThreadFactory threadFactory; /** * 拒绝策略 */ private volatile RejectedExecutionHandler handler; /** * 存活时间 */ private volatile long keepAliveTime; /** * 允许核心线程数 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程数 */ private volatile int corePoolSize; /** * 最大线程数 */ private volatile int maximumPoolSize; /** * 默认拒绝策略 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** * shutdown and shutdownNow权限 */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
constructor ,, menyokong empat pembina dengan minimum lima parameter dan maksimum tujuh parameter:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Pekerja, yang melaksanakan tugas dalam kumpulan benang, kumpulan benang berfungsi melalui pekerja ini, dan di sana ialah pekerja teras (Benang Teras) dan pekerja sementara (dicipta buat sementara waktu apabila tidak cukup orang, dan akan diberhentikan kerja jika mereka melahu),
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 工人的本质就是个线程 final Thread thread; // 第一件工作任务 Runnable firstTask; volatile long completedTasks; /** * 构造器 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 工作 */ public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
kaedah teras, laksanakan tugas melalui kumpulan benang (ini juga merupakan utas Prinsip operasi kumpulan):
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }ditakrifkan oleh kelas induk abstraknya melihat dengan jelas perbezaan antara serah dan laksanakan Dengan memanggil serah, kami akan mencipta
dan mengembalikan Masa Depan Di sini kami boleh menukar jenis nilai pulangan , memaklumkan kaedah serah, dan ia akan mengembalikan nilai melalui kekangan generik. submit()
public abstract class AbstractExecutorService implements ExecutorService { public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } ... }
RunnableFuture
addWorker() ialah cara untuk merekrut orang: private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断状态,及任务列表
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Atas ialah kandungan terperinci Analisis kod sumber prinsip pelaksanaan kolam benang Java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!