java.util.concurrent 包里的 BlockingQueue(阻塞队列) 接口是 Queue 接口的子接口,是一个在队列基础上又支持了两个附加操作的队列,常用来解耦。这两个附加的操作是:1)在队列为空时,获取元素的线程会等待队列变为非空。2)当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
-
支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满。
-
支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空。
1、常用方法
阻塞队列 BlockingQueue 通常用于一个线程生产对象,另外一个线程消费这些对象的场景。和其他类型队列的最重要的区别就是“阻塞”两个字。阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。
1.1 方法分类
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞特定时间 |
---|---|---|---|---|
入队/插入 | add(e) | offer(e) | put(e) | offer(e,timeout,unit) |
出队/移 | remove() | poll() | take() | poll(timeout,unit) |
获取队首元素/检查 | element(e) | peek(e) | 不支持 | 不支持 |
四组不同的行为方式解释:
- 抛异常:如果试图的操作无法立即执行,抛一个异常。
- 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。
可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉,那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。
1.2 入队方法的区别
boolean add(E e)
:不阻塞方法,添加一个元素到此队列中,添加成功返回true, 如果队列满了,则抛出 IllegalStateException 异常。
boolean offer(E e)
:不阻塞方法,添加一个元素到此队列中,添加成功返回true, 如果队列满了,立即返回false。
void put(E e)
:阻塞方法,添加一个元素到此队列中,如果队列满了,一直阻塞,直到空间/容量可用或者线程被中断。
boolean offer(E e, long timeout, TimeUnit unit)
:阻塞特定时间的方法,在队尾插入一个元素,如果队列已满,则等待指定的等待时间(如有必要)才能使空间变得可用,直到出现以下三种情况:
- 被唤醒
- 等待时间超时
- 当前线程被中断
1.3 出队方法的区别
E remove()
:不阻塞方法,返回并删除队首元素,队列为空则抛出异常。
boolean remove(Object o)
:不阻塞方法,从此队列中删除指定元素的单个实例(如果存在)。
E poll()
:不阻塞方法,返回并删除队首元素,如果没有元素,直接返回null;如果有元素则出队。
E take()
:阻塞方法,如果队列空了,一直阻塞,直到队列不为空或者线程被中断。
E poll(long timeout, TimeUnit unit)
:阻塞特定时间,检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
- 被唤醒
- 等待时间超时
- 当前线程被中断
1.4 其他方法
element()
:返回队首元素,但不移除,队列为空则抛出异常。
peek()
:获取队首元素,但不移除,队列为空则返回null。
boolean contains(Object o)
:如果此队列包含指定的元素,则返回true。
int drainTo(Collection<? super E> c)
:从该队列中删除所有可用的元素,并将它们添加到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements)
:最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。
2、常用阻塞队列及生产场景
2.1 常用阻塞队列
BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。常见阻塞队列如下:
- ArrayBlockingQueue(常用):基于数组结构实现的一个有界阻塞队列。
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();
// 以下是使用了 Java 泛型的一个 BlockingQueue 示例
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take();
- LinkedBlockingQueue(常用):基于链表结构实现的一个有界阻塞队列,大小默认为Integer.MAX_VALUE。
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take();
- PriorityBlockingQueue:支持按优先级排序的无界阻塞队列。
BlockingQueue queue = new PriorityBlockingQueue();
//String implements java.lang.Comparable
queue.put("Value");
String value = queue.take();
- DelayQueue:基于优先级队列(PriorityBlockingQueue)实现的延迟无界阻塞队列。
// Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。
// 这个可能在对 DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。
// DelayedElement 是我所创建的一个 DelayedElement 接口的实现类,它不在 Java.util.concurrent 包里。
DelayQueue queue = new DelayQueue();
Delayed element1 = new DelayedElement();
queue.put(element1);
Delayed element2 = queue.take();
-
SynchronousQueue:存储单个元素的阻塞队列。SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。
-
LinkedTransferQueue:基于链表结构实现的一个无界阻塞队列。
-
LinkedBlockingDeque:基于链表结构实现的一个双端(双向)阻塞队列。
2.2 阻塞队列的边界
阻塞队列根据容量的大小,可以分为有界和无界两种。
-
无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。
-
还有的阻塞队列是有界的,如 ArrayBlockingQueue,如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
2.3 阻塞队列的生产场景
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。因为 BlockingQueue(阻塞队列)是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。
-
生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开发的难度和工作量。
-
队列它还能起到一个隔离的作用。生产者不需要关注消费者的逻辑。实现了具体任务与执行任务类之间的解耦,提高了安全性。
生产者线程
生产者线程将会持续生产新对象并将其插入到队列之中(put方法),直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞,且会一直处于阻塞之中,直到队列里有了空闲空间。比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。
消费者线程
负责消费的线程将会一直从该阻塞队列中拿出对象(take方法的功能是获取并移除队列的头结点),通常在队列里有数据的时候是可以正常移除的。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,一旦队列里有数据了(如一个生产线程把一个对象丢进了队列),就会立刻解除阻塞状态,并且取到数据。
3、源码解析
下面主要对两个常用的阻塞队列LinkedBlockingQueue、ArrayBlockingQueue进行源码解读。
3.1 LinkedBlockingQueue
LinkedBlockingQueue 是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。
-
LinkedBlockingQueue 的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。
-
LinkedBlockingQueue 的底层数据结构是使用链表实现的一个队列,有别于一般的队列的区别在于该队列至少有一个节点,头节点不含有元素。结构图如下:
3.1.1 原理
LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以是一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。
具体入队与出队的原理图(图中每一个节点前半部分表示封装的数据x,后边的表示指向的下一个引用):
初始化:初始化之后,有一个初始化数据为null,且head和last节点都是这个节点。
入队两个元素过后
出队一个元素后
3.1.2 属性定义
LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。
//容量,如果没有指定,该值为Integer.MAX_VALUE;
private final int capacity;
//当前队列中的元素
private final AtomicInteger count =new AtomicInteger();
//队列头节点,始终满足head.item==null
transient Node head;
//队列的尾节点,始终满足last.next==null
private transient Node last;
//用于出队的锁
private final ReentrantLock takeLock =new ReentrantLock();
//当队列为空时,保存执行出队的线程
private final Condition notEmpty = takeLock.newCondition();
//用于入队的锁
private final ReentrantLock putLock =new ReentrantLock();
//当队列满时,保存执行入队的线程
private final Condition notFull = putLock.newCondition();
3.1.3 put(E e)方法
put(E e)方法用于将一个元素插入到队列的尾部,其实现如下:
public void put(E e)throws InterruptedException {
//不允许元素为null
if (e ==null)
throw new NullPointerException();
int c = -1;
//以当前元素新建一个节点
Node node =new Node(e);
final ReentrantLock putLock =this.putLock;
final AtomicInteger count =this.count;
//获得入队的锁
putLock.lockInterruptibly();
try {
//如果队列已满,那么将该线程加入到Condition的等待队列中
while (count.get() == capacity) {
notFull.await();
}
//将节点入队
enqueue(node);
//得到插入之前队列的元素个数
c = count.getAndIncrement();
//如果还可以插入元素,那么释放等待的入队线程
if (c +1 < capacity){
notFull.signal();
}
}finally {
//解锁
putLock.unlock();
}
//通知出队线程队列非空
if (c ==0)
signalNotEmpty();
}
put方法总结:
LinkedBlockingQueue不允许元素为null。
同一时刻,只能有一个线程执行入队操作,因为putLock在将元素插入到队列尾部时加锁了
如果队列满了,那么将会调用notFull的await()方法将该线程加入到Condition等待队列中。await()方法就会释放线程占有的锁,这将导致之前由于被锁阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为由于队列仍旧是满的,也被加入到条件队列中。
一旦一个出队线程取走了一个元素,并通知了入队等待队列中可以释放线程了,那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部
然后得到插入一个节点之前的元素个数,如果队列中还有空间可以插入,那么就通知notFull条件的等待队列中的线程。
通知出队线程队列为空了,因为插入一个元素之前的个数为0,而插入一个之后队列中的元素就从无变成了有,就可以通知因队列为空而阻塞的出队线程了。
3.1.4 take()方法
take()方法用于得到队头的元素,在队列为空时会阻塞,知道队列中有元素可取。其实现如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//获取takeLock锁
takeLock.lockInterruptibly();
try {
//如果队列为空,那么加入到notEmpty条件的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//得到队头元素
x = dequeue();
//得到取走一个元素之前队列的元素个数
c = count.getAndDecrement();
//如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果队列中的元素从满到非满,通知put线程
if (c == capacity)
signalNotFull();
return x;
}
take方法总结:
- 当队列为空时,就加入到notEmpty(的条件等待队列中,当队列不为空时就取走一个元素,当取完发现还有元素可取时,再通知一下自己的伙伴(等待在条件队列中的线程);最后,如果队列从满到非满,通知一下put线程。
3.1.5 remove()方法
remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回false;有的话则删除并返回true。入队和出队都是只获取一个锁,而remove()方法需要同时获得两把锁,其实现如下:
public boolean remove(Object o) {
//因为队列不包含null元素,返回false
if (o == null) return false;
//获取两把锁 fullyLock();
try {
//从头的下一个节点开始遍历
for (Node trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//如果匹配,那么将节点从队列中移除,trail表示前驱节点
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//释放两把锁
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
3.1.6 总结
LinkedBlockingQueue是允许两个线程同时在两端进行入队或出队的操作的,但一端同时只能有一个线程进行操作,这是通过两把锁来区分的;
- 为了维持底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(可以+1),另一个是出队的方法(可以-1),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。
3.2 ArrayBlockingQueue
3.2.1 原理
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里(底层是使用一个数组实现的队列)。有界也就意味着它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
-
ArrayBlockingQueue 是一个容量限制的阻塞队列,因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。
-
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
3.2.2 属性定义
/** The queued items */
final Object[] items;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
3.2.3 put(E e)方法
put(E e)方法在队列不满的情况下,将会将元素添加到队列尾部,如果队列已满,将会阻塞,直到队列中有剩余空间可以插入。该方法的实现如下:
public void put(E e) throws InterruptedException {
//检查元素是否为null,如果是,抛出NullPointerException
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//如果队列已满,阻塞,等待队列成为不满状态
while (count == items.length)
notFull.await();
//将元素入队
enqueue(e);
} finally {
lock.unlock();
}
}
put方法总结:
ArrayBlockingQueue不允许元素为null
ArrayBlockingQueue在队列已满时将会调用notFull的await()方法释放锁并处于阻塞状态
一旦ArrayBlockingQueue不为满的状态,就将元素入队
3.2.4 take()方法
take()方法用于取走队头的元素,当队列为空时将会阻塞,直到队列中有元素可取走时将会被释放。其实现如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//首先加锁
lock.lockInterruptibly();
try {
//如果队列为空,阻塞
while (count == 0)
notEmpty.await();
//队列不为空,调用dequeue()出队
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
take方法总结:
- 一旦获得了锁之后,如果队列为空,那么将阻塞;否则调用dequeue()出队一个元素。
3.2.5 总结
ArrayBlockingQueue的并发阻塞是通过ReentrantLock和Condition来实现的,ArrayBlockingQueue内部只有一把锁,意味着同一时刻只有一个线程能进行入队或者出队的操作。
3.3 LinkedBlockingQueue、ArrayBlockingQueue比较
ArrayBlockingQueue:
-
一个对象数组+一把锁+两个条件
-
入队与出队都用同一把锁
-
在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
-
采用了数组,必须指定大小,即容量有限
LinkedBlockingQueue:
-
一个单向链表+两把锁+两个条件
-
两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。
-
在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
-
采用了链表,最大容量为整数最大值,可看做容量无限
4、API使用示例
4.1 会抛出异常API演示
add和remove方法操作元素的时候,在队列满时执行add会抛出异常,在队列为空时执行remove会抛出异常。
public class Test {
public static void main(String[] args) {
//指定队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
//add添加成功返回true
System.out.println(arrayBlockingQueue.add("1"));
System.out.println(arrayBlockingQueue.add("2"));
System.out.println(arrayBlockingQueue.add("3"));
//查看队首的元素是谁 1
System.out.println(arrayBlockingQueue.element());
//超过队列大小 add会抛出异常 Queue full
// System.out.println(arrayBlockingQueue.add("4"));
//remove取出一个元素 返回取出的值 如果队列为空 remove会抛出异常
// NoSuchElementException
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
}
}
4.2 有返回值无异常API演示
public class Test {
public static void main(String[] args) {
//指定队列大小
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
//offer 添加一个元素 返回一个boolean值 成功返回true失败返回true
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
System.out.println("----------------");
//检测队首元素
System.out.println(blockingQueue.peek());
//poll 取出一个元素 返回一个元素 队列为空时 取出null
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
}
4.3 阻塞API使用
队列已满的情况下会一直阻塞直到有空位。
public class Test {
public static void main(String[] args) {
//指定队列大小
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
try {
//put添加元素 没有返回值 满了一直阻塞
//队列大小为二 第三个元素放不进去 阻塞两秒过后就会结束
blockingQueue.put("1");
blockingQueue.put("2");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//取出元素 空了一直阻塞 返回值取出的元素
System.out.println(blockingQueue.take());;
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.4 阻塞API使用
设置阻塞时间,超过阻塞时间没放进去就放弃等待。
public class Test {
public static void main(String[] args) {
//指定队列大小
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(2);
try {
//参数 插入的数值 超时时间 和 单位
blockingQueue.offer("1");
blockingQueue.offer("2");
blockingQueue.offer("3",2, TimeUnit.SECONDS);
System.out.println("------");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5、生产者消费者实例
以下是使用 ArrayBlockingQueue 实现的生产者消费者实例,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 一个 Consumer。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
以下是 Producer 类。注意它在每次 put() 调用时休眠一秒钟,这将导致 Consumer 在等待队列中对象的时候发生阻塞。
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以下是 Consumer 类。它只负责把对象从队列中抽取出来,然后将它们打印出来。
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
评论区