侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 285 篇文章
  • 累计创建 125 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

java并发编程之ThreadPoolExecutor线程池详解及实战一

孔子说JAVA
2022-06-20 / 0 评论 / 0 点赞 / 51 阅读 / 12,777 字 / 正在检测是否收录...

线程池(Thread pool)是一种线程使用模式,在其内维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。ThreadPoolExecutor线程池是JDK线程池的关键实现,是线程池的核心类,Executors静态工厂中的预定义线程池很多都是以这个类为基础实现的。

  • 线程过多会带来调度开销,进而影响缓存局部性和整体性能。线程池主要提供任务执行,线程调度,线程池管理等等服务。
  • 可以用Executors静态工厂实现线程池,也可以通过 ThreadPoolExecutor 来自定义线程池。

image-1655426701870

Worker是对线程池中工作线程的包装,用于对于传递的任务或者任务队列中的任务进行执行、中断控制等操作。这其中还涉及到AQS,即Worker实现了简单的不可重入锁。

1、ThreadPoolExecutor API

1.1 线程池的工作流程和优先级

1.1.1 线程池的工作流程

  1. 判断核心线程数
  2. 判断任务能否加入到任务队列
  3. 判断最大线程数量
  4. 根据线程池的拒绝策略处理任务

线程池处理流程

image-1655435129256

1.1.2 线程池的提交优先级和执行优先级

提交优先级: 核心线程 > 工作等待队列 > 非核心线程
执行优先级: 核心线程 > 非核心线程 > 工作等待队列

验证代码:

public class ThreadPoolApp {
	public static void main(String[] args) throws Exception {
        ExecutorService executorService = new ThreadPoolExecutor(
                3, 6, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new CustomizableThreadFactory("TK-")
        );

        for (int i = 1; i <= 10; i++) {
            final int index = i;
            TimeUnit.MILLISECONDS.sleep(10);
            executorService.execute(() -> {
                try {
                    System.out.println(
                            String.format("%s Thread %s, index %d.",
                                    LocalDateTime.now().format(
                                            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
                                    ),
                                    Thread.currentThread().getName(), index)
                    );
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

结果:

2022-06-09 16:56:10 Thread TK-2, index 2.
2022-06-09 16:56:10 Thread TK-3, index 3.
2022-06-09 16:56:10 Thread TK-1, index 1.			// TK-1~TK-3为核心线程任务
2022-06-09 16:56:10 Thread TK-4, index 7.
2022-06-09 16:56:10 Thread TK-5, index 8.
2022-06-09 16:56:10 Thread TK-6, index 9.			// TK-7~TK-9为非核心线程任务		
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jaemon.demo.ThreadPoolApp$$Lambda$1/1780132728@52f759d7 rejected from java.util.concurrent.ThreadPoolExecutor@7cbd213e[Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.jaemon.demo.ThreadPoolApp.main(ThreadPoolApp.java:137)	
2022-06-09 16:56:15 Thread TK-2, index 4.
2022-06-09 16:56:15 Thread TK-1, index 5.
2022-06-09 16:56:15 Thread TK-3, index 6.			// TK-4~TK-6为工作队列中任务

1.2 ThreadPoolExecutor 构造方法

线程池核心类 ThreadPoolExecutor 为自定义线程池,提供了多个构造方法来创建线程池。由七个参数组合成四个构造方法。单从构造器来说,实际上很简单,首先对参数进行校验,随后对一些全局属性进行初始化。

image-1655260662852

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler);

参数说明:ThreadPoolExecutor一共有七个参数,这七个参数配合起来,构成了线程池强大的功能。

corePoolSize:核心线程数量(线程池中保存的核心线程数,包括空闲线程)。

  • 当提交一个任务到线程池时,如果此时线程池的线程数量小于核心线程数,那么线程池会新创建一个线程来执行任务,即使此时存在空闲线程也不例外。默认情况创建0个核心线程,如果调用了线程池的prestartAllCoreThreads()方法,线程池会立即创建并启动所有核心线程。

maximumPoolSize:线程最大线程数(线程池中允许的最大线程数)。不能小于corePoolSize,不能小于等于0。

  • 当workQueue(任务队列)放不下线程任务,并且已创建的线程数小于最大线程数,则线程池会再次创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列(任务队列没有上限大小)这个参数就没什么效果。

keepAliveTime: 线程池中的空闲线程持续的最长时间。

  • 当线程中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是等待到超过keepAliveTime后将该线程回收。直到数量减少到corePoolSize为止。
  • 如果允许CoreThreadTimeOut(前提是keepAliveTime大于0),那核心线程也会使用此超时时间,超过该时间没有任务则关闭线程;否则,核心线程将永远等待新的任务。

unit: 线程池中的空闲线程持续时间的单位(keepAliveTime的时间单位)。

workQueue: 阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响。只保存由execute() 方法提交的Runnable任务。

  • 当线程任务添加的速度超过所有核心线程执行速度时,新来的来不及执行的线程任务将被存放到workQueue阻塞任务队列中。
  • 任务队列一定是阻塞队列,常见的有以下四种,实际上有很多种:
    • ArrayBlockingQueue:有界阻塞任务队列,构造函数一定要传入具体队列容量。
    • LinkedBlockingQueu:通常作为无界阻塞任务队列(构造函数不传大小会默认为Integer.MAX_VALUE ),当有大量任务提交时,容易造成内存耗尽。
    • SynchronousQueue:一个没有容量的阻塞队列,会将任务同步交付给工作线程。
    • PriorityBlockingQueue : 具有优先级的无界阻塞任务队列。

threadFactory: 线程工厂,用来创建线程,使用Executor创建新线程时用到的工厂方法。这样新创建出来的线程有相同的优先级,是非守护线程、设置好了名称。

  • 线程工厂用于创建工作线程,默认线程工厂:Executors.defaultThreadFactory。

handler: 拒绝策略,是线程执行因为线程数和队列满时发生阻塞的处理方法。

  • 对于正在执行的线程数等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
  • ThreadPoolExecutor中提供了4种拒绝策略(阻塞队列满时拒绝处理任务时的策略)的实现:
    • AbortPolicy:调用者的线程直接抛出异常,作为默认拒绝策略;
    • CallerRunsPolicy:用调用者所在的线程执行任务;
    • DiscardOldestPolicy:丢弃队列中最靠前的任务并执行当前任务;
    • DiscardPolicy:直接丢弃当前任务;

1.2.1 构造方法一

使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。实际处理是调用参数最多的构造函数。

/**
 * 使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    //内部调用最多参数的构造器
    //线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
    //拒绝策略传递的默认实现:defaultHandler
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

1.2.2 构造方法二

使用给定的初始参数和默认的拒绝策略创建新的 ThreadPoolExecutor。实际处理是调用参数最多的构造函数。

/**
 * 使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    //内部调用最多参数的构造器
    //线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

1.2.3 构造方法三

使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。实际处理是调用参数最多的构造函数。

/**
 * 使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    //内部调用最多参数的构造器
    //线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

1.2.4 构造方法四

使用给定的初始参数创建新的 ThreadPoolExecutor,一共有7个参数。

/**
 1. 使用给定的初始参数创建新的 ThreadPoolExecutor。
 2.  3. @param corePoolSize    核心线程数
 3. @param maximumPoolSize 最大线程数
 4. @param keepAliveTime   空闲线程等待超时时间
 5. @param unit            超时时间单位
 6. @param workQueue       阻塞任务队列
 7. @param threadFactory   线程工厂
 8. @param handler         拒绝策略
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //一系列参数校验
    /*
     * 如果核心线程数小于0
     * 或者 如果最大线程数小于等于0
     * 或者 如果最大线程数小于核心线程数
     * 或者 如果空闲线程等待超时时间小于0
     *
     * 满足上面一项,都将抛出IllegalArgumentException异常
     */
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    /*
     * 如果阻塞任务队列为null
     * 或者 如果线程工厂为null
     * 或者 如果拒绝策略为null
     *
     * 满足上面一项,都将抛出NullPointerException异常
     */
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    //初始化用于安全管理器的上下文参数
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    //初始化核心线程数
    this.corePoolSize = corePoolSize;
    //初始化最大线程数
    this.maximumPoolSize = maximumPoolSize;
    //初始化阻塞任务队列
    this.workQueue = workQueue;
    //初始化空闲线程等待超时时间
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    //初始化线程工厂
    this.threadFactory = threadFactory;
    //初始化拒绝策略
    this.handler = handler;
}

1.3 ThreadPoolExecutor 常用方法

execute();	  //提交任务,交给线程池执行	
submit();         //提交任务,能够返回执行结果 execute+Future
shutdown();	  //关闭线程池,等待任务都执行完
shutdownNow();    //立刻关闭线程池,不等待任务执行完
getTaskCount();   //线程池已执行和未执行的任务总数
getCompleteTaskCount();  //已完成的任务数量
getPoolSize();    //线程池当前的线程数量
getActiveCount();     //当前线程池中正在执行任务的线程数量

ExecutorService、ScheduledExecutorService接口的submit()和execute()方法都是把任务提交到线程池中,但二者的区别是

  • 接收的参数不一样,execute只能接收Runnable类型、submit可以接收Runnable和Callable两种类型;
  • submit有返回值,而execute没有返回值;submit方便Exception处理;

2、corePoolSize、maximumPoolSize、workQueue 三者关系

  • 默认情况下,新的线程池不会启动任何线程。如果运行的线程数小于corePoolSize的时候,新提交任务都将通过线程工厂直接创建新线程来处理任务。即使线程池中的其他线程是空闲的。

  • 当创建的线程数量达到corePoolSize时,新提交的任务会判断是否有空闲的线程,如果有就让空闲线程去执行,没有空闲线程时新提交的任务将被放入workQueue中,当有线程执行完当前任务,就会从任务队列中拉取任务继续执行。

  • 如果运行中的线程数大于corePoolSize且小于maximumPoolSize时,那么只有当workQueue满的时候才创建新的线程去处理任务。

  • 如果corePoolSize与maximumPoolSize是相同的,那么创建的线程池大小是固定的。这时有新任务提交,当workQueue未满时,就把请求放入workQueue中。等待空线程从workQueue取出任务。如果workQueue此时也满了,那么就使用另外的拒绝策略参数去执行拒绝策略。

  • 对于线程数大于等于maxmumPoolSize以及workQueue容量已满时新提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。

  • 如果线程池中存在超过corePoolSize的线程数并且存在空闲线程。如果空闲线程在keepAliveTime(unit表示时间单位)时间范围内都没有工作时,将清理空闲线程,减少资源浪费,直到线程数量被清理减少至核心线程数为止,预留一定数量的核心资源。而通过调用allowCoreThreadTimeOut(true)方法(设置成功的要求是超时时间大于0),核心线程也将应用超时时间规则,即此时如果没有新任务,那么所有的线程都将可能被清理。

corePoolSize、maximumPoolSize、keepAliveTime+unit、ThreadFactory、RejectedExecutionHandler都可以在线程池启动(创建)之后动态调整!通过这些参数,线程池可以动态的调整线程数量,我们也可以创建属于自己的特殊的线程池。

unit表示keepAliveTime的时间单位,在TimeUnit类中有7种静态属性:

  • TimeUnit.DAYS; //天
  • TimeUnit.HOURS; //小时
  • TimeUnit.MINUTES; //分钟
  • TimeUnit.SECONDS; //秒
  • TimeUnit.MILLISECONDS; //毫秒
  • TimeUnit.MICROSECONDS; //微妙
  • TimeUnit.NANOSECONDS; //纳秒

3、缓冲队列的排队策略

任务队列用于存放提交的来不及执行的任务,一定是一个阻塞队列BlockingQueue,JDK自带的阻塞队列如下:

队列 描述
ArrayBlockingQueue 有界阻塞队列
LinkedBlockingQueue 无界阻塞队列
LinkedBlockingDeque 无界阻塞双端队列
SynchronousQueue 没有容量的阻塞队列
DelayQueue 支持延时操作的无界阻塞队列
PriorityBlockingQueue 任务具有优先级的无界阻塞队列
LinkedTransferQueue JDK1.7的新无界阻塞队列

当我们提交一个新的任务到线程池,线程池会根据当前池中正在运行的线程数量来决定该任务的处理方式。处理方式有三种:

3.1 直接提交

直接提交也可以称为直接切换,采用SynchronousQueue, 该队列没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者(将任务直接提交给线程处理),必须等队列中的添加元素被消费后才能继续添加新的元素。

  • 使用synchronousQueue阻塞队列一般要求maximumRoolsizes为无界,避免线程拒绝执行操作。

  • 如果线程池中的线程都在工作,不存在用于立即执行任务的线程,则试图将任务加入到缓冲队列就会失败,此时会构造一个新的线程来处理添加的任务,并将线程加入到线程池中

  • 直接提交要求是无界的,即最大线程数量maximumPoolSize的值为Integer.MAX_VALUE. 这样才能避免拒绝新提交的任务,newCachedThreadPool采用的就是直接提交策略。

  • 当队列中没有任务时,获取任务的动作会被阻塞;

  • 当队列中有任务时,存入任务的动作会被阻塞

采用SynchronousQueue作为任务队列的线程池不能缓存任务,一个任务要么被执行要么被拒绝策略处理,这就是“直接提交”的由来。

public class SynchronousQueueThreadPool {

    public static void main(String[] args) {
        //maximumPoolSize设置为2,拒绝策略为AbortPolicy策略,直接抛出异常
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
                //采用SynchronousQueue作为任务队列,不能缓存任务
                new SynchronousQueue<>(),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        //核心线程应用超时等待机制
        pool.allowCoreThreadTimeOut(true);
        //提交三个任务,明显会超出最大线程数量
        for (int i = 0; i < 3; i++) {
            pool.execute(new SynchronousQueueThreadTask());
        }
    }
}

class SynchronousQueueThreadTask implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

运行结果:

pool-1-thread-2
pool-1-thread-1
Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task com.thread.test.threadpool.ThreadTaskSyn@2f0e140b rejected from java.util.concurrent.ThreadPoolExecutor@7440e464[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.thread.test.threadpool.SynchronousQueueThreadPool.main(SynchronousQueueThreadPool.java:17)

3.2 无界队列

LinkedBlockingQueue是个无界缓存等待队列, 理论上可以对无限多的任务进行排序。

LinkedBlockingQueue是一个特殊的BlockingQueue,它虽然可以设置容量,但是不设置容量就作为无界任务队列。
由于LinkedBlockingQueue也可以设置容量,因此也可以作为有界任务队列,并且由于它采用了两把锁,性能好于采用一把锁的ArrayBlockingQueue。

  • 使用无界队列可以在线程池中所有corePoolSize个线程都工作的情况下将任务添加到缓冲队列中。

  • 当前执行的线程数量达到corePoolsize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时 maximumPoolSize 就相当于无效了),每个线程完全独立于其他线程。这时不用创建新的线程来处理添加的任务,线程池中线程的数量不会超过corePoolSize。

  • 当每个任务完全独立于其余的任务,各个任务之间的执行互不影响时,适合使用无界队列。newFixedThreadPool采用的就是无界队列策略。

  • 生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

3.3 有界队列

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小, 队列的最大长度为maximumPoolSize。

采用ArrayBlockingQueue作为任务队列的线程池可以缓存任务,是较为常见的任务队列。

  • 有界队列使用有限的maxmumPoolSize线程数, 有助于防止资源耗尽。

  • 当正在执行的线程数等于corePoolsize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行

  • 当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败, 会开启新的线程去执行

  • 当线程数已经达到最大的maximumPoolsizes时, 再有新的元素尝试加入ArrayBlocki ngQueue时会报错。

  • 有界队列较难调整和控制,需要设定合理的参数,对队列的大小和线程池最大数量的大小相互进行折衷。因为线程池与队列容量都是有限的。所以想让线程池的吞吐率和处理任务达到一个合理的范围,又想使我们的线程调度相对简单,并且还尽可能降低资源的消耗,我们就需要合理的限制这两个数量。

    • 分配技巧:如果想降低资源的消耗包括降低cpu使用率、操作系统资源的消耗、上下文切换的开销等等,可以设置一个较大的队列容量和较小的线程池容量,这样会降低线程池的吞吐量。
    • 如果我们提交的任务经常发生阻塞,我们可以调整maximumPoolSize。如果我们的队列容量较小,我们需要把线程池大小设置的大一些,这样cpu的使用率相对来说会高一些。
    • 如果线程池的容量设置的过大,提高任务的数量过多的时候,并发量会增加,那么线程之间的调度就是一个需要考虑的问题。这样反而可能会降低处理任务的吞吐量。

3.4 优先任务队列

阻塞队列设置为PriorityBlockingQueue。PriorityBlockingQueue是一个特殊的BlockingQueue,它可以为任务设置优先级,优先最高的任务将会先出队列被执行,它要求任务可比较大小。

  • 使用PriorityBlockingQueue队列,通常设置corePoolSize为0,这样新来的任务将会直接进入PriorityBlockingQueue,如果没有指定容量,那么可以无限制的保存添加的新任务(由于底层是数组,实际上最大容量为 Integer.MAX_VALUE,但是基本上达不到就会抛出内存溢出异常)。

  • 采用PriorityBlockingQueue作为任务队列的线程池可以无限缓存任务,如果没有设置容量,maximumPoolSize参数是无效的,当线程池的线程数达到corePoolSize后就不会再增加了,此时需要注意内存资源耗尽的问题。

public class PriorityBlockingQueueThreadPool {

    public static void main(String[] args) {
        //一个线程池,使用优先级的PriorityBlockingQueue阻塞队列作为任务队列
        ExecutorService pool = new ThreadPoolExecutor(0, 1, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        /*
         * 循环从 priority=0开始添加任务,即最先添加的任务优先级最低
         * 查看输出执行情况,可以发现最后添加的任务(优先级最高)最先被执行
         */
        for (int i = 0; i < 20; i++) {
            pool.execute(new ThreadTask(i));
        }
        pool.shutdown();
    }

    /**
     * 任务实现,同时作为Comparable的实现类
     */
    static class ThreadTask implements Runnable, Comparable<ThreadTask> {
        private int priority;

        ThreadTask(int priority) {
            this.priority = priority;
        }

        /**
         * 当前对象和其他对象做比较,当前priority大就返回-1,当前priority小就返回1,
         *
         * @param o 被比较的线程任务
         * @return 返回-1就表示当前任务出队列优先级更高;返回1就表示当前任务出队列优先级更低;即priority值越大,出队列的优先级越高;
         */
        @Override
        public int compareTo(ThreadTask o) {
            return this.priority > o.priority ? -1 : 1;
        }

        @Override
        public void run() {
            System.out.println("priority:" + this.priority + ",ThreadName:" + Thread.currentThread().getName());
        }
    }
}
0

评论区