下面小編就為大家帶來一篇java並發程式_線程池的使用方法(詳解)。小編覺得蠻不錯的,現在就分享給大家,也給大家做個參考。一起跟著小編過來看看吧
一、任務和執行策略之間的隱性耦合
##Executor可以將任務的提交和任務的執行策略解耦只有任務是同類型的且執行時間差別不大,才能發揮最大性能,否則,如將一些耗時長的任務和耗時短的任務放在一個線程池,除非線程池很大,否則會造成死鎖等問題1.線程飢餓死鎖
類似於:將兩個任務提交給一個單一執行緒池,且兩個任務之間相互依賴,一個任務等待另一個任務,則會發生死鎖;表現為池不夠定義:某個任務必須等待池中其他任務的運行結果,有可能發生飢餓死鎖2.執行緒池大小
##注意:執行緒池的大小也受其他的限制,如其他資源池:資料庫連接池
如果每個任務都是一個連接,那麼執行緒池的大小就受制於資料庫連接池的大小
3.設定ThreadPoolExecutor執行緒池
實例:1.透過Executors的工廠方法傳回預設的一些實作
2.透過實例化ThreadPoolExecutor(.....)自訂實作
##執行緒池的
佇列1.無界佇列:任務到達,執行緒池飽滿,則任務在佇列中等待,如果任務無限達到,則佇列會無限擴張如:單例和固定大小的執行緒池用的就是此種
2.有界佇列:如果新任務到達,佇列滿則使用飽和策略
3.同步移交:如果執行緒池很大,將任務放入佇列後在移交就會產生延時,如果任務生產者很快也會導致任務排隊Synchr
onousQueue直接將任務移交給工作執行緒機制:將一個任務放入,必須有一個線程等待接受,如果沒有,則新增
線程,如果線程飽和,則拒絕任務#如:Cache
ThreadPool就是使用的這種策略set
RejectedExecutionHandler來修改飽和策略##1.終止Abort(預設):拋出例外
由呼叫者處理##3.拋棄DiscardOldest :拋棄最舊的任務,注意:如果是優先權佇列將拋棄優先順序最高的任務
4.CallerRuns:回退任務,有呼叫者執行緒自行處理
每當建立執行緒時:其實是呼叫了線程工廠來完成
自訂線程工廠:implements ThreadFactory
##可以自訂該執行緒工廠的行為:如Uncaught
ExceptionHandler等public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
//设置该线程工厂创建的线程的 未捕获异常的行为
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}
5.擴充ThreadPoolExecutor
#可以被自定義子類別覆寫的方法:1.afterExecute:結束後,如果拋出RuntimeException則方法不會執行
2.
beforeExecute:開始前,如果拋出RuntimeException則任務不會執行3.terminated:在執行緒池關閉時,可以用來釋放資源等
遞歸演算法的平行化
# 1.循环 在循环中,每次循环操作都是独立的 2.迭代 如果每个迭代操作是彼此独立的,则可以串行执行 如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的 实例://串行化
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
//并行化
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
//串行 计算compute 和串行迭代
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
//并行 计算compute 和串行迭代
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(() -> results.add(n.compute()));
parallelRecursive(exec, n.getChildren(), results);
}
}
//调用并行方法的操作
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
public class ConcurrentPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
this.exec = initThreadPool();
this.seen = new ConcurrentHashMap<P, Boolean>();
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
}
private ExecutorService initThreadPool() {
return Executors.newCachedThreadPool();
}
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// 等待ValueLatch中闭锁解开,则表示已经找到答案
PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
} finally {
exec.shutdown();//最终主线程关闭线程池
}
}
protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new SolverTask(p, m, n);
}
protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
}
public void run() {
//如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
//为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
return; // already solved or seen this position
}
if (puzzle.isGoal(pos)) {
solution.setValue(this);
} else {
for (M m : puzzle.legalMoves(pos))
exec.execute(newTask(puzzle.move(pos, m), m, this));
}
}
}
}
以上是java並發編程之線程池的使用方法詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!