>Java >java지도 시간 >ThreadPoolExecutor를 사용하여 Java에서 독립적인 단일 스레드 작업을 병렬로 실행하는 방법에 대한 자세한 소개

ThreadPoolExecutor를 사용하여 Java에서 독립적인 단일 스레드 작업을 병렬로 실행하는 방법에 대한 자세한 소개

黄舟
黄舟원래의
2017-03-23 11:06:442997검색

작업 실행 프레임워크는 Java SE 5.0에 도입되었으며, 이는 다중 스레드 프로그래밍 개발을 단순화하는 데 큰 진전을 이루었습니다. 이 프레임워크를 사용하면 작업을 쉽게 관리할 수 있습니다. 작업의 수명주기 및 실행 전략을 관리하세요.

이 기사에서는 간단한 예를 사용하여 이 프레임워크가 제공하는 유연성과 단순성을 보여줍니다.

기본

실행 프레임워크에서는 작업 실행을 관리하기 위해 Executor 인터페이스를 도입합니다. Executor는 Runnable 작업을 제출하는 데 사용되는 인터페이스입니다. 이 인터페이스는 작업 실행에서 작업 제출을 분리합니다. 다양한 실행 전략을 가진 실행기는 모두 동일한 제출 인터페이스를 구현합니다. 실행 전략을 변경해도 작업 제출 논리에는 영향을 미치지 않습니다.

실행을 위해 Runnable 개체를 제출하려는 경우 방법은 매우 간단합니다.

Executor exec = …;
exec.execute(runnable);

스레드 풀

앞서 언급했듯이 실행기가 제출된 실행 가능 작업을 실행하는 방법은 다음과 같습니다. 실행기 인터페이스에 지정된 대로 이는 사용 중인 특정 실행기 유형에 따라 다릅니다. 이 프레임워크는 여러 가지 실행기를 제공하며 실행 전략은 시나리오에 따라 다릅니다.

사용할 수 있는 가장 일반적인 실행기 유형은 ThreadPoolExecutor 클래스(및 해당 하위 클래스)의 인스턴스인 스레드 풀 실행기입니다. ThreadPoolExecutor는 스레드 풀과 작업 대기열 을 관리합니다. 스레드 풀은 작업을 실행하는 데 사용되는 작업자 스레드를 저장합니다.

다른 기술에서도 '풀'이라는 개념을 이해하셨을 텐데요. "풀" 사용의 가장 큰 이점 중 하나는 사용 및 해제 후 재사용이 가능하다는 것입니다. 또 다른 간접적인 이점은 사용되는 리소스의 양을 제어할 수 있다는 것입니다. 예를 들어, 시스템 리소스를 손상시키지 않고 원하는 로드를 달성하기 위해 스레드 풀의 크기를 조정할 수 있습니다.

이 프레임워크는 스레드 풀을 생성하기 위해 Executors라는 팩토리 클래스를 제공합니다. 이 엔지니어링 클래스를 사용하면 다양한 특성을 가진 스레드 풀을 만들 수 있습니다. 기본 구현은 동일한 경우가 많지만(ThreadPoolExecutor) 팩토리 클래스를 사용하면 복잡한 생성자를 사용하지 않고도 스레드 풀을 빠르게 설정할 수 있습니다. 엔지니어링 클래스의 팩토리 메소드는 다음과 같습니다.

  • newFixedThreadPool: 이 메소드는 최대 용량이 고정된 스레드 풀을 반환합니다. 요청 시 새 스레드를 생성하며 스레드 수는 구성된 수보다 크지 않습니다. 스레드 수가 최대값에 도달하면 스레드 풀은 변경되지 않고 유지됩니다.

  • newCachedThreadPool: 이 메소드는 제한되지 않은 스레드 풀을 반환합니다. 즉, 최대 개수 제한이 없습니다. 그러나 작업 부하가 감소하면 이러한 유형의 스레드 풀은 사용되지 않는 스레드를 파괴합니다.

  • newSingleThreadedExecutor: 이 메소드는 모든 작업이 단일 스레드에서 실행되도록 보장할 수 있는 실행기를 반환합니다.

  • newScheduledThreadPool: 이 메소드는 지연된 작업과 예약된 작업의 실행을 지원하는 고정 크기 스레드 풀을 반환합니다.

이것은 시작에 불과합니다. 이 기사의 범위를 벗어나는 Executor의 다른 용도가 있습니다.

  • ExecutorService 인터페이스에 의해 선언된 수명 주기 관리 방법(예: shutdown() 및 waitTermination())과 같습니다.

  • CompletionService를 사용하여 작업 상태를 쿼리하고 반환 값이 있으면 반환 값을 가져옵니다.

ExecutorService 인터페이스는 스레드 풀을 종료하고 더 이상 사용되지 않는 리소스를 정리하는 방법을 제공하므로 특히 중요합니다. 다행스럽게도 ExecutorService 인터페이스는 매우 간단하고 설명이 필요하지 않습니다. 해당 문서를 포괄적으로 연구하는 것이 좋습니다.

대략 말하면 ExecutorService에 shutdown() 메시지를 보내면 새로 제출된 작업을 받지 않지만 대기열에 있는 작업은 계속 처리됩니다. isTerminating()을 사용하여 ExecutorService 종료 상태를 쿼리하거나 waitTermination(...) 메서드를 사용하여 ExecutorService가 종료될 때까지 기다릴 수 있습니다. 최대 제한 시간을 매개변수로 전달하면 waitTermination 메서드는 영원히 기다리지 않습니다.

경고: JVM 프로세스가 절대 종료되지 않는다는 점을 이해하면 약간의 오류와 혼란이 있습니다. executorService를 닫지 않고 기본 스레드만 삭제하면 JVM이 종료되지 않습니다. 마지막 일반 스레드(데몬이 아닌 스레드)가 종료되면 JVM도 종료됩니다.

ThreadPoolExecutor 구성

Executor 팩토리 클래스를 사용하지 않고 수동으로 ThreadPoolExecutor를 생성하기로 결정한 경우 생성자를 사용하여 생성하고 구성해야 합니다. 다음은 이 클래스에서 가장 널리 사용되는 생성자 중 하나입니다.

public ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAlive,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler);

보시다시피 다음을 구성할 수 있습니다.

  • 코어 풀의 크기( 스레드 풀은 사용될 크기)

  • 최대 풀 크기

  • 생존 시간, 이 시간 이후에는 유휴 스레드가 소멸됩니다

  • 작업 저장을 위한 작업 대기열

  • 작업 제출이 거부된 후 실행되는 전략

限制队列中任务数

限制执行任务的并发数、限制线程池大小对应用程序以及程序执行结果的可预期性与稳定性有很大的好处。无尽地创建线程,最终会耗尽运行时资源。你的应用程序因此会产生严重的性能问题,甚至导致程序不稳定。

这只解决了部分问题:限制了并发任务数,但并没有限制提交到等待队列的任务数。如果任务提交的速率一直高于任务执行的速率,那么应用程序最终会出现资源短缺的状况。

解决方法是:

  • 为Executor提供一个存放待执行任务的阻塞队列。如果队列填满,以后提交的任务会被“拒绝”。

  • 当任务提交被拒绝时会触发RejectedExecutionHandler,这也是为什么这个类名中引用动词“rejected”。你可以实现自己的拒绝策略,或者使用框架内置的策略。

默认的拒绝策略可以让executor抛出一个RejectedExecutionException异常。然而,还有其他的内建策略:

  • 悄悄地丢弃一个任务

  • 丢弃最旧的任务,重新提交最新的

  • 在调用者的线程中执行被拒绝的任务

什么时候以及为什么我们才会这样配置线程池?让我们看一个例子。

示例:并行执行独立的单线程任务

最近,我被叫去解决一个很久以前的任务的问题,我的客户之前就运行过这个任务。大致来说,这个任务包含一个组件,这个组件监听目录树所产生的文件系统事件。每当一个事件被触发,必须处理一个文件。一个专门的单线程执行文件处理。说真的,根据任务的特点,即使我能把它并行化,我也不想那么做。一天的某些时候,事件到达率才很高,文件也没必要实时处理,在第二天之前处理完即可。

当前的实现采用了一些混合且匹配的技术,包括使用UNIX SHELL脚本扫描目录结构,并检测是否发生改变。实现完成后,我们采用了双核的执行环境。同样,事件的到达率相当低:目前为止,事件数以百万计,总共要处理1~2T字节的原始数据。

运行处理程序的主机是12核的机器:很好机会去并行化这些旧的单线程任务。基本上,我们有了食谱的所有原料,我们需要做的仅仅是把程序建立起来并调节。在写代码前,我们必须了解下程序的负载。我列一下我检测到的内容:

  • 有非常多的文件需要被周期性地扫描:每个目录包含1~2百万个文件

  • 扫描算法很快,可以并行化

  • 处理一个文件至少需要1s,甚至上升到2s或3s

  • 处理文件时,性能瓶颈主要是CPU

  • CPU利用率必须可调,根据一天时间的不同而使用不同的负载配置。

我需要这样一个线程池,它的大小在程序运行的时候通过负载配置来设置。我倾向于根据负载策略创建一个固定大小的线程池。由于线程的性能瓶颈在CPU,它的核心使用率是100%,不会等待其他资源,那么负载策略就很好计算了:用执行环境的CPU核心数乘以一个负载因子(保证计算的结果在峰值时至少有一个核心):

int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后我需要使用阻塞队列创建一个ThreadPoolExecutor,可以限制提交的任务数。为什么?是这样,扫描算法执行很快,很快就产生庞大数量需要处理的文件。数量有多庞大呢?很难预测,因为变动太大了。我不想让executor内部的队列不加选择地填满了要执行的任务实例(这些实例包含了庞大的文件描述符)。我宁愿在队列填满时,拒绝这些文件。

而且,我将使用ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略。为什么?因为当队列已满时,线程池的线程忙于处理文件,我让提交任务的线程去执行它(被拒绝的任务)。这样,扫面会停止,转而去处理一个文件,处理结束后马上又会扫描目录。

下面是创建executor的代码:

ExecutorService executorService =
    new ThreadPoolExecutor(
        maxThreads, // core thread pool size
        maxThreads, // maximum thread pool size
        1, // time to wait before resizing pool
        TimeUnit.MINUTES, 
        new ArrayBlockingQueue<Runnable>(maxThreads, true),
        new ThreadPoolExecutor.CallerRunsPolicy());

 下面是程序的框架(极其简化版):

// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {
    File currentDir = dirsToProcess.pop();

    // listing children
    File[] children = currentDir.listFiles();

    // processing children
    for (final File currentFile : children) {
        // if it&#39;s a directory, defer processing
        if (currentFile.isDirectory()) {
            dirsToProcess.add(currentFile);
            continue;
        }

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // if it&#39;s a file, process it
                    new ConvertTask(currentFile).perform();
                } catch (Exception ex) {
                    // error management logic
                }
            }
        });
    }
}

// ...
// wait for all of the executor threads to finish
executorService.shutdown();
try {
    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the first try
        executorService.shutdownNow();
    }

    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the second try
    }
} catch (InterruptedException ex) {
    executorService.shutdownNow();
    Thread.currentThread().interrupt();
}

总结

看到了吧,Java并发API非常简单易用,十分灵活,也很强大。真希望我多年前可以多花点功夫写一个这样简单的程序。这样我就可以在几小时内解决由传统单线程组件所引发的扩展性问题。

위 내용은 ThreadPoolExecutor를 사용하여 Java에서 독립적인 단일 스레드 작업을 병렬로 실행하는 방법에 대한 자세한 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.