在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。在正文开始之前需要了解线程安全、并行、并发等相关概念。
- 线程安全就是说多线程访问同一代码,不会产生不确定的结果。
- 并行是指两者同时执行一件事,比如赛跑,两个人都在不停的往前跑;
- 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率
本文侧重介绍阻塞队列LinkedBlockingQueue和并发队列ConcurrentLinkedQueue的用法比较,有关并发队列与阻塞队列的详细介绍请移步:
1、阻塞队列与非阻塞队列
1.1 阻塞队列简介
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
java.util.concurrent 包里的 BlockingQueue(阻塞队列) 接口是 Queue 接口的子接口,是一个在队列基础上又支持了两个附加操作的队列,常用来解耦。这两个附加的阻塞操作是插入与取出:
- 支持阻塞的移除方法take: 队列空时,获取/移除元素的线程会一直被阻塞,直到队列中有元素时才可以继续取出(等待队列变为非空)。
- 支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程(存储元素的线程会等待队列可用),直到队列有空闲时停止阻塞,新元素才可以继续插入。
除拥有普通队列的方法之外,阻塞队列提供了另外4个常用的方法:
- put(E e):向队尾插入元素,若队列已满,则被阻塞等待,直到有空闲才继续插入。
- take():从队首取出元素,若队列为空,则被阻塞等待,直到有元素才继续取出。
- offer(E e,long timeout, TimeUnit unit):向队尾插入元素,若队列已满,则计时等待,当时间期限达到时,若队列还是满的,则返回false;若等待在期限内,队列空闲,则插入成功,返回true。
- poll(long timeout, TimeUnit unit):从队首取出元素,如果队列空,则计时等待,当时间期限达到时,若队列还是空的,则返回null;若等待在期限内,队列中有元素,否则返回取得的元素。
1.2 非阻塞队列简介
非阻塞队列也就是一般的队列,没有阻塞队列的两个阻塞功能。其主要方法如下:
- boolean add(E e):将元素e插入到队列末尾,插入成功,返回true;插入失败(即队列已满),抛出异常;
- boolean offer(E e):将元素e插入到队列末尾,插入成功,则返回true;插入失败(即队列已满),返回false;
- E remove():移除队首元素,若移除成功,则返回true;移除失败(队列为空),则会抛出异常;
- E poll():获取队首元素并移除,若队列不为空,则返回队首元素;否则返回null;
- E element():获取队首元素并不移除元素,若队列不为空,则返回队首元素;否则抛出异常;
- E peek():获取队首元素并不移除元素,若队列不为空,则返回队首元素;否则返回null;
从类图可以发现非阻塞队列与阻塞队列的一个明显区别:ConcurrentLinkedQueue没有实现BlockingQueue接口,所以ConcurrentLinkedQueue没有提供具有阻塞性质的put、take等方法。
2、阻塞队列LinkedBlockingQueue用例
由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
- LinkedBlockingQueue是一个线程安全的阻塞队列,它实现了BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法,这两个方法正是队列操作的阻塞版本。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 多线程模拟实现生产者/消费者模型
*/
public class BlockingQueueTest {
/**
*
* 定义装苹果的篮子
*
*/
public class Basket {
// 篮子,能够容纳3个苹果
BlockingQueue<String> basket = new LinkedBlockingQueue<>(3);
// 生产苹果,放入篮子
public void produce(String instance) throws InterruptedException {
// 生产苹果
long time1=System.currentTimeMillis();
// put方法放入一个苹果,若basket满了,等到basket有位置
basket.put("An apple");
System.out.println(instance +"放入一个苹果!耗时"+(System.currentTimeMillis()-time1)/1000+"S");
}
// 消费苹果,从篮子中取走
public void consume(String instance) throws InterruptedException {
basket.take();
System.out.println(instance +"吃掉一个苹果!");
}
public int size(){
return basket.size();
}
}
// 定义苹果生产者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
//放入一个苹果,若basket满了,会被阻塞,直到basket有位置
basket.produce(instance);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消费苹果
basket.consume(instance);
TimeUnit.SECONDS.sleep(10);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args) {
BlockingQueueTest test = new BlockingQueueTest();
// 建立一个装苹果的篮子
Basket basket = test.new Basket();
Producer producer = test.new Producer(" 农夫001", basket);
Producer producer2 = test.new Producer(" 农夫002", basket);
Consumer consumer = test.new Consumer("吃货", basket);
ExecutorService service = Executors.newCachedThreadPool();
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
}
}
例子中,两个生产者(农夫)线程负责摘苹果,孜孜不倦,争先恐后的摘。但是篮子里最多能装三个苹果,满了就放不进去了。生产者就得等有人消费了(篮子不满了)再摘。一个消费者(吃货)慢悠悠的吃苹果,每隔10S吃一个。执行结果如下:
农夫001放入一个苹果!耗时0S
农夫001放入一个苹果!耗时0S
农夫002放入一个苹果!耗时0S
吃货吃掉一个苹果!
农夫001放入一个苹果!耗时0S
吃货吃掉一个苹果!
农夫002放入一个苹果!耗时10S
吃货吃掉一个苹果!
农夫001放入一个苹果!耗时20S
吃货吃掉一个苹果!
农夫002放入一个苹果!耗时20S
吃货吃掉一个苹果!
农夫001放入一个苹果!耗时20S
吃货吃掉一个苹果!
农夫002放入一个苹果!耗时20S
3、并发队列ConcurrentLinkedQueue用例
ConcurrentLinkedQueue是Queue的一个安全实现,Queue中元素按FIFO原则(先进先出原则)进行排序,新元素从队列尾部插入,而获取队列元素,则需要从队列头部获取。采用CAS操作,来保证元素的一致性。
- ConcurrentLinkedQueue采用循环CAS算法实现,所以也被称为非阻塞队列。
- ConcurrentLinkedQueue不允许null存入队列,队列中所有未删除的节点的item都不能为null且都能从head节点遍历到。
- 在入队的最后一个元素的next为null
- 删除节点是将item设置为null, 队列迭代时跳过item为null节点
- head节点跟tail不一定指向头节点或尾节点,可能存在滞后性
使用ConcurrentLinkedQueue时,我们需要注意以下几点:
- 入列出列线程安全,遍历不安全
- 不允许添加null元素
- 底层使用列表与cas算法包装入列出列安全
- 避免使用(queue.size() > 0),而需要用 (!queue.isEmpty())代替,因为size方法需要遍历整个队列
3.1 例子1
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
private static int count = 2; // 线程个数
//CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
private static CountDownLatch latch = new CountDownLatch(count);
public static void main(String[] args) throws InterruptedException {
long timeStart = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(4);
ConcurrentLinkedQueueTest.offer();
for (int i = 0; i < count; i++) {
es.submit(new Poll());
}
latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
es.shutdown();
}
/**
* 生产者
*/
public static void offer() {
for (int i = 0; i < 100000; i++) {
queue.offer(i);
}
}
/**
* 消费者
*/
static class Poll implements Runnable {
public void run() {
// while (queue.size()>0) {
while (!queue.isEmpty()) {
System.out.println(queue.poll());
}
latch.countDown();
}
}
}
运行结果:
costtime 2360ms
如果把消费者中的判断 while (!queue.isEmpty())
改用 while (queue.size()>0)
后,运行结果:
cost time 46422ms
结果居然相差那么大,看了下ConcurrentLinkedQueue的API,原来 .size()
是要遍历一遍集合的,难怪那么慢,所以尽量要避免用size而改用isEmpty()。
3.2 例子2
public class ConcurrentLinkedQueueTest {
public class Basket {
// 篮子,能够容纳3个苹果
ConcurrentLinkedQueue<String> basket = new ConcurrentLinkedQueue<>();
// 生产苹果,放入篮子
public void produce(String instance) throws InterruptedException {
basket.offer("An apple");
System.out.println(instance +"放入一个苹果!");
}
// 消费苹果,从篮子中取走
public void consume(String instance) throws InterruptedException {
long time1=System.currentTimeMillis();
if (basket.poll()!=null){
System.out.println(instance +"吃掉一个苹果!耗时"+(System.currentTimeMillis()-time1)/1000+"S");
}
}
}
// 定义苹果生产者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
basket.produce(instance);
TimeUnit.SECONDS.sleep(10);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消费苹果
basket.consume(instance);
//TimeUnit.SECONDS.sleep(10);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args) {
ConcurrentLinkedQueueTest test = new ConcurrentLinkedQueueTest();
// 建立一个装苹果的篮子
Basket basket = test.new Basket();
Producer producer = test.new Producer(" 农夫", basket);
Consumer consumer1 = test.new Consumer("吃货001", basket);
Consumer consumer2 = test.new Consumer("吃货002", basket);
ExecutorService service = Executors.newCachedThreadPool();
service.submit(producer);
service.submit(consumer1);
service.submit(consumer2);
}
}
例子中,一个农夫线程负责摘苹果,每隔10S摘一个。两个吃货线程负责吃苹果,不间断,只要篮子里有苹果就拿来吃。运行结果如下:
农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S
农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S
农夫放入一个苹果!
吃货002吃掉一个苹果!耗时0S
农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S
农夫放入一个苹果!
吃货002吃掉一个苹果!耗时0S
农夫放入一个苹果!
吃货002吃掉一个苹果!耗时0S
农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S
4、二者区别
首先二者都是线程安全的队列,都可以用于生产与消费模型的场景。
-
LinkedBlockingQueue是阻塞队列,其好处是:多线程操作共同的队列时不需要额外的同步,由于具有插入与移除的双重阻塞功能,对插入与移除进行阻塞,队列会自动平衡负载,从而减少生产与消费的处理速度差距。
- 由于LinkedBlockingQueue有阻塞功能,其阻塞是基于锁机制实现的,当有多个线程消费时候,队列为空时消费线程被阻塞,有元素时需要再唤醒消费线程,队列元素可能时有时无,导致用户态与内核态切换频繁,消耗系统资源。从此方面来讲,LinkedBlockingQueue更适用于多线程插入,单线程取出,即多个生产者与单个消费者。
-
ConcurrentLinkedQueue非阻塞队列,采用 CAS+自旋操作,解决多线程之间的竞争,多写操作增加冲突几率,增加自选次数,并不适合多写入的场景。当许多线程共享访问一个公共集合时,ConcurrentLinkedQueue 是一个恰当的选择。从此方面来讲,ConcurrentLinkedQueue更适用于单线程插入,多线程取出,即单个生产者与多个消费者。
对于几个线程生产与几个线程消费,二者并没有严格的规定,只有谁更适合。
评论区