侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 292 篇文章
  • 累计创建 132 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Java并发编程之TransferQueue阻塞队列原理及实战

孔子说JAVA
2022-07-30 / 0 评论 / 0 点赞 / 60 阅读 / 17,203 字 / 正在检测是否收录...

JDK7对JDK5中的J.U.C并发工具包进行了增强,其中之一就是新增了TransferQueue接口,该接口在 BlockingQueue 提供的方法基础上,增加了 transfer 方法,与 BlockingQueue 的阻塞方法在队列满或空的时候才阻塞不同,该方法确保生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事),顾名思义,阻塞就是发生在元素从一个线程transfer到另一个线程的过程中,它有效地实现了元素在线程之间的传递。

  • BlockingQueue(和Queue)是Java 5中加入的接口,它是一种阻塞队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。
  • TransferQueue还包括了其他的一些方法:两个tryTransfer方法,一个是非阻塞的,另一个带有timeout参数设置超时时间的,还有两个辅助方法hasWaitingConsumer()和getWaitingConsumerCount()。

image-1659149157346

1、TransferQueue介绍

BlockingQueue 继承自Queue接口,是一种阻塞队列,该队列是有大小的。当队列满的时候,生产者会被阻塞。队列空的时候,消费者会被阻塞。

TransferQueue 是在 BlockingQueue 提供的方法基础上,增加了 transfer 方法,就是只有生产者的消息被消费之后,才返回,否则继续阻塞。通俗来讲就是生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事),transfer方法用来实现这种约束。

TransferQueue接口定义的方法规范如下,有阻塞的,有不阻塞的,也有超时的:

// 继承了BlockingQueue接口,并增加若干新方法
public interface TransferQueue<E> extends BlockingQueue<E> {
    /**
     * 将元素 传给等待的消费者【如果有的话】, 返回true, 如果不存在,返回false,不入队。
     */
    boolean tryTransfer(E e);

    /**
     * 将元素传递给等待的消费者【如果有的话】, 如果没有,则将e插入队列尾部,
     * 会一直等待,直到它被消费者接收
     */
    void transfer(E e) throws InterruptedException;

    /**
     * 在transfer的基础上,增加了超时操作,时间到了还没有被消费的话,返回false,并移除元素
     */
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 如果存在消费者线程,返回true
     */
    boolean hasWaitingConsumer();

    /**
     * 得到等待获取元素的消费者线程的数量
     */
    int getWaitingConsumerCount();
}

2、LinkedTransferQueue源码解析

LinkedTransferQueue在JDK1.7版本诞生,是一个由链表结构组成的无界阻塞队列(实现了TransferQueue接口),相对于其他阻塞队列,多提供了tryTransfer和transfer方法,且在性能方面也有了巨大的提升。

  • 与之前的阻塞队列一样,LinkedTransferQueue也继承了AbstractQueue,所以有一些相同的方法,put、offer、add方法往队列中添加数据,由于队列是无界队列,所以这些方法一定会成功都不会阻塞。而take、poll方法消费队列中数据,take方法可能会阻塞,poll有两个方法可以直接返回、或者延时等待一段时间。

  • LinkedTransferQueue实际上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。

  • SynchronousQueue使用两个队列(一个用于正在等待的生产者、另一个用于正在等待的消费者)和一个用来保护两个队列的锁。而LinkedTransferQueue使用CAS操作实现一个非阻塞的方法,这是避免序列化处理任务的关键。

2.1 类图结构及重要属性

image-1659147132161

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -3223113410248163686L;

    /** 是否为多核处理器 */
    private static final boolean MP =
        Runtime.getRuntime().availableProcessors() > 1;

    /**
     * 当一个节点目前是队列的第一个waiter时,阻塞前的自旋次数
     */
    private static final int FRONT_SPINS   = 1 << 7;

    /**
     * 前驱节点正在处理,当前节点需要自旋的次数
     */
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    /**
     * 
     */
    static final int SWEEP_THRESHOLD = 32;
    
    // 队列中的节点
    static final class Node {...}
    
    // 头节点
    transient volatile Node head;

    /** 尾指针,注意可能不是最后一个节点,初始化为null */
    private transient volatile Node tail;

    /** 删除节点失败的次数 */
    private transient volatile int sweepVotes;
    
    /*
     * xfer方法中使用,定义how,解释很清楚了,每个变量对应不同的方法
     */
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer

2.2 双重队列

LinkedTransferQueue使用的队列结构是slack dual queue,和普通的M&S dual queue的区别在于,它不会每次操作的时候都更新head或tail,而是保持有针对性的slack懈怠,所以它的结构可能是下面这样,tail指针指向的节点未必就是最后一个节点。

       head           tail
         |              |
         v              v
         M -> M -> U -> U -> U -> U

slack dual queue译为双重数据结构或者双重队列。该类型队列放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。

  • 放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。

  • 取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。

无论是放元素还是取元素,都是先和头节点对比,如果二者模式不一样就匹配它们,如果二者模式一样,就入队。

image-1659148262628

2.3 Node节点(内部类)

LinkedTransferQueue保存队列的底层链表结构是一个内部类Node,Node节点的结构其实和SynchronousQueue公平模式差不太多,包含几个重要属性以及一些CAS方法,其中主要属性如下:

  • boolean isData:添加数据的方法创建的节点true,消费为false;
  • Object item:item表示入队的数据,消费方法为null
  • Node next:下一个节点;
  • Thread waiter:阻塞的线程;
    static final class Node {
        // 是否是数据节点(也就标识了是生产者还是消费者)
        final boolean isData;   // isData == true表示存数据,否则为获取数据
        volatile Object item;   // 存数据,item非null, 获取数据,匹配后,item为null
        volatile Node next; // next域(下一个节点)
        volatile Thread waiter; // 等待线程(持有元素的线程)

        // CAS操作next域 如果next为cmp,则变为val
        final boolean casNext(Node cmp, Node val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
		   // CAS操作item域,如果item为cmp,变为val
        final boolean casItem(Object cmp, Object val) {
            // assert cmp == null || cmp.getClass() != Node.class;
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        // 构造器
        Node(Object item, boolean isData) {
            UNSAFE.putObject(this, itemOffset, item); // relaxed write
            this.isData = isData;
        }

        // 将next指向自身this
        final void forgetNext() {
            UNSAFE.putObject(this, nextOffset, this);
        }
		// 匹配或取消节点调用
        final void forgetContents() {
            UNSAFE.putObject(this, itemOffset, this);
            UNSAFE.putObject(this, waiterOffset, null);
        }

        /**
         * 判断节点是否已经匹配,匹配取消也为true
         */ 
        final boolean isMatched() {
            Object x = item;
            return (x == this) || ((x == null) == isData);
        }

        /**
         * 是否为一个未匹配的请求 item为null表示未匹配
         */
        final boolean isUnmatchedRequest() {
            return !isData && item == null;
        }

        /**
         * 如果给定的节点不能挂到当前节点后面,则返回true
         */
        final boolean cannotPrecede(boolean haveData) {
            boolean d = isData;
            Object x;
            return d != haveData && (x = item) != this && (x != null) == d;
        }

        /**
         * 尝试去匹配一个数据节点
         */
        final boolean tryMatchData() {
            // assert isData;
            Object x = item;
            if (x != null && x != this && casItem(x, null)) {
                LockSupport.unpark(waiter);
                return true;
            }
            return false;
        }

        private static final long serialVersionUID = -3375979862319811754L;

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
        private static final long waiterOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
                waiterOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiter"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

典型的单链表结构,内部除了存储元素的值和下一个节点的指针外,还包含了是否为数据节点和持有元素的线程。内部通过isData区分是生产者还是消费者。

2.4 主要构造方法

LinkedTransferQueue的构造方法比较简单,一个无参构造方法,和一个接受一个集合的构造方法,接受的集合就是把集合中的数据放到队列中。并没有初始化其他任何东西了。

  • 从构造方法中可以看到并没有初始容量,所以是无界的一个阻塞队列。
public LinkedTransferQueue() {
}

public LinkedTransferQueue(Collection<? extends E> c) {
    this();
    addAll(c);
}

2.5 xfer方法及队列操作三大类方法

2.5.1 xfer方法

查看源码可以发现put、offer、add、take、poll方法包括tryTransfer、transfer都是调用的xfer方法,所以我们先分析下xfer方法,主要参数:

  • e表示要添加的数据,take与poll为null;
  • haveData表示是否有数据,添加类方法为true,消费类为false;
  • how表示方法阻塞方式,LinkedTransferQueue定义了4个静态变量NOW、ASYNC、SYNC、TIMED,NOW表示不阻塞在poll、tryTransfer方法使用,ASYNC在put、offer、add方法使用,SYNC表示阻塞用于take方法,TIMED用于poll、tryTransfer的延时方法;
  • nanos表示最大阻塞多少时间,poll和tryTransfer方法会用到;
/**
 * xfer方法实现了所有的队列方法
 *
 * @param e take操作传入null, 否则传入具体元素
 * @param haveData put操作为true, take操作为false
 * @param how NOW, ASYNC, SYNC, or TIMED  不同字段,先从名称上猜测一下他们的大意
 * @param nanos 如果是TIMED模式,也就是具有超时机制的方法啦,具体超时的时间
 * @return an item if matched, else e  返回匹配的元素,否则返回e
 * @throws NullPointerException 插入null值抛出空指针异常: haveData==true && e == null
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允许放入空元素
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) {                            // restart on append race

        // 下面这个for循环用于控制匹配的过程
        // 同一时刻队列中只会存储一种类型的节点
        // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
        // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
        
        for (Node h = head, p = h; p != null;) { // find & match first node
            // p节点的模式
            boolean isData = p.isData;
            // p节点的值
            Object item = p.item;
            // p没有被匹配到
            if (item != p && (item != null) == isData) { // unmatched
                // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
                if (isData == haveData)   // can't match
                    break;
                // 如果两者模式不一样,则尝试匹配
                // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
                if (p.casItem(item, e)) { // match
                    // 匹配成功
                    // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                    // 看不懂可以直接跳过
                    for (Node q = p; q != h;) {
                        // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点 
                        Node n = q.next;  // update by 2 unless singleton
                        // 如果head还没变,就把它更新成新的节点
                        // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                        // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                        // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                        // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 唤醒p中等待的线程
                    LockSupport.unpark(p.waiter);
                    // 并返回匹配到的元素
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // p已经被匹配了或者尝试匹配的时候失败了
            // 也就是其它线程先一步匹配了p
            // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
            // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        // 到这里肯定是队列中存储的节点类型和自己一样
        // 或者队列中没有元素了
        // 就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身

        // 如果不是立即返回
        if (how != NOW) {                 // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

private Node tryAppend(Node s, boolean haveData) {
    // 从tail开始遍历,把s放到链表尾端
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        // 如果首尾都是null,说明链表中还没有元素
        if (p == null && (p = head) == null) {
            // 就让首节点指向s
            // 注意,这里插入第一个元素的时候tail指针并没有指向s
            if (casHead(null, s))
                return s;                 // initialize
        }
        else if (p.cannotPrecede(haveData))
            // 如果p无法处理,则返回null
            // 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队
            // 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,
            // 队列中所有的元素都要保证是同一种类型的节点
            // 返回null后外面的方法会重新尝试匹配重新入队等
            return null;                  // lost race vs opposite mode
        else if ((n = p.next) != null)    // not last; keep traversing
            // 如果p的next不为空,说明不是最后一个节点
            // 则让p重新指向最后一个节点
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))
            // 如果CAS更新s为p的next失败
            // 则说明有其它线程先一步更新到p的next了
            // 就让p指向p的next,重新尝试让s入队
            p = p.next;                   // re-read on CAS failure
        else {
            // 到这里说明s成功入队了
            // 如果p不等于t,就更新tail指针
            // 还记得上面插入第一个元素时tail指针并没有指向新元素吗?
            // 这里就是用来更新tail指针的
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            // 返回p,即s的前一个元素
            return p;
        }
    }
}

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    // 如果是有超时的,计算其超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 当前线程
    Thread w = Thread.currentThread();
    // 自旋次数
    int spins = -1; // initialized after first item and cancel checks
    // 随机数,随机让一些自旋的线程让出CPU
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // 如果s元素的值不等于e,说明它被匹配到了
        if (item != e) {                  // matched
            // assert item != s;
            // 把s的item更新为s本身
            // 并把s中的waiter置为空
            s.forgetContents();           // avoid garbage
            // 返回匹配到的元素
            return LinkedTransferQueue.<E>cast(item);
        }
        // 如果当前线程中断了,或者有超时的到期了
        // 就更新s的元素值指向s本身
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            // 尝试解除s与其前一个节点的关系
            // 也就是删除s节点
            unsplice(pred, s);
            // 返回元素的值本身,说明没匹配到
            return e;
        }
        
        // 如果自旋次数小于0,就计算自旋次数
        if (spins < 0) {                  // establish spins at/near front
            // spinsFor()计算自旋次数
            // 如果前面有节点未被匹配就返回0
            // 如果前面有节点且正在匹配中就返回一定的次数,等待
            if ((spins = spinsFor(pred, s.isData)) > 0)
                // 初始化随机数
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            // 还有自旋次数就减1
            --spins;
            // 并随机让出CPU
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            // 更新s的waiter为当前线程
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            // 如果有超时,计算超时时间,并阻塞一定时间
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            // 不是超时的,直接阻塞,等待被唤醒
            // 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了
            LockSupport.park(this);
        }
    }
}

这三个方法里的内容特别复杂,很大一部分代码都是在控制线程安全,各种CAS,我们这里简单描述一下大致的逻辑:

  1. 来了一个元素,我们先查看队列头的节点,是否与这个元素的模式一样;
  2. 如果模式不一样,就尝试让他们匹配,如果头节点被别的线程先匹配走了,就尝试与头节点的下一个节点匹配,如此一直往后,直到匹配到或到链表尾为止;
  3. 如果模式一样,或者到链表尾了,就尝试入队;
  4. 入队的时候有可能链表尾修改了,那就尾指针后移,再重新尝试入队,依此往复;
  5. 入队成功了,就自旋或阻塞,阻塞了就等待被其它线程匹配到并唤醒;
  6. 唤醒之后进入下一次循环就匹配到元素了,返回匹配到的元素;
  7. 是否需要入队及阻塞有四种情况:
a)`NOW`,立即返回,没有匹配到立即返回,不做入队操作

    对应的方法有:`poll()`、`tryTransfer(e)`

b)`ASYNC`,异步,元素入队但当前线程不会阻塞(相当于无界`LinkedBlockingQueue`的元素入队)

    对应的方法有:`add(e)`、`offer(e)`、`put(e)`、`offer(e, timeout, unit)`

c)`SYNC`,同步,元素入队后当前线程阻塞,等待被匹配到

    对应的方法有:`take()`、`transfer(e)`

d)`TIMED`,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身

    对应的方法有:`poll(timeout, unit)`、`tryTransfer(e, timeout, unit)`

2.5.2 入队方法

四个方法都是一样的,使用异步的方式调用xfer()方法,传入的参数都一模一样,都是非阻塞方法。

  • xfer(E e, boolean haveData, int how, long nanos)的参数分别是:e表示元素;haveData表示是否是数据节点,how表示放取元素的方式,上面提到的四种,NOW、ASYNC、SYNC、TIMED;nanos表示超时时间;
public void put(E e) {
    // 异步模式,不会阻塞,不会超时
    // 因为是放元素,单链表存储,会一直往后加
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

2.5.3 出队方法

出队的四个方法也是直接或间接的调用xfer()方法,放取元素的方式和超时规则略微不同,本质没有大的区别。

  • 取元素就各有各的玩法了,有同步的,有超时的,有立即返回的。
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E take() throws InterruptedException {
    // 同步模式,会阻塞直到取到元素
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 有超时时间
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    // 立即返回,没取到元素返回null
    return xfer(null, false, NOW, 0);
}

2.5.4 移交元素的方法

也是直接或间接的调用xfer()方法,注意第二个参数,都是true,也就是这三个方法其实也是放元素的方法

public boolean tryTransfer(E e) {
    // 立即返回
    return xfer(e, true, NOW, 0) == null;
}

public void transfer(E e) throws InterruptedException {
    // 同步模式
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 有超时时间
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

3、TransferQueue应用场景及使用示例

3.1 应用场景

当我们不想生产者过度生产消息时,TransferQueue可能非常有用,可避免发生OutOfMemory错误。在这样的设计中,消费者的消费能力将决定生产者产生消息的速度。

3.2 应用示例1

package com.kz.example.common.queue;

import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class LinkedTransferQueue1 {

    // 无锁算法 无界队列
    static TransferQueue<Integer> queue = new LinkedTransferQueue<>();

    public static void main(String[] args) {

        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "消费 id - " + queue.take());
                    System.out.println("---------------------------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "consumer" + i).start();
        }

        Thread producer = new Thread(() -> {
            while (true) {
                System.out.println("当前队列中等待的线程" + queue.getWaitingConsumerCount());
                // 如果队列中有等待的消费者
                if (queue.hasWaitingConsumer()) {
                    int product = new Random().nextInt(500);
                    try {
                        System.out.println(Thread.currentThread().getName() + "生产 id - " + product);
                        queue.tryTransfer(product);
                        TimeUnit.MILLISECONDS.sleep(100); // 等待消费
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "producer");
        producer.setDaemon(true);
        producer.start();
    }
}

输出结果:

当前队列中等待的线程10
producer生产 id - 396
consumer1消费 id - 396
---------------------------------------------
当前队列中等待的线程9
producer生产 id - 51
consumer5消费 id - 51
---------------------------------------------
当前队列中等待的线程8
producer生产 id - 98
consumer2消费 id - 98
---------------------------------------------
当前队列中等待的线程7
producer生产 id - 218
consumer6消费 id - 218
---------------------------------------------
当前队列中等待的线程6
producer生产 id - 246
consumer9消费 id - 246
---------------------------------------------
当前队列中等待的线程5
producer生产 id - 79
consumer10消费 id - 79
---------------------------------------------
当前队列中等待的线程4
producer生产 id - 489
consumer3消费 id - 489
---------------------------------------------
当前队列中等待的线程3
producer生产 id - 111
consumer4消费 id - 111
---------------------------------------------
当前队列中等待的线程2
producer生产 id - 384
consumer7消费 id - 384
---------------------------------------------
当前队列中等待的线程1
producer生产 id - 210
consumer8消费 id - 210
---------------------------------------------

可以看到,生产者每生产一个产品后,必须等消费者消费了这个产品才能接着生产,生产能力受限于消费能力。

3.3 应用示例2

package com.kz.example.common.queue;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;

public class LinkedTransferQueueDemo {
    static LinkedTransferQueue<String> lnkTransQueue = new LinkedTransferQueue<String>();

    public static void main(String[] args) {
        ExecutorService exService = Executors.newFixedThreadPool(2);
        Producer producer = new LinkedTransferQueueDemo().new Producer();
        Consumer consumer = new LinkedTransferQueueDemo().new Consumer();
        exService.execute(producer);
        exService.execute(consumer);
        exService.shutdown();
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                try {
                    System.out.println("Producer is waiting to transfer...");
                    lnkTransQueue.transfer("A" + i);
                    System.out.println("producer transfered element: A" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                try {
                    System.out.println("Consumer is waiting to take element...");
                    String s = lnkTransQueue.take();
                    System.out.println("Consumer received Element: " + s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行结果:

Producer is waiting to transfer...
Consumer is waiting to take element...
producer transfered element: A0
Producer is waiting to transfer...
Consumer received Element: A0
Consumer is waiting to take element...
Consumer received Element: A1
Consumer is waiting to take element...
producer transfered element: A1
Producer is waiting to transfer...
Consumer received Element: A2
producer transfered element: A2

4、总结

  • LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体;
  • LinkedTransferQueue的实现方式是使用一种叫做双重队列的数据结构;
  • 不管是取元素还是放元素都会入队;
  • 先尝试跟头节点比较,如果二者模式不一样,就匹配它们,组成CP,然后返回对方的值;
  • 如果二者模式一样,就入队,并自旋或阻塞等待被唤醒;
  • 至于是否入队及阻塞有四种模式,NOW、ASYNC、SYNC、TIMED;
  • LinkedTransferQueue全程都没有使用synchronized、重入锁等比较重的锁,基本是通过自旋 + CAS实现;
  • 对于入队之后,先自旋一定次数后再调用LockSupport.park()或LockSupport.parkNanos()阻塞;

LinkedTransferQueue与SynchronousQueue(公平模式)的区别:

  • 在java8中两者的实现方式基本一致,都是使用的双重队列;
  • 前者完全实现了后者,但比后者更灵活;
  • 后者不管放元素还是取元素,如果没有可匹配的元素,所在的线程都会阻塞;
  • 前者可以自己控制放元素是否需要阻塞线程,比如使用四个添加元素的方法就不会阻塞线程,只入队元素,使用transfer()会阻塞线程;
  • 取元素两者基本一样,都会阻塞等待有新的元素进入被匹配到;
0

评论区