侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 377 篇文章
  • 累计创建 136 个标签
  • 累计收到 12 条评论

目 录CONTENT

文章目录

Java多线程编程之CyclicBarrier的使用方法

孔子说JAVA
2022-12-17 / 0 评论 / 0 点赞 / 58 阅读 / 34,669 字 / 正在检测是否收录...

CyclicBarrier是一个同步工具类,拆开来翻译就是循环(Cycle)和屏障(Barrier)。它的主要作用是用来进行线程协作,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会被打开,所有被屏障阻塞的线程才会继续执行,不过它是可以循环执行的,这是它与CountDownLanch最大的不同。CountDownLanch是只有当最后一个线程把计数器置为0的时候,其他阻塞的线程才会继续执行。

v2-3c3e8a7232b791846fb23b0c74d1338c_b

1、概述

并发编程的三大核心是分工,同步和互斥。在日常开发中,经常会碰到需要在主线程中开启多个子线程去并行的执行任务,并且主线程需要等待所有子线程执行完毕再进行汇总的场景,这就涉及到分工与同步的内容了。

JUC CyclicBarrier是一个同步工具类,用来协调多个线程之间的同步。能够允许一组线程去互相等待直到都到达了屏障,CyclicBarrier对于涉及到固定大小的线程是非常有用的,线程们必须相互等待。该屏障称之所以被称为循环屏障,是因为当等待屏障的线程被释放之后,该屏障能循环使用。

  • 官网解释:允许一组线程全部等待彼此达到共同屏障点的同步辅助。循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。

  • 意思就是每个线程都得执行到等待点进行等待,直到所有线程都执行到等待点,才会继续往下执行。相当于日常开会,只有等每个参会的人都到之后才会开始会议。

fa4d24955103ee1c8c0564ab45eebe26

看上图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

2、源码解析

20201229140729713

  • 借助ReentrantLock和Condition作为线程间通信机制。

  • 等到所有parties参与线程都到达阻塞屏障,会唤醒所有parties参与线程(会优先执行barrierAction线程),到达数不足parties则所有线程需要阻塞等待。

源码结构如下:

image-1670289535025

  • parties 变量,表示拦截线程的总数量。
  • count 变量,表示拦截线程的剩余需要数量。
  • barrierAction 变量,为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行
  • barrierAction ,用于处理更加复杂的业务场景
  • generation 变量,表示 CyclicBarrier 的更新换代

CyclicBarrier 的源码实现和 CountDownLatch 大同小异,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier是基于ReentrantLock重入锁和Condition实现的,当然ReentrantLock也是基于AQS实现的。因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。

2.1 成员变量

在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,看下CyclicBarrier有哪些成员变量。

//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
 
//静态内部类Generation
private static class Generation {
  boolean broken = false;
}

可以看到CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务。我用一图来描绘下 CyclicBarrier 里面的一些概念:

image-1670290056277

2.2 构造器

//构造器1
public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}
 
//构造器2
public CyclicBarrier(int parties) {
  this(parties, null);
}

CyclicBarrier有两个构造器,其中构造器1是它的核心构造器,在这里你可以指定本局游戏的参与者数量(要拦截的线程数)以及本局结束时要执行的任务,还可以看到计数器count的初始值被设置为parties。CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。

2.3 方法解析

CyclicBarrier不管是定时等待还是非定时等待,它们都调用了dowait方法,只不过是传入的参数不同而已。

//非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}
 
//定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

最终调用的是dowait方法

//核心等待方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    final Generation g = generation;
    //检查当前栅栏是否被打翻
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    //检查当前线程是否被中断
    if (Thread.interrupted()) {
      //如果当前线程被中断会做以下三件事
      //1.打翻当前栅栏
      //2.唤醒拦截的所有线程
      //3.抛出中断异常
      breakBarrier();
      throw new InterruptedException();
    }
    //每次都将计数器的值减1
    int index = --count;
    //计数器的值减为0则需唤醒所有线程并转换到下一代
    if (index == 0) {
      boolean ranAction = false;
      try {
        //唤醒所有线程前先执行指定的任务
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        //唤醒所有线程并转到下一代
        nextGeneration();
        return 0;
      } finally {
        //确保在任务未成功执行时能将所有线程唤醒
        if (!ranAction) {
          breakBarrier();
        }
      }
    }
 
    //如果计数器不为0则执行此循环
    for (;;) {
      try {
        //根据传入的参数来决定是定时等待还是非定时等待
        if (!timed) {
          trip.await();
        }else if (nanos > 0L) {
          nanos = trip.awaitNanos(nanos);
        }
      } catch (InterruptedException ie) {
        //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
        if (g == generation && ! g.broken) {
          breakBarrier();
          throw ie;
        } else {
          //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
          Thread.currentThread().interrupt();
        }
      }
      //如果线程因为打翻栅栏操作而被唤醒则抛出异常
      if (g.broken) {
        throw new BrokenBarrierException();
      }
      //如果线程因为换代操作而被唤醒则返回计数器的值
      if (g != generation) {
        return index;
      }
      //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
      if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
      }
    }
  } finally {
    lock.unlock();
  }
}

可以看到,是通过index字段控制线程等待的,当index不为0的时候,线程统一会进行阻塞,直到index为0的时候,才会唤醒所有线程,这时候所有线程才会继续往下执行。

在dowait方法中每次都将count减1,减完后立马进行判断看看是否等于0,如果等于0的话就会先去执行之前指定好的任务,执行完之后再调用nextGeneration方法将栅栏转到下一代,在该方法中会将所有线程唤醒,将计数器的值重新设为parties,最后会重新设置栅栏代次,在执行完nextGeneration方法之后就意味着游戏进入下一局。如果计数器此时还不等于0的话就进入for循环,根据参数来决定是调用trip.awaitNanos(nanos)还是trip.await()方法,这两方法对应着定时和非定时等待。如果在等待过程中当前线程被中断就会执行breakBarrier方法,该方法叫做打破栅栏,意味着游戏在中途被掐断,设置generation的broken状态为true并唤醒所有线程。同时这也说明在等待过程中有一个线程被中断整盘游戏就结束,所有之前被阻塞的线程都会被唤醒。线程醒来后会执行下面三个判断,看看是否因为调用breakBarrier方法而被唤醒,如果是则抛出异常;看看是否是正常的换代操作而被唤醒,如果是则返回计数器的值;看看是否因为超时而被唤醒,如果是的话就调用breakBarrier打破栅栏并抛出异常。这里还需要注意的是,如果其中有一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒。下面贴出nextGeneration方法和breakBarrier方法的具体代码。

//切换栅栏到下一代
private void nextGeneration() {
  //唤醒条件队列所有线程
  trip.signalAll();
  //设置计数器的值为需要拦截的线程数
  count = parties;
  //重新设置栅栏代次
  generation = new Generation();
}
 
//打翻当前栅栏
private void breakBarrier() {
  //将当前栅栏状态设置为打翻
  generation.broken = true;
  //设置计数器的值为需要拦截的线程数
  count = parties;
  //唤醒所有线程
  trip.signalAll();
}

重复使用,即重置一个栅栏

和CountdownLatch不一样的是,CountdownLatch是一次性的,而CycliBarrier是可以重复使用的,只需调用一下reset方法。

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        1、破坏当前的屏障点并唤醒所有线程 
        breakBarrier();   
        2、生成下一代
        nextGeneration(); 
    } finally {
        lock.unlock();
    }
}


private void breakBarrier() {
    generation.broken = true;
    将等待线程数量重置
    count = parties;
    唤醒所有线程 
    trip.signalAll();
}

private void nextGeneration() {
    唤醒所有线程
    trip.signalAll();
    将等待线程数量重置
    count = parties;
    generation = new Generation();
}

我们设想一下,如果初始化时,指定了线程 parties = 4,前面有 3 个线程调用了 await 等待,在第 4 个线程调用 await 之前,我们调用 reset 方法,那么会发生什么?

  • 首先,打破栅栏,那意味着所有等待的线程(3个等待的线程)会唤醒,await 方法会通过抛出 BrokenBarrierException 异常返回。然后开启新的一代,重置了 count 和 generation,相当于一切归零了。

3、CyclicBarrier的底层执行流程

关于CyclicBarrier的底层执行流程总结:

  1. 初始化CyclicBarrier中的各种成员变量,包括parties、count以及Runnable(可选);
  2. 当调用await()方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable,接下来开始下一个generation;(注意:这里只是调用Runnable的run()方法,并不是调用start()方法开启另一个线程)
  3. 在下一个分代中,将会重置count值为parties,并且创建新的Generation实例;
  4. 同时会调用Condition的singalAll方法,唤醒所有在屏障前面等待的线程,让其开始继续执行;(注意:当有可选的Runnable时,是执行完run()方法中的汇总操作,其他线程才会继续执行)
  5. 如果计数器没有归零,那么当前的调用线程将会通过Condition的await方法,在屏障前进行等待;
  6. 以上所有执行流程均在lock锁的控制范围内,不会出现并发情况。
  7. 在下一个分代时,该屏障又可以继续使用,例如计数器是3,线程1,线程2和线程3冲破了当前屏障后,下一个分代的屏障可以去给线程4,线程5和线程6使用,也可以又给线程1,线程2和线程3使用。

4、使用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

  • 场景1:比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

  • 场景2:生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是餐厅规定必须等到所有人到齐之后才会上菜。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。

  • 场景3:模拟秒杀场景,让一组线程同时等待,同时恢复执行,实现最大程度的并行性。

5、代码示例

使用 CyclicBarrier 时要注意:在线程池中使用 CyclicBarrier 时一定要注意线程的数量要多于 CyclicBarrier 实例中设置的阻塞线程的数量就会发生死锁。 调用 await() 方法的次数一定要等于屏障中设置的阻塞线程的数量,否则也会死锁。

5.1 示例一

package com.kz.example.juc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * 类说明:演示CyclicBarrier用法,共4个子线程,他们全部完成工作后,交出自己结果,
 * 再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
 *
 * @Author kongzi
 * @Date 2022/12/6 10:17
 * @Version 1.0
 */

public class UseCyclicBarrier {
    /** 创建一个CyclicBarrier实例,屏障数据设为3,处理完之后执行CollectThread类的run方法 */
    private static CyclicBarrier barrier
            = new CyclicBarrier(4, new CollectThread());

    /** 创建一个ConcurrentHashMap,用来保存每个线程的id */
    private static ConcurrentHashMap<String, Long> resultMap
            = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(new SubThread());
            thread.start();
        }

    }

    /** 汇总的任务 */
    private static class CollectThread implements Runnable {
        /** 等到所有的线程到达屏障 */
        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println(" the result = " + result);
            System.out.println("do other business........");
        }
    }

    /*相互等待的子线程*/
    private static class SubThread implements Runnable {

        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            resultMap.put(Thread.currentThread().getId() + "", id);
            try {
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " ....do something ");
                /** 线程完成工作后调用await 设置屏障 */
                barrier.await();
//                Thread.sleep(1000 + id);
//                System.out.println("Thread_" + id + " ....do its business ");
//                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

打印结果:

Thread_12 ....do something 
Thread_13 ....do something 
Thread_14 ....do something 
Thread_15 ....do something 
 the result = [12][13][14][15]
do other business........

放开 SubThread 线程中以下代码注释:

Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " ....do its business ");
barrier.await();

打印结果:

Thread_12 ....do something 
Thread_13 ....do something 
Thread_14 ....do something 
Thread_15 ....do something 
 the result = [12][13][14][15]
do other business........
Thread_12 ....do its business 
Thread_13 ....do its business 
Thread_14 ....do its business 
Thread_15 ....do its business 
 the result = [12][13][14][15]
do other business........

从以上打印结果中可以看出 CyclicBarrier 的计数器可以反复使用。

5.2 示例二:赛马

package com.kz.example.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 赛马程序
 *
 * @Author kongzi
 * @Date 2022/12/6 10:24
 * @Version 1.0
 */
class Horse implements Runnable {

    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;

    public Horse(CyclicBarrier b) { barrier = b; }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    //赛马每次随机跑几步
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    public String tracks() {
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < getStrides(); i++) {
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }

    public synchronized int getStrides() { return strides; }
    public String toString() { return "Horse " + id + " "; }

}

public class UseCyclicBarrierHorseRace implements Runnable {

    private static final int FINISH_LINE = 75;
    private static List<Horse> horses = new ArrayList<Horse>();
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @Override
    public void run() {
        StringBuilder s = new StringBuilder();
        //打印赛道边界
        for(int i = 0; i < FINISH_LINE; i++) {
            s.append("=");
        }
        System.out.println(s);
        //打印赛马轨迹
        for(Horse horse : horses) {
            System.out.println(horse.tracks());
        }
        //判断是否结束
        for(Horse horse : horses) {
            if(horse.getStrides() >= FINISH_LINE) {
                System.out.println(horse + "won!");
                exec.shutdownNow();
                return;
            }
        }
        //休息指定时间再到下一轮
        try {
            TimeUnit.MILLISECONDS.sleep(200);
        } catch(InterruptedException e) {
            System.out.println("barrier-action sleep interrupted");
        }
    }

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(7, new UseCyclicBarrierHorseRace());
        for(int i = 0; i < 7; i++) {
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);
        }
    }

}

该赛马程序主要是通过在控制台不停的打印各赛马的当前轨迹,以此达到动态显示的效果。整场比赛有多个轮次,每一轮次各个赛马都会随机走上几步然后调用await方法进行等待,当所有赛马走完一轮的时候将会执行任务将所有赛马的当前轨迹打印到控制台上。这样每一轮下来各赛马的轨迹都在不停的增长,当其中某个赛马的轨迹最先增长到指定的值的时候将会结束整场比赛,该赛马成为整场比赛的胜利者!程序的运行结果如下:

20181218144511713

5.3 示例三

package com.kz.example.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier模拟跑步场景
 *
 * @Author kongzi
 * @Date 2022/12/6 10:40
 * @Version 1.0
 */
public class UseCyclicBarrierRunner {
    public static void main(String[] args) {
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {
            System.out.println("所有人都准备好了裁判开始了");
        });
        for (int i = 0; i < 10; i++) {
            // lambda中只能只用final的变量
            final int times = i;
            new Thread(() -> {
                try {
                    System.out.println("子线程" + Thread.currentThread().getName() + "正在准备");
                    Thread.sleep(1000 * times);
                    System.out.println("子线程" + Thread.currentThread().getName() + "准备好了");
                    cyclicBarrier.await();
                    System.out.println("子线程" + Thread.currentThread().getName() + "开始跑了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }


    }
}

执行结果:

子线程Thread-2正在准备
子线程Thread-3正在准备
子线程Thread-0正在准备
子线程Thread-0准备好了
子线程Thread-1正在准备
子线程Thread-4正在准备
子线程Thread-6正在准备
子线程Thread-7正在准备
子线程Thread-5正在准备
子线程Thread-8正在准备
子线程Thread-9正在准备
子线程Thread-1准备好了
子线程Thread-2准备好了
子线程Thread-3准备好了
子线程Thread-4准备好了
子线程Thread-5准备好了
子线程Thread-6准备好了
子线程Thread-7准备好了
子线程Thread-8准备好了
子线程Thread-9准备好了
所有人都准备好了裁判开始了
子线程Thread-9开始跑了
子线程Thread-0开始跑了
子线程Thread-1开始跑了
子线程Thread-4开始跑了
子线程Thread-2开始跑了
子线程Thread-3开始跑了
子线程Thread-8开始跑了
子线程Thread-5开始跑了
子线程Thread-7开始跑了
子线程Thread-6开始跑了

可以看到所有线程在其他线程没有准备好之前都在被阻塞中,等到所有线程都准备好了才继续执行 我们在创建CyclicBarrier对象时传入了一个方法,当调用CyclicBarrier的await方法后,当前线程会被阻塞等到所有线程都调用了await方法后 调用传入CyclicBarrier的方法,然后让所有的被阻塞的线程一起运行。

5.4 示例四

和示例三一样,不同的是用了线程池实现。

package com.kz.example.juc;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * CyclicBarrier模拟小猪跑步
 *
 * @Author kongzi
 * @Date 2022/12/6 10:45
 * @Version 1.0
 */
public class UseCyclicBarrierPig {
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(5);
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("主人看看哪个猪跑最快,最肥...");
        });

        // 循环跑3次
        for (int i = 0; i < 3; i++) {
            // 5条猪开始跑
            for(int j = 0; j<5; j++) {
                int finalJ = j;
                service.submit(() -> {
                    System.out.println(finalJ + " pig is run .....");
                    try {
                        // 随机时间,模拟跑花费的时间
                        Thread.sleep(new Random().nextInt(5000));
                        System.out.println(finalJ + " pig reach barrier .....");
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
        service.shutdown();
    }
}

5.5 示例五

开会场景,需要等待所有的人都到达后才开始会议。

package com.kz.example.juc;

import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * 开会
 *
 * @Author kongzi
 * @Date 2022/12/6 10:17
 * @Version 1.0
 */

public class UseCyclicBarrierMetting {
    public static void main(String[] args) {
        // 1、会议需要三个人
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("三个人都已到达会议室");
            }
        });

        // 2、定义三个线程,相当于三个参会的人
        for (int i = 0; i < 3; i++) {
            final int finalI = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 3、模拟每人到会议室所需时间
                        Thread.sleep((long) (Math.random()*5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第"+Thread.currentThread().getName()+"个人到达会议室");
                    try {
                        // 4、等待其他人到会议室
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"开始开会");
                }
            }, String.valueOf(finalI)).start();
        }
    }
}

5.6 示例六

假设有一家公司要全体员工进行团建活动,活动内容为翻越三个障碍物,每一个人翻越障碍物所用的时间是不一样的。但是公司要求所有人在翻越当前障碍物之后再开始翻越下一个障碍物,也就是所有人翻越第一个障碍物之后,才开始翻越第二个,以此类推比如跨栏比赛,我们修改一下规则,当所有选手都跨过第一个栏杆是,才去跨第二个,以此类推,每一个员工都是一个“其他线程”。当所有人都翻越的所有的障碍物之后,程序才结束。而主线程可能早就结束了,这里我们不用管主线程。

package com.kz.example.juc;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier模拟翻越障碍物跑步
 * 
 * @Author kongzi
 * @Date 2022/12/6 10:59
 * @Version 1.0
 */
public class UseCyclicBarrierDemo {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

    public static class Surmount implements Runnable{
        @Override
        public void run() {

            try {
                for(int i = 1; i < 4; i++){
                    Random rand = new Random();
                    int randomNum = rand.nextInt((3000 - 1000) + 1) + 1000;//产生1000到3000之间的随机整数
                    Thread.sleep(randomNum);
                    String name = Thread.currentThread().getName();
                    System.out.println(name+"翻过了第" + i +"个障碍");
                    cyclicBarrier.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
        for (int i = 1; i < 6; i++){
            Thread thread = new Thread(new Surmount(),"选手"+ i );
            thread.start();
        }
        System.out.println("main is end");
    }
}

执行结果:

main is end
选手5翻过了第1个障碍
选手1翻过了第1个障碍
选手4翻过了第1个障碍
选手3翻过了第1个障碍
选手2翻过了第1个障碍
选手5翻过了第2个障碍
选手2翻过了第2个障碍
选手1翻过了第2个障碍
选手3翻过了第2个障碍
选手4翻过了第2个障碍
选手4翻过了第3个障碍
选手1翻过了第3个障碍
选手5翻过了第3个障碍
选手3翻过了第3个障碍
选手2翻过了第3个障碍

5.7 示例7

公司组织旅游,大家都有经历过,10个人,中午到饭点了,需要等到10个人都到了才能开饭,先到的人坐那等着,吃饭完毕之后,所有人都去车上,待所有人都到车上之后,驱车去下一景点玩。代码如下:

package com.kz.example.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * CyclicBarrier模拟公司团建旅游
 *
 * @Author kongzi
 * @Date 2022/12/6 11:39
 * @Version 1.0
 */
public class UseCyclicBarrierTravel {
    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

    public static class T extends Thread {
        int sleep;

        public T(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }

        //等待吃饭
        void eat() {
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");

                //休眠sleep时间,模拟当前员工吃饭耗时
                TimeUnit.SECONDS.sleep(sleep);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        //等待所有人到齐之后,开车去下一站
        void drive() {
            try {
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),去下一景点的路上!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            //等待所有人到齐之后吃饭,先到的人坐那等着,什么事情不要干
            this.eat();
            //等待所有人到齐之后开车去下一景点,先到的人坐那等着,什么事情不要干
            this.drive();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            new T("员工" + i, i).start();
        }
    }

}

最后到的人给大家倒酒赔罪

package com.kz.example.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * CyclicBarrier模拟公司团建旅游-最后到的人给大家倒酒赔罪
 *
 * @Author kongzi
 * @Date 2022/12/6 11:39
 * @Version 1.0
 */
public class UseCyclicBarrierTravel2 {
    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {
        //模拟倒酒,花了2秒,又得让其他9个人等2秒
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "说,不好意思,让大家久等了,给大家倒酒赔罪!");
    });

    public static class T extends Thread {
        int sleep;

        public T(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }

        @Override
        public void run() {
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            new T("员工" + i, i).start();
        }
    }
}

代码中创建CyclicBarrier对象时,多传入了一个参数(内部是倒酒操作),先到的人先等待,待所有人都到齐之后,需要先给大家倒酒,然后唤醒所有等待中的人让大家开饭。从输出结果中我们发现,倒酒操作是由最后一个人操作的,最后一个人倒酒完毕之后,才唤醒所有等待中的其他员工,让大家开饭。

执行结果:

image-1670298606517

其中一个人等待中被打断了

员工5等待中,突然接了个电话,有点急事,然后就拿起筷子开吃了,其他人会怎么样呢?看着他吃么?

package com.kz.example.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * CyclicBarrier模拟公司团建旅游-最后一个人等待中被打断了
 *
 * @Author kongzi
 * @Date 2022/12/6 11:39
 * @Version 1.0
 */
public class UseCyclicBarrierTravel3 {

    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

    public static class T extends Thread {
        int sleep;

        public T(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }

        @Override
        public void run() {
            long starTime = 0, endTime = 0;
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                System.out.println(this.getName() + "到了!");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            endTime = System.currentTimeMillis();
            System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            int sleep = 0;
            if (i == 10) {
                sleep = 10;
            }
            T t = new T("员工" + i, sleep);
            t.start();
            if (i == 5) {
                //模拟员工5接了个电话,将自己等待吃饭给打断了
                TimeUnit.SECONDS.sleep(1);
                System.out.println(t.getName() + ",有点急事,我先开干了!");
                t.interrupt();
                TimeUnit.SECONDS.sleep(2);
            }
        }
    }
}

执行结果:

员工3到了!
员工4到了!
员工1到了!
员工5到了!
员工2到了!
员工5,有点急事,我先开干了!
员工4,sleep:0 等待了1006(ms),开始吃饭了!
员工1,sleep:0 等待了995(ms),开始吃饭了!
员工5,sleep:0 等待了996(ms),开始吃饭了!
员工3,sleep:0 等待了1008(ms),开始吃饭了!
员工2,sleep:0 等待了996(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:234)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
员工6到了!
员工7到了!
员工8到了!
员工9到了!
员工6,sleep:0 等待了1(ms),开始吃饭了!
员工9,sleep:0 等待了1(ms),开始吃饭了!
员工8,sleep:0 等待了1(ms),开始吃饭了!
员工7,sleep:0 等待了1(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)
员工10到了!
员工10,sleep:10 等待了0(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel3$T.run(UseCyclicBarrierTravel3.java:35)

输出的信息看着有点乱,给大家理一理,员工5遇到急事,拿起筷子就是吃,这样好么,当然不好,他这么做了,后面看他这么做了都跟着这么做(这种场景是不是很熟悉,有一个人拿起筷子先吃起来,其他人都跟着上了),直接不等其他人了,拿起筷子就开吃了。CyclicBarrier遇到这种情况就是这么处理的。前面4个员工都在await()处等待着,员工5也在await()上等待着,等了1秒(TimeUnit.SECONDS.sleep(1);),接了个电话,然后给员工5发送中断信号后(t.interrupt();),员工5的await()方法会触发InterruptedException异常,此时其他等待中的前4个员工,看着5开吃了,自己立即也不等了,内部从await()方法中触发BrokenBarrierException异常,然后也开吃了,后面的6/7/8/9/10员工来了以后发现大家都开吃了,自己也不等了,6-10员工调用await()直接抛出了BrokenBarrierException异常,然后继续向下。

结论:

  • 内部有一个人把规则破坏了(接收到中断信号),其他人都不按规则来了,不会等待了
  • 接收到中断信号的线程,await方法会触发InterruptedException异常,然后被唤醒向下运行
  • 其他等待中或者后面到达的线程,会在await()方法上触发 BrokenBarrierException异常,然后继续执行

其中一个人只愿意等待5秒

员工1只愿意等的5秒,5s后如果大家还没到期,自己要开吃了,员工1开吃了,其他人会怎么样呢?

package com.kz.example.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * CyclicBarrier模拟公司团建旅游-其中一个人只愿意等待5秒
 *
 * @Author kongzi
 * @Date 2022/12/6 11:39
 * @Version 1.0
 */
public class UseCyclicBarrierTravel4 {

    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

    public static class T extends Thread {
        int sleep;

        public T(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }

        @Override
        public void run() {
            long starTime = 0, endTime = 0;
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                System.out.println(this.getName() + "到了!");
                if (this.getName().equals("员工1")) {
                    cyclicBarrier.await(5, TimeUnit.SECONDS);
                } else {
                    cyclicBarrier.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            endTime = System.currentTimeMillis();
            System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            T t = new T("员工" + i, i);
            t.start();
        }
    }
}

执行结果:

员工1到了!
员工2到了!
员工3到了!
员工4到了!
员工5到了!
员工6到了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
java.util.concurrent.TimeoutException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:37)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
员工5,sleep:5 等待了1005(ms),开始吃饭了!
员工1,sleep:1 等待了5011(ms),开始吃饭了!
员工2,sleep:2 等待了4015(ms),开始吃饭了!
员工3,sleep:3 等待了3013(ms),开始吃饭了!
员工6,sleep:6 等待了17(ms),开始吃饭了!
员工4,sleep:4 等待了2010(ms),开始吃饭了!
员工7到了!
员工7,sleep:7 等待了0(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
员工8到了!
员工8,sleep:8 等待了1(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
员工9到了!
员工9,sleep:9 等待了0(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)
员工10到了!
员工10,sleep:10 等待了0(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel4$T.run(UseCyclicBarrierTravel4.java:39)

从输出结果中我们可以看到:1等待5秒之后,开吃了,其他等待人都开吃了,后面来的人不等待,直接开吃了。

员工1调用有参await方法等待5秒之后,触发了TimeoutException异常,然后继续向下运行,其他的在5开吃之前已经等了一会的的几个员工,他们看到5开吃了,自己立即不等待了,也开吃了(他们的await抛出了BrokenBarrierException异常);还有几个员工在5开吃之后到达的,他们直接不等待了,直接抛出BrokenBarrierException异常,然后也开吃了。

结论:

  • 等待超时的方法 public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException
  • 内部有一个人把规则破坏了(等待超时),其他人都不按规则来了,不会等待了
  • 等待超时的线程,await方法会触发TimeoutException异常,然后被唤醒向下运行
  • 其他等待中或者后面到达的线程,会在await()方法上触发 BrokenBarrierException 异常,然后继续执行

重建规则

上例中改造一下,员工1等待5秒超时之后,开吃了,打破了规则,先前等待中的以及后面到达的都不按规则来了,都拿起筷子开吃。过了一会,导游重新告知大家,要按规则来,然后重建了规则,大家都按规则来了。

package com.kz.example.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * CyclicBarrier模拟公司团建旅游-重建规则
 *
 * @Author kongzi
 * @Date 2022/12/6 11:39
 * @Version 1.0
 */
public class UseCyclicBarrierTravel5 {
    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

    //规则是否已重建
    public static boolean guizhe = false;

    public static class T extends Thread {
        int sleep;

        public T(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }

        @Override
        public void run() {
            long starTime = 0, endTime = 0;
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                System.out.println(this.getName() + "到了!");
                if (!guizhe) {
                    if (this.getName().equals("员工1")) {
                        cyclicBarrier.await(5, TimeUnit.SECONDS);
                    } else {
                        cyclicBarrier.await();
                    }
                } else {
                    cyclicBarrier.await();

                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            endTime = System.currentTimeMillis();
            System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            T t = new T("员工" + i, i);
            t.start();
        }

        //等待10秒之后,重置,重建规则
        TimeUnit.SECONDS.sleep(15);
        cyclicBarrier.reset();
        guizhe = true;
        System.out.println("---------------大家太皮了,请大家按规则来------------------");
        //再来一次
        for (int i = 1; i <= 10; i++) {
            T t = new T("员工" + i, i);
            t.start();
        }
    }
}

执行结果:

员工1到了!
员工2到了!
员工3到了!
员工4到了!
员工5到了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
java.util.concurrent.TimeoutException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:40)
员工2,sleep:2 等待了3981(ms),开始吃饭了!
员工4,sleep:4 等待了1981(ms),开始吃饭了!
员工5,sleep:5 等待了990(ms),开始吃饭了!
员工3,sleep:3 等待了2979(ms),开始吃饭了!
员工1,sleep:1 等待了5006(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
员工6到了!
员工6,sleep:6 等待了1(ms),开始吃饭了!
员工7到了!
员工7,sleep:7 等待了1(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
员工8到了!
员工8,sleep:8 等待了0(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
员工9到了!
员工9,sleep:9 等待了0(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
员工10到了!
员工10,sleep:10 等待了0(ms),开始吃饭了!
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
	at com.kz.example.juc.UseCyclicBarrierTravel5$T.run(UseCyclicBarrierTravel5.java:42)
---------------大家太皮了,请大家按规则来------------------
员工1到了!
员工2到了!
员工3到了!
员工4到了!
员工5到了!
员工6到了!
员工7到了!
员工8到了!
员工9到了!
员工10到了!
员工10,sleep:10 等待了0(ms),开始吃饭了!
员工1,sleep:1 等待了9000(ms),开始吃饭了!
员工3,sleep:3 等待了6989(ms),开始吃饭了!
员工2,sleep:2 等待了8001(ms),开始吃饭了!
员工8,sleep:8 等待了2000(ms),开始吃饭了!
员工7,sleep:7 等待了2996(ms),开始吃饭了!
员工6,sleep:6 等待了3995(ms),开始吃饭了!
员工5,sleep:5 等待了4994(ms),开始吃饭了!
员工4,sleep:4 等待了5990(ms),开始吃饭了!
员工9,sleep:9 等待了998(ms),开始吃饭了!

可以看到第一次规则被打乱了,过了一会导游重建了规则(cyclicBarrier.reset();),接着又重来来了一次模拟等待吃饭的操作,正常了。

6、总结

CyclicBarrier 和 CountDownLatch 的区别

二者都能让一个或多个线程阻塞等待,都可以用在多个线程间的协调,起到线程同步的作用。

  • CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,被等待线程(例如主线程)再继续执行。
  • CyclicBarrier是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,子线程再继续执行。CyclicBarrier可以被重用,比如有三个线程,执行逻辑到达同步点阻塞,到齐后被唤醒,又再次执行逻辑,到达下一个同步点,到齐后再被唤醒
  • CountdownLatch适用于所有线程通过某一点后通知方法,而CyclicBarrier则适合让所有线程在同一点同时执行。
  • CountdownLatch利用继承AQS的共享锁来进行线程的通知,利用CAS来进行,而CyclicBarrier则利用ReentrantLock的Condition来阻塞和通知线程。
  • CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以反复使用(可以使用reset() 方法重置)。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  • CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
  • 在控制多个线程同时运行上,CountDownLatch 可以不限线程数量,而 CyclicBarrier 是固定线程数。
  • CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。

注意:当高并发请求时,countdownlatch的await方法有可能会引起死锁。

  • 如果线程池中线程的数量较少,在高并发时会出现多个请求占用了全部的线程,但是每个请求又需要await其他线程,被等待的线程拿不到线程资源无法执行,导致多个请求同时进入线程阻塞,最后形成死锁。
  • 解决方法:使用自定义线程池,扩大线程数量,并且建立线程池拒绝机制。
0

评论区