侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 292 篇文章
  • 累计创建 132 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Java并发编程之原子操作类AtomicReferenceArray实战详解

孔子说JAVA
2022-06-09 / 0 评论 / 0 点赞 / 67 阅读 / 11,496 字 / 正在检测是否收录...

并发编程原子类完整教程:Java并发编程之原子操作类实战教程

AtomicReferenceArray 用于原子的更新引用数组的值,即以整个数组对象为单位,数组元素可以执行原子性的操作。该类是JUC原子包(java.util.concurrent.atomic.AtomicReferenceArray)中的数组类。AtomicReferenceArray内部维护了一个Object[]数组,保证了对象的原子操作。AtomicReferenceArray 使用的是 final关键字在多线程环境下的语义(如果把数组定义为volatile类型,其里面的数组元素在读写方面是没有volatile语义的)。JDK官方提供了3个原子数组,它们提供了通过原子的方式更新数组里的某个元素的能力,主要借助 Unsafe 类实现其核心功能。

  • AtomicIntegerArray:原子更新整型数组里的元素
  • AtomicLongArray:原子更新长整型数组里的元素。
  • AtomicReferenceArray:原子更新引用类型数组里的元素。

AtomicReferenceArray有以下特点:

  • 可以存放引用数值的原子性数组
  • 以整个数组对象为单位,里面的元素操作都是原子性的

1、AtomicReferenceArray主要方法

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray这3个类提供的方法几乎一模一样,以 AtomicReferenceArray 类为例,它主要是提供原子的方式更新数组里的引用类型,其常用方法如下:

1.1 构造方法

AtomicReferenceArray 提供了两个构造函数,一个是根据参数length来new一个长度为length的数组,另外一个根据传入的数组参数将此参数数组赋值给变量array。示例如下:

//构造方法创建长度为length的数组,并且赋值给数组元素array
public AtomicReferenceArray(int length){
    array = new Object[length];
}

//将参数数组赋值给元素变量数组
public AtomicReferenceArray(E[] array){
    this.array = Arrays.copy(array, array.length, Object[].class);
}

1.2 主要方法

// 将位置i处的元素设置为给定值。
public void set(int i, E newValue)

// 获取位置i的当前值。
public E get(int i)

// 以原子方式将位置i处的元素设置为给定值并返回旧值。
public E getAndSet(int i, E newValue)

// 最终将位置i的元素设置为给定值。
public void lazySet(int i, E newValue)

// 返回数组的长度。
public int length()

// 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。
public boolean compareAndSet(int i, E expect, E update)

// 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。
public boolean weakCompareAndSet(int i, E expect, E update)

// 返回数组当前值的String表示形式。
public String toString()

2、AtomicReferenceArray演示示例

2.1 演示示例1

import java.util.concurrent.atomic.AtomicReferenceArray;

public class AtomicReferenceArrayTest {
   public static void main(String[] args) {
      //1、创建给定长度的新 AtomicReferenceArray。
      AtomicReferenceArray<Integer> atomicReferenceArray = new AtomicReferenceArray<>(10);
      
      //2、set:将位置 i 的元素设置为给定值。
      atomicReferenceArray.set(1,5);
      System.out.println("Vaule: " + atomicReferenceArray.get(1));
      
      //3、compareAndSet()方法:如果当前值 == 预期值,则以原子方式将位置 i 的元素设置为给定的更新值。
      AtomicReferenceArray<Integer> atomicReferenceArray1 = new AtomicReferenceArray<>(10);
      atomicReferenceArray1.set(2,10);
      System.out.println("Value: " + atomicReferenceArray1.get(2));
      boolean bool = atomicReferenceArray1.compareAndSet(2,10,20);
      System.out.println("是否预期值与当前值相等:" + bool);
      System.out.println("Value: " + atomicReferenceArray1.get(2));
      
      //4、length()方法:返回该数组的长度。
      AtomicReferenceArray<Integer> atomicReferenceArray2 = new AtomicReferenceArray<>(10);
      System.out.println("数组长度:" + atomicReferenceArray2.length());
        
      //5、getAndSet()方法:获取当前索引的值,再将此索引设置新的值
      AtomicReferenceArray<Integer> atomicReferenceArray3 = new AtomicReferenceArray<>(10);
      atomicReferenceArray3.set(2,10);
      System.out.println("旧值:" + atomicReferenceArray3.get(2));
      Integer result = atomicReferenceArray3.getAndSet(2,50);
      System.out.println("获取的result值:" + result);
      System.out.println("新值:" + atomicReferenceArray3.get(2));
   }
}


2.2 演示示例2

TestThread 代码显示了在多线程环境中 AtomicReferenceArray 的用法。

import java.util.concurrent.atomic.AtomicReferenceArray;
public class TestThread {
   private static String[] source = new String[10];
   private static AtomicReferenceArray<String> atomicReferenceArray 
      = new AtomicReferenceArray<String>(source);
   public static void main(final String[] arguments) throws InterruptedException {
      for (int i = 0; i<atomicReferenceArray.length(); i++) {
         atomicReferenceArray.set(i, "item-2");
      }
      Thread t1 = new Thread(new Increment());
      Thread t2 = new Thread(new Compare());
      t1.start();
      t2.start();
      t1.join();
      t2.join();		
   }  
   static class Increment implements Runnable {
      public void run() {
         for(int i = 0; i<atomicReferenceArray.length(); i++) {
            String add = atomicReferenceArray.getAndSet(i,"item-"+ (i+1));
            System.out.println("Thread " + Thread.currentThread().getId() 
               + ", index " +i + ", value: "+ add);
         }
      }
   }
   static class Compare implements Runnable {
      public void run() {
         for(int i = 0; i<atomicReferenceArray.length(); i++) {
            System.out.println("Thread " + Thread.currentThread().getId() 
               + ", index " +i + ", value: "+ atomicReferenceArray.get(i));
            boolean swapped = atomicReferenceArray.compareAndSet(i, "item-2", "updated-item-2");
            System.out.println("Item swapped: " + swapped);
            if(swapped) {
               System.out.println("Thread " + Thread.currentThread().getId() 
                  + ", index " +i + ", updated-item-2");
            }
         }
      }
   }
}

上述代码是在2个线程中同时处理 AtomicReferenceArray 对象,并达到了预期的效果。

Thread 9, index 0, value: item-2
Thread 10, index 0, value: item-1
Item swapped: false
Thread 10, index 1, value: item-2
Item swapped: true
Thread 9, index 1, value: updated-item-2
Thread 10, index 1, updated-item-2
Thread 10, index 2, value: item-3
Item swapped: false
Thread 10, index 3, value: item-2
Item swapped: true
Thread 10, index 3, updated-item-2
Thread 10, index 4, value: item-2
Item swapped: true
Thread 10, index 4, updated-item-2
Thread 10, index 5, value: item-2
Item swapped: true
Thread 10, index 5, updated-item-2
Thread 10, index 6, value: item-2
Thread 9, index 2, value: item-2
Item swapped: true
Thread 9, index 3, value: updated-item-2
Thread 10, index 6, updated-item-2
Thread 10, index 7, value: item-2
Thread 9, index 4, value: updated-item-2
Item swapped: true
Thread 9, index 5, value: updated-item-2
Thread 10, index 7, updated-item-2
Thread 9, index 6, value: updated-item-2
Thread 10, index 8, value: item-2
Thread 9, index 7, value: updated-item-2
Item swapped: true
Thread 9, index 8, value: updated-item-2
Thread 10, index 8, updated-item-2
Thread 9, index 9, value: item-2
Thread 10, index 9, value: item-10
Item swapped: false

3、源码分析

AtomicReferenceArray 的代码很简单,下面对 AtomicReferenceArray 部分源码进行解析。

3.1 实现基础 - 主要字段

AtomicLong 是依赖 volatile 关键字和 CAS 指令实现的,AtomicLongArray 则不同,它的实现主要依赖于三个变量:

private static final long serialVersionUID = -2308431214976778248L;
// Unsafe是所有并发工具的基础工具类
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 数组对象头到数组首元素间的地址偏移量
private static final int base = unsafe.arrayBaseOffset(long[].class);

// 数组中相邻元素的地址偏移量的位移表示形式(若shift=x,那么相邻元素的偏移量就是"1<<shift")
private static final int shift;

private static final long arrayFieldOffset;

// 实际存放元素的数组
private final Object[] array;

从属性的定义中可以知道,数组元素最终是存储在Object[]数组中,虽然类定义是泛型的,但是,最终数组中的元素,并不存在类型信息。

  • base:base变量表示内存中Object[]的初始地址。
  • shift:Object[]数组中元素位移个数。
  • array:用来存储真实Object[]数组的变量。
  • arrayBaseOffset和arrayIndexScale是Unsafe类提供的native方法。

静态块主要是对静态常量shift初始化。

static {
    try{
        unsafe = Unsafe.getUnsafe();
        arrayFieldOffset = unsafe.objectFieldOffset(AtomicReferenceArray.class.getDeclaredField("array"));
        base = unsafe.arrayBaseOffset(Object[].class);
        int scale = unsafe.arrayIndexScale(Object[].class);
        if((scale & (scale - 1)) != 0){
            throw new Error("data type scale not a power of two");
        }
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }catch(Exception e){
        throw new Error(e);
    }
}
  • scale表示相邻元素的地址偏移量,因为内存对齐的原因,scale一定是2n。
  • shift是相邻元素的地址偏移量的位移表示形式,Integer.numberOfLeadingZeros(scale) 取scale进制前导零的个数,所以 shift 是 scale 二进制有效位数减1,即有效的0的个数。如scale=16,即scale=0b00000000_00000000_00000000_00010000 , 那么shift=4。

3.2 实现基础 - 构造方法

构造方法有以下2个:

public AtomicReferenceArray(int length);

public AtomicReferenceArray(E[] array);

AtomicReferenceArray(int)创建指定长度的原子数组.

  • 以 int length 作为参数的构造方法会初始化一个length长度的Object[],默认值均为null

AtomicReferenceArray(E[])利用克隆(这里是深度克隆,源数组和原子数组互不影响)将指定的普通数组包装成原子数组。

  • E[] array作为参数的构造方法使用参数array作为内部数组。
//构造方法创建长度为length的数组,并且赋值给数组元素array
public AtomicReferenceArray(int length){
    array = new Object[length];
}

//将参数数组赋值给元素变量数组
public AtomicReferenceArray(E[] array){
    this.array = Arrays.copy(array, array.length, Object[].class);
}

3.3 实现基础 - 主要方法

3.3.1 工具方法

/**
 * 该函数调用byteOffset就是为了获取数组array的第i个元素相对AtomicReferenceArray的内存偏移量
 */
private long checkedByteOffset(int i){
    if(i < 0 || i >= array.length){
        throw new IndexOutOfBoundsException("index " + i);
    }
    
    return byteOffset(i);
}

/**
 * 获取数组array的第i个元素相对AtomicReferenceArray的内存偏移量
 * base为数组首地址相对AtomicReferenceArray的内存偏移量
 * array数组中第i个元素相对AtomicLongArray的内存偏移量计算方法 = i * scale + base,其中scale为上面介绍的数组中一个元素的大小
 * 由于位移方法效率比乘法效率快,乘法转换成位移计算方式 = i << shift + base,其中shift为上面介绍的scale中最大的1所在的位数
 */
private static long byteOffset(int i){
    return ((long) i << shift) + base;
}

private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException, java.io.InvalidObjectException {
    Object a = s.readFields().get("array", null);
    if(a == null || !a.getClass().isArray()){
        throw new java.io.InvalidObjectException("Not array type");
    }
    if(a.getClass() != Object[].class){
        a = Arrays.copyOf((Object[])a, Array.getLength(a), Object[].class);
    }
    unsafe.putObjectVolatile(this, arrayFieldOffset, a);
}

3.3.2 get和set方法

可以通过set()方法设置指定元素的值,通过get()方法获取指定元素的值。

private E getRaw(long offset){
    return (E) unsafe.getObjectVolatile(array, offset);
}

public final E get(int i){
    return (E) getRaw(checkedByteOffset(i));
}

public final void set(int i, E newValue){
    unsafe.putObjectVolatile(array, checkedByteOffset(i), newValue);
}

可以看到,array 虽然没有使用 volatile,但它调用的都是 unsafe 里面具有 volatile 语义的方法,也就是通过内存下地址对数组元素的操作,也是具有volatile语义的,即具有可见性。

3.3.3 lazySet方法

public final void lazySet(int i, E newValue){
    unsafe.putOrderedObject(array, checkedByteOffset(i), newValue);
}

lazySet与set实现的功能类似,二者区别如下:

  • lazySet实现的是最终一致性,set实现的是强一致性
  • lazySet:多线程并发时,线程A调用unsafe.putOrderedInt更新newValue值时,只是把线程A内存中的元素更新成功了,但其它线程此时看不到线程A更新的newValue值,需过一会,线程A更新的newValue才会刷入到主内存中,此时其它线程才能看到线程A更新的值
  • set: 多线程并发时,线程A更新的值会立即刷入主内存中
  • 总结:对于数据一致性要求高的建议用set

3.3.4 获取设置方法

getAndSet方法为获取设置方法,即先获取数组array的第i号元素的值,然后把newValue更新到第i号元素中。

public final E getAndSet(int i, E newValue){
    return (E)unsafe.getAndSetObject(array, checkedByteOffset(i), newValue);
}

3.3.5 比较设置方法

比较设置方法有 compareAndSet 和 weakCompareAndSet。

  • compareAndSet()方法用于设置指定元素的值,如果当前值与期望值相等则更新为新的值,这个方法是支持CAS操作的,同一个时间只有一个线程调用 compareAndSet()方法。
  • compareAndSet()方法通过乐观锁的方式把update更新到数组array的第i号元素中,其中expect为读取到的数组第i号元素的的值,通过乐观锁的方式,先比较expect是否与第i号元素的值相等,如果相等,把update更新到第i号元素
  • weakCompareAndSet() 方法与 compareAndSet() 类似,但 weakCompareAndSet() 不会插入内存屏障,不能保障volatile的原子性
private boolean compareAndSetRaw(long offset, E expect, E update){
    return unsafe.compareAndSwapObject(array, offset, expect, update);
}

public final boolean compareAndSet(int i, E expect, E update){
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

public final boolean weakCompareAndSet(int i, E expect, E update){
    compareAndSet(i, expect, update);
}

unsafe是通过Unsafe.getUnsafe()返回的一个Unsafe对象。通过Unsafe的CAS函数对Object引用类型数组的元素进行原子操作。如compareAndSetRaw()就是调用Unsafe的CAS函数。

3.3.6 获取更新方法

获取更新方法包括 getAndUpdate 和 updateAndGet。

  • getAndUpdate():先获取数组array第i号元素的值,然后第i号元素的值经过updateFunction函数处理后的值更新到第i号元素中
  • updateAndGet():先把数组第i号元素的值经过updateFunction处理后的值更新到第i元素中,然后返回更新后的值
public final E getAndUpdate(int i, UnaryOperator<E> updateFunction){
    long offset = checkedByteOffset(i);
    E prev, next;
    do{
        prev = getRaw(offset);
        next = updateFunction.apply(prev);
    }while(!compareAndSetRaw(offset, prev, next));
    return prev;
}

public final E updateAndGet(int i, UnaryOperator<E> updateFunction){
    long offset = checkedByteOffset(i);
    E prev, next;
    do{
        prev = getRaw(offset);
        next = updateFunction.apply(prev);
    }while(!compareAndSetRaw(offset, prev, next));
    return next;
}

3.3.7 获取累加方法

获取累加方法包括 getAndAccumulate 和 accumulateAndGet。

  • getAndAccumulate():先获取第i号元素的值,然后把输入的x经过updateFunction函数处理后的值更新到第i号元素中
  • accumulateAndGet():先把输入的x经过accumulatorFunction函数出来了后的值更新到第i号元素中,然后返回更新后的值
public final E getAndAccumulate(int i, E x, BinaryOperator<E> accumulatorFunction){
    long offset = checkedByteOffset(i);
    E prev, next;
    do{
        prev = getRaw(offset);
        next = accumulatorFunction.apply(prev, x);
    }while(!compareAndSetRaw(offset, prev, next));
    return prev;
}

public final E accumulateAndGet(int i, E x, BinaryOperator<E> accumulatorFunction){
    long offset = checkedByteOffset(i);
    E prev, next;
    do{
        prev = getRaw(offset);
        next = accumulatorFunction.apply(prev, x);
    }while(!compareAndSetRaw(offset, prev, next));
    return next;
}

3.3.8 数组长度方法

length方法返回该数组的长度。

public final int length(){
    return array.length;
}

3.3.9 toString方法

toString方法返回数组当前值的String表示形式。

public String toString(){
    int iMax = array.length - 1;
    if(iMax == -1){
        return "[]";
    }
    
    StringBuilder b = new StringBuilder();
    b.append(']');
    for(int i = 0; ; i++){
        b.append(getRaw(byteOffset(i)));
        if(i == iMax)
            return b.append(']').toString();
        b.append(',').append(' ');
    }
}
0

评论区