首頁 >Java >java教程 >java線程池的使用

java線程池的使用

大家讲道理
大家讲道理原創
2017-05-28 11:33:171617瀏覽

一、任務與執行策略之間的隱性耦合

  Executor可以將任務的提交和任務的執行策略解耦

  只有任務是同類型的且執行時間差異不大,才能發揮最大性能,否則,如將一些耗時長的任務和耗時短的任務放在一個線程池,除非線程池很大,否則會造成死鎖等問題

1.執行緒飢餓死鎖

  類似:將兩個任務提交給一個單執行緒池,且兩個任務之間相互依賴,一個任務等待另一個任務,則會發生死鎖;表現為池不夠

  定義:某個任務必須等待池中其他任務的運行結果,有可能發生飢餓死鎖

2.執行緒池大小

#  

#  注意:執行緒池的大小還受其他的限制,如其他資源池:資料庫連接池

    如果每個任務都是一個連接,那麼執行緒池的大小就受制於資料庫連接池的大小

3.設定ThreadPoolExecutor執行緒池實例:

  1.透過Executors的工廠方法傳回預設的一些實作

#  2.透過實例化ThreadPoolExecutor(.....)自訂實作

執行緒池的

佇列

  1.無界佇列:任務到達,線程池飽滿,則任務在隊列中等待,如果任務無限達到,則隊列會無限擴張#    如:單例和固定大小的線程池用的就是此種

  2.有界佇列:如果新任務到達,佇列滿則使用飽和策略

    3.同步移交:如果執行緒池很大,將任務放入佇列後在移交就會產生延時,如果任務生產者很快也會導致任務排隊    SynchronousQueue直接將任務移交給工作執行緒

#機制:將一個任務放入,必須有一個執行緒等待接受,如果沒有,則新增線程,如果執行緒飽和,則拒絕任務    如:

Cache

ThreadPool就是使用的這種策略

飽和策略:#  

set

RejectedExecutionHandl# er來修改飽和策略  1.終止

Abort

(預設):

拋出例外

由呼叫者處理

  2.拋棄

Discard

  3.拋棄DiscardOldest:拋棄最舊的任務,注意:如果是

優先權

佇列將拋棄優先順序最高的任務

  4.
C

all

erRuns
:回退任務,有呼叫者執行緒自行處理

4.執行緒工廠ThreadFactoy

#   每當創建執行緒時:其實是呼叫了執行緒工廠來完成

   自訂執行緒工廠:implements ThreadFactory

   可以自訂該執行緒工廠的行為:如Uncaught

Exception

Handler等  

public class MyAppThread extends Thread {    public static final String DEFAULT_NAME = "MyAppThread";    private static volatile boolean debugLifecycle = false;    private static final AtomicInteger created = new AtomicInteger();    private static final AtomicInteger alive = new AtomicInteger();    private static final Logger log = Logger.getAnonymousLogger();    public MyAppThread(Runnable r) {        this(r, DEFAULT_NAME);
    }    public MyAppThread(Runnable runnable, String name) {        super(runnable, name + "-" + created.incrementAndGet());        //设置该线程工厂创建的线程的 未捕获异常的行为
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }    public void run() {        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;        if (debug) log.log(Level.FINE, "Created " + getName());        try {
            alive.incrementAndGet();            super.run();
        } finally {
            alive.decrementAndGet();            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }    public static int getThreadsCreated() {        return created.get();
    }    public static int getThreadsAlive() {        return alive.get();
    }    public static boolean getDebug() {        return debugLifecycle;
    }    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

 

5.擴充ThreadPoolExecutor#  可以被自訂子類別覆寫的方法:  1.afterExecute:結束後,如果拋出Run

#time

Exception則方法不會執行

  2.
before

Execute:開始前,如果拋出RuntimeException則任務不會執行


  3.terminated:在執行緒池關閉時,可以用來釋放資源等

 

二、遞歸演算法的平行化

1.

循環  

  在循環中,每次循環運算都是獨立的

#########
//串行化
    void processSequentially(List<Element> elements) {        for (Element e : elements)
            process(e);
    }    //并行化
    void processInParallel(Executor exec, List<Element> elements) {        for (final Element e : elements)
            exec.execute(new Runnable() {                public void run() {
                    process(e);
                }
            });
    }
############# #########2.迭​​代###### ###    如果每個迭代操作是彼此獨立的,則可以串行執行######  如:深度優先###搜尋###演算法;注意:遞歸還是串行的,但是,每個節點的計算是平行的######  ####


//串行 计算compute 和串行迭代
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }    //并行 计算compute 和串行迭代
    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {        for (final Node<T> n : nodes) {
            exec.execute(() -> results.add(n.compute()));
            parallelRecursive(exec, n.getChildren(), results);
        }
    }    //调用并行方法的操作
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);        return resultQueue;
    }


 

  实例:

  


public class ConcurrentPuzzleSolver <P, M> {    private final Puzzle<P, M> puzzle;    private final ExecutorService exec;    private final ConcurrentMap<P, Boolean> seen;    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {        this.puzzle = puzzle;        this.exec = initThreadPool();        this.seen = new ConcurrentHashMap<P, Boolean>();        if (exec instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }    private ExecutorService initThreadPool() {        return Executors.newCachedThreadPool();
    }    public List<M> solve() throws InterruptedException {        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));            // 等待ValueLatch中闭锁解开,则表示已经找到答案
            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
            exec.shutdown();//最终主线程关闭线程池        }
    }    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {        return new SolverTask(p, m, n);
    }    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {            super(pos, move, prev);
        }        public void run() {            //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;            //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
            if (solution.isSet() || seen.putIfAbsent(pos, true) != null){                return; // already solved or seen this position            }            if (puzzle.isGoal(pos)) {
                solution.setValue(this);
            } else {                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos, m), m, this));
            }
        }
    }
}


 

  

 

以上是java線程池的使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn