Java并发编程学习七:原子类

一、原子类
1. 简介

在介绍原子类之前,先得了解什么是原子性。所谓原子性,意味着"一组操作要么全都操作成功,要么全都失败,不能只操作成功其中的一部分",而原子类就是有原则性的类。

典型的操作,如i++就是一个非原子性操作,不能保证线程安全,可以采用原子类的getAndIncrement 方法解决。

原子类和锁都可以保证并发情况下的线程安全,但是相比于锁,原子类有以下的优势:

  • 粒度更细:原子变量可以把竞争范围缩小到变量级别,通常情况下,锁的粒度都要大于原子变量的粒度。
  • 效率更高:除了高度竞争的情况之外,使用原子类的效率通常会比使用同步互斥锁的效率更高,因为原子类底层利用了 CAS 操作,不会阻塞线程。
2. 6大原子类

原子类一共可以分为6大类

a. Atomic\ 基本类型原子类

这一类称为基本类型原子类,包括三种:AtomicInteger、AtomicLong 和 AtomicBoolean。

以AtomicInteger为例,它是对于 int 类型的封装,并且提供了原子性的访问和更新。在并发场景下,使用该类,就可以不用基本类型 int,也不使用包装类型 Integer,而是直接使用 AtomicInteger。

AtomicInteger 类的常见方法有:

public final int getAndSet(int newValue) //获取当前的值,并设置新的值
public final int getAndIncrement() //获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值更新为输入值(update)

b. Array 数组类型原子类

这种数组中的元素都具备原子性,包括三种:

  • AtomicIntegerArray:整形数组原子类;
  • AtomicLongArray:长整形数组原子类;
  • AtomicReferenceArray :引用类型数组原子类。

c. Atomic\Reference 引用类型原子类

AtomicReference 类的作用和AtomicInteger 并没有本质区别, AtomicInteger 可以让一个整数保证原子性,而AtomicReference 可以让一个对象保证原子性。AtomicReference 的能力明显比 AtomicInteger 强,因为一个对象里可以包含很多属性。

除了AtomicReference,同属于这一类的还包括:

  • AtomicStampedReference:它是对 AtomicReference 的升级,在此基础上还加了时间戳,用于解决 CAS 的 ABA 问题。
  • AtomicMarkableReference:和 AtomicReference 类似,多了一个绑定的布尔值,可以用于表示该对象已删除等场景。

d. Atomic\FieldUpdater 原子更新器

该类一共有三种:

  • AtomicIntegerFieldUpdater:原子更新整形的更新器;
  • AtomicLongFieldUpdater:原子更新长整形的更新器;
  • AtomicReferenceFieldUpdater:原子更新引用的更新器。

这类的使用场景是,将已经声明的变量进行升级,比如对非原子性的int,利用AtomicIntegerFieldUpdater 将其升级为原子操作。

那么既然要保证原子性,为什么不一开始就声明为原子类,如AtomicInteger,而是要用原子更新器将其升级为原子操作。这里就涉及到以下几种场景:

  • 一是这个变量已经被声明过了而且被广泛运用(即历史原因),修改它成本很高
  • 二是在大部分情况下并不需要使用到它的原子性,直接声明为原子类更加耗费资源。

以下是对原子更新器的使用演示:

public class AtomicIntegerFieldUpdaterDemo implements Runnable{

   static Score math;
   static Score computer;

   public static AtomicIntegerFieldUpdater<Score> scoreUpdater = AtomicIntegerFieldUpdater
           .newUpdater(Score.class, "score");

   @Override
   public void run() {

       for (int i = 0; i < 1000; i++) {

           computer.score++;
           scoreUpdater.getAndIncrement(math);
       }
   }

   public static class Score {

       volatile int score;
   }

   public static void main(String[] args) throws InterruptedException {

       math =new Score();
       computer =new Score();
       AtomicIntegerFieldUpdaterDemo2 r = new AtomicIntegerFieldUpdaterDemo2();
       Thread t1 = new Thread(r);
       Thread t2 = new Thread(r);
       t1.start();
       t2.start();
       t1.join();
       t2.join();
       System.out.println("普通变量的结果:"+ computer.score);
       System.out.println("升级后的结果:"+ math.score);
   }
}

Score 类型内部会有一个分数,也叫作 core,是int类型。声明两个 Score 类型的实例分别叫作数学 math 和计算机 computer,同时声明一个 AtomicIntegerFieldUpdater,在它构造的时候传入了两个参数,第一个是 Score.class,这是类名,第二个是属性名,叫作 score。

分别在多线程下对math 和computer进行自增操作,对于math而言显然是非线程安全的,对于computer,由于AtomicIntegerFieldUpdater的存在,可以保证线程安全。

最终的运行结果:math的结果是1942,而computer是2000,computer的结果符合预期。

e. Adder 加法器

这一类有两种:LongAdder 和 DoubleAdder,后面详细介绍

f. Accumulator 积累器

这一类有两种:LongAccumulator 和 DoubleAccumulator,同样在后面详细介绍

3. 原子类的CAS实现

上面提到过,原子类是基于CAS操作保证线程安全的,具体是怎么实现的呢?

以AtomicInteger为例,查看getAndAdd 方法的源码

//JDK 1.8实现
public final int getAndAdd(int delta) {

   return unsafe.getAndAddInt(this, valueOffset, delta);
}

内部使用了Unsafe 类,并调用了 unsafe.getAndAddInt 方法。

Unsafe 类是用于和操作系统打交道的,因为大部分的 Java 代码自身无法直接操作内存,所以在必要的时候,可以利用 Unsafe 类来和操作系统进行交互,CAS 正是利用到了 Unsafe 类。

AtomicInteger 的内部部分源码如下:

public class AtomicInteger extends Number implements java.io.Serializable {

   // setup to use Unsafe.compareAndSwapInt for updates
   private static final Unsafe unsafe = Unsafe.getUnsafe();
   private static final long valueOffset;

   static {

       try {

           valueOffset = unsafe.objectFieldOffset
               (AtomicInteger.class.getDeclaredField("value"));
       } catch (Exception ex) {

      throw new Error(ex); }
   }

   private volatile int value;
   public final int get() {

     return value;}
   ...
}

源码中获取了 Unsafe 实例,并且定义了 valueOffset。static代码块会在类加载的时候执行,执行时会调用 Unsafe 的 objectFieldOffset 方法,从而得到当前这个原子类的 value 的偏移量,并且赋给 valueOffset 变量,这样一来就获取到了 value 的偏移量,它的含义是在内存中的偏移地址。Unsafe 就是根据内存偏移地址获取数据的原值的,这样我们就能通过 Unsafe 来实现 CAS 了

value 是用 volatile 修饰的,它是原子类存储的值的变量,由于它被 volatile 修饰,可以保证在多线程之间看到的 value 是同一份,保证了可见性。

继续查看Unsafe 的 getAndAddInt 方法:

public final int getAndAddInt(Object var1, long var2, int var4) {

   int var5;
   do {

       var5 = this.getIntVolatile(var1, var2);
   } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
   return var5;
}

传入的参数中,var1是当前原子类,var2是最开始获取到的 offset,var4是我们希望原子类所改变的数值,比如可以传入 +1,也可以传入 -1。

var5 = this.getIntVolatile(var1, var2) 是一个native 方法,作用就是获取在 var1 中的 var2 偏移处的值,并保存到var5中。

do-while 循环是一个死循环,退出条件由compareAndSwapInt方法决定。该方法的参数是:var1, var2, var5, var5 + var4,分别用object、offset、expectedValue、newValue。具体含义如下:

  • 第一个参数 object 就是将要操作的对象,传入的是 this,也就是 atomicInteger 这个对象本身;
  • 第二个参数是 offset,也就是偏移量,借助它就可以获取到 value 的数值;
  • 第三个参数 expectedValue,代表“期望值”,传入的是刚才获取到的 var5;
  • 最后一个参数 newValue 是希望修改的数值 ,等于之前取到的数值 var5 再加上 var4,而 var4 就是之前所传入的 delta

compareAndSwapInt 方法的作用就是,判断如果现在原子类里 value 的值和之前获取到的 var5 相等的话,那么就把计算出来的 var5 + var4 给更新上去,所以说这行代码就实现了 CAS 的过程。

二、 Adder 加法器

1. Atomic\ 基本类型原子类的问题

前面提到 Atomic\ 基本类型原子类分为三种,可以保证多线程下的线程安全。但是,在并发量很大的场景下,Atomic\ 基本类型原子类(其实是AtomicInteger 和 AtomicLong)有很大的性能问题。

以AtomicLong 为例进行说明:

/**
* 描述: 在16个线程下使用AtomicLong
*/
public class AtomicLongDemo {

   public static void main(String[] args) throws InterruptedException {

       AtomicLong counter = new AtomicLong(0);
       ExecutorService service = Executors.newFixedThreadPool(16);
       for (int i = 0; i  < 100; i++) {

           service.submit(new Task(counter));
       }

       Thread.sleep(2000);
       System.out.println(counter.get());
   }

   static class Task implements Runnable {

       private final AtomicLong counter;

       public Task(AtomicLong counter) {

           this.counter = counter;
       }

       @Override
       public void run() {

           counter.incrementAndGet();
       }
   }
}

新建了一个原始值为 0 的 AtomicLong。新建一个线程数为 16 的线程池,并且往这个线程池中添加了 100 次相同的一个Task 任务。Task 任务中调用AtomicLong 的 incrementAndGet 方法进行自增操作。

毫无疑问,最终的运行结果是100,并不会发生线程安全问题。下面对其内部的过程进行说明,以两个线程并发进行简化说明:

file

每个线程有自己的本地内存,而所有的线程都共用共享内存。对于 AtomicLong 内部的 value 属性而言,它是由volatile 修饰的,需要保证可见性。因此每次数值有变化的时候,它都需要进行 flush 和 refresh,比如线程1实现自增1后,需要先从线程1的本地内存中将结果flush到共享内存中,再从共享内存中将结果reflash到线程2的本地内存.

在高并发的场景下,频繁的flush 和 refresh 操作会耗费很多资源,而且 CAS 也会经常失败。

2. Adder 加法器的改进

针对Atomic\ 基本类型原子类的问题高并发下的性能问题,Adder 加法器进行了改进。同样的场景,使用Adder 加法器如下:

/**
* 描述: 在16个线程下使用LongAdder
*/
public class LongAdderDemo {

   public static void main(String[] args) throws InterruptedException {

       LongAdder counter = new LongAdder();
       ExecutorService service = Executors.newFixedThreadPool(16);
       for (int i = 0; i  < 100; i++) {

           service.submit(new Task(counter));
       }

       Thread.sleep(2000);
       System.out.println(counter.sum());
   }
   static class Task implements Runnable {

       private final LongAdder counter;

       public Task(LongAdder counter) {

           this.counter = counter;
       }

       @Override
       public void run() {

           counter.increment();
       }
   }
}

运行的结果依然是100,但是运行速度比刚才 AtomicLong 的实现要快。

那么是什么改进,能够让Adder 加法器更快呢?以LongAdder 为例,其内部引入了分段累加的概念,内部一共有两个参数参与计数:第一个叫作 base,它是一个变量,第二个是 Cell[] ,是一个数组。

当竞争不激烈的时候,可以直接把累加结果改到 base 变量上。而当竞争激烈的时候,会使用Cell[],通过计算出每个线程的 hash 值来给线程分配到不同的 Cell 上去,每个 Cell 相当于是一个独立的计数器,这样一来就不会和其他的计数器干扰,Cell 之间并不存在竞争关系,所以在自加的过程中,就大大减少了刚才的 flush 和 refresh,以及降低了冲突的概率。

那么最终如何实现多线程计数呢?答案就在最后一步的求和 sum 方法,执行 LongAdder.sum() 的时候,会把各个线程里的 Cell 累计求和,并加上 base,形成最终的总和。

public long sum() {

   Cell[] as = cells; Cell a;
   long sum = base;
   if (as != null) {

       for (int i = 0; i  < as.length; ++i) {

           if ((a = as[i]) != null)
               sum += a.value;
       }
   }
   return sum;
}

sum方法中,先取 base 的值,然后遍历所有 Cell,把每个 Cell 的值都加上去,形成最终的总和。需要注意的是,由于在统计的时候并没有进行加锁操作,所以这里得出的 sum 不一定是完全准确的,因为有可能在计算 sum 的过程中 Cell 的值被修改了。

从上面可以知道,LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差。

3. Atomic\ 基本类型原子类 vs Adder 加法器

以AtomicLong 和LongAdder 为例,上面提到LongAdder 的在高并发场景下性能高于AtomicLong ,那是否意味着AtomicLong 可以被LongAdder 取代呢?

答案是,不行!

LongAdder 只提供了 add、increment 等简单的方法,适合的是统计求和计数的场景,场景比较单一,而 AtomicLong 还具有 compareAndSet 等高级方法,可以应对除了加减之外的更复杂的需要 CAS 的场景。

因此,如果仅仅是需要用到加和减操作的话,那么可以直接使用更高效的 LongAdder,但如果需要利用 CAS 比如 compareAndSet 等操作的话,就需要使用 AtomicLong 来完成。

三、Accumulator 积累器

Accumulator 和Adder 非常相似,实际上 Accumulator 就是一个更通用版本的 Adder,比如 LongAccumulator 是 LongAdder 的功能增强版,因为 LongAdder 的 API 只有对数值的加减,而 LongAccumulator 提供了自定义的函数操作。

以下是使用示例

public  class  LongAccumulatorDemo  {

        public  static  void  main(String[]  args)  throws  InterruptedException  {

                LongAccumulator  accumulator  =  new  LongAccumulator((x,  y)  ->  x  +  y,  0);
                ExecutorService  executor  =  Executors.newFixedThreadPool(8);
                IntStream.range(1,  10).forEach(i  ->  executor.submit(()  ->  accumulator.accumulate(i)));

                Thread.sleep(2000);
                System.out.println(accumulator.getThenReset());
        }
}

上述代码中:

  • 首先新建了一个 LongAccumulator,同时给它传入了两个参数;
  • 然后又新建了一个 8 线程的线程池,并且利用整形流也就是 IntStream 往线程池中提交了从 1 ~ 9 这 9 个任务;
  • 之后等待了两秒钟,这两秒钟的作用是等待线程池的任务执行完毕;
  • 最后把 accumulator 的值打印出来。

最终的运行结果是 45,代表 0+1+2+3+…+8+9=45。这个结果是怎么出来的?

LongAccumulator  accumulator  =  new  LongAccumulator((x,  y)  ->  x  +  y,  0);

LongAccumulator 的构造函数中,第一个参数是二元表达式;第二个参数是 x 的初始值,传入的是 0。在二元表达式中,x 是上一次计算的结果(除了第一次的时候需要传入),y 是本次新传入的值。

具体过程是,x首先是构造函数中的0,y首先是 accumulator.accumulate(1) 方法所传入的 1;之后计算结果为1并赋给x,而y是ccumulator.accumulate(2) 传入的 2…以此类推。

除了简单的加减,Accumulator 还可以进行更多的函数操作:

LongAccumulator  counter  =  new  LongAccumulator((x,  y)  ->  x  +  y,  0);
LongAccumulator  result  =  new  LongAccumulator((x,  y)  ->  x  *  y,  0);
LongAccumulator  min  =  new  LongAccumulator((x,  y)  ->  Math.min(x,  y),  0);
LongAccumulator  max  =  new  LongAccumulator((x,  y)  ->  Math.max(x,  y),  0);

这里有个问题,既然是这种累计的计算,为什么不用for循环实现呢?原因是for循环执行的时候是串行,而LongAccumulator 可以使用线程池,一旦使用了线程池,那么多个线程之间是可以并行计算的,效率要比之前的串行高得多。但是,多线程的计算并不是顺序固定的,需要保证运算即使不按顺序进行也能得到正确结果。

Accumulator 的使用需要满足以下几个条件

  • 一是需要大量的计算,此时累加器的计算效率比for循环更高
  • 二是计算的执行顺序并不关键,也就是说它不要求各个计算之间的执行顺序,执行的先后并不影响最终的结果。

四、原子类 VS volatile / synchronized

1. 原子类 VS volatile

前面提到,volatile可以保证可见性,主要的原理是基于本地内存和共享内存之间的flush和reflush操作。但是volatile也仅仅只能做到保证可见性,大部分场景下由于无法保证原子性,是无法保证线程安全的。

对于原子类来说,天生支持原子性,同时基于CAS可以保证线程安全。

以具体场景来说,volatile可以用于解决可见性问题,也可以用来修饰 boolean 类型的标记位,因为对于标记位来讲,直接的赋值操作本身就是具备原子性的,再加上 volatile 保证了可见性,那么就是线程安全的了。而对于组合操作,需要用同步来解决原子性问题的话,那么可以使用原子变量,不能用volatile

2. 原子类 VS synchronized

原子类和synchronized都可以保证线程安全,但是两者之间还是有区别的:

  • 原理不同:synchronized内部基于monitor 锁,在执行同步代码之前,需要首先获取到 monitor 锁,执行完毕后,再释放锁。而原子类则是利用了 CAS 操作
  • 使用范围不同:原子类的使用范围是比较局限的。因为一个原子类仅仅是一个对象,不够灵活。synchronized 的使用范围要广泛得多,既可以修饰一个方法,又可以修饰一段代码
  • 粒度不同:原子变量的粒度是比较小的,它可以把竞争范围缩小到变量级别
  • 性能问题: synchronized 是一种典型的悲观锁,而原子类利用的是乐观锁,两者的性能比较本质上是悲观锁和乐观锁的比较。悲观锁虽然比较重量级的,但是它的开销是固定的,随着时间的增加,这种开销并不会线性增长;乐观锁虽然在短期内的开销不大,但是随着时间的增加,它的开销也是逐步上涨的。在竞争非常激烈的情况下,推荐使用 synchronized;而在竞争不激烈的情况下,使用原子类会得到更好的效果。
    [nbsp]: https://pottercoding.cn/wp-content/uploads/cloud/images/2024/4/5/2017/1712319448600.png