Exchanger(交换者)是一个用于线程间协作的工具类,主要用于两个线程间的通信,Exchanger提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
1、概述
Exchanger来自于JDK1.5的JUC包,顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。
- 每次只能两个线程交换数据,如果有多个线程,也只有两个能交换数据。
- Exchanger是双向的数据传输,2个线程在一个同步点,交换数据。先到的线程会等待第二个线程执行exchange, SynchronousQueue,是2个线程之间单向的数据传输,一个put,一个take。
2、源码解析
Exchanger 使用是非常简单的,但是实现原理和前面几种工具比较确实最难的,前面几种工具都是通过同步器或者锁来实现,而Exchanger 是一种无锁算法,和前面SynchronousQueue 一样,都是通过循环 cas 来实现线程安全,因此这种方式就会显得比较抽象和麻烦。
Exchanger 有单槽位和多槽位之分,单个槽位在同一时刻只能用于两个线程交换数据,这样在竞争比较激烈的时候,会影响到性能,多个槽位就是多个线程可以同时进行两个的数据交换,彼此之间不受影响,这样可以很好的提高吞吐量。
Exchanger直接继承自Object。类定义如下:
public class Exchanger<V>
其中V表示需要交换的对象类型。
2.1 构造器
Exchanger提供一个无参构造函数。
public Exchanger() {
participant = new Participant();
}
2.2 属性
Exchanger包含的属性如下:
//ThreadLocal变量,每个线程都有自己的一个副本
private final Participant participant;
//高并发下使用的,保存待匹配的Node实例
private volatile Node[] arena;
//低并发下,arena未初始化时使用的保存待匹配的Node实例
private volatile Node slot;
//初始值为0,当创建arena后会被赋值成SEQ,用来记录arena数组的可用最大索引,会随着并发的增大而增大直到等于最大值FULL,会随着并行的线程逐一匹配成功而减少恢复成初始值
private volatile int bound;
还有多个表示字段偏移量的静态属性,通过static代码块初始化,如下:
Exchanger 定义了多个静态常量,如下:
// 初始化arena时使用,1 << ASHIFT是一个缓存行的大小,避免不同的Node落入到同一个高速缓存行
// 这里实际是把数组容量扩大了8倍,原来索引相邻的两个元素,扩容后中间隔了7个元素,从元素的起始地址上看就隔了8个元素,中间的7个都是空的,为了避免原来相邻的两个元素落到同一个缓存行中
//因为arena是对象数组,一个元素占8字节,8个就是64字节
private static final int ASHIFT = 7;
//arena数组元素的索引最大值即255
private static final int MMASK = 0xff;
//arena数组的最大长度即256
private static final int SEQ = MMASK + 1;
//获取CPU核数
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//实际的数组长度,因为是线程两两配对的,所以最大长度是核数除以2
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
//自旋等待的次数
private static final int SPINS = 1 << 10;
//如果交换的对象是null,则返回此对象
private static final Object NULL_ITEM = new Object();
//如果等待超时导致交换失败,则返回此对象
private static final Object TIMED_OUT = new Object();
上述属性中Node和Participant都是内部类,其定义如下:
其中Contended注解是为了避免高速缓存行导致的伪共享问题,index用来记录arena数组的索引,bound用于记录上一次的Exchanger bound属性,collides用于记录在bound不变的情况下CAS抢占失败的次数,hash是自旋等待时计算随机数使用的,item表示当前线程请求交换的对象,match是同其他线程交换的结果,match不为null表示交换成功,parked为跟该Node关联的处于休眠状态的线程。
2.3 数据结构
槽位定义:
@sun.misc.Contended static final class Node {
int index; //arena的下标,多个槽位的时候利用
int bound; // 上一次记录的Exchanger.bound;
int collides; // 在当前bound下CAS失败的次数;
int hash; // 用于自旋;
Object item; // 这个线程的当前项,也就是需要交换的数据;
volatile Object match; // 交换的数据
volatile Thread parked; // 线程
}
/**
* Value representing null arguments/returns from public
* methods. Needed because the API originally didn't disallow null
* arguments, which it should have.
* 如果交换的数据为 null,则用NULL_ITEM 代替
*/
private static final Object NULL_ITEM = new Object();
Node 定义中,index,bound,collides 这些都是用于多槽位的,item 是本线程需要交换的数据,match 是和其它线程交换后的数据,开始是为null,交换数据成功后,就是我们需要的数据了,parked记录线程,用于阻塞和唤醒线程。
/** The number of CPUs, for sizing and spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* The bound for spins while waiting for a match. The actual
* number of iterations will on average be about twice this value
* due to randomization. Note: Spinning is disabled when NCPU==1.
*/
private static final int SPINS = 1 << 10; // 自旋次数
/**
* Slot used until contention detected.
*/
private volatile Node slot; // 用于交换数据的槽位
Node是每个线程自身用于数据交换的节点,相当于每个Node就代表了每个线程,为了保证线程安全,把线程的Node 节点 放在哪里呢,当然是ThreadLocal咯。
/**
* Per-thread state 每个线程的数据,ThreadLocal 子类
*/
private final Participant participant;
/** The corresponding thread local class */
static final class Participant extends ThreadLocal<Node> {
// 初始值返回Node
public Node initialValue() { return new Node(); }
}
2.4 方法解析
Exchanger提供了两个主要方法:
// 阻塞当前线程,直到另外一个线程调用exchange方法或者当前线程被中断。
public V exchange(V x) throws InterruptedException
当这个方法被调用的时候,当前线程将会等待直到其他的线程调用同样的方法。当其他的线程调用exchange之后,当前线程将会继续执行。
- x: 需要交换的对象。
- 在等待过程中,如果有其他的线程interrupt当前线程,则会抛出InterruptedException。
// 阻塞当前线程,直到另外一个线程调用exchange方法或者当前线程被中断或者等待超时。
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
和第一个方法类似,区别是多了一个timeout时间。如果在timeout时间之内没有其他线程调用exchange方法,则会抛出TimeoutException。
- x: 需要交换的对象。
- timeout:超时时间。
- unit:超时时间单位。
2.4.1 exchange方法解析
没有设定超时时间的exchange 方法
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
具有超时功能的exchange 方法
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Object v;
Object item = (x == null) ? NULL_ITEM : x;
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
((Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null)))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
}
exchange 执行单槽位交换还是多槽位交换,同时如果发生中断,则返回前会重设中断标志位 这几个操作通过一个语句来实现,因此看的时候可能需要仔细一点。
两个方法,主要的不同还是在于是否有超时时间设置,如果有超时时间设置,那么如果在指定的时间内没有交换到数据,那么就会返回(抛出超时异常),不会一直等待。
2.4.2 单槽位交换解析
单槽位交换的核心方法slotExchange
private final Object slotExchange(Object item, boolean timed, long ns) {
// 得到一个初试的Node
Node p = participant.get();
// 当前线程
Thread t = Thread.currentThread();
// 如果发生中断,返回null,会重设中断标志位,并没有直接抛异常
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
// 槽位 solt不为null,则说明已经有线程在这里等待交换数据了
if ((q = slot) != null) {
// 重置槽位
if (U.compareAndSwapObject(this, SLOT, q, null)) {
//获取交换的数据
Object v = q.item;
//等待线程需要的数据
q.match = item;
//等待线程
Thread w = q.parked;
//唤醒等待的线程
if (w != null)
U.unpark(w);
return v; // 返回拿到的数据,交换完成
}
// create arena on contention, but continue until slot null
//存在竞争,其它线程抢先了一步该线程,因此需要采用多槽位模式,这个后面再分析
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null) //多槽位不为空,需要执行多槽位交换
return null; // caller must reroute to arenaExchange
else { //还没有其他线程来占据槽位
p.item = item;
// 设置槽位为p(也就是槽位被当前线程占据)
if (U.compareAndSwapObject(this, SLOT, null, p))
break; // 退出无限循环
p.item = null; // 如果设置槽位失败,则有可能其他线程抢先了,重置item,重新循环
}
}
//当前线程占据槽位,等待其它线程来交换数据
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
// 直到成功交换到数据
while ((v = p.match) == null) {
if (spins > 0) { // 自旋
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
// 主动让出cpu,这样可以提供cpu利用率(反正当前线程也自旋等待,还不如让其它任务占用cpu)
Thread.yield();
}
else if (slot != p) //其它线程来交换数据了,修改了solt,但是还没有设置match,再稍等一会
spins = SPINS;
//需要阻塞等待其它线程来交换数据
//没发生中断,并且是单槽交换,没有设置超时或者超时时间未到 则继续执行
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
// cas 设置BLOCKER,可以参考Thread 中的parkBlocker
U.putObject(t, BLOCKER, this);
// 需要挂起当前线程
p.parked = t;
if (slot == p)
U.park(false, ns); // 阻塞当前线程
// 被唤醒后
p.parked = null;
// 清空 BLOCKER
U.putObject(t, BLOCKER, null);
}
// 不满足前面 else if 条件,交换失败,需要重置solt
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
//清空match
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
// 返回交换得到的数据(失败则为null)
return v;
}
当一个线程来交换数据时,如果发现槽位(solt)有数据时,说明其它线程已经占据了槽位,等待交换数据,那么当前线程就和该槽位进行数据交换,设置相应字段,如果交换失败,则说明其它线程抢先了该线程一步和槽位交换了数据,那么这个时候就存在竞争了,这个时候就会生成多槽位(area),后面就会进行多槽位交换了。
如果来交换的线程发现槽位没有被占据,啊哈,这个时候自己就把槽位占据了,如果占据失败,则有可能其他线程抢先了占据了槽位,重头开始循环。
当来交换的线程占据了槽位后,就需要等待其它线程来进行交换数据了,首先自己需要进行一定时间的自旋,因为自旋期间有可能其它线程就来了,那么这个时候就可以进行数据交换工作,而不用阻塞等待了,如果不幸,进行了一定自旋后,没有其他线程到来,那么还是避免不了需要阻塞(如果设置了超时等待,发生了超时或中断异常,则退出,不阻塞等待),当准备阻塞线程的时候,发现槽位值变了,那么说明其它线程来交换数据了,但是还没有完全准备好数据,这个时候就不阻塞了,再稍微等那么一会,如果始终没有等到其它线程来交换,那么就挂起当前线程。
当其它线程到来并成功交换数据后,会唤醒被阻塞的线程,阻塞的线程被唤醒后,拿到数据(如果是超时,或中断,则数据为null)返回,结束。
单槽位的交换就结束了,整个过程应该不难,如果竞争激烈,那么一个槽位显然就成了性能瓶颈了,因此就衍生出了多槽位交换,各自交换各自的,互不影响。
2.4.3 多槽位交换解析
多槽位实际就是一个 Node 数组,代表了很多的槽位。多槽位交换用 arenaExchange 方法。
多槽的交换大致思想就是:当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待,如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,如果有数据就交换,没数据就等一会,但是不会阻塞在这里,在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪,如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。
简单来说,如果有竞争冲突,那么就寻找后面的槽位,在后面的槽位等待一定时间,没有线程来交换,那么就又往前挪。
private final Object arenaExchange(Object item, boolean timed, long ns) {
// 槽位数组
Node[] a = arena;
//代表当前线程的Node
Node p = participant.get(); // p.index 初始值为 0
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
//在槽位数组中根据"索引" i 取出数据 j相当于是 "第一个"槽位
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
// 该位置上有数据(即有线程在这里等待交换数据)
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
// 进行数据交换,这里和单槽位的交换是一样的
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
// bound 是最大的有效的 位置,和MMASK相与,得到真正的存储数据的索引最大值
else if (i <= (m = (b = bound) & MMASK) && q == null) {
// i 在这个范围内,该槽位也为空
//将需要交换的数据 设置给p
p.item = item; // offer
//设置该槽位数据(在该槽位等待其它线程来交换数据)
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
// 进行一定时间的自旋
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
//在自旋的过程中,有线程来和该线程交换数据
if (v != null) {
//交换数据后,清空部分设置,返回交换得到的数据,over
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
// 交换数据的线程到来,但是还没有设置好match,再稍等一会
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS;
//符合条件,特别注意m==0 这个说明已经到达area 中最小的存储数据槽位了
//没有其他线程在槽位等待了,所有当前线程需要阻塞在这里
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
// 再次检查槽位,看看在阻塞前,有没有线程来交换数据
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns); // 挂起
p.parked = null;
U.putObject(t, BLOCKER, null);
}
// 当前这个槽位一直没有线程来交换数据,准备换个槽位试试
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
//更新bound
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
// 减小索引值 往"第一个"槽位的方向挪动
i = p.index >>>= 1; // descend
// 发送中断,返回null
if (Thread.interrupted())
return null;
// 超时
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart 继续主循环
}
}
}
else
//占据槽位失败,先清空item,防止成功交换数据后,p.item还引用着item
p.item = null; // clear offer
}
else { // i 不在有效范围,或者被其它线程抢先了
//更新p.bound
if (p.bound != b) { // stale; reset
p.bound = b;
//新bound ,重置collides
p.collides = 0;
//i如果达到了最大,那么就递减
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1; // 更新冲突
// i=0 那么就从m开始,否则递减i
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
//递增,往后挪动
i = m + 1; // grow
// 更新index
p.index = i;
}
}
}
3、使用场景
Exchanger应用在遗传算法和管道的设计。遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。
Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致。
使用 Exchanger 在线程间交换缓冲区,因此,在需要时,填充缓冲区的线程获取一个新腾空的缓冲区,并将填满的缓冲区传递给腾空缓冲区的线程。
4、代码示例
4.1 示例一
package com.kz.example.juc;
import java.util.concurrent.Exchanger;
/**
* 使用Exchanger
*
* @Author kongzi
* @Date 2022/12/8 18:30
* @Version 1.0
*/
public class UseExchangerDemo2 {
static Exchanger<String> exchanger = new Exchanger<String>();
static class Task implements Runnable {
@Override
public void run() {
try {
String result = exchanger.exchange(Thread.currentThread().getName());
System.out.println("this is " + Thread.currentThread().getName() + " receive data:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new Task(), "thread1");
Thread t2 = new Thread(new Task(), "thread2");
t1.start();
t2.start();
t1.join();
t2.join();
}
}
执行结果:
this is thread1 receive data:thread2
this is thread2 receive data:thread1
4.2 示例二
每次只能两个线程交换数据,如果有多个线程,也只有两个能交换数据。下面看个通俗的例子:一手交钱一首交货!
package com.kz.example.juc;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用Exchanger
*
* @Author kongzi
* @Date 2022/12/8 18:48
* @Version 1.0
*/
public class UseExchangerTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
// 定义一个交换对象,用来交换数据
final Exchanger exchanger = new Exchanger();
// 开启一个线程执行任务
service.execute(new Runnable() {
@Override
public void run() {
try {
String data1 = "工艺品";
System.out.println("线程" + Thread.currentThread().getName() + "正在把货物" + data1 + "拿出来");
Thread.sleep((long) (Math.random() * 10000));
// 把要交换的数据传到exchange方法中,然后被阻塞,等待另一个线程与之交换。返回交换后的数据
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName() + "用" + data1 + "换来了" + data2);
} catch (Exception e) {
} finally {
service.shutdown();
System.out.println("交易完毕,拿着钱快跑!");
}
}
});
// 开启另一个线程执行任务
service.execute(new Runnable() {
@Override
public void run() {
try {
String data1 = "300万";
System.out.println("线程" + Thread.currentThread().getName() + "正在把" + data1 + "拿出来");
Thread.sleep((long) (Math.random() * 10000));
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName() + "用300万弄到了" + data2);
} catch (Exception e) {
} finally {
service.shutdown();
System.out.println("交易完毕,拿着工艺品快跑!");
}
}
});
}
}
执行结果:
线程pool-1-thread-1正在把货物工艺品拿出来
线程pool-1-thread-2正在把300万拿出来
线程pool-1-thread-2用300万弄到了工艺品
线程pool-1-thread-1用工艺品换来了300万
交易完毕,拿着工艺品快跑!
交易完毕,拿着钱快跑!
4.3 示例三
调用exchange超时方法,若超时未交换数据则会抛出异常。如:卖家被放鸽子,交易失败!
package com.kz.example.juc;
import java.util.concurrent.*;
/**
* 使用Exchanger
*
* @Author kongzi
* @Date 2022/12/8 18:59
* @Version 1.0
*/
public class UseExchangerTest2 {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
// 定义一个交换对象,用来交换数据
final Exchanger exchanger = new Exchanger();
// 开启一个线程执行任务
service.execute(new Runnable() {
@Override
public void run() {
String data2 = null;
try {
String data1 = "工艺品";
System.out.println("线程" + Thread.currentThread().getName() + "正在把货物" + data1 + "拿出来");
// 把要交换的数据传到exchange方法中,然后被阻塞,等待另一个线程与之交换。返回交换后的数据
data2 = (String) exchanger.exchange(data1, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException ex) {
System.out.println("等待超时-TimeoutException");
} finally {
service.shutdown();
}
if (data2 == null) {
System.out.println("卖家被放鸽子,交易失败!");
}
}
});
}
}
执行结果:
线程pool-1-thread-1正在把货物工艺品拿出来
等待超时-TimeoutException
卖家被放鸽子,交易失败!
4.4 示例四
使用Exchanger在线程之间交换缓冲区:填满的缓冲区得到清空,清空的缓冲区得到填满。
package com.kz.example.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
/**
* 使用Exchanger在线程之间交换缓冲区:填满的缓冲区得到清空,清空的缓冲区得到填满。
*
* @Author kongzi
* @Date 2022/12/8 19:44
* @Version 1.0
*/
public class UseExchangerFillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<>();
DataBuffer initialEmptyBuffer = new DataBuffer();
DataBuffer initialFullBuffer = new DataBuffer();
class FillingLoop implements Runnable {
@Override
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull()) {
currentBuffer = exchanger.exchange(currentBuffer);
}
}
} catch (InterruptedException ex) {
System.out.println("... handle ...");
}
}
}
private void addToBuffer(DataBuffer currentBuffer) {
if (currentBuffer.list.size() == 0) {
for (int i = 0; i < 10; i++) {
currentBuffer.add(i);
}
}
}
class EmptyingLoop implements Runnable {
@Override
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty()) {
currentBuffer = exchanger.exchange(currentBuffer);
}
}
} catch (InterruptedException ex) {
System.out.println("... handle ...");
}
}
}
private void takeFromBuffer(DataBuffer currentBuffer) {
if (currentBuffer.list.size() == 10) {
for (int i = 0; i < 10; i++) {
currentBuffer.remove(i);
}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
public static void main(String[] args) {
new UseExchangerFillAndEmpty().start();
}
class DataBuffer {
private List<Integer> list = new ArrayList<>();
public boolean isFull() {
if (list.size() == 10) {
return true;
} else {
return false;
}
}
public boolean isEmpty() {
if (list.size() == 0) {
return true;
} else {
return false;
}
}
public void add(Integer i) {
list.add(i);
}
public void remove(Integer i) {
list.remove(i);
}
}
}
4.5 示例五
Exchanger用于两个线程之间交换数据,当然实际参与的线程可以不止两个。
package com.kz.example.juc;
import java.security.SecureRandom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
/**
* Exchanger用于两个线程之间交换数据,当然实际参与的线程可以不止两个
*
* @Author kongzi
* @Date 2022/12/8 18:30
* @Version 1.0
*/
public class UseExchangerDemo3 {
public static void main(String[] args) throws Exception {
Exchanger<String> exchanger = new Exchanger<>();
CountDownLatch countDownLatch = new CountDownLatch(5);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
SecureRandom rand = new SecureRandom();
String origMsg = String.valueOf(rand.nextInt(1000));
//先到达的线程会在此等待,直到有一个线程跟他交互数据或者等待超时
String exchangeMsg = exchanger.exchange(origMsg, 5, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + ",origMsg->" + origMsg + ",exchangeMsg->" + exchangeMsg);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
};
for (int i = 0; i < 5; i++) {
new Thread(runnable).start();
}
countDownLatch.await();
}
}
执行结果,第5个线程因为没有匹配的线程而等待超时:
Thread-3,origMsg->452,exchangeMsg->477
Thread-0,origMsg->477,exchangeMsg->452
Thread-4,origMsg->998,exchangeMsg->880
Thread-2,origMsg->880,exchangeMsg->998
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.kz.example.juc.UseExchangerDemo3$1.run(UseExchangerDemo3.java:28)
at java.lang.Thread.run(Thread.java:745)
4.6 示例六
package com.kz.example.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
/**
* 模拟消息消费的场景来观察Exchanger的行为
*
* @Author kongzi
* @Date 2022/12/8 18:30
* @Version 1.0
*/
public class UseExchangerConsumer {
static class Producer implements Runnable {
// 生产者、消费者交换的数据结构
private List<String> buffer;
// 生产者和消费者的交换对象
private Exchanger<List<String>> exchanger;
Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 1; i < 5; i++) {
System.out.println("生产者第" + i + "次提供");
for (int j = 1; j <= 3; j++) {
System.out.println("生产者装入" + i + "--" + j);
buffer.add("buffer:" + i + "--" + j);
}
System.out.println("生产者装满,等待与消费者交换...");
try {
exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 1; i < 5; i++) {
//调用exchange()与消费者进行数据交换
try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者第" + i + "次提取");
for (int j = 1; j <= 3; j++) {
System.out.println("消费者 : " + buffer.get(0));
buffer.remove(0);
}
}
}
}
public static void main(String[] args) {
List<String> buffer1 = new ArrayList<String>();
List<String> buffer2 = new ArrayList<String>();
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
Thread producerThread = new Thread(new Producer(buffer1, exchanger));
Thread consumerThread = new Thread(new Consumer(buffer2, exchanger));
producerThread.start();
consumerThread.start();
}
}
执行结果:
生产者第1次提供
生产者装入1--1
生产者装入1--2
生产者装入1--3
生产者装满,等待与消费者交换...
消费者第1次提取
消费者 : buffer:1--1
消费者 : buffer:1--2
消费者 : buffer:1--3
生产者第2次提供
生产者装入2--1
生产者装入2--2
生产者装入2--3
生产者装满,等待与消费者交换...
生产者第3次提供
生产者装入3--1
消费者第2次提取
生产者装入3--2
消费者 : buffer:2--1
生产者装入3--3
消费者 : buffer:2--2
生产者装满,等待与消费者交换...
消费者 : buffer:2--3
消费者第3次提取
生产者第4次提供
消费者 : buffer:3--1
生产者装入4--1
消费者 : buffer:3--2
生产者装入4--2
消费者 : buffer:3--3
生产者装入4--3
生产者装满,等待与消费者交换...
消费者第4次提取
消费者 : buffer:4--1
消费者 : buffer:4--2
消费者 : buffer:4--3
首先生产者Producer、消费者Consumer首先都创建一个缓冲列表,通过Exchanger来同步交换数据。消费中通过调用Exchanger与生产者进行同步来获取数据,而生产者则通过for循环向缓存队列存储数据并使用exchanger对象消费者同步。到消费者从exchanger哪里得到数据后,他的缓冲列表中有3个数据,而生产者得到的则是一个空的列表。上面的例子充分展示了消费者-生产者是如何利用Exchanger来完成数据交换的。
在Exchanger中,如果一个线程已经到达了exchanger节点时,对于它的伙伴节点的情况有三种:
- 如果它的伙伴节点在该线程到达之前已经调用了exchanger方法,则它会唤醒它的伙伴然后进行数据交换,得到各自数据返回。
- 如果它的伙伴节点还没有到达交换点,则该线程将会被挂起,等待它的伙伴节点到达被唤醒,完成数据交换。
- 如果当前线程被中断了则抛出异常,或者等待超时了,则抛出超时异常。
5、总结
Exchanger 还是很有意思的,用于线程之间两两交换数据,在多线程下,互相交换数据的两个线程是不确定的。
在竞争比较小的时候,采用单槽位进行交换数据,当线程来交换数据时,发现槽位为空,则自己在这里等待,否则就和槽位进行交换数据,同时会唤醒等待的线程。
在竞争比较激烈的情况下,就会转到多槽位的交换,这个多槽位的交换,其实思想还是很好理解,但是局部有些细节确实还没有理解透,同时调试也困难,只有自己脑海里不挺的模拟,但是这个可能模拟出错(哈哈)。但是对多槽位的大致思想应该还是明白了,当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待,如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,如果有数据就交换,没数据就等一会,但是不会阻塞在这里,在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪,如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。
第一个槽位并不是指的数组中的第一个,而是逻辑第一个,因为存在伪共享,多槽位中,部分空间没有被利用。
Exchanger,Semaphore,CountDownLatch与CyclicBarrier区别:
- CountDownLatch和CyclicBarrier都能够实现线程之间的等待,Semaphore主要是计数。
- Exchanger主要是数据交换
评论区