从Java5开始对Java线程的类库做了大量的扩展,其中线程池就是Java5的新特征之一。Executor框架作为一个灵活且强大的异步执行框架,支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程进行了解耦,基于生产者和消费者模型,还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能检测等机制。并提供了三个核心接口来满足使用者的需求:
-
Executor:提交普通的可执行任务
-
ExecutorService:提供对线程池生命周期的管理、异步任务的支持
-
ScheduledExecutorService:提供对任务的周期性执行支持
对应上面三种执行器接口,J.U.C提供了许多默认的接口实现,如果用户自己去创建这些类的实例,就需要了解这些类的细节,使用成本较高,所以J.U.C中还提供了一个Executors类,专门用于创建上述接口的实现类对象。
1、Executors介绍
Executors可以看作一个工具类,其实就是一个简单工厂,它的所有方法都是static的(静态方法),用户可以根据需要,选择需要创建的执行器实例,即内置线程池。Executors一共提供了六类可供创建的Executor执行器实例。
- newFixedThreadPool:创建一个固定大小的线程池
- newSingleThreadExecutor:创建单个线程的线程池
- newSingleThreadScheduledExecutor:创建执行定时任务的单个线程的线程池
- newScheduledThreadPool:创建执行定时任务的线程池
- newCachedThreadPool:带缓存的线程池,适用于短时间有大量任务的场景,但有可能会占用更多的资源;线程数量随任务量而定。
- newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用。创建时如果不设置任何参数,则以当前机器处理器个数作为线程个数。
在《阿里巴巴java开发手册》中不推荐使用 Executors 创建线程池,在强制编程规约中有这样一条:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式。这样的处理方式可以让写的人更加明确线程池的运行规则,规避资源耗尽的风险。
那么 Executors 创建的线程池对象有哪些弊端呢?
- FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积⼤量的请求,从⽽导致 OOM 。
- CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建⼤量的线程,从⽽导致 OOM。
2、使用Executors内置线程池
2.1 newFixedThreadPool固定大小线程池
使用 newFixedThreadPool 可以创建一个固定线程数的线程池,以共享的无界队列方式来运行这些线程。超出的任务会在队列中等待空闲的线程,可用于控制程序的最大并发数。
源码
/**
* 创建一个具有固定线程数的Executor.
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 创建一个具有固定线程数的Executor.
* 在需要时使用提供的 ThreadFactory 创建新线程.
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
-
corePoolSize与maximumPoolSize相等,即最大线程数就是核心线程数;keepAliveTime = 0 该参数默认对核心线程无效,因此不会超时,在线程池被关闭之前,池中被创建的线程将一直存在。
-
workQueue为LinkedBlockingQueue,是一个无界阻塞队列,队列容量为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列堆积大量任务,很有可能在执行拒绝策略之前就造成内存溢出。
-
适用于持续不断地提交任务的场景,并且要求任务提交速度不得超过线程处理速度。
使用语法
ExecutorService fixedPool = Executors.newFixedThreadPool(2);
- 最多2个线程将处于活动状态。
- 如果提交了两个以上的线程,那么它们将保持在队列中,直到线程可用。
- 如果一个线程由于执行关闭期间的失败而终止,则执行器尚未被调用,则创建一个新线程。
- 线程会一直存在,直到池关闭。
应用实例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* FixedThreadPool线程池
*/
public class FixedThreadPoolTest {
public static void main(String[] args) {
//创建一个可重用,固定线程数的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//创建实现了Runnable接口的类,如Thread
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
MyThread t3 = new MyThread();
MyThread t4 = new MyThread();
//将线程放入池中执行
threadPool.execute(t1);
threadPool.execute(t2);
threadPool.execute(t3);
threadPool.execute(t4);
//关闭线程池
threadPool.shutdown();
}
}
class MyThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在执行...");
}
}
程序运行结果:
pool-1-thread-2正在执行...
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...
pool-1-thread-2正在执行...
2.2 newSingleThreadExecutor单个线程的线程池
使用 newSingleThreadExecutor 可以创建一个单个线程的线程池,这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
源码
/**
* 创建一个使用单个 worker 线程的 Executor.
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 创建一个使用单个 worker 线程的 Executor.
* 在需要时使用提供的 ThreadFactory 创建新线程.
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory));
}
可以看到,只有单个线程的线程池其实就是指定线程数为1的固定线程池,主要区别就是,返回的Executor实例用了一个FinalizableDelegatedExecutorService对象进行包装。FinalizableDelegatedExecutorService 类只定义了一个finalize方法:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
核心是其继承的DelegatedExecutorService ,这是一个包装类,实现了ExecutorService的所有方法,但是内部实现其实都委托给了传入的ExecutorService 实例:
/**
* ExecutorService实现类的包装类.
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) {
e = executor;
}
public void execute(Runnable command) {
e.execute(command);
}
public void shutdown() {
e.shutdown();
}
public List<Runnable> shutdownNow() {
return e.shutdownNow();
}
public boolean isShutdown() {
return e.isShutdown();
}
public boolean isTerminated() {
return e.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
为什么要多此一举,加上这样一个委托层?因为返回的ThreadPoolExecutor包含一些设置线程池大小的方法——比如setCorePoolSize,对于只有单个线程的线程池来说,我们是不希望用户通过强转的方式使用这些方法的,所以需要一个包装类,只暴露ExecutorService本身的方法。
使用语法
ExecutorService executor = Executors.newSingleThreadExecutor();
- newSingleThreadExecutor方法创建一次执行单个任务的执行程序。
应用实例:
复用上述代码,将上例中创建线程池的代码改为:
ExecutorService threadPool = Executors.newSingleThreadExecutor();
程序运行结果可以看出只有1个线程在执行:
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
pool-1-thread-1正在执行...
2.3 newScheduledThreadPool:创建执行定时任务的线程池
如果有任务需要延迟/周期调用,可以使用 newScheduledThreadPool 创建一个数量固定的线程池,它支持执行定时性或周期性任务。
源码
/**
* 创建一个具有固定线程数的 可调度Executor.
* 它可安排任务在指定延迟后或周期性地执行.
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* 创建一个具有固定线程数的 可调度Executor.
* 它可安排任务在指定延迟后或周期性地执行.
* 在需要时使用提供的 ThreadFactory 创建新线程.
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
使用语法
ExecutorService executor = Executors.newScheduledThreadPool(1);
应用实例:
复用上述代码,将上例中创建线程池的代码改为:
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(2);
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
MyThread t3 = new MyThread();
MyThread t4 = new MyThread();
//延迟执行
threadPool.schedule(t1,5, TimeUnit.MILLISECONDS);
threadPool.schedule(t2,5, TimeUnit.MILLISECONDS);
threadPool.schedule(t3,5, TimeUnit.MILLISECONDS);
threadPool.schedule(t4,5, TimeUnit.MILLISECONDS);
//关闭线程池
threadPool.shutdown();
2.4 newSingleThreadScheduledExecutor创建执行定时任务的单个线程的线程池
使用 newSingleThreadScheduledExecutor 可以创建一个执行定时任务的单个线程的线程池,此线程池就是单线程的 newScheduledThreadPool。
源码
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
使用语法
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- newSingleThreadScheduledExecutor:产生一个ScheduledExecutorService对象,这个对象的线程池大小为1,如果任务多于一个,任务将按先后顺序执行。
应用实例:
复用上述代码,将上例中创建线程池的代码改为:
ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
2.5 newCachedThreadPool带缓存的线程池
有些情况下,我们虽然创建了具有一定线程数的线程池,但出于资源利用率的考虑,可能希望在特定的时候对线程进行回收(比如线程超过指定时间没有被使用),Executors就提供了这种类型的线程池:可变尺寸的线程池 newCachedThreadPool(带缓存的线程池)。
newCachedThreadPool是一个根据需要创建新线程的线程池,当一个任务提交时,corePoolSize为0不创建核心线程,SynchronousQueue是一个不存储元素的队列,可以理解为队里永远是满的,因此最终会创建非核心线程来执行任务。
对于非核心线程空闲60s时将被回收。因为Integer.MAX_VALUE非常大,可以认为是可以无限创建线程的,在资源有限的情况下容易引起OOM异常。
- 短时间内处理大量工作的线程池,会根据任务数量产生对应的线程,并试图缓存线程以便重复使用,如果限制 60 秒没被使用,则会被移除缓存。如果现有线程没有可用的,则创建一个新线程并添加到池中,如果有被使用完但是还没销毁的线程,就复用该线程。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
源码
/**
* 创建一个可缓存线程的Execotor.
* 如果线程池中没有线程可用, 则创建一个新线程并添加到池中;
* 如果有线程长时间未被使用(默认60s, 可通过threadFactory配置), 则从缓存中移除.
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* 创建一个可缓存线程的Execotor.
* 如果线程池中没有线程可用, 则创建一个新线程并添加到池中;
* 如果有线程长时间未被使用(默认60s, 可通过threadFactory配置), 则从缓存中移除.
* 在需要时使用提供的 ThreadFactory 创建新线程.
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
}
可以看到,返回的还是ThreadPoolExecutor对象,只是指定了超时时间,另外线程池中线程的数量在[0, Integer.MAX_VALUE]之间。
使用语法
ExecutorService executor = Executors.newCachedThreadPool();
- newCachedThreadPool()方法创建一个具有可扩展线程池的执行器。
- 这样的执行者适合于发起许多短命的任务的应用程序。
应用实例:
复用上述代码,将上例中创建线程池的代码改为:
ExecutorService threadPool = Executors.newCachedThreadPool();
程序运行结果:
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...
pool-1-thread-3正在执行...
pool-1-thread-4正在执行..
2.6 newWorkStealingPool抢占式执行的线程池
使用 newWorkStealingPool 可以创建一个抢占式执行的线程池(任务执行顺序不确定),是Java 8 新增创建线程池的方法,只有在 JDK 1.8+ 版本中才能使用。创建时如果不设置任何参数,则以当前机器处理器个数作为线程个数。
- newWorkStealingPool简单翻译是任务窃取线程池,核心实现就是ForkJoinPool类,Fork/Join线程池是比较特殊的一类线程池。
- 使用ForkJoinPool的好处是,把1个任务拆分成多个“小任务”,把这些“小任务”分发到多个线程上执行。这些“小任务”都执行完成后,再将结果合并。
- 之前的线程池中,多个线程共有一个阻塞队列,而newWorkStealingPool 中每一个线程都有一个自己的队列。
- 当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行。可以简单理解为”窃取“。
- 一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从头开始执行,一个从尾部开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。
/**
* 创建具有指定并行级别的ForkJoin线程池.
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}
/**
* 创建并行级别等于CPU核心数的ForkJoin线程池.
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
使用语法
ExecutorService executorService = Executors.newWorkStealingPool(3);
应用实例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WorkStealingTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newWorkStealingPool(3);
for (int i=1; i<= 100; i++){
executorService.submit(new MyWorker(i));
}
while (true){}
}
static class MyWorker implements Runnable {
int time;
public MyWorker(int time) {
this.time = time;
}
public void run() {
System.out.println(Thread.currentThread().getName()+"正在执行,数值:"+time);
}
}
}
发现确实创建了3个线程来执行任务。具体结果如下:
ForkJoinPool-1-worker-2正在执行,数值:2
ForkJoinPool-1-worker-3正在执行,数值:3
ForkJoinPool-1-worker-1正在执行,数值:1
ForkJoinPool-1-worker-3正在执行,数值:5
ForkJoinPool-1-worker-3正在执行,数值:7
ForkJoinPool-1-worker-3正在执行,数值:8
ForkJoinPool-1-worker-3正在执行,数值:9
......
把newWorkStealingPool(3)中参数去掉改成newWorkStealingPool(),结果如下:
ForkJoinPool-1-worker-1正在执行,数值:1
ForkJoinPool-1-worker-3正在执行,数值:2
ForkJoinPool-1-worker-4正在执行,数值:3
ForkJoinPool-1-worker-2正在执行,数值:6
ForkJoinPool-1-worker-4正在执行,数值:7
ForkJoinPool-1-worker-4正在执行,数值:8
ForkJoinPool-1-worker-4正在执行,数值:9
ForkJoinPool-1-worker-4正在执行,数值:10
ForkJoinPool-1-worker-7正在执行,数值:11
ForkJoinPool-1-worker-4正在执行,数值:12
ForkJoinPool-1-worker-4正在执行,数值:13
ForkJoinPool-1-worker-4正在执行,数值:14
ForkJoinPool-1-worker-4正在执行,数值:15
ForkJoinPool-1-worker-0正在执行,数值:16
......
发现确实创建了8个线程共同完成任务,因为我的CPU有8个核。
3、自定义线程池
ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。
以下只是一个简单的示例,更多详情可参考:java并发编程之ThreadPoolExecutor线程池详解及实战一
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 自定义线程池
*/
public class Test2 {
public static void main(String[] args) {
//创建等待队列
BlockingQueue bQueue = new ArrayBlockingQueue(20);
//创建一个单线程执行程序,可安排在给定延迟时间后执行
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,2,TimeUnit.MILLISECONDS,bQueue);
//创建实现了Runnable接口的类,如Thread
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
MyThread t3 = new MyThread();
MyThread t4 = new MyThread();
//将线程放入池中执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
//关闭线程池
pool.shutdown();
}
}
class MyThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在执行..."+System.currentTimeMillis());
}
}
4、Executor 源码
4.1 Executor 接口源码
package java.util.concurrent;
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
4.2 ExecutorService 接口源码
package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
4.3 Executors 线程池工厂源码
package java.util.concurrent;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.security.PrivilegedActionException;
import java.security.AccessControlException;
import sun.security.util.SecurityConstants;
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }};
}
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }};
}
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
static final class PrivilegedCallable<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;
PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
return task.call();
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}
static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
// Whether setContextClassLoader turns out to be necessary
// or not, we fail fast if permission is not available.
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.task = task;
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
Thread t = Thread.currentThread();
ClassLoader cl = t.getContextClassLoader();
if (ccl == cl) {
return task.call();
} else {
t.setContextClassLoader(ccl);
try {
return task.call();
} finally {
t.setContextClassLoader(cl);
}
}
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
private Executors() {}
}
4.4 默认线程工厂
Executors提供了一个默认的线程工厂实现!线程池中线程的默认名字的由来以及线程所属线程组等属性都是通过线程工厂设置的!
public static ThreadFactory defaultThreadFactory()
源码如下:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
/**
* DefaultThreadFactory 是默认的线程工厂,使用Executors.defaultThreadFactory()获取
*/
static class DefaultThreadFactory implements ThreadFactory {
//静态的原子变量, 用来统计线程工厂的个数。是static 的原子变量,用来记录当前线程池的编号,它是应用级别的,所有线程池共用一个,比如创建第一个线程池时线程池编号为l ,创建第二个线程池时线程池的编号为2,所以pool-1-thread-1 里面的pool -1 中的l 就是这个值。
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程组
private final ThreadGroup group;
//threadNumber 用来记录每个线程工厂创建了多少线程, 这两个值也作为线程池和线程的名称的一部分。线程池级别的,每个线程池使用该变量来记录该线程池中线程的编号, 所以pool- 1-thread- l 里面的thread-I 中的l 就是这个值。
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名字的前缀,线程池中线程名称的前缀,默认固定为pool o
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
/**
* newThread 方法是对线程的一个修饰
* @param r
* @return
*/
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
//线程名字由来
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
4.5 Runnable转换为Callable源码
public static < T > Callable< T > callable(Runnable task, T result)
- 返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。这在把需要 Callable 的方法应用到其他无结果的操作时很有用。
public static Callable< Object > callable(Runnable task)
- 返回 Callable 对象,调用它时可运行给定的任务并返回 null。
Runnable转换为Callable的原理实际就是适配器模式的应用,返回的是一个RunnableAdapter适配器类的实例。
/**
* 返回 Callable 对象,调用它时可运行给定的任务并返回 null。
*
* @param task 被适配的Runnable任务
* @return 一个Callable任务
* @throws NullPointerException 如果task为inull
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
/**
* 返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。
* 这在把需要 Callable 的方法应用到其他无结果的操作时很有用。
*
* @param task 被适配的Runnable任务
* @param result 指定返回值
* @return 一个Callable任务
* @throws NullPointerException 如果task为inull
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
/**
* 该类是Executors的内部类,实现了callable接口
* 作为一个适配器类,用于将runnable任务和result结果适配成Callable对象
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
/**
* 重写了call方法
* 适配原理很简单,调用task.run(),并且返回result
*
* @return result
*/
public T call() {
task.run();
return result;
}
}
评论区