设为首页 加入收藏

TOP

使用ThreadPoolExecutor并行执行独立的单线程任务(一)
2017-09-30 16:23:52 】 浏览:5647
Tags:使用 ThreadPoolExecutor 并行 执行 独立 线程 任务

Java SE 5.0中引入了任务执行框架,这是简化多线程程序设计开发的一大进步。使用这个框架可以方便地管理任务:管理任务的生命周期以及执行策略。

在这篇文章中,我们通过一个简单的例子来展现这个框架所带来的灵活与简单。

基础

执行框架引入了Executor接口来管理任务的执行。Executor是一个用来提交Runnable任务的接口。这个接口将任务提交与任务执行隔离起来:拥有不同执行策略的executor都实现了同一个提交接口。改变执行策略不会影响任务的提交逻辑。

如果你要提交一个Runnable对象来执行,很简单:

1
2
Executor exec = …;
exec.execute(runnable);

线程池

如前所述,executor如何去执行提交的runnable任务并没有在Executor接口中规定,这取决于你所用的executor的具体类型。这个框架提供了几种不同的executor,执行策略针对不同的场景而不同。

你可能会用到的最常见的executor类型就是线程池executor,也就是ThreadPoolExecutor类(及其子类)的实例。ThreadPoolExecutor管理着一个线程池和一个工作队列,线程池存放着用于执行任务的工作线程。

你肯定在其他技术中也了解过“池”的概念。使用“池”的一个最大的好处就是减少资源创建的开销,用过并释放后,还可以重用。另一个间接的好处是你可以控制使用资源的多少。比如,你可以调整线程池的大小达到你想要的负载,而不损害系统的资源。

这个框架提供了一个工厂类,叫Executors,来创建线程池。使用这个工程类你可以创建不同特性的线程池。尽管底层的实现常常是一样的(ThreadPoolExecutor),但工厂类可以使你不必使用复杂的构造函数就可以快速地设置一个线程池。工程类的工厂方法有:

  • newFixedThreadPool:该方法返回一个最大容量固定的线程池。它会按需创建新线程,线程数量不大于配置的数量大小。当线程数达到最大以后,线程池会一直维持这么多不变。
  • newCachedThreadPool:该方法返回一个无界的线程池,也就是没有最大数量限制。但当工作量减小时,这类线程池会销毁没用的线程。
  • newSingleThreadedExecutor:该方法返回一个executor,它可以保证所有的任务都在一个单线程中执行。
  • newScheduledThreadPool:该方法返回一个固定大小的线程池,它支持延时和定时任务的执行。

这仅仅是一个开端。Executor还有一些其他用法已超出了这篇文章的范围,我强烈推荐你研究以下内容:

  • 生命周期管理的方法,这些方法由ExecutorService接口声明(比如shutdown()和awaitTermination())。
  • 使用CompletionService来查询任务状态、获取返回值,如果有返回值的话。

ExecutorService接口特别重要,因为它提供了关闭线程池的方法,并确保清理了不再使用的资源。令人欣慰的是,ExecutorService接口相当简单、一目了然,我建议全面地学习下它的文档。

大致来说,当你向ExecutorService发送了一个shutdown()消息后,它就不会接收新提交的任务,但是仍在队列中的任务会被继续处理完。你可以使用isTerminated()来查询ExecutorService终止状态,或使用awaitTermination(…)方法来等待ExecutorService终止。如果传入一个最大超时时间作为参数,awaitTermination方法就不会永远等待。

警告: 对JVM进程永远不会退出的理解上,存在着一些错误和迷惑。如果你不关闭executorService,只是销毁了底层的线程,JVM就不会退出。当最后一个普通线程(非守护线程)退出后,JVM也会退出。

配置ThreadPoolExecutor

如果你决定不使用Executor的工厂类,而是手动创建一个 ThreadPoolExecutor,你需要使用构造函数来创建并配置。下面是这个类使用最广泛的一个构造函数:

1
2
3
4
5
6
7
public ThreadPoolExecutor(
     int corePoolSize,
     int maxPoolSize,
     long keepAlive,
     TimeUnit unit,
     BlockingQueue<Runnable> workQueue,
     RejectedExecutionHandler handler);

如你所见,你可以配置以下内容:

  • 核心池的大小(线程池将会使用的大小)
  • 最大池大小
  • 存活时间,空闲线程在这个时间后被销毁
  • 存放任务的工作队列
  • 任务提交拒绝后要执行的策略

限制队列中任务数

限制执行任务的并发数、限制线程池大小对应用程序以及程序执行结果的可预期性与稳定性有很大的好处。无尽地创建线程,最终会耗尽运行时资源。你的应用程序因此会产生严重的性能问题,甚至导致程序不稳定。

这只解决了部分问题:限制了并发任务数,但并没有限制提交到等待队列的任务数。如果任务提交的速率一直高于任务执行的速率,那么应用程序最终会出现资源短缺的状况。

解决方法是:

  • 为Executor提供一个存放待执行任务的阻塞队列。如果队列填满,以后提交的任务会被“拒绝”。
  • 当任务提交被拒绝时会触发RejectedExecutionHandler,这也是为什么这个类名中引用动词“rejected”。你可以实现自己的拒绝策略,或者使用框架内置的策略。

默认的拒绝策略可以让executor抛出一个RejectedExecutionException异常。然而,还有其他的内建策略:

  • 悄悄地丢弃一个任务
  • 丢弃最旧的任务,重新提交最新的
  • 在调用者的线程中执行被拒绝的任务

什么时候以及为什么我们才会这样配置线程池?让我们看一个例子。

示例:并行执行独立的单线程任务

最近,我被叫去解决一个很久以前的任务的问题,我的客户之前就运行过这个任务。大致来说,这个任务包含一个组件,这个组件监听目录树所产生的文件系统事件。每当一个事件被触发,必须处理一个文件。一个专门的单线程执行文件处理。说真的,根据任务的特点,即使我能把它并行化,我也不想那么做。一天的某些时候,事件到达率才很高,文件也没必要实时处理,在第二天之前处理完即可。

当前的实现采用了一些混合且匹配的技术,包括使用UNIX SHELL脚本扫描目录结构,并检测是否发生改变。实现完成后,我们采用了双核的执行环境。同样,事件的到达率相当低:目前为止,事件数以百万计,总共要处理1~2T字节的原始数据。

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇python:关于三级菜单的新手实现 下一篇python 零宽负预测先行断言 (心..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目