搜索
首页Javajava教程Java中使用ThreadPoolExecutor并行执行独立的单线程任务的详细介绍

Java SE 5.0中引入了任务执行框架,这是简化多线程程序设计开发的一大进步。使用这个框架可以方便地管理任务:管理任务的生命周期以及执行策略。

在这篇文章中,我们通过一个简单的例子来展现这个框架所带来的灵活与简单。

基础

执行框架引入了Executor接口来管理任务的执行。Executor是一个用来提交Runnable任务的接口。这个接口将任务提交与任务执行隔离起来:拥有不同执行策略的executor都实现了同一个提交接口。改变执行策略不会影响任务的提交逻辑。

如果你要提交一个Runnable对象来执行,很简单:

Executor exec = …;
exec.execute(runnable);

线程池

如前所述,executor如何去执行提交的runnable任务并没有在Executor接口中规定,这取决于你所用的executor的具体类型。这个框架提供了几种不同的executor,执行策略针对不同的场景而不同。

你可能会用到的最常见的executor类型就是线程池executor,也就是ThreadPoolExecutor类(及其子类)的实例。ThreadPoolExecutor管理着一个线程池和一个工作队列,线程池存放着用于执行任务的工作线程。

你肯定在其他技术中也了解过“池”的概念。使用“池”的一个最大的好处就是减少资源创建的开销,用过并释放后,还可以重用。另一个间接的好处是你可以控制使用资源的多少。比如,你可以调整线程池的大小达到你想要的负载,而不损害系统的资源。

这个框架提供了一个工厂类,叫Executors,来创建线程池。使用这个工程类你可以创建不同特性的线程池。尽管底层的实现常常是一样的(ThreadPoolExecutor),但工厂类可以使你不必使用复杂的构造函数就可以快速地设置一个线程池。工程类的工厂方法有:

  • newFixedThreadPool:该方法返回一个最大容量固定的线程池。它会按需创建新线程,线程数量不大于配置的数量大小。当线程数达到最大以后,线程池会一直维持这么多不变。

  • newCachedThreadPool:该方法返回一个无界的线程池,也就是没有最大数量限制。但当工作量减小时,这类线程池会销毁没用的线程。

  • newSingleThreadedExecutor:该方法返回一个executor,它可以保证所有的任务都在一个单线程中执行。

  • newScheduledThreadPool:该方法返回一个固定大小的线程池,它支持延时和定时任务的执行。

这仅仅是一个开端。Executor还有一些其他用法已超出了这篇文章的范围,我强烈推荐你研究以下内容:

  • 生命周期管理的方法,这些方法由ExecutorService接口声明(比如shutdown()和awaitTermination())。

  • 使用CompletionService来查询任务状态、获取返回值,如果有返回值的话。

ExecutorService接口特别重要,因为它提供了关闭线程池的方法,并确保清理了不再使用的资源。令人欣慰的是,ExecutorService接口相当简单、一目了然,我建议全面地学习下它的文档。

大致来说,当你向ExecutorService发送了一个shutdown()消息后,它就不会接收新提交的任务,但是仍在队列中的任务会被继续处理完。你可以使用isTerminated()来查询ExecutorService终止状态,或使用awaitTermination(…)方法来等待ExecutorService终止。如果传入一个最大超时时间作为参数,awaitTermination方法就不会永远等待。

警告: 对JVM进程永远不会退出的理解上,存在着一些错误和迷惑。如果你不关闭executorService,只是销毁了底层的线程,JVM就不会退出。当最后一个普通线程(非守护线程)退出后,JVM也会退出。

配置ThreadPoolExecutor

如果你决定不使用Executor的工厂类,而是手动创建一个 ThreadPoolExecutor,你需要使用构造函数来创建并配置。下面是这个类使用最广泛的一个构造函数:

public ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAlive,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler);

如你所见,你可以配置以下内容:

  • 核心池的大小(线程池将会使用的大小)

  • 最大池大小

  • 存活时间,空闲线程在这个时间后被销毁

  • 存放任务的工作队列

  • 任务提交拒绝后要执行的策略

限制队列中任务数

限制执行任务的并发数、限制线程池大小对应用程序以及程序执行结果的可预期性与稳定性有很大的好处。无尽地创建线程,最终会耗尽运行时资源。你的应用程序因此会产生严重的性能问题,甚至导致程序不稳定。

这只解决了部分问题:限制了并发任务数,但并没有限制提交到等待队列的任务数。如果任务提交的速率一直高于任务执行的速率,那么应用程序最终会出现资源短缺的状况。

解决方法是:

  • 为Executor提供一个存放待执行任务的阻塞队列。如果队列填满,以后提交的任务会被“拒绝”。

  • 当任务提交被拒绝时会触发RejectedExecutionHandler,这也是为什么这个类名中引用动词“rejected”。你可以实现自己的拒绝策略,或者使用框架内置的策略。

默认的拒绝策略可以让executor抛出一个RejectedExecutionException异常。然而,还有其他的内建策略:

  • 悄悄地丢弃一个任务

  • 丢弃最旧的任务,重新提交最新的

  • 在调用者的线程中执行被拒绝的任务

什么时候以及为什么我们才会这样配置线程池?让我们看一个例子。

示例:并行执行独立的单线程任务

最近,我被叫去解决一个很久以前的任务的问题,我的客户之前就运行过这个任务。大致来说,这个任务包含一个组件,这个组件监听目录树所产生的文件系统事件。每当一个事件被触发,必须处理一个文件。一个专门的单线程执行文件处理。说真的,根据任务的特点,即使我能把它并行化,我也不想那么做。一天的某些时候,事件到达率才很高,文件也没必要实时处理,在第二天之前处理完即可。

当前的实现采用了一些混合且匹配的技术,包括使用UNIX SHELL脚本扫描目录结构,并检测是否发生改变。实现完成后,我们采用了双核的执行环境。同样,事件的到达率相当低:目前为止,事件数以百万计,总共要处理1~2T字节的原始数据。

运行处理程序的主机是12核的机器:很好机会去并行化这些旧的单线程任务。基本上,我们有了食谱的所有原料,我们需要做的仅仅是把程序建立起来并调节。在写代码前,我们必须了解下程序的负载。我列一下我检测到的内容:

  • 有非常多的文件需要被周期性地扫描:每个目录包含1~2百万个文件

  • 扫描算法很快,可以并行化

  • 处理一个文件至少需要1s,甚至上升到2s或3s

  • 处理文件时,性能瓶颈主要是CPU

  • CPU利用率必须可调,根据一天时间的不同而使用不同的负载配置。

我需要这样一个线程池,它的大小在程序运行的时候通过负载配置来设置。我倾向于根据负载策略创建一个固定大小的线程池。由于线程的性能瓶颈在CPU,它的核心使用率是100%,不会等待其他资源,那么负载策略就很好计算了:用执行环境的CPU核心数乘以一个负载因子(保证计算的结果在峰值时至少有一个核心):

int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后我需要使用阻塞队列创建一个ThreadPoolExecutor,可以限制提交的任务数。为什么?是这样,扫描算法执行很快,很快就产生庞大数量需要处理的文件。数量有多庞大呢?很难预测,因为变动太大了。我不想让executor内部的队列不加选择地填满了要执行的任务实例(这些实例包含了庞大的文件描述符)。我宁愿在队列填满时,拒绝这些文件。

而且,我将使用ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略。为什么?因为当队列已满时,线程池的线程忙于处理文件,我让提交任务的线程去执行它(被拒绝的任务)。这样,扫面会停止,转而去处理一个文件,处理结束后马上又会扫描目录。

下面是创建executor的代码:

ExecutorService executorService =
    new ThreadPoolExecutor(
        maxThreads, // core thread pool size
        maxThreads, // maximum thread pool size
        1, // time to wait before resizing pool
        TimeUnit.MINUTES, 
        new ArrayBlockingQueue<Runnable>(maxThreads, true),
        new ThreadPoolExecutor.CallerRunsPolicy());

 下面是程序的框架(极其简化版):

// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {
    File currentDir = dirsToProcess.pop();

    // listing children
    File[] children = currentDir.listFiles();

    // processing children
    for (final File currentFile : children) {
        // if it&#39;s a directory, defer processing
        if (currentFile.isDirectory()) {
            dirsToProcess.add(currentFile);
            continue;
        }

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    // if it&#39;s a file, process it
                    new ConvertTask(currentFile).perform();
                } catch (Exception ex) {
                    // error management logic
                }
            }
        });
    }
}

// ...
// wait for all of the executor threads to finish
executorService.shutdown();
try {
    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the first try
        executorService.shutdownNow();
    }

    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        // pool didn&#39;t terminate after the second try
    }
} catch (InterruptedException ex) {
    executorService.shutdownNow();
    Thread.currentThread().interrupt();
}

总结

看到了吧,Java并发API非常简单易用,十分灵活,也很强大。真希望我多年前可以多花点功夫写一个这样简单的程序。这样我就可以在几小时内解决由传统单线程组件所引发的扩展性问题。

以上是Java中使用ThreadPoolExecutor并行执行独立的单线程任务的详细介绍的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
在平台独立性的平台独立性上使用字节码优于本机代码的优点是什么?在平台独立性的平台独立性上使用字节码优于本机代码的优点是什么?Apr 30, 2025 am 12:24 AM

ByteCodeachievesPlatFormIndenceByByByByByByExecutedBoviratualMachine(VM),允许CodetorunonanyplatformwithTheApprepreprepvm.Forexample,Javabytecodecodecodecodecanrunonanydevicewithajvm

Java真的100%独立于平台吗?为什么或为什么不呢?Java真的100%独立于平台吗?为什么或为什么不呢?Apr 30, 2025 am 12:18 AM

Java不能做到100%的平台独立性,但其平台独立性通过JVM和字节码实现,确保代码在不同平台上运行。具体实现包括:1.编译成字节码;2.JVM的解释执行;3.标准库的一致性。然而,JVM实现差异、操作系统和硬件差异以及第三方库的兼容性可能影响其平台独立性。

Java的平台独立性如何支持代码可维护性?Java的平台独立性如何支持代码可维护性?Apr 30, 2025 am 12:15 AM

Java通过“一次编写,到处运行”实现平台独立性,提升代码可维护性:1.代码重用性高,减少重复开发;2.维护成本低,只需一处修改;3.团队协作效率高,方便知识共享。

为新平台创建JVM面临哪些挑战?为新平台创建JVM面临哪些挑战?Apr 30, 2025 am 12:15 AM

在新平台上创建JVM面临的主要挑战包括硬件兼容性、操作系统兼容性和性能优化。1.硬件兼容性:需要确保JVM能正确使用新平台的处理器指令集,如RISC-V。2.操作系统兼容性:JVM需正确调用新平台的系统API,如Linux。3.性能优化:需进行性能测试和调优,调整垃圾回收策略以适应新平台的内存特性。

Javafx库如何试图解决GUI开发中的平台不一致?Javafx库如何试图解决GUI开发中的平台不一致?Apr 30, 2025 am 12:01 AM

javafxeffectife addressEddressEndressInconSiscies uningies uningusing inaplatform-agnosticsCenegraphandCssStyling.1)itabstractsplactsplatsplatsplatsplatformsthercensthascenegenceenceNaSceneGraph,确保ConsistSistEntertRenderingRenderingRenderingRenderingAccomWindows,MacOs,MacOS,MacOS,andlinux.2)

说明JVM如何充当Java代码和基础操作系统之间的中介。说明JVM如何充当Java代码和基础操作系统之间的中介。Apr 29, 2025 am 12:23 AM

JVM的工作原理是将Java代码转换为机器码并管理资源。1)类加载:加载.class文件到内存。2)运行时数据区:管理内存区域。3)执行引擎:解释或编译执行字节码。4)本地方法接口:通过JNI与操作系统交互。

解释Java虚拟机(JVM)在Java平台独立性中的作用。解释Java虚拟机(JVM)在Java平台独立性中的作用。Apr 29, 2025 am 12:21 AM

JVM使Java实现跨平台运行。1)JVM加载、验证和执行字节码。2)JVM的工作包括类加载、字节码验证、解释执行和内存管理。3)JVM支持高级功能如动态类加载和反射。

您将采取哪些步骤来确保Java应用程序在不同的操作系统上正确运行?您将采取哪些步骤来确保Java应用程序在不同的操作系统上正确运行?Apr 29, 2025 am 12:11 AM

Java应用可通过以下步骤在不同操作系统上运行:1)使用File或Paths类处理文件路径;2)通过System.getenv()设置和获取环境变量;3)利用Maven或Gradle管理依赖并测试。Java的跨平台能力依赖于JVM的抽象层,但仍需手动处理某些操作系统特定的功能。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具

SublimeText3 英文版

SublimeText3 英文版

推荐:为Win版本,支持代码提示!

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

VSCode Windows 64位 下载

VSCode Windows 64位 下载

微软推出的免费、功能强大的一款IDE编辑器

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境