侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 285 篇文章
  • 累计创建 125 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Java并发编程之原子操作类AtomicReference实战详解

孔子说JAVA
2022-06-10 / 0 评论 / 0 点赞 / 63 阅读 / 11,016 字 / 正在检测是否收录...

并发编程原子类完整教程:Java并发编程之原子操作类实战教程

AtomicReference 是java.util.concurrent(简写为JUC)包下的类,和 AtomicInteger、AtomicBoolean 等类是一样(这些类也是JUC下的),也是基于CAS无锁理论实现的,但是不同的是 AtomicInteger、AtomicBoolean 等类只能并发修改一个属性,而 AtomicReference 是可以操控多个属性的原子性的并发类(即对多个属性同时进行并发修改),该类提供了对象引用的非阻塞原子性读写操作,并且提供了其他一些高级的用法。

  • AtomicReference 和 AtomicInteger ⾮常类似,不同之处就在于 AtomicInteger 是对整数的封装,⽽ AtomicReference 则对应普通的对
    象引⽤,是操控多个属性的原子性的并发类。也就是它可以保证你在修改对象引⽤时的线程安全性。
  • AtomicReference 是作⽤是对”对象”进⾏原⼦操作,提供了⼀种读和写都是原⼦性的对象引⽤变量。原⼦意味着多个线程试图改变同
    ⼀个 AtomicReference (例如⽐较和交换操作) 不会使 AtomicReference 处于不⼀致的状态。

1、AtomicReference 主要方法

1.1 构造方法

AtomicReference 是一个泛型类,它的构造与其他原子类型的构造一样,也提供了无参和一个有参的构造函数。示例如下:

// 当使用无参构造函数创建AtomicReference对象的时候,
// 需要再次调用set()方法为AtomicReference内部的value指定初始值。
AtomicReference()

// 使用给定的初始值创建新的 AtomicReference 对象。
AtomicReference(V initialValue);

1.2 主要方法

// 获取AtomicReference的当前对象引用值。
V get()

// 设置AtomicReference最新的对象引用值,该新值的更新对其他线程立即可见。
void set(V newValue)

// 设置AtomicReference的对象引用值最终为给定值。
void lazySet(V newValue)

// 原子性地更新AtomicReference内部的value值,并且返回AtomicReference的旧值。
V getAndSet(V newValue)

/**
原子性地更新AtomicReference内部的value值,
其中expect代表当前AtomicReference的value值,update则是需要设置的新引用值。
该方法会返回一个boolean的结果,
如果当前值 == 预期值expect,则以原子方式将该值设置为给定的更新值,并返回true。
当expect和AtomicReference的当前值不相等时,修改会失败,返回值为false,
**/
boolean compareAndSet(V expect, V update)

// 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
boolean weakCompareAndSet(V expect, V update)

// 原子性地更新value值,并且返回AtomicReference的旧值,该方法需要传入一个Function接口。
getAndUpdate(UnaryOperator<V> updateFunction)

// 原子性地更新value值,并且返回AtomicReference更新后的新值,该方法需要传入一个Function接口。
updateAndGet(UnaryOperator<V> updateFunction)

// 原子性地更新value值,并且返回AtomicReference更新前的旧值。
// 该方法需要传入两个参数,第一个是更新后的新值,第二个是BinaryOperator接口。
getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction)

// 原子性地更新value值,并且返回AtomicReference更新后的值。
// 该方法需要传入两个参数,第一个是更新的新值,第二个是BinaryOperator接口。
accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction)

2、AtomicReference 演示示例

2.1 演示示例1

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
    
    public static void main(String[] args){

        // 创建两个Person对象,它们的id分别是101和102。
        Person p1 = new Person(101);
        Person p2 = new Person(102);
        // 新建AtomicReference对象,初始化它的值为p1对象
        AtomicReference ar = new AtomicReference(p1);
        // 通过CAS设置ar。如果ar的值为p1的话,则将其设置为p2。
        ar.compareAndSet(p1, p2);

        Person p3 = (Person)ar.get();
        System.out.println("p3 is "+p3);
        System.out.println("p3.equals(p1)="+p3.equals(p1));
    }
}

class Person {
    volatile long id;
    public Person(long id) {
        this.id = id;
    }
    public String toString() {
        return "id:"+id;
    }
}

运行结果:

p3 is id:102
p3.equals(p1)=false

运行过程解析:

首先新建一个 AtomicReference 对象 ar 并将它初始化为p1。紧接着,通过CAS函数对它进行条件设置,当ar的值为 p1 时将其设置为 p2。最后获取ar对应的对象,并打印结果。p3.equals(p1)的结果为false,这是因为Person并没有覆盖equals()方法,而是采用继承自Object.java的equals()方法;而Object.java中的equals()实际上是调用"=="去比较两个对象,即比较两个对象的地址是否相等。

2.2 演示示例2

AtomicReference 的引入是为了可以用一种类似乐观锁的方式操作共享资源,在特定场景下用以提升性能。

  • synchronized是一种阻塞式的解决方案,同一时刻只能有一个线程真正在工作,其他线程都将陷入阻塞,因此这并不是一种高效的解决方案,这个时候就可以利用 AtomicReference 的非阻塞原子性解决方案提供更加高效的方式了。

如我们用多个线程同时访问共享资源时,一般情况下需要以加锁的方式控制并发,这是一种对共享资源加悲观锁的访问方式,类似代码如下:

volatile Foo sharedValue = value;
Lock lock = new ReentrantLock();

lock.lock();
try{
    // 操作共享资源sharedValue
}
finally{
    lock.unlock();
}

AtomicReference 则提供了一种以无锁方式访问共享资源的能力(乐观锁),让我们通过具体示例看下如何通过 AtomicReference 来保证线程安全:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
    public static void main(String[] args) throws InterruptedException {
        AtomicReference<Integer> ref=new AtomicReference<Integer>(new Integer(1000));
        List<Thread> list=new ArrayList<Thread>();
        for(int i=0;i<1000;i++){
            Thread t=new Thread(new Task(ref),"Thread-"+i);
            list.add(t);
            t.start();
        }
        for(Thread t: list){
            System.out.println(t.getName());
            t.join();
        }
        System.out.println(ref.get());
    }
}

class Task implements Runnable{
    private AtomicReference<Integer> ref;
    public Task(AtomicReference<Integer> ref) {
        this.ref=ref;
    }
    @Override
    public void run() {
        for(;;){
            Integer oldV=ref.get();
            System.out.println(Thread.currentThread().getName()+":"+oldV);
            if(ref.compareAndSet(oldV, oldV+1)){
                break;
            }
        }
    }
}

上述代码最终会打印的结果为:2000

可以看到在本例中我们并没有使用锁,而是使用 自旋+CAS的无锁操作 保证共享变量的线程安全。1000个线程,每个线程对金额增加1,最终结果为2000,如果线程不安全,最终结果应该会小于2000。

通过示例,可以总结出AtomicReference的一般使用模式如下:

AtomicReference<Object> ref = new AtomicReference<>(new Object());
Object oldCache = ref.get();

// 对缓存oldCache做一些操作
Object newCache  =  someFunctionOfOld(oldCache); 

// 如果期间没有其它线程改变了缓存值,则更新
boolean success = ref.compareAndSet(oldCache , newCache);

2.3 演示示例3

在高并发的场景中,根据业务的需要,要求同时更新sequence和timestamp(并发修改多个属性)。

public class AtomicReferenceDemo {

    private Reference reference;
    
    private AtomicReference<Reference> atomicReference;
    
    /**
     * 构建器中初始化AtomicReference
     *
     * @param reference
     */
    public AtomicReferenceDemo(Reference reference) {
        this.reference = reference;
        this.atomicReference = new AtomicReference<>(reference);
    }
    
    public void atomic(Reference reference) {
        Reference referenceOld;
        Reference referenceNew;
    
        long sequence;
        long timestamp;
    
        while (true) {
            referenceOld = this.atomicReference.get();
            sequence = referenceOld.getSequence();
            sequence++;
            timestamp = System.currentTimeMillis();
    
            referenceNew = new Reference(sequence, timestamp);
            /**
             * 比较交换
             */
            if (this.atomicReference.compareAndSet(referenceOld, referenceNew)) {
                reference.setSequence(sequence);
                reference.setTimestamp(timestamp);
                break;
            }
        }
    }
}

/**
 * 序列需要自增并且时间需要更新成最新的时间戳
 */

@Data
@AllArgsConstructor
class Reference {
    /**
     * 序列
     */
    private long sequence;
    /**
     * 时间戳
     */
    private long timestamp;
}

上述示例代码的逻辑如下:

  • 获取并缓存原来的变量,这个变量包含原来的序列和时间戳
  • 基于原来的变量来更新新的时间戳和序列
  • 计算后,使用CAS操作更新原来的变量,更新的过程中,需要传递保存原来的变量
  • 如果保存的原来变量被其他线程修改了,就需要在这里重新拿到最新的变量,并再次计算和重试更新

3、源码分析

AtomicReference 的源码比较简单,它是通过"volatile"和"Unsafe提供的CAS函数实现"原子操作。下面对 AtomicReference 部分源码进行解析。

  • value是volatile类型。这保证了:当某线程修改value的值时,其他线程看到的value值都是最新的value值,即修改之后的volatile的值。
  • 通过CAS设置value。这保证了:当某线程池通过CAS函数(如compareAndSet函数)设置value时,它的操作是原子的,即线程在操作value时不会被中断。

3.1 实现基础 - 主要字段

AtomicReference 和 AtomicInteger 非常相似,它们内部都是用了下面三个属性:

  • Unsafe 是 sun.misc 包下面的类,AtomicReference 主要是依赖于 sun.misc.Unsafe 提供的一些 native 方法保证操作的原子性。
  • Unsafe 的 objectFieldOffset 方法可以获取成员属性在内存中的地址相对于对象内存地址的偏移量。这个偏移量也就是 valueOffset ,说得简单点就是找到这个变量在内存中的地址,便于后续通过内存地址直接进行操作。
  • value 就是 AtomicReference 中的实际值,因为有 volatile ,这个值实际上就是内存值。
// 序列化号
private static final long serialVersionUID = -1848883965231344442L;

// Unsafe是所有并发工具的基础工具类
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 获取value属性的偏移量
private static final long valueOffset;

// 引用变量value
private volatile V value;

静态块主要是对静态常量valueOffset初始化。

// 静态代码块,通过unsafe获取value属性的偏移量
static {
   try {
      valueOffset = unsafe.objectFieldOffset(AtomicReference.class.getDeclaredField("value"));
   } catch (Exception ex) { throw new Error(ex); }
}

3.2 实现基础 - 构造方法

构造方法有以下2个:

// 无参构造函数,创建一个关联对象为 null 的 AtomicReference 实例
public AtomicReference() {
}
    
// 使用给定的初始值创建一个新的AtomicReference对象
public AtomicReference(V initialValue) {
   value = initialValue;
}
  • AtomicReference()创建一个空的AtomicReference对象,一般情况下我们还需要再次调用set()方法为AtomicReference内部的value指定初始值.

  • AtomicReference(V initialValue)使用给定的初始值创建新的 AtomicReference 对象。

3.3 实现基础 - 主要方法

3.3.1 get和set方法

get() : 获取当前 AtomicReference 的值。set() : 设置当前 AtomicReference 的值

  • get() 可以原子性的读取 AtomicReference 中的数据,set() 可以原子性的设置当前的值,因为 get() 和 set() 最终都是作用于 value 变量,而 value 是由 volatile 修饰的,所以 get 、set 相当于都是对内存进行读取和设置。
/**
 * 读取当前值
 * with memory effects as specified by {@link VarHandle#getVolatile}.
 */
public final V get() {
   return value;
}

/**
 * 设为指定值
 */
public final void set(V newValue) {
   value = newValue;
}

3.3.2 lazySet方法

/**
 * 最终设为指定值,但其它线程不能马上看到变化,会延时一会
 */
public final void lazySet(V newValue) {
   unsafe.putOrderedObject(this, valueOffset, newValue);
}

lazySet与set实现的功能类似,二者区别如下:

  • lazySet实现的是最终一致性,set实现的是强一致性
  • lazySet:多线程并发时,线程A调用unsafe.putOrderedInt更新newValue值时,只是把线程A内存中的元素更新成功了,但其它线程此时看不到线程A更新的newValue值,需过一会,线程A更新的newValue才会刷入到主内存中,此时其它线程才能看到线程A更新的值
  • set: 多线程并发时,线程A更新的值会立即刷入主内存中
  • 总结:对于数据一致性要求高的建议用set

语义上,内存屏障之前的所有写操作都要写入内存;内存屏障之后的读操作都可以获得同步屏障之前的写操作的结果。因此,对于敏感的程序块,写操作之后、读操作之前可以插入内存屏障。内存屏障的开销非常轻量级,但是再小也是有开销的,LazySet 的作用正是如此,它会以普通变量的形式来读写变量。也可以说是:懒得设置屏障了

  • CPU 使用了很多优化,使用缓存、指令重排等,其最终的目的都是为了性能,也就是说,当一个程序执行时,只要最终的结果是一样的,指令是否被重排并不重要。所以指令的执行时序并不是顺序执行的,而是乱序执行的,这就会带来很多问题,这也促使着内存屏障的出现。
  • 内存屏障,也称内存栅栏,内存栅障,屏障指令等, 是一类同步屏障指令,是 CPU 或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作。也是一个让CPU 处理单元中的内存状态对其它处理单元可见的一项技术。

3.3.3 获取设置方法

getAndSet方法为获取设置方法,即以原子方式设置为给定值,并返回旧值。

  • getAndSet 方法涉及两个 cpp 实现的方法,一个是 getObjectVolatile ,一个是 compareAndSwapObject 方法,他们用在 do…while 循环中,也就是说,每次都会先获取最新对象引用的值,如果使用 CAS 成功交换两个对象的话,就会直接返回 var5 的值,var5 此时应该就是更新前的内存值,也就是旧值。
// 以原子方式设置为给定值,并返回旧值
public final V getAndSet(V newValue) {
   return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}

3.3.4 比较设置方法

比较设置方法有 compareAndSet 和 weakCompareAndSet。

  • 原子的将引用值设置为 newValue,如果旧值 == expectedValue,设置成功返回 true,否则返回 false。这个方法是支持CAS操作的,同一个时间只有一个线程调用 compareAndSet()方法。
  • compareAndSet() 是 AtomicReference 非常关键的 CAS 方法了,与 AtomicInteger 不同的是,AtomicReference 是调用的 compareAndSwapObject ,而 AtomicInteger 调用的是 compareAndSwapInt 方法。
  • weakCompareAndSet() 方法与 compareAndSet() 类似,但 weakCompareAndSet() 不会插入内存屏障,不能保障volatile的原子性
/**
 * CAS操作,现代CPU已广泛支持,是一种原子操作;
 * 简单地说,当期待值expect与valueOffset地址处的值相等时,设置为update值
 */
public final boolean compareAndSet(V expect, V update) {
   return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

//和compareAndSet()方法相同,区别是不会插入内存屏障,不能保障volatile的原子性
public final boolean weakCompareAndSet(V expect, V update) {
   return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

unsafe是通过Unsafe.getUnsafe()返回的一个Unsafe对象。通过Unsafe的CAS函数对Object引用类型数组的元素进行原子操作。如compareAndSetRaw()就是调用Unsafe的CAS函数。

3.3.5 获取更新方法

获取更新方法包括 getAndUpdate 和 updateAndGet。

  • getAndUpdate():使用函数式接口 updateFunction 基于旧值计算新值,并将旧值替换为计算值,返回旧值
  • updateAndGet():使用函数式接口 updateFunction 基于旧值计算新值,并将旧值替换为计算值,返回新值
/**
 * 使用函数式接口 updateFunction 基于旧值计算新值,并将旧值替换为计算值,返回旧值
 */
public final V getAndUpdate(UnaryOperator<V> updateFunction) {
   V prev = get(), next = null;
   for (boolean haveNext = false;;) {
     if (!haveNext) {
       next = updateFunction.apply(prev);
     }
     if (weakCompareAndSetVolatile(prev, next)) {
       return prev;
     }
     haveNext = prev == (prev = get());
   }
}

/**
 * 使用函数式接口 updateFunction 基于旧值计算新值,并将旧值替换为计算值,返回新值
 */
public final V updateAndGet(UnaryOperator<V> updateFunction) {
   V prev = get(), next = null;
   for (boolean haveNext = false;;) {
     if (!haveNext) {
       next = updateFunction.apply(prev);
     }
     if (weakCompareAndSetVolatile(prev, next)) {
       return next;
     }
     haveNext = prev == (prev = get());
   }
}

3.3.6 获取累加方法

获取累加方法包括 getAndAccumulate 和 accumulateAndGet。

  • getAndAccumulate():使用函数式接口 accumulatorFunction 基于旧值和形参 x 计算新值,并将旧值替换为计算值,返回旧值
  • accumulateAndGet():使用函数式接口 accumulatorFunction 基于旧值和形参 x 计算新值,并将旧值替换为计算值,返回新值
/**
 *  使用函数式接口 accumulatorFunction 基于旧值和形参 x 计算新值,并将旧值替换为计算值,返回旧值
 */
public final V getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction) {
   V prev = get(), next = null;
   for (boolean haveNext = false;;) {
      if (!haveNext) {
         next = accumulatorFunction.apply(prev, x);
      }
      if (weakCompareAndSetVolatile(prev, next)) {
         return prev;
      }
      haveNext = prev == (prev = get());
   }
}

/**
 *  使用函数式接口 accumulatorFunction 基于旧值和形参 x 计算新值,并将旧值替换为计算值,返回新值
 */
public final V accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction) {
   V prev = get(), next = null;
   for (boolean haveNext = false;;) {
     if (!haveNext) {
        next = accumulatorFunction.apply(prev, x);
     }
     if (weakCompareAndSetVolatile(prev, next)) {
        return next;
     }
     haveNext = prev == (prev = get());
   }
}

3.3.7 toString方法

toString方法返回数组当前值的String表示形式。

//返回v类的toString()方法得到的字符串
public String toString() {
    return String.valueOf(get());
}

4、总结

AtomicReference 其实是通过 CAS 来比较对象的引用。而 CAS 可能存在ABA的问题,也就是说:假如一个值原来是A,变成了B,又变成了A,那么CAS检查时会发现它的值没有发生变化,但是实际上却变化了。一般来讲这并不是什么问题,比如数值运算,线程其实根本不关心变量中途如何变化,只要最终的状态和预期值一样即可。但是,有些操作会依赖于对象的变化过程,此时的解决思路一般就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A - 2B - 3A。

0

评论区