SynchronousQueue是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介(实际上它不是一个真正的队列)。与其余队列不一样的是,它维护一组线程,这些线程在等待着把元素加入或移出队列,即生产者线程对其的插入操作put必须等待消费者的移除操作take。
1、SynchronousQueue介绍
SynchronousQueue是一个同步阻塞队列,没有存储功能,所以put和take会一直阻塞,直到有另外一个线程已经准备好参与到交付过程当中。即每一个 put操作都必须等待一个take操作。每一个take操作也必须等待一个put操作。
-
仅当有足够多的消费者,而且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
-
SynchronousQueue是没有容量的,无法存储元素节点信息,不能通过peek方法获取元素,peek方法会直接返回null。由于没有元素,所以不能被迭代,它的iterator方法会返回一个空的迭代器Collections.emptyIterator()。
-
SynchronousQueue比较适合线程通信、传递信息、状态切换等应用场景,一个线程必须等待另一个线程传递某些信息给他才可以继续执行。
SynchronousQueue 的整体设计比较抽象,在内部抽象出了两种算法实现,一种是先入先出的队列,一种是后入先出的堆栈,两种算法被两个内部类实现,而直接对外的put,take方法的实现就比较简单,都是直接调用两个内部类的transfer方法进行实现,整体的调用关系如下图所示:
SynchronousQueue的公平与非公平模式
SynchronousQueue 内部分为公平(队列)和非公平(栈),队列的性能相对而言会好点。默认是非公平的,通常非公平(栈 FIFO)的性能会高那么一点点。
-
公平模式(TransferQueue):队尾匹配(判断模式),队头出队,先进先出。
-
非公平模式(默认策略:TransferStack):栈顶匹配,栈顶出栈,后进先出。
SynchronousQueue的特点
-
SynchronousQueue 最大的特点在于,它的容量为0,没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
-
SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。
-
由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
-
使用的数据结构是链表。
-
使用CAS+自旋(无锁),自旋了一定次数后调用 LockSupport.park()进行阻塞。
2、SynchronousQueue源码解析
2.1 数据结构
// 堆栈和队列共同的接口
// 负责执行 put or take
abstract static class Transferer<E> {
// e 为空的,会直接返回特殊值,不为空会传递给消费者
// timed 为 true,说明会有超时时间
abstract E transfer(E e, boolean timed, long nanos);
}
// 堆栈 后入先出 非公平
// Scherer-Scott 算法
static final class TransferStack<E> extends Transferer<E> {
}
// 队列 先入先出 公平
static final class TransferQueue<E> extends Transferer<E> {
}
private transient volatile Transferer<E> transferer;
// 无参构造器默认为非公平的
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
从源码中我们可以得到几点:
- 堆栈和队列都有一个共同的接口,叫做Transferer,该接口有个方法:transfer,该方法很神奇,会承担take和put的双重功能;
- 在我们初始化的时候,是可以选择是使用堆栈还是队列的,如果你选择,默认的就是堆栈,类注释也说明了这一点,堆栈的效率比队列更高。
2.2 构造方法
/**
* 默认无参的构造方法:直接调用有参的构造方法,传入false(非公平模式)
*/
public SynchronousQueue() {
this(false);
}
/**
* 有参构造方法,参数是是否公平模式。
* fair:true公平模式、false非公平模式。
*/
public SynchronousQueue(boolean fair) {
// 将队列结构或者栈结构对象放在transferer对象上
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
2.3 入队方法
/**
* SynchronousQueue的入队方法
*/
public void put(E e) throws InterruptedException {
// 如果元素是NULL,抛出异常
if (e == null) throw new NullPointerException();
// 调用公平或者非公平模式的transfer方法
if (transferer.transfer(e, false, 0) == null) {
// 线程中断
Thread.interrupted();
// 抛出中断异常
throw new InterruptedException();
}
}
2.4 出队方法
/**
* SynchronousQueue的出方法
*/
public E take() throws InterruptedException {
// 调用公平或者非公平模式的transfer方法
E e = transferer.transfer(null, false, 0);
// 元素不为null,返回元素
if (e != null)
return e;
// 元素为null,中断线程
Thread.interrupted();
// 抛出中断异常
throw new InterruptedException();
}
2.5 公平模式(队列)源码分析
// 下一个节点
volatile QNode next; // next node in queue
// 当前元素
volatile Object item; // CAS'ed to or from null
// 当前所属的线程
volatile Thread waiter; // to control park/unpark
// put(true)还是take(false)的标记
final boolean isData;
/**
* 公平模式节点的构造方法
*/
TransferQueue() {
// 构建一个当前的元素为空,标记为take的节点
QNode h = new QNode(null, false); // initialize to dummy node.
// 将头节点设置为当前节点
head = h;
// 将尾节点设置为当前节点
tail = h;
}
/**
* 公平模式的存取方法
*/
E transfer(E e, boolean timed, long nanos) {
// 定义一个可能被使用的当前节点
QNode s = null; // constructed/reused as needed
// 判断当前的元素是不是空:区分是put(true)还是take(flse)
boolean isData = (e != null);
for (;;) {
// 得到队列的尾结点
QNode t = tail;
// 得到队列的头结点
QNode h = head;
// 头尾为空,说明没有被初始化。去下次循环
if (t == null || h == null) // saw uninitialized value
// 继续下次循环
continue; // spin
// 头尾节点相同(只有一个节点,或者为空的时候)或者模式相同
if (h == t || t.isData == isData) { // empty or same-mode
// 得到当前节点的下一个节点
QNode tn = t.next;
// 尾结点不一致,直接继续循环(由于其他节点入队了,导致读不一致)。
if (t != tail) // inconsistent read
// 继续下次循环
continue;
// 当前节点的下个节点有数据:有其他节点入队,并且下一个节点是其他的节点
if (tn != null) { // lagging tail
// 将下一个节点通过CAS的方式设置到尾结点上
advanceTail(t, tn);
// 继续下次循环
continue;
}
// 目前来看put和take传入的值timed都是false。
// 这里表示有时间限制的时候,超时了,直接返回null。
if (timed && nanos <= 0) // can't wait
return null;
// 当前的节点不存在,构建一个新节点,用当前传入的元素
if (s == null)
s = new QNode(e, isData);
// 下一个节点是null,并且将新节点通过CAS的方式放入到下一个节点
if (!t.casNext(null, s)) // failed to link in
// 失败的话继续下次循环
continue;
// 将当前节点设置为尾结点
advanceTail(t, s); // swing tail and wait
// 等待完成:自旋或阻塞线程,直到满足s.item != e(传入的数据不是当前数据)
Object x = awaitFulfill(s, e, timed, nanos);
// 节点被取消、中断或超时
if (x == s) { // wait was cancelled
// 清除s节点(当前节点)
clean(t, s);
// 节点被清除,返回null
return null;
}
// 没有被取消、中断或超时
if (!s.isOffList()) { // not already unlinked
// 如果是头部,取消连接
advanceHead(t, s); // unlink if head
// 当前元素不是空的
if (x != null) // and forget fields
// 在当前元素的设置到节点上
s.item = s;
// 当前节点的持有线程变为null
s.waiter = null;
}
// 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
return (x != null) ? (E)x : e;
// 模式不相同的逻辑
} else { // complementary-mode
// 得到头结点的下一个节点(要处理的节点)
QNode m = h.next; // node to fulfill
// 尾结点不为空,或者头结点的下个节点不为空,或者头结点有其他线程的变化。进行下次循环(其他线程进行了更改)。
if (t != tail || m == null || h != head)
continue; // inconsistent read
// 得到要处理节点的元素
Object x = m.item;
if (isData == (x != null) || // m already fulfilled : 要处理的节点位置有值(相同的模式)
x == m || // m cancelled : 节点被取消、中断或超时
!m.casItem(x, e)) { // lost CAS : cas不能讲当前元素转变(无值的时候变为有值,或者有值的时候变为无值)
// 尝试跳过头结点
advanceHead(h, m); // dequeue and retry
// 继续下次循环
continue;
}
// 匹配成功,将m变为head,虚拟节点
advanceHead(h, m); // successfully fulfilled
// 唤醒在要处理节点上等待的线程
LockSupport.unpark(m.waiter);
// 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
return (x != null) ? (E)x : e;
}
}
}
/**
* 将新节点通过CAS的方式设置到尾结点上
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
/**
* 等待完成:自旋或阻塞线程,直到满足s.item != e(传入的数据不是当前数据)
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
// 超时时间的计算,有超时时间就计算,没有超时时间就是0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 得到当前的线程
Thread w = Thread.currentThread();
// 计算需要自旋的次数
// 头结点的下一个节点是当前节点的时候:根据电脑性能计算自选次数
// 头结点的下一个节点不是当前节点的时候:自选次数为0
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果当前线程被中断
if (w.isInterrupted())
// 尝试取消当前节点
s.tryCancel(e);
// 得到当前节点的具体元素
Object x = s.item;
// 元素不相同,直接返回当前的元素。tryCancel方法会设置为this导致这里不一致
if (x != e)
return x;
// 如果设置了超时时间,判断超时时间
if (timed) {
// 计算剩余时间
nanos = deadline - System.nanoTime();
// 剩余时间小于0
if (nanos <= 0L) {
// 取消当前节点
s.tryCancel(e);
// 继续下次循环
continue;
}
}
// 步数大于0,步数减一:减少可自选的总数
if (spins > 0)
--spins;
// 次数用完了,设置一下s的等待线程为当前线程
else if (s.waiter == null)
s.waiter = w;
// 没有超时设置的阻塞
else if (!timed)
LockSupport.park(this);
// 超时时间大于1000L的时候,使用判断时间的阻塞(时间nanos大于0才会阻塞)
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
/**
* 尝试取消当前的节点
*/
void tryCancel(Object cmp) {
// 通过CAS的方式将传入的变量设置为this
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
/**
* 清除s节点
*/
void clean(QNode pred, QNode s) {
// 设置S的线程为null
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
// 最后一个节点要保留一个为null的节点
while (pred.next == s) { // Return early if already unlinked
// 得到头结点
QNode h = head;
// 得到头结点的下一个节点
QNode hn = h.next; // Absorb cancelled first node as head
// 下一个节点不为空并且被取消了
if (hn != null && hn.isCancelled()) {
// 头结点变为下一个节点
advanceHead(h, hn);
// 继续下次循环
continue;
}
// 得到尾结点
QNode t = tail; // Ensure consistent read for tail
// 头尾相等,队列空了,直接返回
if (t == h)
return;
// 得到下一个节点的下一个节点
QNode tn = t.next;
// 尾结点不一致,直接继续循环(由于其他节点入队了,导致读不一致)。
if (t != tail)
// 继续下次循环
continue;
// 头结点的下一个节点不是空的时候
if (tn != null) {
// 尾结点变为下一个节点
advanceTail(t, tn);
// 继续下次循环
continue;
}
// 当前节点不是尾结点,尝试赋值到尾结点
if (s != t) { // If not tail, try to unsplice
// 得到当前节点的下一个节点
QNode sn = s.next;
// 当前节点和下一个节点相同或者cas成功将当前的节点设置为下一个(跳过了当前节点,删除成功)
if (sn == s || pred.casNext(s, sn))
// 直接返回
return;
}
// 走到这里,说明需要删除的s节点是队尾节点,需要使用cleanMe
QNode dp = cleanMe;
// 尝试取消上一个已取消的节点的链接
if (dp != null) { // Try unlinking previous cancelled node
// 得到删除节点的下一个节点(要删除的元素)
QNode d = dp.next;
// 得到要删除元素的下一个元素
QNode dn;
if (d == null || // d is gone or :要删除的元素已经删除了
d == dp || // d is off list or : 要删除的元素不在列表上
!d.isCancelled() || // d not cancelled or :要删除的元素没有被取消
(d != t && // d not tail and : 要删除的元素不是尾结点
(dn = d.next) != null && // has successor : 要删除的元素有后继节点
dn != d && // that is on list : 在这个名单上
dp.casNext(d, dn))) // d unspliced : 可以cas到下一步
// 通过CAS的方式把cleanMe变为null
casCleanMe(dp, null);
// 要删除的元素和传入的元素一致:当前节点已保存到节点
if (dp == pred)
return; // s is already saved node
// 当前节点可以放入到cleanMe中,推迟清除,直接返回
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
/**
* 判断节点是否被取消:取消的时候会cas节点为this
*/
boolean isCancelled() {
return item == this;
}
/**
* 头结点变为下一个节点
*/
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
线程被阻塞住后,当前线程是如何把自己的数据传给阻塞线程的?假设线程 1 从队列中 take 数据,被阻塞,变成阻塞线程 A 然后线程 2 开始往队列中 put 数据 B,大致的流程如下:
- 线程 1 从队列 take 数据,发现队列内无数据,于是被阻塞,成为 A
- 线程 2 往队尾 put 数据,会从队尾往前找到第一个被阻塞的节点,假设此时能找到的就是节点 A,然后线程 B 把将 put 的数据放到节点 A 的 item 属性里面,并唤醒线程 1
- 线程 1 被唤醒后,就能从 A.item 里面拿到线程 2 put 的数据了,线程 1 成功返回。
在这个过程中,公平主要体现在,每次 put 数据的时候,都 put 到队尾上,而每次拿数据时,并不是直接从队头拿数据,而是从队尾往前寻找第一个被阻塞的线程,这样就会按照顺序释放被阻塞的线程。
2.6 非公平模式(堆栈)源码分析
从上图中我们可以看到,我们有一个大的堆栈池,池的开口叫做堆栈头,put的时候,就往堆栈池中放数据。take的时候,就从堆栈池中拿数据,两者操作都是在堆栈头上操作数据,从图中可以看到,越靠近堆栈头,数据越新,所以每次take的时候,都会拿到堆栈头的最新数据,这就是我们说的后入先出,也就是非公平的。
/**
* 非公平模式的存取方法
*/
E transfer(E e, boolean timed, long nanos) {
// 义一个可能被使用的当前节点
SNode s = null; // constructed/reused as needed
// 得到模式:true为take,false为put
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
//得到投机诶单
SNode h = head;
// 头结点为空或者节点模式与头结点相同,压栈
if (h == null || h.mode == mode) { // empty or same-mode
// 有超时时间的设置,并且超时时间设置小于0
if (timed && nanos <= 0) { // can't wait
// 头结点不为空,并且被取消了
if (h != null && h.isCancelled())
// 弹出头结点。无后续代码,进入下次循环
casHead(h, h.next); // pop cancelled node
else
// 超时了直接返回null
return null;
// 没有超时的情况,尝试将s设置为头结点
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 自选,等待线程匹配成功
SNode m = awaitFulfill(s, timed, nanos);
// 等待的线程被取消了,清除当前节点
if (m == s) { // wait was cancelled
// 清除s节点(当前节点)
clean(s);
// 节点被清除,直接返回null
return null;
}
// 头结点不是空的并且头节点的下一个节点是当前节点
if ((h = head) != null && h.next == s)
// 将s的下一个节点设置为头结点
casHead(h, s.next); // help s's fulfiller
// 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 节点模式不相同,并且(m & FULFILLING) != 0 。model的值为1或者0(上门的逻辑创建的节点)返回false,model的值2或者3(这个判断里面创建的节点)返回true
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 头结点被取消
if (h.isCancelled()) // already cancelled
// 移除头结点
casHead(h, h.next); // pop and retry
// 创建一个节点,这个节点的状态为2或者3。2 & 2 = 2。3 & 2 = 2。
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
// 得到当前节点的下一个节点:新创建的元素中,m是头结点
SNode m = s.next; // m is s's match
// 头结点被其他线程执行了
if (m == null) { // all waiters are gone
// 移除当前节点
casHead(s, null); // pop fulfill node
// 取消节点的引用
s = null; // use new node next time
// 跳出内部的循环
break; // restart main loop
}
// 得到头结点的下一个节点
SNode mn = m.next;
// 尝试匹配:匹配成功会唤醒等待的线程
if (m.tryMatch(s)) {
// s是头结点的时候,移除!
casHead(s, mn); // pop both s and m
// 返回处理后的元素:这里就是反着来,传入null返回数据,传入数据返回null
return (E) ((mode == REQUEST) ? m.item : s.item);
// 唤醒失败了,s不是头结点
} else // lost match
// 移除真正的头结点
s.casNext(m, mn); // help unlink
}
}
// 这里是出栈的逻辑
} else { // help a fulfiller
// 得到头结点的下一个节点
SNode m = h.next; // m is h's match
// 下一个节点为空
if (m == null) // waiter is gone
// 栈顶变为null
casHead(h, null); // pop fulfilling node
// 下一个节点有值
else {
// 得到栈顶元素
SNode mn = m.next;
// 头节点尝试匹配:匹配成功会唤醒等待的线程
if (m.tryMatch(h)) // help match
// 移除头结点
casHead(h, mn); // pop both h and m
else // lost match
// 匹配失败,m是栈顶,尝试移除当前栈顶
h.casNext(m, mn); // help unlink
}
}
}
}
/**
* 栈结构的构造方法
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {
// 节点为空,以传入的元素构建一个节点
if (s == null) s = new SNode(e);
// 设置当前节点的模式
s.mode = mode;
// 设置当前节点的下一个节点
s.next = next;
// 返回当前的节点
return s;
}
/**
* 等待完成:自旋或阻塞线程,直到匹配成功
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 超时时间的计算,有超时时间就计算,没有超时时间就是0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 得到当前的线程
Thread w = Thread.currentThread();
// 计算需要自旋的次数
// 头结点的下一个节点是当前节点的时候:根据电脑性能计算自选次数
// 头结点的下一个节点不是当前节点的时候:自选次数为0
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果当前线程被中断
if (w.isInterrupted())
// 尝试取消当前节点
s.tryCancel();
// 获取当前节点的匹配节点
SNode m = s.match;
// 节点不为空,直接返回
if (m != null)
return m;
// 如果设置了超时时间,判断超时时间
if (timed) {
// 计算剩余时间
nanos = deadline - System.nanoTime();
// 剩余时间小于0
if (nanos <= 0L) {
// 尝试取消当前节点
s.tryCancel();
// 继续下次循环
continue;
}
}
// 步数大于0,步数减一:减少可自选的总数
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 次数用完了,设置一下s的等待线程为当前线程
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
// 没有超时设置的阻塞
else if (!timed)
LockSupport.park(this);
// 超时时间大于1000L的时候,使用判断时间的阻塞(时间nanos大于0才会阻塞)
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
/**
* 尝试取消当前的节点
*/
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
/**
* 清除s节点
*/
void clean(SNode s) {
// 设置当前的元素为null
s.item = null; // forget item
// 设置S的线程为null
s.waiter = null; // forget thread
// 得到当前节点的下一个节点
SNode past = s.next;
// 下一个个节点有值但是被取消了,尝试获取下下个
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
// 定义一个临时的节点
SNode p;
// 头结点不为空并且头结点不是下一个节点并且头结点是被取消
while ((p = head) != null && p != past && p.isCancelled())
// 通过CAS的方式将头结点变为下一个节点,直到找到一个没有被取消的
casHead(p, p.next);
// Unsplice embedded nodes
// 头结点不为空,并且和下一个节点不相等(头结点为null)
while (p != null && p != past) {
// 得到下一个节点
SNode n = p.next;
// 下一个节点不为空,并且没有被取消
if (n != null && n.isCancelled())
// 尝试头结点变为下一个:清除掉现在的头结点
p.casNext(n, n.next);
else
// 被取消了的话,直接跳过头结点!
p = n;
}
}
/**
* 尝试匹配:匹配成功会唤醒等待的线程
*/
boolean tryMatch(SNode s) {
// 匹配节点为空并且CAS成功变为s节点
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
// 得到现在持有锁的线程
Thread w = waiter;
// 有现成持有锁
if (w != null) { // waiters need at most one unpark
// 取消持有锁的线程
waiter = null;
// 唤醒持有锁的线程
LockSupport.unpark(w);
}
// 返回成功
return true;
}
// 节点相同返回成功,否则返回失败
return match == s;
}
总结一下操作思路:
- 判断是 put 方法还是 take 方法
- 判断栈头数据是否为空,如果为空或者栈头的操作和本次操作一致,是的话走 3,否则走 5
- 判断操作有无设置超时时间,如果设置了超时时间并且已经超时,返回 null,否则走 4
- 如果栈头为空,把当前操作设置成栈头,或者栈头不为空,但栈头的操作和本次操作相同,也把当前操作设置成栈头,并看看其它线程能否满足自己,不能满足则阻塞自己。比如当前操作是 take,但队列中没有数据,则阻塞自己
- 如果栈头已经是阻塞住的,需要别人唤醒的,判断当前操作能否唤醒栈头,可以唤醒走 6,否则走 4
- 把自己当作一个节点,赋值到栈头的 match 属性上,并唤醒栈头节点
- 栈头被唤醒后,拿到 match 属性,就是把自己唤醒的节点的信息,返回。
3、存取操作及执行逻辑
常用方法
iterator() 永远返回空,因为里面没有东西
peek() 永远返回null
put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
take() 取出并且remove掉queue里的element,取不到东西他会一直等。
poll() 取出并且remove掉queue里的element,只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
isEmpty() 永远返回true
remove()&removeAll() 永远返回false
SynchronousQueue的存取操作
- 存取调用同一个方法:transfer()。
- put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中。
- take、poll 为消费者,不携帯数据,为 Request 模式,设置到 SNode或QNode属性中。
SynchronousQueue的大概执行逻辑
- 第一个线程Thread0是消费者访问,此时队列为空,则入队(创建Node结点并赋值)。
- 第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队。
- 第三个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作。直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队。
SynchronousQueue的执行过程
- 线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同。
- 相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性。
- 不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队。
4、应用场景
-
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
-
SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。
-
Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
5、应用示例
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
static class SynchronousQueueProducer implements Runnable {
protected BlockingQueue<String> blockingQueue;
final Random random = new Random();
public SynchronousQueueProducer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = UUID.randomUUID().toString();
System.out.println("Put: " + data);
blockingQueue.put(data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class SynchronousQueueConsumer implements Runnable {
protected BlockingQueue<String> blockingQueue;
public SynchronousQueueConsumer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = blockingQueue.take();
System.out.println(Thread.currentThread().getName()
+ " take(): " + data);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
synchronousQueue);
new Thread(queueProducer).start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer1).start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
synchronousQueue);
new Thread(queueConsumer2).start();
}
}
执行结果:
Put: 8ef86daa-7ef2-43a4-9848-1041cfb85934
Thread-1 take(): 8ef86daa-7ef2-43a4-9848-1041cfb85934
Put: 3d647a9a-62cd-4996-96c0-22b4dd169a85
Thread-2 take(): 3d647a9a-62cd-4996-96c0-22b4dd169a85
Put: 3fa489c5-37c6-4e98-8527-2306bbb5b308
Thread-1 take(): 3fa489c5-37c6-4e98-8527-2306bbb5b308
Put: dc62de60-c052-49ef-bf62-90ba56de5194
Thread-2 take(): dc62de60-c052-49ef-bf62-90ba56de5194
Put: e248c117-0e7f-4ebb-b75b-d44abf23a255
Thread-1 take(): e248c117-0e7f-4ebb-b75b-d44abf23a255
Put: 62ec50ee-5718-4167-b3ed-c310e7f40130
Thread-2 take(): 62ec50ee-5718-4167-b3ed-c310e7f40130
......
6、总结
- SynchronousQueue队列不存储数据,所以没有大小,也无法迭代;
- 插入操作的返回必须等待另一个线程完成对应数据的删除操作,反之亦然;
- 队列由两种数据结构组成,分别是后入先出的堆栈和先入先出的队列,堆栈是非公平的,队列是公平的。
评论区