>  기사  >  Java  >  Java 스레드 풀 사용

Java 스레드 풀 사용

大家讲道理
大家讲道理원래의
2017-05-28 11:33:171588검색

1. 작업과 실행 전략 간의 암시적 결합

실행자는 작업 제출과 작업 실행 전략을 분리할 수 있습니다.

동일한 유형이고 실행 시간의 차이가 거의 없는 작업만 최대 성능을 얻을 수 있습니다. -스레드 풀에서 작업을 소비하고 시간이 짧은 작업을 수행하는 경우 스레드 풀이 매우 크지 않으면 교착 상태 및 기타 문제가 발생합니다

1. 스레드 부족 교착 상태

다음과 유사합니다. 단일 스레드 풀이 주어지면, 두 작업은 서로 종속되어 있습니다. 한 작업이 다른 작업을 기다리면 교착 상태가 발생합니다.

정의: 작업은 풀에 있는 다른 작업의 실행 결과를 기다려야 합니다. 기아 상태가 발생합니다. 교착 상태가 발생할 수 있습니다

2. 스레드 풀 크기

  ​​

참고: 스레드 풀의 크기에는 다른 리소스 풀과 같은 다른 제한 사항도 적용됩니다. 데이터베이스 연결 풀

각 작업이 연결인 경우 thread 풀의 크기는 데이터베이스 연결 풀의 크기에 따라 달라집니다

3. ThreadPoolExecutor 스레드 풀 구성

Instance:

  1. Executors의 팩토리 메소드를 통해 일부 기본 구현을 반환합니다.

2. By ThreadPoolExecutor 인스턴스화 (.. ...)

스레드 풀의 대기열

사용자 정의 구현 1. 무제한 대기열: 작업이 도착하고 스레드 풀이 가득 차면 작업은 대기열에서 대기합니다. 무한히, 큐는 무한히 확장됩니다

예: 싱글톤 및 고정 크기 스레드 풀이 사용하는 것입니다

  2. 제한된 큐: 새 작업이 도착하고 큐가 가득 차면 포화 전략

을 사용합니다.

3. 동기식 핸드오버: 스레드 풀이 큰 경우 작업을 대기열에 넣은 후 핸드오버가 지연됩니다. 작업 생산자가 작업을 빠르게 대기열에 추가하면

  SynchronousQueue가 직접 작업을 작업자 스레드에 넘겨주세요

 메커니즘: 작업을 넣습니다. 수락 대기 중인 스레드가 있어야 합니다. 그렇지 않은 경우 추가스레드, 스레드가 포화되면 작업을 거부합니다

 예: CacheThreadPool은 사용된 전략입니다

포화 전략:

  setRejectedExecutionHandler로 포화 전략을 수정

1. 종료 Abort(기본값): 예외 발생 처리자 호출자

2. 폐기 Discard

3. 폐기 DiscardOldest: 폐기 가장 오래된 작업, 참고: priority인 경우 대기열은 우선 순위가 가장 높은 작업

 4.CallerRuns을 포기합니다. : 작업을 롤백하면 호출자 스레드가 자체적으로 작업을 처리합니다

4. 스레드 팩토리 ThreadFactoy

스레드가 생성될 때마다 실제로 스레드 팩토리를 호출하여 완료합니다.

사용자 정의 스레드 팩토리: ThreadFactory를 구현합니다.

사용자 정의할 수 있습니다. 스레드 팩토리의 동작: Uncaught Exception Handler 등


public class MyAppThread extends Thread {    public static final String DEFAULT_NAME = "MyAppThread";    private static volatile boolean debugLifecycle = false;    private static final AtomicInteger created = new AtomicInteger();    private static final AtomicInteger alive = new AtomicInteger();    private static final Logger log = Logger.getAnonymousLogger();    public MyAppThread(Runnable r) {        this(r, DEFAULT_NAME);
    }    public MyAppThread(Runnable runnable, String name) {        super(runnable, name + "-" + created.incrementAndGet());        //设置该线程工厂创建的线程的 未捕获异常的行为
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }    public void run() {        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;        if (debug) log.log(Level.FINE, "Created " + getName());        try {
            alive.incrementAndGet();            super.run();
        } finally {
            alive.decrementAndGet();            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }    public static int getThreadsCreated() {        return created.get();
    }    public static int getThreadsAlive() {        return alive.get();
    }    public static boolean getDebug() {        return debugLifecycle;
    }    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}


5. 사용자 정의 하위 클래스로 재정의할 수 있는 메서드:

1. afterExecute: 종료 후 Run

time

Exception이 발생하면 메소드가 실행되지 않음

2. before

Execute: 시작 전 RuntimeException이 발생하면 작업이 실행되지 않음

 3.terminating : 스레드 풀이 닫히면 리소스 해제 등에 사용할 수 있습니다.

2.

recursive

알고리즘

의 병렬화 1.

Loop

  루프에서 각 루프 작업은 independent

//串行化
    void processSequentially(List<Element> elements) {        for (Element e : elements)
            process(e);
    }    //并行化
    void processInParallel(Executor exec, List<Element> elements) {        for (final Element e : elements)
            exec.execute(new Runnable() {                public void run() {
                    process(e);
                }
            });
    }



2. Iteration

각 반복 작업이 서로 독립적인 경우 직렬 실행이 가능합니다

 예: 깊이 우선

search

알고리즘; 여전히 직렬이지만 각 노드의 계산은 병렬입니다

 


//串行 计算compute 和串行迭代
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }    //并行 计算compute 和串行迭代
    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {        for (final Node<T> n : nodes) {
            exec.execute(() -> results.add(n.compute()));
            parallelRecursive(exec, n.getChildren(), results);
        }
    }    //调用并行方法的操作
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);        return resultQueue;
    }


 

  实例:

  


public class ConcurrentPuzzleSolver <P, M> {    private final Puzzle<P, M> puzzle;    private final ExecutorService exec;    private final ConcurrentMap<P, Boolean> seen;    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {        this.puzzle = puzzle;        this.exec = initThreadPool();        this.seen = new ConcurrentHashMap<P, Boolean>();        if (exec instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }    private ExecutorService initThreadPool() {        return Executors.newCachedThreadPool();
    }    public List<M> solve() throws InterruptedException {        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));            // 等待ValueLatch中闭锁解开,则表示已经找到答案
            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
            exec.shutdown();//最终主线程关闭线程池        }
    }    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {        return new SolverTask(p, m, n);
    }    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {            super(pos, move, prev);
        }        public void run() {            //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;            //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
            if (solution.isSet() || seen.putIfAbsent(pos, true) != null){                return; // already solved or seen this position            }            if (puzzle.isGoal(pos)) {
                solution.setValue(this);
            } else {                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos, m), m, this));
            }
        }
    }
}


 

  

 

위 내용은 Java 스레드 풀 사용의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
이전 기사:자바 NIO: I/O 모델다음 기사:자바 NIO: I/O 모델