侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 297 篇文章
  • 累计创建 134 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Java并发编程之SynchronousQueue阻塞队列原理及实战

孔子说JAVA
2022-07-31 / 0 评论 / 0 点赞 / 55 阅读 / 16,362 字 / 正在检测是否收录...

SynchronousQueue是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介(实际上它不是一个真正的队列)。与其余队列不一样的是,它维护一组线程,这些线程在等待着把元素加入或移出队列,即生产者线程对其的插入操作put必须等待消费者的移除操作take。

1、SynchronousQueue介绍

SynchronousQueue是一个同步阻塞队列,没有存储功能,所以put和take会一直阻塞,直到有另外一个线程已经准备好参与到交付过程当中。即每一个 put操作都必须等待一个take操作。每一个take操作也必须等待一个put操作。

  • 仅当有足够多的消费者,而且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

  • SynchronousQueue是没有容量的,无法存储元素节点信息,不能通过peek方法获取元素,peek方法会直接返回null。由于没有元素,所以不能被迭代,它的iterator方法会返回一个空的迭代器Collections.emptyIterator()。

  • SynchronousQueue比较适合线程通信、传递信息、状态切换等应用场景,一个线程必须等待另一个线程传递某些信息给他才可以继续执行。

SynchronousQueue 的整体设计比较抽象,在内部抽象出了两种算法实现,一种是先入先出的队列,一种是后入先出的堆栈,两种算法被两个内部类实现,而直接对外的put,take方法的实现就比较简单,都是直接调用两个内部类的transfer方法进行实现,整体的调用关系如下图所示:

image-1659151534652

SynchronousQueue的公平与非公平模式

SynchronousQueue 内部分为公平(队列)和非公平(栈),队列的性能相对而言会好点。默认是非公平的,通常非公平(栈 FIFO)的性能会高那么一点点。

  • 公平模式(TransferQueue):队尾匹配(判断模式),队头出队,先进先出。

  • 非公平模式(默认策略:TransferStack):栈顶匹配,栈顶出栈,后进先出。

SynchronousQueue的特点

  • SynchronousQueue 最大的特点在于,它的容量为0,没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

  • SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。

  • 由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

  • 使用的数据结构是链表。

  • 使用CAS+自旋(无锁),自旋了一定次数后调用 LockSupport.park()进行阻塞。

2、SynchronousQueue源码解析

2.1 数据结构

    // 堆栈和队列共同的接口
    // 负责执行 put or take
    abstract static class Transferer<E> {
        // e 为空的,会直接返回特殊值,不为空会传递给消费者
        // timed 为 true,说明会有超时时间
        abstract E transfer(E e, boolean timed, long nanos);
    }

    // 堆栈 后入先出 非公平
    // Scherer-Scott 算法
    static final class TransferStack<E> extends Transferer<E> {
    }

    // 队列 先入先出 公平
    static final class TransferQueue<E> extends Transferer<E> {
    }

    private transient volatile Transferer<E> transferer;

    // 无参构造器默认为非公平的
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

从源码中我们可以得到几点:

  • 堆栈和队列都有一个共同的接口,叫做Transferer,该接口有个方法:transfer,该方法很神奇,会承担take和put的双重功能;
  • 在我们初始化的时候,是可以选择是使用堆栈还是队列的,如果你选择,默认的就是堆栈,类注释也说明了这一点,堆栈的效率比队列更高。

2.2 构造方法

/**
 * 默认无参的构造方法:直接调用有参的构造方法,传入false(非公平模式)
 */
public SynchronousQueue() {
    this(false);
}

/**
 * 有参构造方法,参数是是否公平模式。
 * fair:true公平模式、false非公平模式。
 */
public SynchronousQueue(boolean fair) {
    // 将队列结构或者栈结构对象放在transferer对象上
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

2.3 入队方法

/**
 * SynchronousQueue的入队方法
 */
public void put(E e) throws InterruptedException {
    // 如果元素是NULL,抛出异常
    if (e == null) throw new NullPointerException();
    // 调用公平或者非公平模式的transfer方法
    if (transferer.transfer(e, false, 0) == null) {
        // 线程中断
        Thread.interrupted();
        // 抛出中断异常
        throw new InterruptedException();
    }
}

2.4 出队方法

/**
 * SynchronousQueue的出方法
 */
public E take() throws InterruptedException {
    // 调用公平或者非公平模式的transfer方法 
    E e = transferer.transfer(null, false, 0);
    // 元素不为null,返回元素
    if (e != null)
        return e;
    // 元素为null,中断线程
    Thread.interrupted();
    // 抛出中断异常
    throw new InterruptedException();
}

2.5 公平模式(队列)源码分析

// 下一个节点
volatile QNode next;          // next node in queue
// 当前元素
volatile Object item;         // CAS'ed to or from null
// 当前所属的线程
volatile Thread waiter;       // to control park/unpark
// put(true)还是take(false)的标记
final boolean isData;

/**
 * 公平模式节点的构造方法
 */
TransferQueue() {
    // 构建一个当前的元素为空,标记为take的节点
    QNode h = new QNode(null, false); // initialize to dummy node.
    // 将头节点设置为当前节点
    head = h;
    // 将尾节点设置为当前节点
    tail = h;
}

/**
 * 公平模式的存取方法
 */
E transfer(E e, boolean timed, long nanos) {
    // 定义一个可能被使用的当前节点
    QNode s = null; // constructed/reused as needed
    // 判断当前的元素是不是空:区分是put(true)还是take(flse)
    boolean isData = (e != null);

    for (;;) {
        // 得到队列的尾结点
        QNode t = tail;
        // 得到队列的头结点
        QNode h = head;
        // 头尾为空,说明没有被初始化。去下次循环
        if (t == null || h == null)         // saw uninitialized value
            // 继续下次循环
            continue;                       // spin

        // 头尾节点相同(只有一个节点,或者为空的时候)或者模式相同
        if (h == t || t.isData == isData) { // empty or same-mode
            // 得到当前节点的下一个节点
            QNode tn = t.next;
            // 尾结点不一致,直接继续循环(由于其他节点入队了,导致读不一致)。
            if (t != tail)                  // inconsistent read
                // 继续下次循环
                continue;
            // 当前节点的下个节点有数据:有其他节点入队,并且下一个节点是其他的节点
            if (tn != null) {               // lagging tail
                // 将下一个节点通过CAS的方式设置到尾结点上
                advanceTail(t, tn);
                // 继续下次循环
                continue;
            }
            // 目前来看put和take传入的值timed都是false。
            // 这里表示有时间限制的时候,超时了,直接返回null。
            if (timed && nanos <= 0)        // can't wait
                return null;
            // 当前的节点不存在,构建一个新节点,用当前传入的元素
            if (s == null)
                s = new QNode(e, isData);
            // 下一个节点是null,并且将新节点通过CAS的方式放入到下一个节点
            if (!t.casNext(null, s))        // failed to link in
                // 失败的话继续下次循环
                continue;
            // 将当前节点设置为尾结点
            advanceTail(t, s);              // swing tail and wait
            // 等待完成:自旋或阻塞线程,直到满足s.item != e(传入的数据不是当前数据)
            Object x = awaitFulfill(s, e, timed, nanos);
            // 节点被取消、中断或超时
            if (x == s) {                   // wait was cancelled
                // 清除s节点(当前节点)
                clean(t, s);
                // 节点被清除,返回null
                return null;
            }
            // 没有被取消、中断或超时
            if (!s.isOffList()) {           // not already unlinked
                // 如果是头部,取消连接
                advanceHead(t, s);          // unlink if head
                // 当前元素不是空的
                if (x != null)              // and forget fields
                    // 在当前元素的设置到节点上
                    s.item = s;
                // 当前节点的持有线程变为null
                s.waiter = null;
            }
            // 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
            return (x != null) ? (E)x : e;
        // 模式不相同的逻辑
        } else {                            // complementary-mode
            // 得到头结点的下一个节点(要处理的节点)
            QNode m = h.next;               // node to fulfill
            // 尾结点不为空,或者头结点的下个节点不为空,或者头结点有其他线程的变化。进行下次循环(其他线程进行了更改)。
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read
            // 得到要处理节点的元素
            Object x = m.item;

            if (isData == (x != null) ||    // m already fulfilled : 要处理的节点位置有值(相同的模式)
                x == m ||                   // m cancelled : 节点被取消、中断或超时
                !m.casItem(x, e)) {         // lost CAS : cas不能讲当前元素转变(无值的时候变为有值,或者有值的时候变为无值)
                // 尝试跳过头结点
                advanceHead(h, m);          // dequeue and retry
                // 继续下次循环
                continue;
            }
            // 匹配成功,将m变为head,虚拟节点
            advanceHead(h, m);              // successfully fulfilled
            // 唤醒在要处理节点上等待的线程
            LockSupport.unpark(m.waiter);
            // 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
            return (x != null) ? (E)x : e;
        }
    }
}

/**
 * 将新节点通过CAS的方式设置到尾结点上
 */
void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}

/**
 * 等待完成:自旋或阻塞线程,直到满足s.item != e(传入的数据不是当前数据)
 */
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    // 超时时间的计算,有超时时间就计算,没有超时时间就是0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 得到当前的线程
    Thread w = Thread.currentThread();
    // 计算需要自旋的次数
    // 头结点的下一个节点是当前节点的时候:根据电脑性能计算自选次数
    // 头结点的下一个节点不是当前节点的时候:自选次数为0
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果当前线程被中断
        if (w.isInterrupted())
            // 尝试取消当前节点
            s.tryCancel(e);
        // 得到当前节点的具体元素
        Object x = s.item;
        // 元素不相同,直接返回当前的元素。tryCancel方法会设置为this导致这里不一致
        if (x != e)
            return x;
        // 如果设置了超时时间,判断超时时间
        if (timed) {
            // 计算剩余时间
            nanos = deadline - System.nanoTime();
            // 剩余时间小于0
            if (nanos <= 0L) {
                // 取消当前节点
                s.tryCancel(e);
                // 继续下次循环
                continue;
            }
        }
        // 步数大于0,步数减一:减少可自选的总数
        if (spins > 0)
            --spins;
        // 次数用完了,设置一下s的等待线程为当前线程
        else if (s.waiter == null)
            s.waiter = w;
        // 没有超时设置的阻塞
        else if (!timed)
            LockSupport.park(this);
        // 超时时间大于1000L的时候,使用判断时间的阻塞(时间nanos大于0才会阻塞)
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

/**
 * 尝试取消当前的节点
 */
void tryCancel(Object cmp) {
    // 通过CAS的方式将传入的变量设置为this
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}

/**
 * 清除s节点
 */
void clean(QNode pred, QNode s) {
    // 设置S的线程为null
    s.waiter = null; // forget thread
    /*
     * At any given time, exactly one node on list cannot be
     * deleted -- the last inserted node. To accommodate this,
     * if we cannot delete s, we save its predecessor as
     * "cleanMe", deleting the previously saved version
     * first. At least one of node s or the node previously
     * saved can always be deleted, so this always terminates.
     */
    // 最后一个节点要保留一个为null的节点
    while (pred.next == s) { // Return early if already unlinked
        // 得到头结点
        QNode h = head;
        // 得到头结点的下一个节点
        QNode hn = h.next;   // Absorb cancelled first node as head
        // 下一个节点不为空并且被取消了
        if (hn != null && hn.isCancelled()) {
            // 头结点变为下一个节点
            advanceHead(h, hn);
            // 继续下次循环
            continue;
        }
        // 得到尾结点
        QNode t = tail;      // Ensure consistent read for tail
        // 头尾相等,队列空了,直接返回
        if (t == h)
            return;
        // 得到下一个节点的下一个节点
        QNode tn = t.next;
        // 尾结点不一致,直接继续循环(由于其他节点入队了,导致读不一致)。
        if (t != tail)
            // 继续下次循环
            continue;
        // 头结点的下一个节点不是空的时候
        if (tn != null) {
            // 尾结点变为下一个节点
            advanceTail(t, tn);
            // 继续下次循环
            continue;
        }
        // 当前节点不是尾结点,尝试赋值到尾结点
        if (s != t) {        // If not tail, try to unsplice
            // 得到当前节点的下一个节点
            QNode sn = s.next;
            // 当前节点和下一个节点相同或者cas成功将当前的节点设置为下一个(跳过了当前节点,删除成功)
            if (sn == s || pred.casNext(s, sn))
                // 直接返回
                return;
        }
        // 走到这里,说明需要删除的s节点是队尾节点,需要使用cleanMe
        QNode dp = cleanMe;
        // 尝试取消上一个已取消的节点的链接
        if (dp != null) {    // Try unlinking previous cancelled node
            // 得到删除节点的下一个节点(要删除的元素)
            QNode d = dp.next;
            // 得到要删除元素的下一个元素
            QNode dn;
            if (d == null ||               // d is gone or :要删除的元素已经删除了
                d == dp ||                 // d is off list or : 要删除的元素不在列表上
                !d.isCancelled() ||        // d not cancelled or :要删除的元素没有被取消
                (d != t &&                 // d not tail and : 要删除的元素不是尾结点
                 (dn = d.next) != null &&  //   has successor : 要删除的元素有后继节点
                 dn != d &&                //   that is on list : 在这个名单上
                 dp.casNext(d, dn)))       // d unspliced : 可以cas到下一步
                // 通过CAS的方式把cleanMe变为null
                casCleanMe(dp, null);
            // 要删除的元素和传入的元素一致:当前节点已保存到节点
            if (dp == pred)
                return;      // s is already saved node
        // 当前节点可以放入到cleanMe中,推迟清除,直接返回
        } else if (casCleanMe(null, pred))
            return;          // Postpone cleaning s
    }
}

/**
 * 判断节点是否被取消:取消的时候会cas节点为this
 */
boolean isCancelled() {
    return item == this;
}

/**
 * 头结点变为下一个节点
 */
void advanceHead(QNode h, QNode nh) {
    if (h == head &&
        UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}

线程被阻塞住后,当前线程是如何把自己的数据传给阻塞线程的?假设线程 1 从队列中 take 数据,被阻塞,变成阻塞线程 A 然后线程 2 开始往队列中 put 数据 B,大致的流程如下:

  1. 线程 1 从队列 take 数据,发现队列内无数据,于是被阻塞,成为 A
  2. 线程 2 往队尾 put 数据,会从队尾往前找到第一个被阻塞的节点,假设此时能找到的就是节点 A,然后线程 B 把将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1
  3. 线程 1 被唤醒后,就能从 A.item 里面拿到线程 2 put 的数据了,线程 1 成功返回。

在这个过程中,公平主要体现在,每次 put 数据的时候,都 put 到队尾上,而每次拿数据时,并不是直接从队头拿数据,而是从队尾往前寻找第一个被阻塞的线程,这样就会按照顺序释放被阻塞的线程。

2.6 非公平模式(堆栈)源码分析

image-1659152694829

从上图中我们可以看到,我们有一个大的堆栈池,池的开口叫做堆栈头,put的时候,就往堆栈池中放数据。take的时候,就从堆栈池中拿数据,两者操作都是在堆栈头上操作数据,从图中可以看到,越靠近堆栈头,数据越新,所以每次take的时候,都会拿到堆栈头的最新数据,这就是我们说的后入先出,也就是非公平的。

/**
 * 非公平模式的存取方法
 */
E transfer(E e, boolean timed, long nanos) {
    // 义一个可能被使用的当前节点
    SNode s = null; // constructed/reused as needed
    // 得到模式:true为take,false为put
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        //得到投机诶单
        SNode h = head;
        // 头结点为空或者节点模式与头结点相同,压栈
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 有超时时间的设置,并且超时时间设置小于0
            if (timed && nanos <= 0) {      // can't wait
                // 头结点不为空,并且被取消了
                if (h != null && h.isCancelled())
                    // 弹出头结点。无后续代码,进入下次循环
                    casHead(h, h.next);     // pop cancelled node
                else
                    // 超时了直接返回null
                    return null;
            // 没有超时的情况,尝试将s设置为头结点
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 自选,等待线程匹配成功
                SNode m = awaitFulfill(s, timed, nanos);
                // 等待的线程被取消了,清除当前节点
                if (m == s) {               // wait was cancelled
                    // 清除s节点(当前节点)
                    clean(s);
                    // 节点被清除,直接返回null
                    return null;
                }
                // 头结点不是空的并且头节点的下一个节点是当前节点
                if ((h = head) != null && h.next == s)
                    // 将s的下一个节点设置为头结点
                    casHead(h, s.next);     // help s's fulfiller
                // 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        // 节点模式不相同,并且(m & FULFILLING) != 0 。model的值为1或者0(上门的逻辑创建的节点)返回false,model的值2或者3(这个判断里面创建的节点)返回true
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            // 头结点被取消
            if (h.isCancelled())            // already cancelled
                // 移除头结点
                casHead(h, h.next);         // pop and retry
            // 创建一个节点,这个节点的状态为2或者3。2 & 2 = 2。3 & 2 = 2。
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    // 得到当前节点的下一个节点:新创建的元素中,m是头结点
                    SNode m = s.next;       // m is s's match
                    // 头结点被其他线程执行了
                    if (m == null) {        // all waiters are gone
                        // 移除当前节点
                        casHead(s, null);   // pop fulfill node
                        // 取消节点的引用
                        s = null;           // use new node next time
                        // 跳出内部的循环
                        break;              // restart main loop
                    }
                    // 得到头结点的下一个节点
                    SNode mn = m.next;
                    // 尝试匹配:匹配成功会唤醒等待的线程
                    if (m.tryMatch(s)) {
                        // s是头结点的时候,移除!
                        casHead(s, mn);     // pop both s and m
                        // 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    // 唤醒失败了,s不是头结点
                    } else                  // lost match
                        // 移除真正的头结点
                        s.casNext(m, mn);   // help unlink
                }
            }
        // 这里是出栈的逻辑
        } else {                            // help a fulfiller
            // 得到头结点的下一个节点
            SNode m = h.next;               // m is h's match
            // 下一个节点为空
            if (m == null)                  // waiter is gone
                // 栈顶变为null
                casHead(h, null);           // pop fulfilling node
            // 下一个节点有值
            else {
                // 得到栈顶元素
                SNode mn = m.next;
                // 头节点尝试匹配:匹配成功会唤醒等待的线程
                if (m.tryMatch(h))          // help match
                    // 移除头结点
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    // 匹配失败,m是栈顶,尝试移除当前栈顶
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

/**
 * 栈结构的构造方法
 */
static SNode snode(SNode s, Object e, SNode next, int mode) {
    // 节点为空,以传入的元素构建一个节点
    if (s == null) s = new SNode(e);
    // 设置当前节点的模式
    s.mode = mode;
    // 设置当前节点的下一个节点
    s.next = next;
    // 返回当前的节点
    return s;
}

/**
 * 等待完成:自旋或阻塞线程,直到匹配成功
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 超时时间的计算,有超时时间就计算,没有超时时间就是0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 得到当前的线程
    Thread w = Thread.currentThread();
    // 计算需要自旋的次数
    // 头结点的下一个节点是当前节点的时候:根据电脑性能计算自选次数
    // 头结点的下一个节点不是当前节点的时候:自选次数为0
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果当前线程被中断
        if (w.isInterrupted())
            // 尝试取消当前节点
            s.tryCancel();
        // 获取当前节点的匹配节点
        SNode m = s.match;
        // 节点不为空,直接返回
        if (m != null)
            return m;
        // 如果设置了超时时间,判断超时时间
        if (timed) {
            // 计算剩余时间
            nanos = deadline - System.nanoTime();
            // 剩余时间小于0
            if (nanos <= 0L) {
                // 尝试取消当前节点
                s.tryCancel();
                // 继续下次循环
                continue;
            }
        }
        // 步数大于0,步数减一:减少可自选的总数
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 次数用完了,设置一下s的等待线程为当前线程
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        // 没有超时设置的阻塞
        else if (!timed)
            LockSupport.park(this);
        // 超时时间大于1000L的时候,使用判断时间的阻塞(时间nanos大于0才会阻塞)
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

/**
 * 尝试取消当前的节点
 */
void tryCancel() {
    UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}

/**
 * 清除s节点
 */
void clean(SNode s) {
    // 设置当前的元素为null
    s.item = null;   // forget item
    // 设置S的线程为null
    s.waiter = null; // forget thread

    // 得到当前节点的下一个节点
    SNode past = s.next;
    // 下一个个节点有值但是被取消了,尝试获取下下个
    if (past != null && past.isCancelled())
        past = past.next;

    // Absorb cancelled nodes at head
    // 定义一个临时的节点
    SNode p;
    // 头结点不为空并且头结点不是下一个节点并且头结点是被取消
    while ((p = head) != null && p != past && p.isCancelled())
        // 通过CAS的方式将头结点变为下一个节点,直到找到一个没有被取消的
        casHead(p, p.next);

    // Unsplice embedded nodes
    // 头结点不为空,并且和下一个节点不相等(头结点为null)
    while (p != null && p != past) {
        // 得到下一个节点
        SNode n = p.next;
        // 下一个节点不为空,并且没有被取消
        if (n != null && n.isCancelled())
            // 尝试头结点变为下一个:清除掉现在的头结点
            p.casNext(n, n.next);
        else
            // 被取消了的话,直接跳过头结点!
            p = n;
    }
}

/**
 * 尝试匹配:匹配成功会唤醒等待的线程
 */
boolean tryMatch(SNode s) {
    // 匹配节点为空并且CAS成功变为s节点
    if (match == null &&
        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        // 得到现在持有锁的线程
        Thread w = waiter;
        // 有现成持有锁
        if (w != null) {    // waiters need at most one unpark
            // 取消持有锁的线程
            waiter = null;
            // 唤醒持有锁的线程
            LockSupport.unpark(w);
        }
        // 返回成功
        return true;
    }
    // 节点相同返回成功,否则返回失败
    return match == s;
}

总结一下操作思路:

  1. 判断是 put 方法还是 take 方法
  2. 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
  3. 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
  4. 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
  5. 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
  6. 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
  7. 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。

3、存取操作及执行逻辑

常用方法

iterator() 永远返回空,因为里面没有东西

peek() 永远返回null

put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。

offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。

take() 取出并且remove掉queue里的element,取不到东西他会一直等。

poll() 取出并且remove掉queue里的element,只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。

isEmpty() 永远返回true

remove()&removeAll() 永远返回false

SynchronousQueue的存取操作

  • 存取调用同一个方法:transfer()。
  • put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中。
  • take、poll 为消费者,不携帯数据,为 Request 模式,设置到 SNode或QNode属性中。

SynchronousQueue的大概执行逻辑

  • 第一个线程Thread0是消费者访问,此时队列为空,则入队(创建Node结点并赋值)。
  • 第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队。
  • 第三个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作。直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队。

SynchronousQueue的执行过程

  • 线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同。
  • 相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性。
  • 不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队。

4、应用场景

  • SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。

  • SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。

  • Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

5、应用示例

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueExample {

    static class SynchronousQueueProducer implements Runnable {

        protected BlockingQueue<String> blockingQueue;
        final Random random = new Random();

        public SynchronousQueueProducer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println("Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class SynchronousQueueConsumer implements Runnable {

        protected BlockingQueue<String> blockingQueue;

        public SynchronousQueueConsumer(BlockingQueue<String> queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()
                            + " take(): " + data);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();

        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                synchronousQueue);
        new Thread(queueProducer).start();

        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer1).start();

        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer2).start();

    }
}

执行结果:

Put: 8ef86daa-7ef2-43a4-9848-1041cfb85934
Thread-1 take(): 8ef86daa-7ef2-43a4-9848-1041cfb85934
Put: 3d647a9a-62cd-4996-96c0-22b4dd169a85
Thread-2 take(): 3d647a9a-62cd-4996-96c0-22b4dd169a85
Put: 3fa489c5-37c6-4e98-8527-2306bbb5b308
Thread-1 take(): 3fa489c5-37c6-4e98-8527-2306bbb5b308
Put: dc62de60-c052-49ef-bf62-90ba56de5194
Thread-2 take(): dc62de60-c052-49ef-bf62-90ba56de5194
Put: e248c117-0e7f-4ebb-b75b-d44abf23a255
Thread-1 take(): e248c117-0e7f-4ebb-b75b-d44abf23a255
Put: 62ec50ee-5718-4167-b3ed-c310e7f40130
Thread-2 take(): 62ec50ee-5718-4167-b3ed-c310e7f40130
......

6、总结

  • SynchronousQueue队列不存储数据,所以没有大小,也无法迭代;
  • 插入操作的返回必须等待另一个线程完成对应数据的删除操作,反之亦然;
  • 队列由两种数据结构组成,分别是后入先出的堆栈和先入先出的队列,堆栈是非公平的,队列是公平的。
0

评论区