并发编程原子类完整教程:Java并发编程之原子操作类实战教程
AtomicReferenceArray 用于原子的更新引用数组的值,即以整个数组对象为单位,数组元素可以执行原子性的操作。该类是JUC原子包(java.util.concurrent.atomic.AtomicReferenceArray)中的数组类。AtomicReferenceArray内部维护了一个Object[]数组,保证了对象的原子操作。AtomicReferenceArray 使用的是 final关键字在多线程环境下的语义(如果把数组定义为volatile类型,其里面的数组元素在读写方面是没有volatile语义的)。JDK官方提供了3个原子数组,它们提供了通过原子的方式更新数组里的某个元素的能力,主要借助 Unsafe 类实现其核心功能。
- AtomicIntegerArray:原子更新整型数组里的元素
- AtomicLongArray:原子更新长整型数组里的元素。
- AtomicReferenceArray:原子更新引用类型数组里的元素。
AtomicReferenceArray有以下特点:
- 可以存放引用数值的原子性数组
- 以整个数组对象为单位,里面的元素操作都是原子性的
1、AtomicReferenceArray主要方法
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray这3个类提供的方法几乎一模一样,以 AtomicReferenceArray 类为例,它主要是提供原子的方式更新数组里的引用类型,其常用方法如下:
1.1 构造方法
AtomicReferenceArray 提供了两个构造函数,一个是根据参数length来new一个长度为length的数组,另外一个根据传入的数组参数将此参数数组赋值给变量array。示例如下:
//构造方法创建长度为length的数组,并且赋值给数组元素array
public AtomicReferenceArray(int length){
array = new Object[length];
}
//将参数数组赋值给元素变量数组
public AtomicReferenceArray(E[] array){
this.array = Arrays.copy(array, array.length, Object[].class);
}
1.2 主要方法
// 将位置i处的元素设置为给定值。
public void set(int i, E newValue)
// 获取位置i的当前值。
public E get(int i)
// 以原子方式将位置i处的元素设置为给定值并返回旧值。
public E getAndSet(int i, E newValue)
// 最终将位置i的元素设置为给定值。
public void lazySet(int i, E newValue)
// 返回数组的长度。
public int length()
// 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。
public boolean compareAndSet(int i, E expect, E update)
// 如果当前值==期望值,则以原子方式将位置i处的元素设置为给定的更新值。
public boolean weakCompareAndSet(int i, E expect, E update)
// 返回数组当前值的String表示形式。
public String toString()
2、AtomicReferenceArray演示示例
2.1 演示示例1
import java.util.concurrent.atomic.AtomicReferenceArray;
public class AtomicReferenceArrayTest {
public static void main(String[] args) {
//1、创建给定长度的新 AtomicReferenceArray。
AtomicReferenceArray<Integer> atomicReferenceArray = new AtomicReferenceArray<>(10);
//2、set:将位置 i 的元素设置为给定值。
atomicReferenceArray.set(1,5);
System.out.println("Vaule: " + atomicReferenceArray.get(1));
//3、compareAndSet()方法:如果当前值 == 预期值,则以原子方式将位置 i 的元素设置为给定的更新值。
AtomicReferenceArray<Integer> atomicReferenceArray1 = new AtomicReferenceArray<>(10);
atomicReferenceArray1.set(2,10);
System.out.println("Value: " + atomicReferenceArray1.get(2));
boolean bool = atomicReferenceArray1.compareAndSet(2,10,20);
System.out.println("是否预期值与当前值相等:" + bool);
System.out.println("Value: " + atomicReferenceArray1.get(2));
//4、length()方法:返回该数组的长度。
AtomicReferenceArray<Integer> atomicReferenceArray2 = new AtomicReferenceArray<>(10);
System.out.println("数组长度:" + atomicReferenceArray2.length());
//5、getAndSet()方法:获取当前索引的值,再将此索引设置新的值
AtomicReferenceArray<Integer> atomicReferenceArray3 = new AtomicReferenceArray<>(10);
atomicReferenceArray3.set(2,10);
System.out.println("旧值:" + atomicReferenceArray3.get(2));
Integer result = atomicReferenceArray3.getAndSet(2,50);
System.out.println("获取的result值:" + result);
System.out.println("新值:" + atomicReferenceArray3.get(2));
}
}
2.2 演示示例2
TestThread 代码显示了在多线程环境中 AtomicReferenceArray 的用法。
import java.util.concurrent.atomic.AtomicReferenceArray;
public class TestThread {
private static String[] source = new String[10];
private static AtomicReferenceArray<String> atomicReferenceArray
= new AtomicReferenceArray<String>(source);
public static void main(final String[] arguments) throws InterruptedException {
for (int i = 0; i<atomicReferenceArray.length(); i++) {
atomicReferenceArray.set(i, "item-2");
}
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
}
static class Increment implements Runnable {
public void run() {
for(int i = 0; i<atomicReferenceArray.length(); i++) {
String add = atomicReferenceArray.getAndSet(i,"item-"+ (i+1));
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ add);
}
}
}
static class Compare implements Runnable {
public void run() {
for(int i = 0; i<atomicReferenceArray.length(); i++) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ atomicReferenceArray.get(i));
boolean swapped = atomicReferenceArray.compareAndSet(i, "item-2", "updated-item-2");
System.out.println("Item swapped: " + swapped);
if(swapped) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", updated-item-2");
}
}
}
}
}
上述代码是在2个线程中同时处理 AtomicReferenceArray 对象,并达到了预期的效果。
Thread 9, index 0, value: item-2
Thread 10, index 0, value: item-1
Item swapped: false
Thread 10, index 1, value: item-2
Item swapped: true
Thread 9, index 1, value: updated-item-2
Thread 10, index 1, updated-item-2
Thread 10, index 2, value: item-3
Item swapped: false
Thread 10, index 3, value: item-2
Item swapped: true
Thread 10, index 3, updated-item-2
Thread 10, index 4, value: item-2
Item swapped: true
Thread 10, index 4, updated-item-2
Thread 10, index 5, value: item-2
Item swapped: true
Thread 10, index 5, updated-item-2
Thread 10, index 6, value: item-2
Thread 9, index 2, value: item-2
Item swapped: true
Thread 9, index 3, value: updated-item-2
Thread 10, index 6, updated-item-2
Thread 10, index 7, value: item-2
Thread 9, index 4, value: updated-item-2
Item swapped: true
Thread 9, index 5, value: updated-item-2
Thread 10, index 7, updated-item-2
Thread 9, index 6, value: updated-item-2
Thread 10, index 8, value: item-2
Thread 9, index 7, value: updated-item-2
Item swapped: true
Thread 9, index 8, value: updated-item-2
Thread 10, index 8, updated-item-2
Thread 9, index 9, value: item-2
Thread 10, index 9, value: item-10
Item swapped: false
3、源码分析
AtomicReferenceArray 的代码很简单,下面对 AtomicReferenceArray 部分源码进行解析。
3.1 实现基础 - 主要字段
AtomicLong 是依赖 volatile 关键字和 CAS 指令实现的,AtomicLongArray 则不同,它的实现主要依赖于三个变量:
private static final long serialVersionUID = -2308431214976778248L;
// Unsafe是所有并发工具的基础工具类
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 数组对象头到数组首元素间的地址偏移量
private static final int base = unsafe.arrayBaseOffset(long[].class);
// 数组中相邻元素的地址偏移量的位移表示形式(若shift=x,那么相邻元素的偏移量就是"1<<shift")
private static final int shift;
private static final long arrayFieldOffset;
// 实际存放元素的数组
private final Object[] array;
从属性的定义中可以知道,数组元素最终是存储在Object[]数组中,虽然类定义是泛型的,但是,最终数组中的元素,并不存在类型信息。
- base:base变量表示内存中Object[]的初始地址。
- shift:Object[]数组中元素位移个数。
- array:用来存储真实Object[]数组的变量。
- arrayBaseOffset和arrayIndexScale是Unsafe类提供的native方法。
静态块主要是对静态常量shift初始化。
static {
try{
unsafe = Unsafe.getUnsafe();
arrayFieldOffset = unsafe.objectFieldOffset(AtomicReferenceArray.class.getDeclaredField("array"));
base = unsafe.arrayBaseOffset(Object[].class);
int scale = unsafe.arrayIndexScale(Object[].class);
if((scale & (scale - 1)) != 0){
throw new Error("data type scale not a power of two");
}
shift = 31 - Integer.numberOfLeadingZeros(scale);
}catch(Exception e){
throw new Error(e);
}
}
- scale表示相邻元素的地址偏移量,因为内存对齐的原因,scale一定是2n。
- shift是相邻元素的地址偏移量的位移表示形式,Integer.numberOfLeadingZeros(scale) 取scale进制前导零的个数,所以 shift 是 scale 二进制有效位数减1,即有效的0的个数。如scale=16,即scale=0b00000000_00000000_00000000_00010000 , 那么shift=4。
3.2 实现基础 - 构造方法
构造方法有以下2个:
public AtomicReferenceArray(int length);
public AtomicReferenceArray(E[] array);
AtomicReferenceArray(int)创建指定长度的原子数组.
- 以 int length 作为参数的构造方法会初始化一个length长度的Object[],默认值均为null
AtomicReferenceArray(E[])利用克隆(这里是深度克隆,源数组和原子数组互不影响)将指定的普通数组包装成原子数组。
- E[] array作为参数的构造方法使用参数array作为内部数组。
//构造方法创建长度为length的数组,并且赋值给数组元素array
public AtomicReferenceArray(int length){
array = new Object[length];
}
//将参数数组赋值给元素变量数组
public AtomicReferenceArray(E[] array){
this.array = Arrays.copy(array, array.length, Object[].class);
}
3.3 实现基础 - 主要方法
3.3.1 工具方法
/**
* 该函数调用byteOffset就是为了获取数组array的第i个元素相对AtomicReferenceArray的内存偏移量
*/
private long checkedByteOffset(int i){
if(i < 0 || i >= array.length){
throw new IndexOutOfBoundsException("index " + i);
}
return byteOffset(i);
}
/**
* 获取数组array的第i个元素相对AtomicReferenceArray的内存偏移量
* base为数组首地址相对AtomicReferenceArray的内存偏移量
* array数组中第i个元素相对AtomicLongArray的内存偏移量计算方法 = i * scale + base,其中scale为上面介绍的数组中一个元素的大小
* 由于位移方法效率比乘法效率快,乘法转换成位移计算方式 = i << shift + base,其中shift为上面介绍的scale中最大的1所在的位数
*/
private static long byteOffset(int i){
return ((long) i << shift) + base;
}
private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException, java.io.InvalidObjectException {
Object a = s.readFields().get("array", null);
if(a == null || !a.getClass().isArray()){
throw new java.io.InvalidObjectException("Not array type");
}
if(a.getClass() != Object[].class){
a = Arrays.copyOf((Object[])a, Array.getLength(a), Object[].class);
}
unsafe.putObjectVolatile(this, arrayFieldOffset, a);
}
3.3.2 get和set方法
可以通过set()方法设置指定元素的值,通过get()方法获取指定元素的值。
private E getRaw(long offset){
return (E) unsafe.getObjectVolatile(array, offset);
}
public final E get(int i){
return (E) getRaw(checkedByteOffset(i));
}
public final void set(int i, E newValue){
unsafe.putObjectVolatile(array, checkedByteOffset(i), newValue);
}
可以看到,array 虽然没有使用 volatile,但它调用的都是 unsafe 里面具有 volatile 语义的方法,也就是通过内存下地址对数组元素的操作,也是具有volatile语义的,即具有可见性。
3.3.3 lazySet方法
public final void lazySet(int i, E newValue){
unsafe.putOrderedObject(array, checkedByteOffset(i), newValue);
}
lazySet与set实现的功能类似,二者区别如下:
- lazySet实现的是最终一致性,set实现的是强一致性
- lazySet:多线程并发时,线程A调用unsafe.putOrderedInt更新newValue值时,只是把线程A内存中的元素更新成功了,但其它线程此时看不到线程A更新的newValue值,需过一会,线程A更新的newValue才会刷入到主内存中,此时其它线程才能看到线程A更新的值
- set: 多线程并发时,线程A更新的值会立即刷入主内存中
- 总结:对于数据一致性要求高的建议用set
3.3.4 获取设置方法
getAndSet方法为获取设置方法,即先获取数组array的第i号元素的值,然后把newValue更新到第i号元素中。
public final E getAndSet(int i, E newValue){
return (E)unsafe.getAndSetObject(array, checkedByteOffset(i), newValue);
}
3.3.5 比较设置方法
比较设置方法有 compareAndSet 和 weakCompareAndSet。
- compareAndSet()方法用于设置指定元素的值,如果当前值与期望值相等则更新为新的值,这个方法是支持CAS操作的,同一个时间只有一个线程调用 compareAndSet()方法。
- compareAndSet()方法通过乐观锁的方式把update更新到数组array的第i号元素中,其中expect为读取到的数组第i号元素的的值,通过乐观锁的方式,先比较expect是否与第i号元素的值相等,如果相等,把update更新到第i号元素
- weakCompareAndSet() 方法与 compareAndSet() 类似,但 weakCompareAndSet() 不会插入内存屏障,不能保障volatile的原子性
private boolean compareAndSetRaw(long offset, E expect, E update){
return unsafe.compareAndSwapObject(array, offset, expect, update);
}
public final boolean compareAndSet(int i, E expect, E update){
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
public final boolean weakCompareAndSet(int i, E expect, E update){
compareAndSet(i, expect, update);
}
unsafe是通过Unsafe.getUnsafe()返回的一个Unsafe对象。通过Unsafe的CAS函数对Object引用类型数组的元素进行原子操作。如compareAndSetRaw()就是调用Unsafe的CAS函数。
3.3.6 获取更新方法
获取更新方法包括 getAndUpdate 和 updateAndGet。
- getAndUpdate():先获取数组array第i号元素的值,然后第i号元素的值经过updateFunction函数处理后的值更新到第i号元素中
- updateAndGet():先把数组第i号元素的值经过updateFunction处理后的值更新到第i元素中,然后返回更新后的值
public final E getAndUpdate(int i, UnaryOperator<E> updateFunction){
long offset = checkedByteOffset(i);
E prev, next;
do{
prev = getRaw(offset);
next = updateFunction.apply(prev);
}while(!compareAndSetRaw(offset, prev, next));
return prev;
}
public final E updateAndGet(int i, UnaryOperator<E> updateFunction){
long offset = checkedByteOffset(i);
E prev, next;
do{
prev = getRaw(offset);
next = updateFunction.apply(prev);
}while(!compareAndSetRaw(offset, prev, next));
return next;
}
3.3.7 获取累加方法
获取累加方法包括 getAndAccumulate 和 accumulateAndGet。
- getAndAccumulate():先获取第i号元素的值,然后把输入的x经过updateFunction函数处理后的值更新到第i号元素中
- accumulateAndGet():先把输入的x经过accumulatorFunction函数出来了后的值更新到第i号元素中,然后返回更新后的值
public final E getAndAccumulate(int i, E x, BinaryOperator<E> accumulatorFunction){
long offset = checkedByteOffset(i);
E prev, next;
do{
prev = getRaw(offset);
next = accumulatorFunction.apply(prev, x);
}while(!compareAndSetRaw(offset, prev, next));
return prev;
}
public final E accumulateAndGet(int i, E x, BinaryOperator<E> accumulatorFunction){
long offset = checkedByteOffset(i);
E prev, next;
do{
prev = getRaw(offset);
next = accumulatorFunction.apply(prev, x);
}while(!compareAndSetRaw(offset, prev, next));
return next;
}
3.3.8 数组长度方法
length方法返回该数组的长度。
public final int length(){
return array.length;
}
3.3.9 toString方法
toString方法返回数组当前值的String表示形式。
public String toString(){
int iMax = array.length - 1;
if(iMax == -1){
return "[]";
}
StringBuilder b = new StringBuilder();
b.append(']');
for(int i = 0; ; i++){
b.append(getRaw(byteOffset(i)));
if(i == iMax)
return b.append(']').toString();
b.append(',').append(' ');
}
}
评论区