侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 285 篇文章
  • 累计创建 125 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

LinkedBlockingQueue阻塞队列和ConcurrentLinkedQueue并发队列使用比较

孔子说JAVA
2022-07-15 / 0 评论 / 0 点赞 / 41 阅读 / 8,391 字 / 正在检测是否收录...

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。在正文开始之前需要了解线程安全、并行、并发等相关概念。

  • 线程安全就是说多线程访问同一代码,不会产生不确定的结果。
  • 并行是指两者同时执行一件事,比如赛跑,两个人都在不停的往前跑;
  • 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率

本文侧重介绍阻塞队列LinkedBlockingQueue和并发队列ConcurrentLinkedQueue的用法比较,有关并发队列与阻塞队列的详细介绍请移步:

1、阻塞队列与非阻塞队列

1.1 阻塞队列简介

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

image-1656030281462

java.util.concurrent 包里的 BlockingQueue(阻塞队列) 接口是 Queue 接口的子接口,是一个在队列基础上又支持了两个附加操作的队列,常用来解耦。这两个附加的阻塞操作是插入与取出:

  • 支持阻塞的移除方法take: 队列空时,获取/移除元素的线程会一直被阻塞,直到队列中有元素时才可以继续取出(等待队列变为非空)。
  • 支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程(存储元素的线程会等待队列可用),直到队列有空闲时停止阻塞,新元素才可以继续插入。

除拥有普通队列的方法之外,阻塞队列提供了另外4个常用的方法:

  • put(E e):向队尾插入元素,若队列已满,则被阻塞等待,直到有空闲才继续插入。
  • take():从队首取出元素,若队列为空,则被阻塞等待,直到有元素才继续取出。
  • offer(E e,long timeout, TimeUnit unit):向队尾插入元素,若队列已满,则计时等待,当时间期限达到时,若队列还是满的,则返回false;若等待在期限内,队列空闲,则插入成功,返回true。
  • poll(long timeout, TimeUnit unit):从队首取出元素,如果队列空,则计时等待,当时间期限达到时,若队列还是空的,则返回null;若等待在期限内,队列中有元素,否则返回取得的元素。

1.2 非阻塞队列简介

非阻塞队列也就是一般的队列,没有阻塞队列的两个阻塞功能。其主要方法如下:

  • boolean add(E e):将元素e插入到队列末尾,插入成功,返回true;插入失败(即队列已满),抛出异常;
  • boolean offer(E e):将元素e插入到队列末尾,插入成功,则返回true;插入失败(即队列已满),返回false;
  • E remove():移除队首元素,若移除成功,则返回true;移除失败(队列为空),则会抛出异常;
  • E poll():获取队首元素并移除,若队列不为空,则返回队首元素;否则返回null;
  • E element():获取队首元素并不移除元素,若队列不为空,则返回队首元素;否则抛出异常;
  • E peek():获取队首元素并不移除元素,若队列不为空,则返回队首元素;否则返回null;

image-1657763048868

从类图可以发现非阻塞队列与阻塞队列的一个明显区别:ConcurrentLinkedQueue没有实现BlockingQueue接口,所以ConcurrentLinkedQueue没有提供具有阻塞性质的put、take等方法。

2、阻塞队列LinkedBlockingQueue用例

由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

  • LinkedBlockingQueue是一个线程安全的阻塞队列,它实现了BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法,这两个方法正是队列操作的阻塞版本。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 多线程模拟实现生产者/消费者模型
 */
public class BlockingQueueTest {
    /**
     *
     * 定义装苹果的篮子
     *
     */
    public class Basket {
        // 篮子,能够容纳3个苹果
        BlockingQueue<String> basket = new LinkedBlockingQueue<>(3);
 
        // 生产苹果,放入篮子
        public void produce(String instance) throws InterruptedException {
            // 生产苹果
            long time1=System.currentTimeMillis();
            // put方法放入一个苹果,若basket满了,等到basket有位置
            basket.put("An apple");
            System.out.println(instance +"放入一个苹果!耗时"+(System.currentTimeMillis()-time1)/1000+"S");
        }
 
        // 消费苹果,从篮子中取走
        public void consume(String instance) throws InterruptedException {
            basket.take();
            System.out.println(instance +"吃掉一个苹果!");
        }
 
        public int size(){
            return basket.size();
        }
    }
 
    // 定义苹果生产者
    class Producer implements Runnable {
        private String instance;
        private Basket basket;
 
        public Producer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }
 
        public void run() {
            try {
                while (true) {
                    //放入一个苹果,若basket满了,会被阻塞,直到basket有位置
                    basket.produce(instance);
                }
            } catch (InterruptedException ex) {
                System.out.println("Producer Interrupted");
            }
        }
    }
 
    // 定义苹果消费者
    class Consumer implements Runnable {
        private String instance;
        private Basket basket;
 
        public Consumer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }
 
        public void run() {
            try {
                while (true) {
                    // 消费苹果
                    basket.consume(instance);
                    TimeUnit.SECONDS.sleep(10);
                }
            } catch (InterruptedException ex) {
                System.out.println("Consumer Interrupted");
            }
        }
    }
 
    public static void main(String[] args) {
        BlockingQueueTest test = new BlockingQueueTest();
 
        // 建立一个装苹果的篮子
        Basket basket = test.new Basket();
 
        Producer producer = test.new Producer("  农夫001", basket);
        Producer producer2 = test.new Producer("  农夫002", basket);
        Consumer consumer = test.new Consumer("吃货", basket);
 
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(producer);
        service.submit(producer2);
        service.submit(consumer);
    }
 
}

例子中,两个生产者(农夫)线程负责摘苹果,孜孜不倦,争先恐后的摘。但是篮子里最多能装三个苹果,满了就放不进去了。生产者就得等有人消费了(篮子不满了)再摘。一个消费者(吃货)慢悠悠的吃苹果,每隔10S吃一个。执行结果如下:

  农夫001放入一个苹果!耗时0S
  农夫001放入一个苹果!耗时0S
  农夫002放入一个苹果!耗时0S
吃货吃掉一个苹果!
  农夫001放入一个苹果!耗时0S
吃货吃掉一个苹果!
  农夫002放入一个苹果!耗时10S
吃货吃掉一个苹果!
  农夫001放入一个苹果!耗时20S
吃货吃掉一个苹果!
  农夫002放入一个苹果!耗时20S
吃货吃掉一个苹果!
  农夫001放入一个苹果!耗时20S
吃货吃掉一个苹果!
  农夫002放入一个苹果!耗时20S

3、并发队列ConcurrentLinkedQueue用例

ConcurrentLinkedQueue是Queue的一个安全实现,Queue中元素按FIFO原则(先进先出原则)进行排序,新元素从队列尾部插入,而获取队列元素,则需要从队列头部获取。采用CAS操作,来保证元素的一致性。

  • ConcurrentLinkedQueue采用循环CAS算法实现,所以也被称为非阻塞队列。
  • ConcurrentLinkedQueue不允许null存入队列,队列中所有未删除的节点的item都不能为null且都能从head节点遍历到。
  • 在入队的最后一个元素的next为null
  • 删除节点是将item设置为null, 队列迭代时跳过item为null节点
  • head节点跟tail不一定指向头节点或尾节点,可能存在滞后性

使用ConcurrentLinkedQueue时,我们需要注意以下几点:

  1. 入列出列线程安全,遍历不安全
  2. 不允许添加null元素
  3. 底层使用列表与cas算法包装入列出列安全
  4. 避免使用(queue.size() > 0),而需要用 (!queue.isEmpty())代替,因为size方法需要遍历整个队列

3.1 例子1

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentLinkedQueueTest {
    private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
    private static int count = 2; // 线程个数
    
    //CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
    private static CountDownLatch latch = new CountDownLatch(count);

    public static void main(String[] args) throws InterruptedException {
        long timeStart = System.currentTimeMillis();
        ExecutorService es = Executors.newFixedThreadPool(4);
        ConcurrentLinkedQueueTest.offer();
        for (int i = 0; i < count; i++) {
            es.submit(new Poll());
        }
        latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
        System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
        es.shutdown();
    }
    
    /**
     * 生产者
     */
    public static void offer() {
        for (int i = 0; i < 100000; i++) {
            queue.offer(i);
        }
    }


    /**
     * 消费者
     */
    static class Poll implements Runnable {
        public void run() {
            // while (queue.size()>0) {
            while (!queue.isEmpty()) {
                System.out.println(queue.poll());
            }
            latch.countDown();
        }
    }
}

运行结果:

costtime 2360ms

如果把消费者中的判断 while (!queue.isEmpty()) 改用 while (queue.size()>0) 后,运行结果:

cost time 46422ms

结果居然相差那么大,看了下ConcurrentLinkedQueue的API,原来 .size() 是要遍历一遍集合的,难怪那么慢,所以尽量要避免用size而改用isEmpty()。

3.2 例子2

public class ConcurrentLinkedQueueTest {
 
    public class Basket {
        // 篮子,能够容纳3个苹果
        ConcurrentLinkedQueue<String> basket = new ConcurrentLinkedQueue<>();
 
        // 生产苹果,放入篮子
        public void produce(String instance) throws InterruptedException {
            basket.offer("An apple");
            System.out.println(instance +"放入一个苹果!");
        }
 
        // 消费苹果,从篮子中取走
        public void consume(String instance) throws InterruptedException {
            long time1=System.currentTimeMillis();
            if (basket.poll()!=null){
                System.out.println(instance +"吃掉一个苹果!耗时"+(System.currentTimeMillis()-time1)/1000+"S");
            }
        }
    }
 
    // 定义苹果生产者
    class Producer implements Runnable {
        private String instance;
        private Basket basket;
 
        public Producer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }
 
        public void run() {
            try {
                while (true) {
                    basket.produce(instance);
                    TimeUnit.SECONDS.sleep(10);
                }
            } catch (InterruptedException ex) {
                System.out.println("Producer Interrupted");
            }
        }
    }
 
    // 定义苹果消费者
    class Consumer implements Runnable {
        private String instance;
        private Basket basket;
 
        public Consumer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }
 
        public void run() {
            try {
                while (true) {
                    // 消费苹果
                    basket.consume(instance);
                    //TimeUnit.SECONDS.sleep(10);
                }
            } catch (InterruptedException ex) {
                System.out.println("Consumer Interrupted");
            }
        }
    }
 
    public static void main(String[] args) {
        ConcurrentLinkedQueueTest test = new ConcurrentLinkedQueueTest();
 
        // 建立一个装苹果的篮子
        Basket basket = test.new Basket();
 
        Producer producer = test.new Producer("  农夫", basket);
        Consumer consumer1 = test.new Consumer("吃货001", basket);
        Consumer consumer2 = test.new Consumer("吃货002", basket);
 
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(producer);
        service.submit(consumer1);
        service.submit(consumer2);
    }
 
}

例子中,一个农夫线程负责摘苹果,每隔10S摘一个。两个吃货线程负责吃苹果,不间断,只要篮子里有苹果就拿来吃。运行结果如下:

  农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S
  农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S
  农夫放入一个苹果!
吃货002吃掉一个苹果!耗时0S
  农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S
  农夫放入一个苹果!
吃货002吃掉一个苹果!耗时0S
  农夫放入一个苹果!
吃货002吃掉一个苹果!耗时0S
  农夫放入一个苹果!
吃货001吃掉一个苹果!耗时0S

4、二者区别

image-1657764333730

首先二者都是线程安全的队列,都可以用于生产与消费模型的场景。

  • LinkedBlockingQueue是阻塞队列,其好处是:多线程操作共同的队列时不需要额外的同步,由于具有插入与移除的双重阻塞功能,对插入与移除进行阻塞,队列会自动平衡负载,从而减少生产与消费的处理速度差距。

    • 由于LinkedBlockingQueue有阻塞功能,其阻塞是基于锁机制实现的,当有多个线程消费时候,队列为空时消费线程被阻塞,有元素时需要再唤醒消费线程,队列元素可能时有时无,导致用户态与内核态切换频繁,消耗系统资源。从此方面来讲,LinkedBlockingQueue更适用于多线程插入,单线程取出,即多个生产者与单个消费者。
  • ConcurrentLinkedQueue非阻塞队列,采用 CAS+自旋操作,解决多线程之间的竞争,多写操作增加冲突几率,增加自选次数,并不适合多写入的场景。当许多线程共享访问一个公共集合时,ConcurrentLinkedQueue 是一个恰当的选择。从此方面来讲,ConcurrentLinkedQueue更适用于单线程插入,多线程取出,即单个生产者与多个消费者。

对于几个线程生产与几个线程消费,二者并没有严格的规定,只有谁更适合。

0

评论区