侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 377 篇文章
  • 累计创建 136 个标签
  • 累计收到 12 条评论

目 录CONTENT

文章目录

Java多线程编程之Exchanger的使用方法

孔子说JAVA
2022-12-19 / 0 评论 / 0 点赞 / 66 阅读 / 20,496 字 / 正在检测是否收录...

Exchanger(交换者)是一个用于线程间协作的工具类,主要用于两个线程间的通信,Exchanger提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

1、概述

Exchanger来自于JDK1.5的JUC包,顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。

  • 每次只能两个线程交换数据,如果有多个线程,也只有两个能交换数据。
  • Exchanger是双向的数据传输,2个线程在一个同步点,交换数据。先到的线程会等待第二个线程执行exchange, SynchronousQueue,是2个线程之间单向的数据传输,一个put,一个take。

2、源码解析

Exchanger 使用是非常简单的,但是实现原理和前面几种工具比较确实最难的,前面几种工具都是通过同步器或者锁来实现,而Exchanger 是一种无锁算法,和前面SynchronousQueue 一样,都是通过循环 cas 来实现线程安全,因此这种方式就会显得比较抽象和麻烦。

Exchanger 有单槽位和多槽位之分,单个槽位在同一时刻只能用于两个线程交换数据,这样在竞争比较激烈的时候,会影响到性能,多个槽位就是多个线程可以同时进行两个的数据交换,彼此之间不受影响,这样可以很好的提高吞吐量。

Exchanger直接继承自Object。类定义如下:

public class Exchanger<V>

其中V表示需要交换的对象类型。

2.1 构造器

Exchanger提供一个无参构造函数。

public Exchanger() {
   participant = new Participant();
}

2.2 属性

Exchanger包含的属性如下:

    //ThreadLocal变量,每个线程都有自己的一个副本
    private final Participant participant;
 
    //高并发下使用的,保存待匹配的Node实例
    private volatile Node[] arena;
 
    //低并发下,arena未初始化时使用的保存待匹配的Node实例
    private volatile Node slot;
    
    //初始值为0,当创建arena后会被赋值成SEQ,用来记录arena数组的可用最大索引,会随着并发的增大而增大直到等于最大值FULL,会随着并行的线程逐一匹配成功而减少恢复成初始值
    private volatile int bound;

还有多个表示字段偏移量的静态属性,通过static代码块初始化,如下:

image-1670542411279

Exchanger 定义了多个静态常量,如下:

    // 初始化arena时使用,1 << ASHIFT是一个缓存行的大小,避免不同的Node落入到同一个高速缓存行
    // 这里实际是把数组容量扩大了8倍,原来索引相邻的两个元素,扩容后中间隔了7个元素,从元素的起始地址上看就隔了8个元素,中间的7个都是空的,为了避免原来相邻的两个元素落到同一个缓存行中
    //因为arena是对象数组,一个元素占8字节,8个就是64字节
    private static final int ASHIFT = 7;
 
    //arena数组元素的索引最大值即255
    private static final int MMASK = 0xff;
 
    //arena数组的最大长度即256
    private static final int SEQ = MMASK + 1;
 
    //获取CPU核数
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
 
    //实际的数组长度,因为是线程两两配对的,所以最大长度是核数除以2
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
 
    //自旋等待的次数
    private static final int SPINS = 1 << 10;
 
    //如果交换的对象是null,则返回此对象
    private static final Object NULL_ITEM = new Object();
    
    //如果等待超时导致交换失败,则返回此对象
    private static final Object TIMED_OUT = new Object();

上述属性中Node和Participant都是内部类,其定义如下:

image-1670542453417

其中Contended注解是为了避免高速缓存行导致的伪共享问题,index用来记录arena数组的索引,bound用于记录上一次的Exchanger bound属性,collides用于记录在bound不变的情况下CAS抢占失败的次数,hash是自旋等待时计算随机数使用的,item表示当前线程请求交换的对象,match是同其他线程交换的结果,match不为null表示交换成功,parked为跟该Node关联的处于休眠状态的线程。

2.3 数据结构

槽位定义:

@sun.misc.Contended static final class Node {
    int index;              //arena的下标,多个槽位的时候利用
    int bound;              // 上一次记录的Exchanger.bound;
    int collides;           // 在当前bound下CAS失败的次数;
    int hash;               // 用于自旋;
    Object item;            // 这个线程的当前项,也就是需要交换的数据;
    volatile Object match;  // 交换的数据
    volatile Thread parked; // 线程
}
/**
 * Value representing null arguments/returns from public
 * methods. Needed because the API originally didn't disallow null
 * arguments, which it should have.
 * 如果交换的数据为 null,则用NULL_ITEM  代替
 */
private static final Object NULL_ITEM = new Object();

Node 定义中,index,bound,collides 这些都是用于多槽位的,item 是本线程需要交换的数据,match 是和其它线程交换后的数据,开始是为null,交换数据成功后,就是我们需要的数据了,parked记录线程,用于阻塞和唤醒线程。

/** The number of CPUs, for sizing and spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
 * The bound for spins while waiting for a match. The actual
 * number of iterations will on average be about twice this value
 * due to randomization. Note: Spinning is disabled when NCPU==1.
 */
private static final int SPINS = 1 << 10; // 自旋次数
/**
 * Slot used until contention detected.
 */
private volatile Node slot; // 用于交换数据的槽位

Node是每个线程自身用于数据交换的节点,相当于每个Node就代表了每个线程,为了保证线程安全,把线程的Node 节点 放在哪里呢,当然是ThreadLocal咯。

/**
 * Per-thread state  每个线程的数据,ThreadLocal 子类
 */
private final Participant participant;

/** The corresponding thread local class */
 static final class Participant extends ThreadLocal<Node> {
     // 初始值返回Node
     public Node initialValue() { return new Node(); }
 }

2.4 方法解析

Exchanger提供了两个主要方法:

// 阻塞当前线程,直到另外一个线程调用exchange方法或者当前线程被中断。
public V exchange(V x) throws InterruptedException

v2-a9664bb8a0ca8f01248251d68bb04735_r

当这个方法被调用的时候,当前线程将会等待直到其他的线程调用同样的方法。当其他的线程调用exchange之后,当前线程将会继续执行。

  • x: 需要交换的对象。
  • 在等待过程中,如果有其他的线程interrupt当前线程,则会抛出InterruptedException。
// 阻塞当前线程,直到另外一个线程调用exchange方法或者当前线程被中断或者等待超时。
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

v2-c46ae50381bd62d8243b24c6ec85612d_r

和第一个方法类似,区别是多了一个timeout时间。如果在timeout时间之内没有其他线程调用exchange方法,则会抛出TimeoutException。

  • x: 需要交换的对象。
  • timeout:超时时间。
  • unit:超时时间单位。

2.4.1 exchange方法解析

没有设定超时时间的exchange 方法

public V exchange(V x) throws InterruptedException {
    Object v;
    Object item = (x == null) ? NULL_ITEM : x; // translate null args
    if ((arena != null ||
         (v = slotExchange(item, false, 0L)) == null) &&
        ((Thread.interrupted() || // disambiguates null return
          (v = arenaExchange(item, false, 0L)) == null)))
        throw new InterruptedException();
    return (v == NULL_ITEM) ? null : (V)v;
}

具有超时功能的exchange 方法

public V exchange(V x, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    Object v;
    Object item = (x == null) ? NULL_ITEM : x;
    long ns = unit.toNanos(timeout);
    if ((arena != null ||
         (v = slotExchange(item, true, ns)) == null) &&
        ((Thread.interrupted() ||
          (v = arenaExchange(item, true, ns)) == null)))
        throw new InterruptedException();
    if (v == TIMED_OUT)
        throw new TimeoutException();
    return (v == NULL_ITEM) ? null : (V)v;
}

exchange 执行单槽位交换还是多槽位交换,同时如果发生中断,则返回前会重设中断标志位 这几个操作通过一个语句来实现,因此看的时候可能需要仔细一点。

两个方法,主要的不同还是在于是否有超时时间设置,如果有超时时间设置,那么如果在指定的时间内没有交换到数据,那么就会返回(抛出超时异常),不会一直等待。

2.4.2 单槽位交换解析

单槽位交换的核心方法slotExchange

private final Object slotExchange(Object item, boolean timed, long ns) {
    // 得到一个初试的Node
    Node p = participant.get();
    // 当前线程
    Thread t = Thread.currentThread();
    // 如果发生中断,返回null,会重设中断标志位,并没有直接抛异常
    if (t.isInterrupted()) // preserve interrupt status so caller can recheck
        return null;

    for (Node q;;) {
        // 槽位 solt不为null,则说明已经有线程在这里等待交换数据了
        if ((q = slot) != null) {
            // 重置槽位
            if (U.compareAndSwapObject(this, SLOT, q, null)) {
                //获取交换的数据
                Object v = q.item;
                //等待线程需要的数据
                q.match = item;
                //等待线程
                Thread w = q.parked;
                //唤醒等待的线程
                if (w != null)
                    U.unpark(w);
                return v; // 返回拿到的数据,交换完成
            }
            // create arena on contention, but continue until slot null
            //存在竞争,其它线程抢先了一步该线程,因此需要采用多槽位模式,这个后面再分析
            if (NCPU > 1 && bound == 0 &&
                U.compareAndSwapInt(this, BOUND, 0, SEQ))
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        else if (arena != null) //多槽位不为空,需要执行多槽位交换
            return null; // caller must reroute to arenaExchange
        else { //还没有其他线程来占据槽位
            p.item = item;
            // 设置槽位为p(也就是槽位被当前线程占据)
            if (U.compareAndSwapObject(this, SLOT, null, p))
                break; // 退出无限循环
            p.item = null; // 如果设置槽位失败,则有可能其他线程抢先了,重置item,重新循环
        }
    }

    //当前线程占据槽位,等待其它线程来交换数据
    int h = p.hash;
    long end = timed ? System.nanoTime() + ns : 0L;
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    // 直到成功交换到数据
    while ((v = p.match) == null) {
        if (spins > 0) { // 自旋
            h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
            if (h == 0)
                h = SPINS | (int)t.getId();
            else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                // 主动让出cpu,这样可以提供cpu利用率(反正当前线程也自旋等待,还不如让其它任务占用cpu)
                Thread.yield(); 
        }
        else if (slot != p) //其它线程来交换数据了,修改了solt,但是还没有设置match,再稍等一会
            spins = SPINS;
        //需要阻塞等待其它线程来交换数据
        //没发生中断,并且是单槽交换,没有设置超时或者超时时间未到 则继续执行
        else if (!t.isInterrupted() && arena == null &&
                 (!timed || (ns = end - System.nanoTime()) > 0L)) {
            // cas 设置BLOCKER,可以参考Thread 中的parkBlocker
            U.putObject(t, BLOCKER, this);
            // 需要挂起当前线程
            p.parked = t;
            if (slot == p)
                U.park(false, ns); // 阻塞当前线程
            // 被唤醒后    
            p.parked = null;
            // 清空 BLOCKER
            U.putObject(t, BLOCKER, null);
        }
        // 不满足前面 else if 条件,交换失败,需要重置solt
        else if (U.compareAndSwapObject(this, SLOT, p, null)) {
            v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    //清空match
    U.putOrderedObject(p, MATCH, null);
    p.item = null;
    p.hash = h;
    // 返回交换得到的数据(失败则为null)
    return v;
}

当一个线程来交换数据时,如果发现槽位(solt)有数据时,说明其它线程已经占据了槽位,等待交换数据,那么当前线程就和该槽位进行数据交换,设置相应字段,如果交换失败,则说明其它线程抢先了该线程一步和槽位交换了数据,那么这个时候就存在竞争了,这个时候就会生成多槽位(area),后面就会进行多槽位交换了。

如果来交换的线程发现槽位没有被占据,啊哈,这个时候自己就把槽位占据了,如果占据失败,则有可能其他线程抢先了占据了槽位,重头开始循环。

当来交换的线程占据了槽位后,就需要等待其它线程来进行交换数据了,首先自己需要进行一定时间的自旋,因为自旋期间有可能其它线程就来了,那么这个时候就可以进行数据交换工作,而不用阻塞等待了,如果不幸,进行了一定自旋后,没有其他线程到来,那么还是避免不了需要阻塞(如果设置了超时等待,发生了超时或中断异常,则退出,不阻塞等待),当准备阻塞线程的时候,发现槽位值变了,那么说明其它线程来交换数据了,但是还没有完全准备好数据,这个时候就不阻塞了,再稍微等那么一会,如果始终没有等到其它线程来交换,那么就挂起当前线程。

当其它线程到来并成功交换数据后,会唤醒被阻塞的线程,阻塞的线程被唤醒后,拿到数据(如果是超时,或中断,则数据为null)返回,结束。

单槽位的交换就结束了,整个过程应该不难,如果竞争激烈,那么一个槽位显然就成了性能瓶颈了,因此就衍生出了多槽位交换,各自交换各自的,互不影响。

2.4.3 多槽位交换解析

多槽位实际就是一个 Node 数组,代表了很多的槽位。多槽位交换用 arenaExchange 方法。

多槽的交换大致思想就是:当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待,如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,如果有数据就交换,没数据就等一会,但是不会阻塞在这里,在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪,如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。

简单来说,如果有竞争冲突,那么就寻找后面的槽位,在后面的槽位等待一定时间,没有线程来交换,那么就又往前挪。

private final Object arenaExchange(Object item, boolean timed, long ns) {
    // 槽位数组
    Node[] a = arena;
    //代表当前线程的Node
    Node p = participant.get(); // p.index 初始值为 0
    for (int i = p.index;;) {                      // access slot at i
        int b, m, c; long j;                       // j is raw array offset
        //在槽位数组中根据"索引" i 取出数据 j相当于是 "第一个"槽位
        Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
        // 该位置上有数据(即有线程在这里等待交换数据)
        if (q != null && U.compareAndSwapObject(a, j, q, null)) {
            // 进行数据交换,这里和单槽位的交换是一样的
            Object v = q.item;                     // release
            q.match = item;
            Thread w = q.parked;
            if (w != null)
                U.unpark(w);
            return v;
        }
        // bound 是最大的有效的 位置,和MMASK相与,得到真正的存储数据的索引最大值
        else if (i <= (m = (b = bound) & MMASK) && q == null) {
            // i 在这个范围内,该槽位也为空

            //将需要交换的数据 设置给p
            p.item = item;                         // offer
            //设置该槽位数据(在该槽位等待其它线程来交换数据)
            if (U.compareAndSwapObject(a, j, null, p)) {
                long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                Thread t = Thread.currentThread(); // wait
                // 进行一定时间的自旋
                for (int h = p.hash, spins = SPINS;;) {
                    Object v = p.match;
                    //在自旋的过程中,有线程来和该线程交换数据
                    if (v != null) {
                        //交换数据后,清空部分设置,返回交换得到的数据,over
                        U.putOrderedObject(p, MATCH, null);
                        p.item = null;             // clear for next use
                        p.hash = h;
                        return v;
                    }
                    else if (spins > 0) {
                        h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                        if (h == 0)                // initialize hash
                            h = SPINS | (int)t.getId();
                        else if (h < 0 &&          // approx 50% true
                                 (--spins & ((SPINS >>> 1) - 1)) == 0)
                            Thread.yield();        // two yields per wait
                    }
                    // 交换数据的线程到来,但是还没有设置好match,再稍等一会
                    else if (U.getObjectVolatile(a, j) != p)
                        spins = SPINS; 
                    //符合条件,特别注意m==0 这个说明已经到达area 中最小的存储数据槽位了
                    //没有其他线程在槽位等待了,所有当前线程需要阻塞在这里     
                    else if (!t.isInterrupted() && m == 0 &&
                             (!timed ||
                              (ns = end - System.nanoTime()) > 0L)) {
                        U.putObject(t, BLOCKER, this); // emulate LockSupport
                        p.parked = t;              // minimize window
                        // 再次检查槽位,看看在阻塞前,有没有线程来交换数据
                        if (U.getObjectVolatile(a, j) == p) 
                            U.park(false, ns); // 挂起
                        p.parked = null;
                        U.putObject(t, BLOCKER, null);
                    }
                    // 当前这个槽位一直没有线程来交换数据,准备换个槽位试试
                    else if (U.getObjectVolatile(a, j) == p &&
                             U.compareAndSwapObject(a, j, p, null)) {
                        //更新bound
                        if (m != 0)                // try to shrink
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                        p.item = null;
                        p.hash = h;
                        // 减小索引值 往"第一个"槽位的方向挪动
                        i = p.index >>>= 1;        // descend
                        // 发送中断,返回null
                        if (Thread.interrupted())
                            return null;
                        // 超时
                        if (timed && m == 0 && ns <= 0L)
                            return TIMED_OUT;
                        break;                     // expired; restart 继续主循环
                    }
                }
            }
            else
                //占据槽位失败,先清空item,防止成功交换数据后,p.item还引用着item
                p.item = null;                     // clear offer
        }
        else { // i 不在有效范围,或者被其它线程抢先了
            //更新p.bound
            if (p.bound != b) {                    // stale; reset
                p.bound = b;
                //新bound ,重置collides
                p.collides = 0;
                //i如果达到了最大,那么就递减
                i = (i != m || m == 0) ? m : m - 1;
            }
            else if ((c = p.collides) < m || m == FULL ||
                     !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                p.collides = c + 1; // 更新冲突
                // i=0 那么就从m开始,否则递减i
                i = (i == 0) ? m : i - 1;          // cyclically traverse
            }
            else
                //递增,往后挪动
                i = m + 1;                         // grow
            // 更新index
            p.index = i;
        }
    }
}

3、使用场景

Exchanger应用在遗传算法和管道的设计。遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。

Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致。

使用 Exchanger 在线程间交换缓冲区,因此,在需要时,填充缓冲区的线程获取一个新腾空的缓冲区,并将填满的缓冲区传递给腾空缓冲区的线程。

4、代码示例

4.1 示例一

package com.kz.example.juc;

import java.util.concurrent.Exchanger;

/**
 * 使用Exchanger
 *
 * @Author kongzi
 * @Date 2022/12/8 18:30
 * @Version 1.0
 */
public class UseExchangerDemo2 {
    static Exchanger<String> exchanger = new Exchanger<String>();

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                String result = exchanger.exchange(Thread.currentThread().getName());
                System.out.println("this is " + Thread.currentThread().getName() + " receive data:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Thread t1 = new Thread(new Task(), "thread1");
        Thread t2 = new Thread(new Task(), "thread2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }
}

执行结果:

this is thread1 receive data:thread2
this is thread2 receive data:thread1

4.2 示例二

每次只能两个线程交换数据,如果有多个线程,也只有两个能交换数据。下面看个通俗的例子:一手交钱一首交货!

package com.kz.example.juc;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 使用Exchanger
 *
 * @Author kongzi
 * @Date 2022/12/8 18:48
 * @Version 1.0
 */
public class UseExchangerTest {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        // 定义一个交换对象,用来交换数据
        final Exchanger exchanger = new Exchanger();
        // 开启一个线程执行任务
        service.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String data1 = "工艺品";
                    System.out.println("线程" + Thread.currentThread().getName() + "正在把货物" + data1 + "拿出来");
                    Thread.sleep((long) (Math.random() * 10000));
                    // 把要交换的数据传到exchange方法中,然后被阻塞,等待另一个线程与之交换。返回交换后的数据
                    String data2 = (String) exchanger.exchange(data1);
                    System.out.println("线程" + Thread.currentThread().getName() + "用" + data1 + "换来了" + data2);
                } catch (Exception e) {
                } finally {
                    service.shutdown();
                    System.out.println("交易完毕,拿着钱快跑!");
                }
            }
        });
        // 开启另一个线程执行任务
        service.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String data1 = "300万";
                    System.out.println("线程" + Thread.currentThread().getName() + "正在把" + data1 + "拿出来");
                    Thread.sleep((long) (Math.random() * 10000));
                    String data2 = (String) exchanger.exchange(data1);
                    System.out.println("线程" + Thread.currentThread().getName() + "用300万弄到了" + data2);
                } catch (Exception e) {
                } finally {
                    service.shutdown();
                    System.out.println("交易完毕,拿着工艺品快跑!");
                }
            }
        });
    }
}

执行结果:

线程pool-1-thread-1正在把货物工艺品拿出来
线程pool-1-thread-2正在把300万拿出来
线程pool-1-thread-2用300万弄到了工艺品
线程pool-1-thread-1用工艺品换来了300万
交易完毕,拿着工艺品快跑!
交易完毕,拿着钱快跑!

4.3 示例三

调用exchange超时方法,若超时未交换数据则会抛出异常。如:卖家被放鸽子,交易失败!

package com.kz.example.juc;

import java.util.concurrent.*;

/**
 * 使用Exchanger
 *
 * @Author kongzi
 * @Date 2022/12/8 18:59
 * @Version 1.0
 */
public class UseExchangerTest2 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        // 定义一个交换对象,用来交换数据
        final Exchanger exchanger = new Exchanger();
        // 开启一个线程执行任务
        service.execute(new Runnable() {
            @Override
            public void run() {
                String data2 = null;
                try {
                    String data1 = "工艺品";
                    System.out.println("线程" + Thread.currentThread().getName() + "正在把货物" + data1 + "拿出来");
                    // 把要交换的数据传到exchange方法中,然后被阻塞,等待另一个线程与之交换。返回交换后的数据
                    data2 = (String) exchanger.exchange(data1, 5, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException ex) {
                    System.out.println("等待超时-TimeoutException");
                } finally {
                    service.shutdown();
                }
                if (data2 == null) {
                    System.out.println("卖家被放鸽子,交易失败!");
                }
            }
        });
    }
}

执行结果:

线程pool-1-thread-1正在把货物工艺品拿出来
等待超时-TimeoutException
卖家被放鸽子,交易失败!

4.4 示例四

使用Exchanger在线程之间交换缓冲区:填满的缓冲区得到清空,清空的缓冲区得到填满。

package com.kz.example.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 使用Exchanger在线程之间交换缓冲区:填满的缓冲区得到清空,清空的缓冲区得到填满。
 *
 * @Author kongzi
 * @Date 2022/12/8 19:44
 * @Version 1.0
 */
public class UseExchangerFillAndEmpty {
    Exchanger<DataBuffer> exchanger = new Exchanger<>();
    DataBuffer initialEmptyBuffer = new DataBuffer();
    DataBuffer initialFullBuffer = new DataBuffer();

    class FillingLoop implements Runnable {
        @Override
        public void run() {
            DataBuffer currentBuffer = initialEmptyBuffer;
            try {
                while (currentBuffer != null) {
                    addToBuffer(currentBuffer);
                    if (currentBuffer.isFull()) {
                        currentBuffer = exchanger.exchange(currentBuffer);
                    }
                }
            } catch (InterruptedException ex) {
                System.out.println("... handle ...");
            }
        }
    }

    private void addToBuffer(DataBuffer currentBuffer) {
        if (currentBuffer.list.size() == 0) {
            for (int i = 0; i < 10; i++) {
                currentBuffer.add(i);
            }
        }

    }

    class EmptyingLoop implements Runnable {
        @Override
        public void run() {
            DataBuffer currentBuffer = initialFullBuffer;
            try {
                while (currentBuffer != null) {
                    takeFromBuffer(currentBuffer);
                    if (currentBuffer.isEmpty()) {
                        currentBuffer = exchanger.exchange(currentBuffer);
                    }
                }
            } catch (InterruptedException ex) {
                System.out.println("... handle ...");
            }
        }
    }

    private void takeFromBuffer(DataBuffer currentBuffer) {
        if (currentBuffer.list.size() == 10) {
            for (int i = 0; i < 10; i++) {
                currentBuffer.remove(i);
            }
        }
    }

    void start() {
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
    }

    public static void main(String[] args) {
        new UseExchangerFillAndEmpty().start();
    }

    class DataBuffer {
        private List<Integer> list = new ArrayList<>();


        public boolean isFull() {
            if (list.size() == 10) {
                return true;
            } else {
                return false;
            }
        }

        public boolean isEmpty() {
            if (list.size() == 0) {
                return true;
            } else {
                return false;
            }
        }

        public void add(Integer i) {
            list.add(i);
        }

        public void remove(Integer i) {
            list.remove(i);
        }

    }
}

4.5 示例五

Exchanger用于两个线程之间交换数据,当然实际参与的线程可以不止两个。

package com.kz.example.juc;

import java.security.SecureRandom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

/**
 * Exchanger用于两个线程之间交换数据,当然实际参与的线程可以不止两个
 *
 * @Author kongzi
 * @Date 2022/12/8 18:30
 * @Version 1.0
 */
public class UseExchangerDemo3 {

    public static void main(String[] args) throws Exception {

        Exchanger<String> exchanger = new Exchanger<>();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    SecureRandom rand = new SecureRandom();
                    String origMsg = String.valueOf(rand.nextInt(1000));
                    //先到达的线程会在此等待,直到有一个线程跟他交互数据或者等待超时
                    String exchangeMsg = exchanger.exchange(origMsg, 5, TimeUnit.SECONDS);
                    System.out.println(Thread.currentThread().getName() + ",origMsg->" + origMsg + ",exchangeMsg->" + exchangeMsg);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        for (int i = 0; i < 5; i++) {
            new Thread(runnable).start();
        }
        countDownLatch.await();
    }
}

执行结果,第5个线程因为没有匹配的线程而等待超时:

Thread-3,origMsg->452,exchangeMsg->477
Thread-0,origMsg->477,exchangeMsg->452
Thread-4,origMsg->998,exchangeMsg->880
Thread-2,origMsg->880,exchangeMsg->998
java.util.concurrent.TimeoutException
	at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
	at com.kz.example.juc.UseExchangerDemo3$1.run(UseExchangerDemo3.java:28)
	at java.lang.Thread.run(Thread.java:745)

4.6 示例六

package com.kz.example.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 模拟消息消费的场景来观察Exchanger的行为
 *
 * @Author kongzi
 * @Date 2022/12/8 18:30
 * @Version 1.0
 */
public class UseExchangerConsumer {
    static class Producer implements Runnable {

        // 生产者、消费者交换的数据结构
        private List<String> buffer;

        // 生产者和消费者的交换对象
        private Exchanger<List<String>> exchanger;

        Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
            this.buffer = buffer;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            for (int i = 1; i < 5; i++) {
                System.out.println("生产者第" + i + "次提供");
                for (int j = 1; j <= 3; j++) {
                    System.out.println("生产者装入" + i + "--" + j);
                    buffer.add("buffer:" + i + "--" + j);
                }

                System.out.println("生产者装满,等待与消费者交换...");
                try {
                    exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer implements Runnable {
        private List<String> buffer;

        private final Exchanger<List<String>> exchanger;

        public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
            this.buffer = buffer;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            for (int i = 1; i < 5; i++) {
                //调用exchange()与消费者进行数据交换
                try {
                    buffer = exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("消费者第" + i + "次提取");
                for (int j = 1; j <= 3; j++) {
                    System.out.println("消费者 : " + buffer.get(0));
                    buffer.remove(0);
                }
            }
        }
    }

    public static void main(String[] args) {
        List<String> buffer1 = new ArrayList<String>();
        List<String> buffer2 = new ArrayList<String>();

        Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

        Thread producerThread = new Thread(new Producer(buffer1, exchanger));
        Thread consumerThread = new Thread(new Consumer(buffer2, exchanger));

        producerThread.start();
        consumerThread.start();
    }
}

执行结果:

生产者第1次提供
生产者装入1--1
生产者装入1--2
生产者装入1--3
生产者装满,等待与消费者交换...
消费者第1次提取
消费者 : buffer:1--1
消费者 : buffer:1--2
消费者 : buffer:1--3
生产者第2次提供
生产者装入2--1
生产者装入2--2
生产者装入2--3
生产者装满,等待与消费者交换...
生产者第3次提供
生产者装入3--1
消费者第2次提取
生产者装入3--2
消费者 : buffer:2--1
生产者装入3--3
消费者 : buffer:2--2
生产者装满,等待与消费者交换...
消费者 : buffer:2--3
消费者第3次提取
生产者第4次提供
消费者 : buffer:3--1
生产者装入4--1
消费者 : buffer:3--2
生产者装入4--2
消费者 : buffer:3--3
生产者装入4--3
生产者装满,等待与消费者交换...
消费者第4次提取
消费者 : buffer:4--1
消费者 : buffer:4--2
消费者 : buffer:4--3

首先生产者Producer、消费者Consumer首先都创建一个缓冲列表,通过Exchanger来同步交换数据。消费中通过调用Exchanger与生产者进行同步来获取数据,而生产者则通过for循环向缓存队列存储数据并使用exchanger对象消费者同步。到消费者从exchanger哪里得到数据后,他的缓冲列表中有3个数据,而生产者得到的则是一个空的列表。上面的例子充分展示了消费者-生产者是如何利用Exchanger来完成数据交换的。

在Exchanger中,如果一个线程已经到达了exchanger节点时,对于它的伙伴节点的情况有三种:

  1. 如果它的伙伴节点在该线程到达之前已经调用了exchanger方法,则它会唤醒它的伙伴然后进行数据交换,得到各自数据返回。
  2. 如果它的伙伴节点还没有到达交换点,则该线程将会被挂起,等待它的伙伴节点到达被唤醒,完成数据交换。
  3. 如果当前线程被中断了则抛出异常,或者等待超时了,则抛出超时异常。

5、总结

Exchanger 还是很有意思的,用于线程之间两两交换数据,在多线程下,互相交换数据的两个线程是不确定的。

在竞争比较小的时候,采用单槽位进行交换数据,当线程来交换数据时,发现槽位为空,则自己在这里等待,否则就和槽位进行交换数据,同时会唤醒等待的线程。

在竞争比较激烈的情况下,就会转到多槽位的交换,这个多槽位的交换,其实思想还是很好理解,但是局部有些细节确实还没有理解透,同时调试也困难,只有自己脑海里不挺的模拟,但是这个可能模拟出错(哈哈)。但是对多槽位的大致思想应该还是明白了,当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待,如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,如果有数据就交换,没数据就等一会,但是不会阻塞在这里,在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪,如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。

第一个槽位并不是指的数组中的第一个,而是逻辑第一个,因为存在伪共享,多槽位中,部分空间没有被利用。

Exchanger,Semaphore,CountDownLatch与CyclicBarrier区别:

  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,Semaphore主要是计数。
  • Exchanger主要是数据交换
0

评论区