侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 377 篇文章
  • 累计创建 136 个标签
  • 累计收到 12 条评论

目 录CONTENT

文章目录

Java多线程编程之CountDownLatch的使用方法

孔子说JAVA
2022-12-16 / 0 评论 / 1 点赞 / 51 阅读 / 11,671 字 / 正在检测是否收录...

CountDownLatch 是一个同步工具类,常常被称作"计数器",它允许一些线程(1-N个)一直阻塞,直到其他线程执行完后才继续往下执行,主要用来协调多个线程之间的同步。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。count down是倒数的意思,latch则是门闩的含义,整体含义可以理解为倒数的门栓,该计数器的初始值是线程的数量。

image-1666576584141

1、概述

JUC CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。

  • CountDownLatch在初始化时,需要指定一个正整数作为计数器起始值,每当一个线程执行完毕后,计数器就被countDown方法减1,当计数器为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就被唤醒,恢复工作。从线程执行状态的角度分析,就是允许一个或多个线程等待,直到其它线程执行完成才继续往下走。

  • CountDownLatch的计数器无法被重置,当计数器被减到0时,调用await方法会直接返回。这一点有别于Semaphore,Semaphore是可以通过release操作恢复信号量的。

  • CountDownLatch是在java 1.5 被引入的,与它一起被引入的工具类还包括CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue等,它们都位于java.util.cucurrent包下。

  • CountDownLatch最重要的方法是countDown()和await(),前者主要是计数减一,后者是等待计数到0,如果没有到达0,就继续阻塞等待。

2、原理

CountDownLatch内部使用了AQS锁,其内部有一个state字段,通过该字段来控制锁的操作,CountDownLatch内部是将state作为计数器使用的。当初始化CountDownLatch时,会将state值进行初始化,调用CountDownLatch的awit时,会判断state计数器是否已经变为0,如果没有变为0则挂起当前线程,并加入到AQS的阻塞队列中,如果有线程调用了CountDownLatch的countDown时,这时的操作是将state计数器减1,每当减少操作时都会唤醒阻塞队列中的线程,线程会判断此时state计数器是否已经都执行完了,如果还没有执行完则继续挂起当前线程,直到state计数器清零或线程被中断为止。

  • 比如我们初始化时,state计数器为3,同时开启三个线程,每当有一个线程执行完成后就将state值减少1,直到减少到0时,说明所有线程已经执行完毕。

image-1666576827057

CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就减一。当计数器递减至零时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就被唤醒,恢复执行任务。

2.1 源码解析

public class CountDownLatch {
    /**
     * 同步控制,
     * 使用 AQS的state来表示计数。
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // 初始化state值(也就是需要等待几个线程完成任务)
        Sync(int count) {
            setState(count);
        }
        // 获取state值。
        int getCount() {
            return getState();
        }
        // 获得锁。
        protected int tryAcquireShared(int acquires) {
            // 这里判断如果state=0的时候才能获得锁,反之获取不到将当前线程放入到队列中阻塞。
            // 这里是关键点。
            return (getState() == 0) ? 1 : -1;
        }
 
        protected boolean tryReleaseShared(int releases) {
            // state进行减少,当state减少为0时,阻塞线程才能进行处理。
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    // 锁对象。
    private final Sync sync;
 
    /**
     * 初始化同步锁对象。
     */
    public CountDownLatch(int count) { 
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
 
    /**
     * 导致当前线程等待直到闩锁倒计时到零,除非线程是被中断。如果当前计数为零,则此方法立即返回。如果当前计数大于零,
     * 则当前线程将被禁用以进行线程调度并处于休眠状态,直到发生以下两种情况:
     * 1.计数达到零。
     * 2.如果当前线程被中断。
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 
    /**
     * 等待计数器清零或被中断,等待一段时间后如果还是没有
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
 
    /**
     * 使当前线程等待直到闩锁倒计时到零,除非线程被中断或指定的等待时间已过。
     */
    public void countDown() {
        sync.releaseShared(1);
    }
 
    /**
     * 返回state值。
     */
    public long getCount() {
        return sync.getCount();
    }
}

2.2 方法解析

CountDownLatch主要有两个方法:countDown()和await()。countDown()方法用于使计数器减一,其一般是执行任务的线程调用,await()方法则使调用该方法的线程处于等待状态,其一般是主线程调用。

  • 注意,countDown()方法并没有规定一个线程只能调用一次,当同一个线程调用多次countDown()方法时,每次都会使计数器减一;另外,await()方法也并没有规定只能有一个线程执行该方法,如果多个线程同时执行await()方法,那么这几个线程都将处于等待状态,并且以共享模式享有同一个锁。

  • await() 阻塞当前线程,直到计数器为零为止;

  • await(long timeout, TimeUnit unit) await()的重载方法,可以指定阻塞时长;

  • countDown() 计数器减1,如果计数达到零,释放所有等待的线程。

  • getCount() 返回当前计数

awit方法

当调用awit方法时,其实内部调用的AQS的acquireSharedInterruptibly方法,这个方法会调用Sync中tryAcquireShared的方法,如我们初始化时将state值初始化2,但是Sync中判断(getState() == 0) ? 1 : -1;此时state值为2,判定为false,则返回-1,当返回负数时,内部会将当前线程挂起,并且放入AQS的队列中,直到AQS的state值减少到0会唤醒当前线程,或者是当前线程被中断,线程会抛出InterruptedException异常,然后返回。

/**
 * 导致当前线程等待直到闩锁倒计时到零,除非线程是被中断。如果当前计数为零,则此方法立即返回。如果当前计数大于零,
 * 则当前线程将被禁用以进行线程调度并处于休眠状态,直到发生以下两种情况:
 * 1.计数达到零。
 * 2.如果当前线程被中断。
 */
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly内部调用的是CountDownLatch内部类Sync实现的tryAcquireShared方法,tryAcquireShared判断state是否已经清空,也就是计数器是否已经清零了,清零时才能进行执行,此时并没有进行清空,则会将当前线程挂起,并且将挂起的线程放入到AQS的阻塞队列,等待其他线程唤醒动作。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 调用tryAcquireShared 方法。
        if (tryAcquireShared(arg) < 0)
            // 阻塞线程,将线程加入到阻塞队列等到其他线程恢复线程。
            doAcquireSharedInterruptibly(arg);
    }
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

coutDown方法

当线程执行完成后,会调用CountDownLatch的countDown,countDown方法内部调用的AQS的releaseShared,releaseShared方法实现在Sync类中,该方法主要作用是将state计数器中的值,进行减1操作,先进行判断是否已经将state值修改为0,如果修改为则不进行下面的操作,防止状态已经修改为0时,其他线程还调用了countDown操作导致state值变为负数,当state值减少1时,会通知阻塞队列中的等待线程,假设上面的例子其中一个线程先执行了countDown方法,则此时state=1,并且唤醒阻塞队列中的线程,线程还是会去调用tryAcquireShared方法,发现还是返回-1,则还会将当前线程进行挂起阻塞并且加入到阻塞队列中。

当另外一个线程也执行完成,调用countDown时,state减少1则变为state=0,当这时候唤醒等待的线程时,tryAcquireShared返回的结果是1,则会直接返回成功。

3、使用场景

CountDownLatch非常适合于对任务进行拆分,使其并行执行,实现最大的并行性。如某一线程A在开始运行前一直被阻塞,需等待n个其它线程执行完毕,则需将CountDownLatch的计数器初始化为 new CountDownLatch(n),每当一个任务线程执行完毕,则调用countDown()函数使得计数器减一;当计数器变为0时,在CountDownLatch上await()的线程A就会被唤醒。

  • 场景1:多个线程等待某一个线程的信号,同时开始执行:模拟并发,让并发线程一起执行

  • 场景2:让单个线程等待:多个线程(任务)完成后,进行汇总合并

    • 如:会计汇总一年的财务报表时,可以把任务按照季度拆分为四份,那么就可以将这个大任务拆分为4个子任务,分别交由4个财务(线程)执行,执行完成之后再由主财务(主线程)进行财务汇总。此时,总的执行时间将决定于执行最慢的任务,通常来看,还是可以大大减少总的执行时间。

4、代码示例

4.1 主线程等待其他线程

实现主线程等待其他线程的任务完成之后,才继续执行的代码。

public class UseCountDownLatch {

    static class TaskThread extends Thread {
        CountDownLatch latch;

        public TaskThread(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            super.run();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(getName() + " Task is Done");
                latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int threadNum = 10;
        CountDownLatch latch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            TaskThread taskThread = new TaskThread(latch);
            taskThread.start();
        }
        System.out.println("Task Start!");

        latch.await();

        System.out.println("All Task is Done!");
    }
}

代码解释:

  • 设置 CountDownLatch 的等待线程数为 10
  • 开启 10 个线程,每个线程都会睡眠 1 秒,睡眠结束后就会调用 CountDownLatch 的 countDown() 方法
  • 主线程调用 CountDownLatch 的 await() 方法,所以会开始阻塞,直到 CountDownLatch 的 count 为 0 才继续执行

打印结果:

Task Start!
Thread-5 Task is Done
Thread-8 Task is Done
Thread-4 Task is Done
Thread-3 Task is Done
Thread-9 Task is Done
Thread-6 Task is Done
Thread-0 Task is Done
Thread-7 Task is Done
Thread-1 Task is Done
Thread-2 Task is Done
All Task is Done!

4.2 模拟员工下班关门案例

1)模拟公司所有员工都下班后,值班人员锁门离开公司。

import java.util.concurrent.CountDownLatch;

/**
 * 模拟员工下班关门案例
 */
public class CountDownLatchDemo1 {
    public static void main(String[] args) throws InterruptedException {
        closeDoor();
    }

    private static void closeDoor() throws InterruptedException {
        //创建CountDownLatch,初始容量为10,模拟10名员工
        CountDownLatch countDownLatch=new CountDownLatch(10);
        for (int i=1;i<=10;i++){
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t" + "忙完工作,下班。");
                //计数器减1
                countDownLatch.countDown();
            },"员工编号:"+String.valueOf(i)).start();
        }
        //当计数器的值变为0,因调用await方法被阻塞的线程会被唤醒,继续执行
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t值班人员锁门离开公司");
    }
}

image-1666575184500

2)使用枚举方式模拟员工下班关门案例。

import lombok.Getter;
/**
 * 枚举类,定义10名员工
 * */
public enum EmpEnum {
    ONE(1, "刘一"),
    TWO(2, "陈二"),
    THREE(3, "张三"),
    FOUR(4, "李四"),
    FIVE(5, "王五"),
    SIX(6, "赵六"),
    SEVEN(7, "孙七"),
    EIGHT(8, "周八"),
    NINE(9, "吴九"),
    TEN(10, "郑十");

    @Getter
    private int code;//员工编号
    @Getter
    private String name;//员工名称

    EmpEnum(Integer code, String name) {
        this.code = code;
        this.name = name;
    }

    public static EmpEnum forEach(int index) {
        EmpEnum[] EmpEnums = EmpEnum.values();
        for (EmpEnum epmEnum : EmpEnums) {
            if (index == epmEnum.getCode()) {
                return epmEnum;
            }
        }
        return null;
    }

}
public class CountDownLatchDemo2 {
    public static void main(String[] args) throws InterruptedException {
        closeDoor();
    }

    private static void closeDoor() throws InterruptedException {
        //创建CountDownLatch,初始容量为10,模拟10名员工
        CountDownLatch countDownLatch=new CountDownLatch(10);
        for (int i=1;i<=10;i++){
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t" + "忙完工作,下班。");
                //计数器减1
                countDownLatch.countDown();
            },EmpEnum.forEach(i).getName()).start();
        }
        //当计数器的值变为0,因调用await方法被阻塞的线程会被唤醒,继续执行
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t值班人员锁门离开公司");
    }
}

4.3 模拟高并发

为了模拟高并发,让一组线程在指定时刻(秒杀时间)执行抢购,这些线程在准备就绪后,进行等待(CountDownLatch.await()),直到秒杀时刻的到来,然后一拥而上;这也是本地测试接口并发的一个简易实现。在这个场景中,CountDownLatch充当的是一个发令枪的角色;就像田径赛跑时,运动员会在起跑线做准备动作,等到发令枪一声响,运动员就会奋力奔跑。

CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        try {
            //准备完毕……运动员都阻塞在这,等待号令
            countDownLatch.await();
            String parter = "【" + Thread.currentThread().getName() + "】";
            System.out.println(parter + "开始执行……");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
Thread.sleep(2000);// 裁判准备发令
countDownLatch.countDown();// 发令枪:执行发令

运行结果:

【Thread-0】开始执行……
【Thread-1】开始执行……
【Thread-4】开始执行……
【Thread-3】开始执行……
【Thread-2】开始执行……

通过 CountDownLatch.await(),让多个参与者线程启动后阻塞等待,然后在主线程调用CountDownLatch.countdown() 将计数减为 0,让所有线程一起往下执行;以此实现了多个线程在同一时刻并发执行,来模拟并发请求的目的。

4.4 运动员开始跑步比赛

在运动会上开始比赛前,在运动员起跑之前都会等待裁判员发号施令,一声令下运动员统一起跑。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RunDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("运动员有5秒的准备时间");
        CountDownLatch latch=new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no=i+1;
            Runnable runnable=new Runnable(){

                @Override
                public void run() {
                    try {

                        System.out.println(no+"号运动员准备完毕,等待裁判员发令。");
                        latch.await();
                        System.out.println(no+"号运动员开始跑步了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }

       Thread.sleep(5000);
        System.out.println("5秒准备时间已过,发令枪响,比赛开始!");
        latch.countDown();

    }
}

在这段代码中,首先打印出了运动员有 5 秒的准备时间,然后新建了一个 CountDownLatch,其倒数值只有 1;接着,同样是一个 5 线程的线程池,并且用 for 循环的方式往里提交 5 个任务,而这 5 个任务在一开始时就让它调用 await() 方法开始等待。

接下来我们再回到主线程。主线程会首先等待 5 秒钟,这意味着裁判员正在做准备工作,比如他会喊“各就各位,预备”这样的话语;然后 5 秒之后,主线程会打印出“5 秒钟准备时间已过,发令枪响,比赛开始”的信号,紧接着会调用 countDown 方法,一旦主线程调用了该方法,那么之前那 5 个已经调用了 await() 方法的线程都会被唤醒,所以这段程序的运行结果如下:

运动员有5秒的准备时间
2号运动员准备完毕,等待裁判员发令。
1号运动员准备完毕,等待裁判员发令。
3号运动员准备完毕,等待裁判员发令。
4号运动员准备完毕,等待裁判员发令。
5号运动员准备完毕,等待裁判员发令。
5秒准备时间已过,发令枪响,比赛开始!
2号运动员开始跑步了
3号运动员开始跑步了
1号运动员开始跑步了
4号运动员开始跑步了
5号运动员开始跑步了

可以看到,运动员首先会有 5 秒钟的准备时间,然后 5 个运动员分别都准备完毕了,等待发令枪响,紧接着 5 秒之后,发令枪响,比赛开始,于是 5 个子线程几乎同时开始跑步了。

4.5 裁判员结束跑步比赛

在比赛跑步时有 5 个运动员参赛,终点有一个裁判员,什么时候比赛结束呢?那就是当所有人都跑到终点之后,这相当于裁判员等待 5 个运动员都跑到终点,宣布比赛结束。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class RunDemo1 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch=new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no=i+1;
            Runnable runnable=new Runnable(){

                @Override
                public void run() {
                    try {
                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println(no+"号运动员完成了比赛。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }

        System.out.println("等待5个运动员都跑完.....");
        latch.await();
        System.out.println("所有人都跑完了,比赛结束");
    }
}

在这段代码中,我们新建了一个初始值为 5 的 CountDownLatch,然后建立了一个固定 5 线程的线程池,用一个 for 循环往这个线程池中提交 5 个任务,每个任务代表一个运动员,这个运动员会首先随机等待一段时间,代表他在跑步,然后打印出他完成了比赛,在跑完了之后,同样会调用 countDown 方法来把计数减 1。

之后我们再回到主线程,主线程打印完“等待 5 个运动员都跑完”这句话后,会调用 await() 方法,代表让主线程开始等待,在等待之前的那几个子线程都执行完毕后,它才会认为所有人都跑完了比赛。这段程序的运行结果如下所示:

等待5个运动员都跑完.....
5号运动员完成了比赛。
2号运动员完成了比赛。
3号运动员完成了比赛。
4号运动员完成了比赛。
1号运动员完成了比赛。
所有人都跑完了,比赛结束

可以看出,直到 5 个运动员都完成了比赛之后,主线程才会继续,而且由于子线程等待的时间是随机的,所以各个运动员完成比赛的次序也是随机的。

5、总结

  1. CountDownLatch的初始值不能重置,只能减少不能增加,最多减少到0;
  2. 当CountDownLatch计数值没减少到0之前,调用await方法可能会让调用线程进入一个阻塞队列,直到计数值减小到0;
  3. 调用countDown方法会让计数值每次都减小1,但是最多减少到0。当CountDownLatch的计数值减少到0的时候,会唤醒所有在阻塞队列中的线程。
  4. await()方法也并没有规定只能有一个线程执行该方法,如果多个线程同时执行await()方法,那么这几个线程都将处于等待状态,并且以共享模式享有同一个锁。
1

评论区