侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 292 篇文章
  • 累计创建 132 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Java并发编程之BlockingQueue阻塞队列原理及实战

孔子说JAVA
2022-06-26 / 0 评论 / 0 点赞 / 77 阅读 / 13,963 字 / 正在检测是否收录...

java.util.concurrent 包里的 BlockingQueue(阻塞队列) 接口是 Queue 接口的子接口,是一个在队列基础上又支持了两个附加操作的队列,常用来解耦。这两个附加的操作是:1)在队列为空时,获取元素的线程会等待队列变为非空。2)当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

  • 支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满。

  • 支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空。

image-1656030281462

1、常用方法

阻塞队列 BlockingQueue 通常用于一个线程生产对象,另外一个线程消费这些对象的场景。和其他类型队列的最重要的区别就是“阻塞”两个字。阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。

1.1 方法分类

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

方法 抛出异常 返回特定值 阻塞 阻塞特定时间
入队/插入 add(e) offer(e) put(e) offer(e,timeout,unit)
出队/移 remove() poll() take() poll(timeout,unit)
获取队首元素/检查 element(e) peek(e) 不支持 不支持

四组不同的行为方式解释:

  • 抛异常:如果试图的操作无法立即执行,抛一个异常。
  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。
可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉,那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

1.2 入队方法的区别

boolean add(E e):不阻塞方法,添加一个元素到此队列中,添加成功返回true, 如果队列满了,则抛出 IllegalStateException 异常。

boolean offer(E e):不阻塞方法,添加一个元素到此队列中,添加成功返回true, 如果队列满了,立即返回false。

void put(E e):阻塞方法,添加一个元素到此队列中,如果队列满了,一直阻塞,直到空间/容量可用或者线程被中断。

boolean offer(E e, long timeout, TimeUnit unit):阻塞特定时间的方法,在队尾插入一个元素,如果队列已满,则等待指定的等待时间(如有必要)才能使空间变得可用,直到出现以下三种情况:

  • 被唤醒
  • 等待时间超时
  • 当前线程被中断

1.3 出队方法的区别

E remove():不阻塞方法,返回并删除队首元素,队列为空则抛出异常。

boolean remove(Object o):不阻塞方法,从此队列中删除指定元素的单个实例(如果存在)。

E poll():不阻塞方法,返回并删除队首元素,如果没有元素,直接返回null;如果有元素则出队。

E take():阻塞方法,如果队列空了,一直阻塞,直到队列不为空或者线程被中断。

E poll(long timeout, TimeUnit unit):阻塞特定时间,检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:

  • 被唤醒
  • 等待时间超时
  • 当前线程被中断

1.4 其他方法

element():返回队首元素,但不移除,队列为空则抛出异常。

peek():获取队首元素,但不移除,队列为空则返回null。

boolean contains(Object o):如果此队列包含指定的元素,则返回true。

int drainTo(Collection<? super E> c):从该队列中删除所有可用的元素,并将它们添加到给定的集合中。

int drainTo(Collection<? super E> c, int maxElements):最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。

2、常用阻塞队列及生产场景

2.1 常用阻塞队列

BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。常见阻塞队列如下:

  • ArrayBlockingQueue(常用):基于数组结构实现的一个有界阻塞队列。
BlockingQueue queue = new ArrayBlockingQueue(1024);  
  
queue.put("1");  
  
Object object = queue.take();  

// 以下是使用了 Java 泛型的一个 BlockingQueue 示例

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);  
  
queue.put("1");  
  
String string = queue.take();  
  • LinkedBlockingQueue(常用):基于链表结构实现的一个有界阻塞队列,大小默认为Integer.MAX_VALUE。
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();  
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);  
  
bounded.put("Value");  
  
String value = bounded.take();  
  • PriorityBlockingQueue:支持按优先级排序的无界阻塞队列。
BlockingQueue queue   = new PriorityBlockingQueue();  
  
//String implements java.lang.Comparable  
queue.put("Value");  
  
String value = queue.take();  
  • DelayQueue:基于优先级队列(PriorityBlockingQueue)实现的延迟无界阻塞队列。
// Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。
// 这个可能在对 DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。
// DelayedElement 是我所创建的一个 DelayedElement 接口的实现类,它不在 Java.util.concurrent 包里。

DelayQueue queue = new DelayQueue();  
Delayed element1 = new DelayedElement();  
queue.put(element1);  
Delayed element2 = queue.take();  
  • SynchronousQueue:存储单个元素的阻塞队列。SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

  • LinkedTransferQueue:基于链表结构实现的一个无界阻塞队列。

  • LinkedBlockingDeque:基于链表结构实现的一个双端(双向)阻塞队列。

2.2 阻塞队列的边界

阻塞队列根据容量的大小,可以分为有界和无界两种。

  • 无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。

  • 还有的阻塞队列是有界的,如 ArrayBlockingQueue,如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

2.3 阻塞队列的生产场景

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。因为 BlockingQueue(阻塞队列)是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。

  • 生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开发的难度和工作量。

  • 队列它还能起到一个隔离的作用。生产者不需要关注消费者的逻辑。实现了具体任务与执行任务类之间的解耦,提高了安全性。

生产者线程

image-1656033915536

生产者线程将会持续生产新对象并将其插入到队列之中(put方法),直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞,且会一直处于阻塞之中,直到队列里有了空闲空间。比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。

消费者线程

image-1656034163177

负责消费的线程将会一直从该阻塞队列中拿出对象(take方法的功能是获取并移除队列的头结点),通常在队列里有数据的时候是可以正常移除的。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,一旦队列里有数据了(如一个生产线程把一个对象丢进了队列),就会立刻解除阻塞状态,并且取到数据。

3、源码解析

下面主要对两个常用的阻塞队列LinkedBlockingQueue、ArrayBlockingQueue进行源码解读。

3.1 LinkedBlockingQueue

LinkedBlockingQueue 是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。

  • LinkedBlockingQueue 的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。

  • LinkedBlockingQueue 的底层数据结构是使用链表实现的一个队列,有别于一般的队列的区别在于该队列至少有一个节点,头节点不含有元素。结构图如下:

image-1656034545021

3.1.1 原理

LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以是一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。

具体入队与出队的原理图(图中每一个节点前半部分表示封装的数据x,后边的表示指向的下一个引用):

初始化:初始化之后,有一个初始化数据为null,且head和last节点都是这个节点。

image-1656035335981

入队两个元素过后

image-1656035407108

出队一个元素后

image-1656035422926

3.1.2 属性定义

LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。

//容量,如果没有指定,该值为Integer.MAX_VALUE;

private final int capacity;

//当前队列中的元素

private final AtomicInteger count =new AtomicInteger();

//队列头节点,始终满足head.item==null

transient Node head;

//队列的尾节点,始终满足last.next==null

private transient Node last;

//用于出队的锁

private final ReentrantLock takeLock =new ReentrantLock();

//当队列为空时,保存执行出队的线程

private final Condition notEmpty = takeLock.newCondition();

//用于入队的锁

private final ReentrantLock putLock =new ReentrantLock();

//当队列满时,保存执行入队的线程

private final Condition notFull = putLock.newCondition();

3.1.3 put(E e)方法

put(E e)方法用于将一个元素插入到队列的尾部,其实现如下:

public void put(E e)throws InterruptedException {

//不允许元素为null

    if (e ==null)

throw new NullPointerException();

    int c = -1;

    //以当前元素新建一个节点

    Node node =new Node(e);

    final ReentrantLock putLock =this.putLock;

    final AtomicInteger count =this.count;

    //获得入队的锁

    putLock.lockInterruptibly();

    try {

       //如果队列已满,那么将该线程加入到Condition的等待队列中

        while (count.get() == capacity) {

             notFull.await();

        }

       //将节点入队

        enqueue(node);

        //得到插入之前队列的元素个数

        c = count.getAndIncrement();

        //如果还可以插入元素,那么释放等待的入队线程

        if (c +1 < capacity){

              notFull.signal();

        }

}finally {

//解锁

        putLock.unlock();

    }

//通知出队线程队列非空

    if (c ==0)

signalNotEmpty();

}

put方法总结:

  1. LinkedBlockingQueue不允许元素为null。

  2. 同一时刻,只能有一个线程执行入队操作,因为putLock在将元素插入到队列尾部时加锁了

  3. 如果队列满了,那么将会调用notFull的await()方法将该线程加入到Condition等待队列中。await()方法就会释放线程占有的锁,这将导致之前由于被锁阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为由于队列仍旧是满的,也被加入到条件队列中。

  4. 一旦一个出队线程取走了一个元素,并通知了入队等待队列中可以释放线程了,那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部

  5. 然后得到插入一个节点之前的元素个数,如果队列中还有空间可以插入,那么就通知notFull条件的等待队列中的线程。

  6. 通知出队线程队列为空了,因为插入一个元素之前的个数为0,而插入一个之后队列中的元素就从无变成了有,就可以通知因队列为空而阻塞的出队线程了。

3.1.4 take()方法

take()方法用于得到队头的元素,在队列为空时会阻塞,知道队列中有元素可取。其实现如下:

public E take() throws InterruptedException {

        E x;

        int c = -1;

        final AtomicInteger count = this.count;

        final ReentrantLock takeLock = this.takeLock;

        //获取takeLock锁       

         takeLock.lockInterruptibly();

        try {

            //如果队列为空,那么加入到notEmpty条件的等待队列中           

            while (count.get() == 0) {

                notEmpty.await();

            }

            //得到队头元素           

             x = dequeue();

            //得到取走一个元素之前队列的元素个数           

               c = count.getAndDecrement();

            //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程           

                if (c > 1)

                notEmpty.signal();

        } finally {

            takeLock.unlock();

        }

        //如果队列中的元素从满到非满,通知put线程       

           if (c == capacity)

            signalNotFull();

        return x;

    }

take方法总结:

  1. 当队列为空时,就加入到notEmpty(的条件等待队列中,当队列不为空时就取走一个元素,当取完发现还有元素可取时,再通知一下自己的伙伴(等待在条件队列中的线程);最后,如果队列从满到非满,通知一下put线程。

3.1.5 remove()方法

remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回false;有的话则删除并返回true。入队和出队都是只获取一个锁,而remove()方法需要同时获得两把锁,其实现如下:

public boolean remove(Object o) {

        //因为队列不包含null元素,返回false     

          if (o == null) return false;

        //获取两把锁        fullyLock();

        try {

            //从头的下一个节点开始遍历           

            for (Node trail = head, p = trail.next;

                p != null;

                trail = p, p = p.next) {

                //如果匹配,那么将节点从队列中移除,trail表示前驱节点               

               if (o.equals(p.item)) {

                    unlink(p, trail);

                    return true;

                }

            }

            return false;

        } finally {

            //释放两把锁         

            fullyUnlock();

        }

}
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

3.1.6 总结

LinkedBlockingQueue是允许两个线程同时在两端进行入队或出队的操作的,但一端同时只能有一个线程进行操作,这是通过两把锁来区分的;

  • 为了维持底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(可以+1),另一个是出队的方法(可以-1),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。

3.2 ArrayBlockingQueue

3.2.1 原理

ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里(底层是使用一个数组实现的队列)。有界也就意味着它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。

  • ArrayBlockingQueue 是一个容量限制的阻塞队列,因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。

  • ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

3.2.2 属性定义

/** The queued items */ 
final Object[] items;

/** Main lock guarding all access */ 
final ReentrantLock lock;

/** Condition for waiting takes */    
private final Condition notEmpty;

/** Condition for waiting puts */   
private final Condition notFull;

3.2.3 put(E e)方法

put(E e)方法在队列不满的情况下,将会将元素添加到队列尾部,如果队列已满,将会阻塞,直到队列中有剩余空间可以插入。该方法的实现如下:

public void put(E e) throws InterruptedException {

        //检查元素是否为null,如果是,抛出NullPointerException       

       checkNotNull(e);

        final ReentrantLock lock = this.lock;

        //加锁       

        lock.lockInterruptibly();

        try {

            //如果队列已满,阻塞,等待队列成为不满状态           

            while (count == items.length)

                notFull.await();

            //将元素入队           

            enqueue(e);

        } finally {

            lock.unlock();

        }

}

put方法总结:

  1. ArrayBlockingQueue不允许元素为null

  2. ArrayBlockingQueue在队列已满时将会调用notFull的await()方法释放锁并处于阻塞状态

  3. 一旦ArrayBlockingQueue不为满的状态,就将元素入队

3.2.4 take()方法

take()方法用于取走队头的元素,当队列为空时将会阻塞,直到队列中有元素可取走时将会被释放。其实现如下:

public E take() throws InterruptedException {

        final ReentrantLock lock = this.lock;

        //首先加锁       

         lock.lockInterruptibly();

        try {

            //如果队列为空,阻塞           

            while (count == 0)

                notEmpty.await();

            //队列不为空,调用dequeue()出队           

            return dequeue();

        } finally {

            //释放锁           

          lock.unlock();

        }

}

take方法总结:

  1. 一旦获得了锁之后,如果队列为空,那么将阻塞;否则调用dequeue()出队一个元素。

3.2.5 总结

ArrayBlockingQueue的并发阻塞是通过ReentrantLock和Condition来实现的,ArrayBlockingQueue内部只有一把锁,意味着同一时刻只有一个线程能进行入队或者出队的操作。

3.3 LinkedBlockingQueue、ArrayBlockingQueue比较

ArrayBlockingQueue:

  1. 一个对象数组+一把锁+两个条件

  2. 入队与出队都用同一把锁

  3. 在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高

  4. 采用了数组,必须指定大小,即容量有限

LinkedBlockingQueue:

  1. 一个单向链表+两把锁+两个条件

  2. 两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。

  3. 在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多

  4. 采用了链表,最大容量为整数最大值,可看做容量无限

4、API使用示例

4.1 会抛出异常API演示

add和remove方法操作元素的时候,在队列满时执行add会抛出异常,在队列为空时执行remove会抛出异常。

public class Test {
    public static void main(String[] args) {
        //指定队列大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        //add添加成功返回true
        System.out.println(arrayBlockingQueue.add("1"));
        System.out.println(arrayBlockingQueue.add("2"));
        System.out.println(arrayBlockingQueue.add("3"));
        //查看队首的元素是谁 1
        System.out.println(arrayBlockingQueue.element());
        //超过队列大小 add会抛出异常  Queue full
        // System.out.println(arrayBlockingQueue.add("4"));
        //remove取出一个元素  返回取出的值 如果队列为空  remove会抛出异常
        // NoSuchElementException
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
    }
}

image-1656036776765

4.2 有返回值无异常API演示

public class Test {
    public static void main(String[] args) {
        //指定队列大小
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
        //offer  添加一个元素  返回一个boolean值   成功返回true失败返回true
        System.out.println(blockingQueue.offer(1));
        System.out.println(blockingQueue.offer(2));
        System.out.println(blockingQueue.offer(3));
        System.out.println("----------------");
        //检测队首元素
        System.out.println(blockingQueue.peek());
        //poll  取出一个元素  返回一个元素    队列为空时 取出null
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.peek());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }
}

image-1656036798817

4.3 阻塞API使用

队列已满的情况下会一直阻塞直到有空位。

public class Test {
    public static void main(String[] args) {
        //指定队列大小
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
        try {
            //put添加元素 没有返回值 满了一直阻塞
            //队列大小为二   第三个元素放不进去   阻塞两秒过后就会结束
            blockingQueue.put("1");
            blockingQueue.put("2");
            blockingQueue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            //取出元素  空了一直阻塞  返回值取出的元素
            System.out.println(blockingQueue.take());;
            System.out.println(blockingQueue.take());
            System.out.println(blockingQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

image-1656036854493

4.4 阻塞API使用

设置阻塞时间,超过阻塞时间没放进去就放弃等待。

public class Test {
    public static void main(String[] args) {
        //指定队列大小
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);

        try {
            //参数 插入的数值  超时时间 和 单位
            blockingQueue.offer("1");
            blockingQueue.offer("2");
            blockingQueue.offer("3",2, TimeUnit.SECONDS);
            System.out.println("------");
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll());
            System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

image-1656036944186

5、生产者消费者实例

以下是使用 ArrayBlockingQueue 实现的生产者消费者实例,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 一个 Consumer。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。

public class BlockingQueueExample {  
  
    public static void main(String[] args) throws Exception {  
  
        BlockingQueue queue = new ArrayBlockingQueue(1024);  
  
        Producer producer = new Producer(queue);  
        Consumer consumer = new Consumer(queue);  
  
        new Thread(producer).start();  
        new Thread(consumer).start();  
  
        Thread.sleep(4000);  
    }  
}  

以下是 Producer 类。注意它在每次 put() 调用时休眠一秒钟,这将导致 Consumer 在等待队列中对象的时候发生阻塞。

public class Producer implements Runnable{  
  
    protected BlockingQueue queue = null;  
  
    public Producer(BlockingQueue queue) {  
        this.queue = queue;  
    }  
  
    public void run() {  
        try {  
            queue.put("1");  
            Thread.sleep(1000);  
            queue.put("2");  
            Thread.sleep(1000);  
            queue.put("3");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

以下是 Consumer 类。它只负责把对象从队列中抽取出来,然后将它们打印出来。

public class Consumer implements Runnable{  
  
    protected BlockingQueue queue = null;  
  
    public Consumer(BlockingQueue queue) {  
        this.queue = queue;  
    }  
  
    public void run() {  
        try {  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}
0

评论区