侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 297 篇文章
  • 累计创建 134 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

java并发编程之ThreadPoolExecutor线程池详解及实战二

孔子说JAVA
2022-06-21 / 0 评论 / 0 点赞 / 71 阅读 / 8,702 字 / 正在检测是否收录...

线程池(Thread pool)是一种线程使用模式,在其内维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。ThreadPoolExecutor线程池是JDK线程池的关键实现,是线程池的核心类,Executors静态工厂中的预定义线程池很多都是以这个类为基础实现的。

4、线程池四种拒绝策略

image-1655433884413

CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy是四个拒绝策略类。对于正在执行的线程数大于等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,任务会交给RejectedExecutionHandler来处理。

  • AbortPolicy:调用者的线程直接抛出异常,作为默认拒绝策略;
  • CallerRunsPolicy:用调用者所在的线程执行任务;
  • DiscardOldestPolicy:丢弃队列中最靠前的任务并执行当前任务;
  • DiscardPolicy:直接丢弃当前任务;
    /* Predefined RejectedExecutionHandlers */
 
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    // 不抛弃任务,请求调用线程池的主线程(比如main),帮忙执行任务
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }
 
        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
 
    /**
     * A handler for rejected tasks that throws a
     * {@link RejectedExecutionException}.
     *
     * This is the default handler for {@link ThreadPoolExecutor} and
     * {@link ScheduledThreadPoolExecutor}.
     */
    // 抛出异常,丢弃任务
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }
 
        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
 
    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    // 直接丢弃任务,丢弃等待时间最短的任务
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
 
        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
 
    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    // 直接丢弃任务,丢弃等待时间最长的任务
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }
 
        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

5、如何定义线程池参数

CPU密集型: 线程池的大小推荐为CPU数量 + 1,CPU数量可以根据Runtime.availableProcessors方法获取
IO密集型: CPU数量 * CPU利用率 * (1 + 线程等待时间/线程CPU时间)
混合型: 将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,从而使每个线程池可以根据各自的工作负载来调整

阻塞队列: 推荐使用有界队列,有界队列有助于避免资源耗尽的情况发生
拒绝策略: 默认采用的是AbortPolicy拒绝策略,直接在程序中抛出RejectedExecutionException异常【因为是运行时异常,不强制catch】,这种处理方式不够优雅。处理拒绝策略有以下几种比较推荐:

  • 在程序中捕获RejectedExecutionException异常,在捕获异常中对任务进行处理。针对默认拒绝策略
  • 使用CallerRunsPolicy拒绝策略,该策略会将任务交给调用execute的线程执行【一般为主线程】,此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低
  • 自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可
  • 如果任务不是特别重要,使用DiscardPolicy和DiscardOldestPolicy拒绝策略将任务丢弃也是可以的

如果使用Executors的静态方法创建ThreadPoolExecutor对象,可以通过使用Semaphore对任务的执行进行限流也可以避免出现OOM异常

6、使用实战

自定义线程池,可以用ThreadPool Executor类创建,它有多个构造方法来创建线程池,用该类很容易实现自定义的线程池。

线程池统一通过ThreadFactory创建新线程,可以说是工厂模式的应用。默认使用Executors.defaultThreadFactory工厂,该工厂创建的线程全部位于同一个ThreadGroup中,并且具有pool-N-thread-M的线程命名(N表示线程池工厂编号,M表示一个工厂创建的线程编号,都是自增的)和非守护进程状态。

6.1 基础使用方式代码示例

import java.util.concurrent.ArrayBlockingQueue;   
import java.util.concurrent.BlockingQueue;   
import java.util.concurrent.ThreadPoolExecutor;   
import java.util.concurrent.TimeUnit;   
  
public class ThreadPoolTest{   
    public static void main(String[] args){   
        //创建等待队列   
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
        //创建线程池,池中保存的线程数为3,允许的最大线程数为5  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);   
        //创建七个任务   
        Runnable t1 = new MyThread();   
        Runnable t2 = new MyThread();   
        Runnable t3 = new MyThread();   
        Runnable t4 = new MyThread();   
        Runnable t5 = new MyThread();   
        Runnable t6 = new MyThread();   
        Runnable t7 = new MyThread();   
        //每个任务会在一个线程上执行  
        pool.execute(t1);   
        pool.execute(t2);   
        pool.execute(t3);   
        pool.execute(t4);   
        pool.execute(t5);   
        pool.execute(t6);   
        pool.execute(t7);   
        //关闭线程池   
        pool.shutdown();   
    }   
}   
  
class MyThread implements Runnable{   
    @Override   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "正在执行...");   
        try{   
            Thread.sleep(100);   
        }catch(InterruptedException e){   
            e.printStackTrace();   
        }   
    }   
}

从结果中可以看出,七个任务是在线程池的三个线程上执行的。

pool-1-thread-1正在执行...
pool-1-thread-2正在执行...
pool-1-thread-3正在执行...
pool-1-thread-1正在执行...
pool-1-thread-3正在执行...
pool-1-thread-2正在执行...
pool-1-thread-1正在执行...

6.2 使用ThreadPoolExecutor.AbortPolicy() 策略示例

import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class TestThreadPool2 {
    private static int produceTaskSleepTime = 2;
    private static int produceTaskMaxNumber = 8;
 
    public static void main( String[] args ) {
        // 构造一个线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 4, 3, TimeUnit.SECONDS,
                        new ArrayBlockingQueue < Runnable >( 3 ), new ThreadPoolExecutor.AbortPolicy() );
 
        for ( int i = 1; i <= produceTaskMaxNumber; i++ ) {
            try {
                // 产生一个任务,并将其加入到线程池
                String task = "task@ " + i;
                System.out.println( "put " + task );
                threadPool.execute( new ThreadPoolTask( task ) );
 
                // 便于观察,等待一段时间
                Thread.sleep( produceTaskSleepTime );
            }
            catch ( Exception e ) {
                e.printStackTrace();
            }
 
           
        }
    }
}
 
 
/**
 * 线程池执行的任务
 */
class ThreadPoolTask implements Runnable, Serializable {
    private static final long serialVersionUID = 0;
    // 保存任务所需要的数据
    private Object threadPoolTaskData;
 
    ThreadPoolTask( Object tasks ) {
        this.threadPoolTaskData = tasks;
    }
 
    public void run() {
        // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
        System.out.println( Thread.currentThread().getName() );
        System.out.println( "start .." + threadPoolTaskData );
 
        try {
            // //便于观察,等待一段时间
            Thread.sleep( 2000 );
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
        threadPoolTaskData = null;
    }
 
    public Object getTask() {
        return this.threadPoolTaskData;
    }
 
}

程序运行结果:

put task@ 1
pool-1-thread-1
start ..task@ 1
put task@ 2
pool-1-thread-2
start ..task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
pool-1-thread-3
start ..task@ 6
put task@ 7
pool-1-thread-4
start ..task@ 7
put task@ 8
java.util.concurrent.RejectedExecutionException: Task outputMml2.ThreadPoolTask@5aaa6d82 rejected from java.util.concurrent.ThreadPoolExecutor@69222c14[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
	at outputMml2.TestThreadPool2.main(TestThreadPool2.java:22)
pool-1-thread-1
start ..task@ 3
pool-1-thread-2
start ..task@ 4
pool-1-thread-3
start ..task@ 5

6.3 使用ThreadPoolExecutor.DiscardOldestPolicy() 策略示例

把上述代码的例子的策略改为:new ThreadPoolExecutor.DiscardOldestPolicy() ,程序的执行结果是:

put task@ 1
pool-1-thread-1
start ..task@ 1
put task@ 2
pool-1-thread-2
start ..task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
pool-1-thread-3
start ..task@ 6
put task@ 7
pool-1-thread-4
start ..task@ 7
put task@ 8
pool-1-thread-1
start ..task@ 4
pool-1-thread-2
start ..task@ 5
pool-1-thread-3
start ..task@ 8

从结果中可以看出来,最早进入队列的任务3被丢弃了。

6.4 ThreadFactory 线程工厂

线程池统一通过ThreadFactory创建新线程,可以说是工厂模式的应用。默认使用Executors.defaultThreadFactory工厂,该工厂创建的线程全部位于同一个ThreadGroup中,并且具有pool-N-thread-M的线程命名(N表示线程池工厂编号,M表示一个工厂创建的线程编号,都是自增的)和非守护进程状态。

通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从new Thread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。也可以通过实现ThreadFactory自定义线程工厂,示例如下:

public class ThreadPool {
    private static ExecutorService pool;

    public static void main(String[] args) {
        //自定义线程池,最大线程数为3
        pool = new ThreadPoolExecutor(3, 3, 1000, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(10),
                //自定义线程工厂,lambda
                r -> {
                    System.out.println("线程" + r.hashCode() + "创建");
                    //线程命名
                    return new Thread(r, "threadPool-" + r.hashCode());
                }, new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask());
        }
        pool.shutdown();
    }
}

class ThreadTask implements Runnable {
    @Override
    public void run() {
        //输出执行线程的名称
        System.out.println("ThreadName:" + Thread.currentThread().getName());
    }
}

上例是通过匿名内部类的lambda形式创建ThreadFactory实例的,其他框架也提供了多种现成的方式:

  • spring自带的的CustomizableThreadFactory。可以通过构造器和方法设置参数。
  • guava的ThreadFactoryBuilder。可以通过方法的链式调用设置参数。
  • commons-lang3的BasicThreadFactory。可以通过方法的链式调用设置参数
0

评论区