java中大数据高并发、异步处理、系统资源被占用造成的阻塞等场景中我们经常会用到多线程,而线程是稀缺资源,它的创建与销毁是个相对偏重且耗资源的操作,Java线程又依赖于内核线程,创建线程需要进行操作系统内核状态切换,为避免资源过度消耗需要设法重用线程执行多个任务,这时就出现了线程池的概念,其实就是一个线程缓存,负责对线程进行统一分配、调优与监控。
JDK1.5之前,对于多线程编程的支持还是比较原始的,只能通过继承 Thread类 或实现 Runnable接口 来编写多线程逻辑,功能也并不多。在JDK1.5的JUC包中,对Java的多线程应用做了一次全面的扩展,比如lock锁、并发容器等,还有一个重要的扩展就是出现了Executor执行框架。Executor执行框架将Java线程的应用做了更细致的功能划分,并且进行了功能的增强。
1、Executor框架概述
Executor执行框架对Java线程进行了功能的增强,我们只需创建线程任务、然后交给指定的线程池去执行,执行完毕之后等待获取返回结果即可,不再需要关注线程的创建、开启、执行、回收等基础性的中间工作,将任务与线程解耦,程序员可以更加的关注和业务相关的线程任务逻辑。
Executor框架的最大优点是把任务的提交和执行解耦。要执行任务的人只需把Task描述清楚,然后提交即可。这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了。具体点讲,提交一个Callable对象给ExecutorService(如最常用的线程池ThreadPoolExecutor),将得到一个Future对象,调用Future对象的get方法等待执行结果就好了。
1.1 Executor 组成
1.1.1 线程任务
线程任务也就是工作单元,包括被执行任务需要实现的接口:Runnable 接口或者 Callable 接口。
- JDK1.5之前,只有Runnable代表线程任务,对于受检异常必须手动在 try catch 中处理,不支持 throws 声明可能抛出的异常,不支持任务返回值。
- JDK1.5开始,出现了Callable接口,可以看作 Runnable 的增强,对于受检异常可以不用在 try catch 中处理,支持 throws 声明可能抛出的异常,并且支持任务返回值。
1.1.2 执行器
执行器会把任务分派给多个线程去执行,包括 Executor 接口及继承自 Executor 接口的 ExecutorService 接口,Executor 框架有两个关键类实现了 ExecutorService 接口 (ThreadPoolExecutor 和 ScheduThreadPoolExecutor)
- JDK1.5之前,线程任务的执行需要我们手动创建Thread对象、传入任务、并调用start方法,一个任务对应一个线程,它们之间的关联非常紧密,这样对于线程任务的管理、线程资源的复用等功能几乎没有,或者只能自己手动实现,非常麻烦。
- JDK1.5开始,出现了Executor线程池。线程池作为任务执行器,我们只需要创建指定的线程池,随后将线程任务传入线程池中,由线程池来确定是将任务直接分配给池中的线程去执行、还是创建线程并执行、或者是加入任务队列等待等等逻辑,使用线程池之后我们不再需要手动创建线程去执行,并且可以实现线程的复用以及线程任务的管理等强大的功能。执行器(线程池)将任务与线程解耦!
1.1.3 异步执行结果
异步执行结果包括 Future 接口及实现了 Future 接口的 FutureTask 类。
- JDK1.5之前,在线程任务启动之后,对于线程任务监控几乎没有,我们不知道任务有没有完成,也没办法定义任务的返回值等一系列信息。
- JDK1.5开始,出现了Future接口以及它的各种实现。这个接口体系代表了线程任务异步计算的结果,通常与Callable线程任务连用。利用了Future设计模式,在一个线程A执行线程任务的时候,我么可以在另一个线程B中异步的通过Future的实现的相关方法完成判断对应线程任务是否执行完毕、是否被取消、手动取消正在执行的线程任务、以及从执行完毕的线程任务中获取返回值等功能。
1.2 Thread 与 Executor 的比较
Thread存在的问题:
- 性能差,每次调用new Thread() 时都会创建新的对象。
- 线程缺乏统一的管理,可能会无限的新建线程,相互之间竞争可能会导致占用过多的系统资源而发生死机或者内存溢出问题。
- 缺乏对线程的功能控制. 比如定时执行,定期执行,线程中断等。
Executor特点:
- Executor启动线程比Thread中的start() 方法启动线程更加易于管理,效率更高,节约开销。
- 有助于避免this逃逸问题: 如果在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能访问到初始化一半的对象,导致引用不完整问题。
- 实现线程资源的合理复用,性能高,可以重用已经存在的线程,减少对象的创建开销,避免过多的资源竞争,造成阻塞。
- 可以对线程和线程任务实现实时监控和管理。比如目前活动线程数、曾经的最大线程数、已完成的任务数量等功能;控制最大并发线程数,在线程任务执行前-执行完毕后-线程池停止后具有可选的回调方法、移除某个线程任务、立即停止线程池等功能。
- 提供线程的定时执行,定期执行,单线程,并发数控制等功能。
- 提升任务执行效率。当新来一个线程任务的时候,由于具有线程复用计数因此可以直接利用现有的线程去执行任务,不需要新建线程,这样一定程度上提升了执行效率。
- Executor中实现了对生命周期的支持,统计信息收集,应用程序管理机制和性能监视机制。
- Executor作为异步执行框架,支持多种不同类型的任务执行策略,提供了一种标准方法将任务的提交过程和执行过程进行解耦。(基于生产者-消费者模式,使用Runnable来表示任务,提交任务的线程相当于生产者,执行任务的线程相当于消费者)。
线程池的好处:
线程池减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。同时可以根据系统的承受能力,调整线程池中工作线线程的数目,防止消耗过多的内存。
- 降低资源消耗 - 重用存在的线程,减少对象创建、消亡的开销,性能好。
- 提高响应速度 - 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞。当任务到达时,任务可不用等待线程创建就能立即执行。
- 提高线程的可管理性 - 提供定时执行、定期执行、单线程、并发数控制等功能。
1.3 Executor 框架的两级调度
Executor 框架为两级调度,在上层,Java 多线程程序通常把应用分解成为若干任务,然后使用用户级的调度器 (Executor 框架) 将这些任务映射成为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
- 应用程序通过 Executor 框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
2、Executor类及接口
2.1 Executor 接口
Executor 是"执行者"接口,是 Executor 框架的基础,提供了任务和执行的解耦机制,它将任务的提交与任务的执行分离开来。准确的说,Executor提供了execute()接口来执行已提交的 Runnable 任务的对象。Executor存在的目的是提供一种将 “任务提交” 与 “任务如何运行” 分离开来的机制。它只包含一个函数接口。
Executor定义了一个接收 Runnable 对象的方法 executor,其方法签名为 executor(Runnable command),该方法接收一个 Runable 实例,用来执行一个任务,任务即一个实现了 Runnable 接口的类。
- Executors 可以把一个 Runnable 对象封装成为一个 Callable 对象,然后将 Runnable 对象或者 Callable 对象直接交给 ExecutorService 执行。
- 传统多线程编程中,Runnable 任务在新线程中的使用方法为:
new Thread(new RunnableTask())).start()
。 - 而在 Executor 中,可以使用 Executor 而不用显示地创建线程:
executor.execute(new RunnableTask())
; 该方法是异步执行的。
Future就是异步执行的结果,是submit方法返回,通过futrue.get()获取计算结果。
Callable和Runnable接口,这两个接口的不同之处就是Callable可以返回结果,Runnable则不可。如果有submit提交一个Runnable,后台会用Executors.callable(runnable, result)转成Callable。
方法 | 描述 |
---|---|
void execute(Runnable command) |
在将来的某个时间执行给定的命令。 |
2.2 ExecutorService 接口
ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口,使用更广泛。其提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法。
ExecutorService 将返回一个实现 Future 接口的对象。由于 FutureTask 实现了 Runnable接口,所以我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。主线程可以执行 FutureTask.get() 方法等待任务执行完成并获取计算结果。
-
新增了可为跟踪一个或多个异步任务执行状况而生成 Future 的方法,比如submit方法,作为execute方法的扩展。
- 调用 ExecutorService.submit() 方法返回的 Future 对象,可以调用isDone()方法查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用get()获取结果,在这种情况下,get()将阻塞,直至结果准备就绪,还可以取消任务的执行。Future 提供了 cancel() 方法用来取消执行 pending 中的任务。
-
新增了可以关闭线程池的方法,比如 shutdown 和 shutdownNow 方法。
- 调用 ExecutorService 的 shutdown()方法可以平滑地关闭 ExecutorService,调用该方法后,ExecutorService 将停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。
-
新增了批量执行任务的方法,比如 invokeAny 和 invokeAll 方法。
方法 | 描述 |
---|---|
boolean awaitTermination(long timeout, TimeUnit unit) |
阻止所有任务在关闭请求完成后执行,或发生超时,或当前线程中断,以先到者为准。 |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) |
执行给定的任务,返回持有它们的状态和结果的所有完成的列表。 |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
执行给定的任务,返回在所有完成或超时到期时持有其状态和结果的列表,以先发生者为准。 |
<T> T invokeAny(Collection<? extends Callable<T>> tasks) |
执行给定的任务,返回一个已经成功完成的结果(即不抛出异常)。 |
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
执行给定的任务,返回一个已经成功完成的结果(即,不抛出异常),如果有则在给定的超时过去之前。 |
boolean isShutdown() |
如果执行程序已关闭,则返回true。 |
boolean isTerminated() |
如果所有任务在关闭后完成,则返回true。 |
void shutdown() |
启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 |
List<Runnable> shutdownNow() |
尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。 |
<T> Future<T> submit(Callable<T> task) |
提交值返回任务以执行,并返回代表任务待处理结果。 |
Future<?> submit(Runnable task) |
提交一个可运行的任务执行,并返回一个表示该任务的Future。 |
<T> Future<T> submit(Runnable task, T result) |
提交一个可运行的任务执行,并返回一个表示该任务的Future。 |
2.3 ScheduleExcutorService 接口
ScheduleExcutorService 接口扩展了ExecutorService 接口,支持Future和定期执行任务。
方法 | 描述 |
---|---|
ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) |
创建并执行在给定延迟后启用ScheduledFuture。 |
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) |
创建并执行在给定延迟后启用的单次操作。 |
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) |
创建并执行在给定的初始延迟之后,随后以给定的时间段首先启用的周期性动作; 那就是执行会在initialDelay之后开始,然后是 initialDelay + period ,然后是 initialDelay + 2 * period ,等等。 |
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) |
创建并执行在给定的初始延迟之后首先启用的定期动作,随后在一个执行的终止和下一个执行的开始之间给定的延迟。 |
2.4 AbstractExecutorService 抽象类
实现了ExecutorService的抽象类,提供 ExecutorService 执行方法的默认实现。比如对ExecutorService返回Future,实现为返回RunnableFuture。另一个作用是作为骨干实现最大限度对ExecutorService的实现功能进行了抽象。
2.5 ThreadPoolExecutor
继承了 AbstractExecutorService 的普通类,这是JDK线程池的核心实现。
- 它的构造器提供了各种可配置参数,比如线程数量、任务队列、拒绝策略等,方便我们自定义自己的线程池,以及各种钩子 (hook) 方法,方便追踪线程任务的执行。
// 完成给定Runnable的执行后调用的方法。
protected void afterExecute(Runnable r, Throwable t)
// 设置管理核心线程是否可以超时并在保持活动时间内没有任务到达时终止的策略,在新任务到达时根据需要进行替换。
void allowCoreThreadTimeOut(boolean value)
// 如果此池允许核心线程超时并在keepAlive时间内没有任务到达时终止,则返回true,在新任务到达时根据需要进行替换。
boolean allowsCoreThreadTimeOut()
// 阻止所有任务在关闭请求之后完成执行,或发生超时,或者当前线程被中断,以先发生者为准。
boolean awaitTermination(long timeout, TimeUnit unit)
// 在给定线程中执行给定Runnable之前调用的方法。
protected void beforeExecute(Thread t, Runnable r)
// 将来某个时候执行给定的任务。
void execute(Runnable command)
// 当不再引用此执行程序且没有线程时,调用shutdown。
protected void finalize()
// 返回正在执行任务的大致线程数。
int getActiveCount()
// 返回已完成执行的大致任务总数。
long getCompletedTaskCount()
// 返回核心线程数。
int getCorePoolSize()
// 返回线程保持活动时间,该时间是超过核心池大小的线程在终止之前保持空闲的时间量。
long getKeepAliveTime(TimeUnit unit)
// 返回同时存在于池中的最大线程数。
int getLargestPoolSize()
// 返回允许的最大线程数。
int getMaximumPoolSize()
// 返回池中当前的线程数。
int getPoolSize()
// 返回此执行程序使用的任务队列。
BlockingQueue getQueue() BlockingQueue getQueue()
// 返回不可执行任务的当前处理程序。
RejectedExecutionHandler getRejectedExecutionHandler()
// 返回已安排执行的大致任务总数。
long getTaskCount()
// 返回用于创建新线程的线程工厂。
ThreadFactory getThreadFactory()
// 如果此执行程序已关闭,则返回true。
boolean isShutdown()
// 如果关闭后所有任务都已完成,则返回true。
boolean isTerminated()
// 如果此执行程序在shutdown()或shutdownNow()之后终止但尚未完全终止,则返回true。
boolean isTerminating()
// 启动所有核心线程,导致它们无所事事地等待工作。
int prestartAllCoreThreads()
// 启动一个核心线程,导致它无所事事地等待工作。
boolean prestartCoreThread()
// 尝试从工作队列中删除已取消的所有Future任务。
void purge()
// 如果执行程序的内部队列存在,则从执行程序的内部队列中删除此任务,从而导致它在尚未启动时不会运行。
boolean remove(Runnable task)
// 设置核心线程数。
void setCorePoolSize(int corePoolSize)
// 设置线程在终止之前可以保持空闲的时间限制。
void setKeepAliveTime(long time, TimeUnit unit)
// 设置允许的最大线程数。
void setMaximumPoolSize(int maximumPoolSize)
// 为不可执行的任务设置新的处理程序。
void setRejectedExecutionHandler(RejectedExecutionHandler handler)
// 设置用于创建新线程的线程工厂。
void setThreadFactory(ThreadFactory threadFactory)
// 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。
void shutdown()
// 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
List《Runnable》 shutdownNow()
// Executor终止时调用的方法。
protected void terminated()
// 返回标识此池及其状态的字符串,包括运行状态和估计的工作和任务计数的指示。
String toString()
2.6 ScheduledThreadPoolExecutor
继承了 ThreadPoolExecutor 的普通类,并实现了ScheduleExcutorService 接口。可以看作 ThreadPoolExecutor 功能的扩展或增强。
它能够将线程任务延迟指定时间后执行,或者间隔固定时间多次执行。功能与Timer类似,但ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。Timer中一个任务出现异常之后会影响其他任务的执行,但是ScheduledThreadPoolExecutor不会。Timer中一个任务耗时较常会影响其他任务的执行,ScheduledThreadPoolExecutor不会。
// 修改或替换用于执行可调用的任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task)
// 修改或替换用于执行runnable的任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task)
// 执行所需延迟为零的命令。
void execute(Runnable command)
// 获取有关是否继续执行现有定期任务的策略,即使此执行程序已关闭也是如此。
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()
// 获取有关是否执行现有延迟任务的策略,即使此执行程序已关闭也是如此。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()
// 返回此执行程序使用的任务队列。
BlockingQueue <Runnable> getQueue()
// 获取有关是否应在取消时立即从工作队列中删除已取消任务的策略。
boolean getRemoveOnCancelPolicy()
// 创建并执行在给定延迟后变为启用的ScheduledFuture。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
// 创建并执行在给定延迟后启用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,随后在给定的时间段内启用; 即执行将在initialDelay之后开始,然后是initialDelay + period,然后是initialDelay + 2 * period,依此类推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 创建并执行一个周期性动作,该动作在给定的初始延迟之后首先被启用,并且随后在一次执行的终止和下一次执行的开始之间给定延迟。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
// 设置是否继续执行现有周期性任务的策略,即使此执行程序已关闭。
void setContinueExistingPeriodicTasksAfterShutdownPolicy (boolean value)
// 设置是否执行现有延迟任务的策略,即使此执行程序已关闭也是如此。
void setExecuteExistingDelayedTasksAfterShutdownPolicy (boolean value)
// 设置关于是否应在取消时立即从工作队列中删除已取消任务的策略。
void setRemoveOnCancelPolicy(boolean value)
// 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。
void shutdown()
// 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
List <Runnable> shutdownNow()
// 提交值返回任务以执行并返回表示任务的挂起结果的Future。
<T> Future<T> submit(Callable<T> task)
// 提交Runnable任务以执行并返回表示该任务的Future。
Future<?> submit(Runnable task)
// 提交Runnable任务以执行并返回表示该任务的Future。
<T> Future<T> submit(Runnable task, T result)
2.7 Executors
Executors 是独立出来的一个普通类(没有继承和实现关系,采用组合/聚合关系,图上没有注明),作为一个线程池静态工厂类。它通过静态工厂方法来创建不同类型的线程池,比如CachedThreadPool、FixedThreadPool等;提供了将Runnable包装、转换为Callable的方法;提供默认的ThreadFactory线程工厂的实现等功能。
使用Executors可以创建以下几种线程池:
2.7.1 newCachedThreadPool——不推荐使用
newCachedThreadPool() 创建一个 ExecutorService,该 ExecutorService 根据需要来创建线程,可以重复利用已存在的线程来执行任务(如果已有线程是空闲的可以重用已有的线程)。底层使用ThreadPoolExector。
- newCachedThreadPool先查看线程池中有没有以前建立的线程,如果有,就reuse(重新使用)。如果没有,就建一个新的线程加入池中。
- 缓存型池子通常用于执行一些生存期很短的异步型任务,因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。
- 能reuse的线程,必须是timeout IDLE内的池中线程,缺省 timeout 是60s,超过这个IDLE时长,线程实例将被终止及移出池。注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
特点:没有核心线程,等待队列使用同步队列,出现一个任务就创建一个临时线程去执行任务
-
问题:不会出现内存溢出,但是会浪费CPU资源,导致机器卡死。
2.7.2 newFixedThreadPool——不推荐使用
newFixedThreadPool(int numberOfThreads) 定长线程池,创建一个可重复使用的、固定线程数量的 ExecutorService。底层使用ThreadPoolExector。核心线程数和最大线程数由用户传入,可以设置线程的最大并发数,超出在队列中等待。newFixedThreadPool和SingleThreadExecutor类似,唯一的区别就是核心线程数不同,并且由于使用的是LinkedBlockingQueue,在资源有限的时候容易引起OOM异常。
- newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程。
- 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。
- 和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器。
- 从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同: fixed池线程数固定,并且是0秒IDLE(无IDLE)。cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
特定:核心线程,无临时线程。等待队列使用链表,等待队列无限长度。
-
问题:会导致内存溢出,因为等待队列无限长。
2.7.3 newSingleThreadExecutor——不推荐使用
newSingleThreadExecutor():创建单个工作线程 ExecutorService。
- 单例线程,任意时间池中只能有一个线程。用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 特点:创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO,LIFO, 优先级)执行。只有一个核心线程,依次执行任务。
2.7.4 newScheduledThreadPool
newScheduledThreadPool(int corePoolSize)定长线程池,核心线程数由用户传入,支持定时和周期任务执行。根据时间计划,延迟给定时间后创建 ExecutorService(或者周期性地创建 ExecutorService)。
- 调度型线程池,这个池子里的线程可以按schedule依次delay执行,或周期执行。
2.7.5 newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor():根据时间计划延迟创建单个工作线程 ExecutorService(或者周期性的创建)。
2.7.6 newWorkStealingPool
newWorkStealingPool():创建一个拥有多个任务队列(以便减少连接数)的 ExecutorService。
2.8 Runnable接口和Callable接口
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。
-
Callable和Runnable接口,这两个接口的不同之处就是Callable可以返回结果,Runnable则不可。如果有submit提交一个Runnable,后台会用Executors.callable(runnable, result)转成Callable。
-
除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable。
3、简单用例
3.1 Executor接口
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool();
executor.execute(new Task());
ThreadPoolExecutor pool = (ThreadPoolExecutor)executor;
pool.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果如下:
Running Task!
Task Completed
3.2 ExecutorService接口
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
executor.submit(new Task());
System.out.println("Shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 20);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果如下:
Shutdown executor
Running Task!
shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:302)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
at TestThread$Task.run(TestThread.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
3.3 ScheduledExecutorService接口
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> beepHandler =
scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);
scheduler.schedule(new Runnable() {
@Override
public void run() {
beepHandler.cancel(true);
scheduler.shutdown();
}
}, 10, TimeUnit.SECONDS);
}
static class BeepTask implements Runnable {
public void run() {
System.out.println("beep");
}
}
}
运行结果如下:
beep
beep
beep
beep
beep
3.4 newSingleThreadExecutor()方法
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
executor.submit(new Task());
System.out.println("Shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 20);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果如下:
Shutdown executor
Running Task!shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:302)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
at TestThread$Task.run(TestThread.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
评论区