线程池(Thread pool)是一种线程使用模式,在其内维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。ThreadPoolExecutor线程池是JDK线程池的关键实现,是线程池的核心类,Executors静态工厂中的预定义线程池很多都是以这个类为基础实现的。
4、线程池四种拒绝策略
CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy是四个拒绝策略类。对于正在执行的线程数大于等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,任务会交给RejectedExecutionHandler来处理。
- AbortPolicy:调用者的线程直接抛出异常,作为默认拒绝策略;
- CallerRunsPolicy:用调用者所在的线程执行任务;
- DiscardOldestPolicy:丢弃队列中最靠前的任务并执行当前任务;
- DiscardPolicy:直接丢弃当前任务;
/* Predefined RejectedExecutionHandlers */
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
// 不抛弃任务,请求调用线程池的主线程(比如main),帮忙执行任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@link RejectedExecutionException}.
*
* This is the default handler for {@link ThreadPoolExecutor} and
* {@link ScheduledThreadPoolExecutor}.
*/
// 抛出异常,丢弃任务
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
// 直接丢弃任务,丢弃等待时间最短的任务
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
// 直接丢弃任务,丢弃等待时间最长的任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
5、如何定义线程池参数
CPU密集型: 线程池的大小推荐为CPU数量 + 1,CPU数量可以根据Runtime.availableProcessors方法获取
IO密集型: CPU数量 * CPU利用率 * (1 + 线程等待时间/线程CPU时间)
混合型: 将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,从而使每个线程池可以根据各自的工作负载来调整
阻塞队列: 推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生
拒绝策略: 默认采用的是AbortPolicy拒绝策略,直接在程序中抛出RejectedExecutionException异常【因为是运行时异常,不强制catch】,这种处理方式不够优雅。处理拒绝策略有以下几种比较推荐:
- 在程序中捕获RejectedExecutionException异常,在捕获异常中对任务进行处理。针对默认拒绝策略
- 使用CallerRunsPolicy拒绝策略,该策略会将任务交给调用execute的线程执行【一般为主线程】,此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低
- 自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可
- 如果任务不是特别重要,使用DiscardPolicy和DiscardOldestPolicy拒绝策略将任务丢弃也是可以的
如果使用Executors的静态方法创建ThreadPoolExecutor对象,可以通过使用Semaphore对任务的执行进行限流也可以避免出现OOM异常
6、使用实战
自定义线程池,可以用ThreadPool Executor类创建,它有多个构造方法来创建线程池,用该类很容易实现自定义的线程池。
线程池统一通过ThreadFactory创建新线程,可以说是工厂模式的应用。默认使用Executors.defaultThreadFactory工厂,该工厂创建的线程全部位于同一个ThreadGroup中,并且具有pool-N-thread-M的线程命名(N表示线程池工厂编号,M表示一个工厂创建的线程编号,都是自增的)和非守护进程状态。
6.1 基础使用方式代码示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest{
public static void main(String[] args){
//创建等待队列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//创建线程池,池中保存的线程数为3,允许的最大线程数为5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
//创建七个任务
Runnable t1 = new MyThread();
Runnable t2 = new MyThread();
Runnable t3 = new MyThread();
Runnable t4 = new MyThread();
Runnable t5 = new MyThread();
Runnable t6 = new MyThread();
Runnable t7 = new MyThread();
//每个任务会在一个线程上执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//关闭线程池
pool.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "正在执行...");
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
从结果中可以看出,七个任务是在线程池的三个线程上执行的。
pool-1-thread-1正在执行...
pool-1-thread-2正在执行...
pool-1-thread-3正在执行...
pool-1-thread-1正在执行...
pool-1-thread-3正在执行...
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...
6.2 使用ThreadPoolExecutor.AbortPolicy() 策略示例
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPool2 {
private static int produceTaskSleepTime = 2;
private static int produceTaskMaxNumber = 8;
public static void main( String[] args ) {
// 构造一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 4, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue < Runnable >( 3 ), new ThreadPoolExecutor.AbortPolicy() );
for ( int i = 1; i <= produceTaskMaxNumber; i++ ) {
try {
// 产生一个任务,并将其加入到线程池
String task = "task@ " + i;
System.out.println( "put " + task );
threadPool.execute( new ThreadPoolTask( task ) );
// 便于观察,等待一段时间
Thread.sleep( produceTaskSleepTime );
}
catch ( Exception e ) {
e.printStackTrace();
}
}
}
}
/**
* 线程池执行的任务
*/
class ThreadPoolTask implements Runnable, Serializable {
private static final long serialVersionUID = 0;
// 保存任务所需要的数据
private Object threadPoolTaskData;
ThreadPoolTask( Object tasks ) {
this.threadPoolTaskData = tasks;
}
public void run() {
// 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
System.out.println( Thread.currentThread().getName() );
System.out.println( "start .." + threadPoolTaskData );
try {
// //便于观察,等待一段时间
Thread.sleep( 2000 );
}
catch ( Exception e ) {
e.printStackTrace();
}
threadPoolTaskData = null;
}
public Object getTask() {
return this.threadPoolTaskData;
}
}
程序运行结果:
put task@ 1
pool-1-thread-1
start ..task@ 1
put task@ 2
pool-1-thread-2
start ..task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
pool-1-thread-3
start ..task@ 6
put task@ 7
pool-1-thread-4
start ..task@ 7
put task@ 8
java.util.concurrent.RejectedExecutionException: Task outputMml2.ThreadPoolTask@5aaa6d82 rejected from java.util.concurrent.ThreadPoolExecutor@69222c14[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at outputMml2.TestThreadPool2.main(TestThreadPool2.java:22)
pool-1-thread-1
start ..task@ 3
pool-1-thread-2
start ..task@ 4
pool-1-thread-3
start ..task@ 5
6.3 使用ThreadPoolExecutor.DiscardOldestPolicy() 策略示例
把上述代码的例子的策略改为:new ThreadPoolExecutor.DiscardOldestPolicy() ,程序的执行结果是:
put task@ 1
pool-1-thread-1
start ..task@ 1
put task@ 2
pool-1-thread-2
start ..task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
pool-1-thread-3
start ..task@ 6
put task@ 7
pool-1-thread-4
start ..task@ 7
put task@ 8
pool-1-thread-1
start ..task@ 4
pool-1-thread-2
start ..task@ 5
pool-1-thread-3
start ..task@ 8
从结果中可以看出来,最早进入队列的任务3被丢弃了。
6.4 ThreadFactory 线程工厂
线程池统一通过ThreadFactory创建新线程,可以说是工厂模式的应用。默认使用Executors.defaultThreadFactory工厂,该工厂创建的线程全部位于同一个ThreadGroup中,并且具有pool-N-thread-M的线程命名(N表示线程池工厂编号,M表示一个工厂创建的线程编号,都是自增的)和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从new Thread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。也可以通过实现ThreadFactory自定义线程工厂,示例如下:
public class ThreadPool {
private static ExecutorService pool;
public static void main(String[] args) {
//自定义线程池,最大线程数为3
pool = new ThreadPoolExecutor(3, 3, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
//自定义线程工厂,lambda
r -> {
System.out.println("线程" + r.hashCode() + "创建");
//线程命名
return new Thread(r, "threadPool-" + r.hashCode());
}, new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
pool.execute(new ThreadTask());
}
pool.shutdown();
}
}
class ThreadTask implements Runnable {
@Override
public void run() {
//输出执行线程的名称
System.out.println("ThreadName:" + Thread.currentThread().getName());
}
}
上例是通过匿名内部类的lambda形式创建ThreadFactory实例的,其他框架也提供了多种现成的方式:
- spring自带的的CustomizableThreadFactory。可以通过构造器和方法设置参数。
- guava的ThreadFactoryBuilder。可以通过方法的链式调用设置参数。
- commons-lang3的BasicThreadFactory。可以通过方法的链式调用设置参数
评论区