您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 2浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

LongAdder源码解析

Dongguo丶 发布时间:2021-09-25 09:00:44 ,浏览量:2

LongAdder源码

上一篇atomic原子类中对比了LongAdder、LongAccumulator 性能要比AtomicLong快

那么我们看一看LongAdder为什么这么快吧。

架构

image-20210907171758445

LongAdder是Striped64的子类

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;
   
}

Striped64继承自Number

abstract class Striped64 extends Number {
Striped64

LongAdder是Striped64的子类

Striped64有几个比较重要的成员函数

Striped64.java

 
/** Number of CPUS, to place bound on table size        CPU数量,即cells数组的最大长度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
 * Table of cells. When non-null, size is a power of 2.
cells数组,为2的幂,2,4,8,16.....,方便以后位运算
 */
transient volatile Cell[] cells;//最大值为CPU数量

/**基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
 * Base value, used mainly when there is no contention, but also as
 * a fallback during table initialization races. Updated via CAS.
 */
transient volatile long base;

/**创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。  1为加锁
 * Spinlock (locked via CAS) used when resizing and/or creating Cells. 
 */
transient volatile int cellsBusy;

Striped64中一些变量或者方法的定义

image-20210907173447887

其中最重要的两个变量是cells和base

image-20210924203115219

Cell 累加单元

是 java.util.concurrent.atomic 下 Striped64 的一个内部类

image-20210907173509728

cas(long prev, long next) 方法 表示用 cas 方式进行累加, prev 表示旧值, next 表示新值

@sun.misc.Contended注解为了防止缓存行伪共享

缓存行伪共享

缓存与内存的速度比较

image-20210912204426151

从 cpu 到大约需要的时钟周期寄存器1 cycle (4GHz 的 CPU 约为0.25ns)L13~4 cycleL210~20 cycleL340~45 cycle内存120~240 cycle

因为 CPU 与 内存的速度差异很大,如果每次都到内存中去读数据,那么时间都耗费在读取上了,

那么可以预读数据至缓存来提升效率。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long) 缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中

​ CPU0使用这份数据,CPU1也使用这份数据、他们都会将数据读到各自的缓存中。

CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效 (64byte的数据都失效)

image-20210912204619829

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了: Core-0 要修改 Cell[0] Core-1 要修改 Cell[1] 无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效 @sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding填充,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

image-20210912204713413

LongAdder为什么这么快

官网说明

image-20210907171957149

当多个线程更新用于收集统计信息(而不是细粒度同步控制)等目的的公共和时,该类通常比AtomicLong更可取。在低更新竞争下,这两个类具有相似的特性。但在高争用情况下,该类的预期吞吐量明显更高,而代价是更高的空间消耗。

阿里Java开发手册

image-20210907132620107

LongAdder的基本思路是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

//获得long值的LongAdder.longValue()调用的是sum()
    public long longValue() {
        return sum();
    }

sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

Striped64内部有一个base变量,一个Cell[]数组。

base变量:非竞态条件下,直接累加到该变量上

Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

数学表达

sum= Base + image-20210907174640111

image-20210907174648198

AtomicLong 基于 base + CAS自旋

5000W并发使用CAS就有几千W自旋的请求

LongAdder 基于base+cell[]

采用分散热点思想 ,将5000W并发分散到cell数组中

比如可以线程id对cell个数进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标

然后获取值时,将各个槽中的结果累加返回 ,即 base+cell[]

其中cell[]大小为2的次幂,cell[最大值为CPU核数

所以

并发低时 LongAdder 相当于AtomicLong =>base

并发高时LongAdder 相当于AtomicLong+cell数组=》base+cell[]

可以拿购物来举例,如果买的少就可用口袋装,口袋就是base

如果买的多了,口袋装不下,就要买购物袋装,购物袋就是cell数组

买的所有东西就是口袋装的加上购物袋装的==》sum=base+cell数组

longAdder.increment()源码解读

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

image-20210907183507346

数学表达

image-20210907173616479

对于自增的方法incremen

LongAdder longAdder = new LongAdder();

public void add_LongAdder() {
    longAdder.increment();
}
//longAdder.increment()
    public void increment() {
        add(1L);
    }

increment调用的是add()方法

add()

1

1当竞争不激烈,cells还未初始化
public void add(long x) {//1
    // as 为累加单元数组
	// b 为基础值
    Cell[] as; long b, v; int m; Cell a;
    // 进入 if 的两个条件
    // 1. as 有值, 表示已经发生过竞争, 初始化了cells,
    // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
    //此时cells未初始化,cells=null  ;
    //casBase(b = base, b + x)  cas操作能够成功
    if ((as = cells) != null || !casBase(b = base, b + x)) {//结果为(false||false)=》if条件不满足
        // uncontended= true 
        boolean uncontended = true;
        if (as == null || (m = as.length - 1)  0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            //
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n  0 &&
                                //true
                                rs[j = (m - 1) & h] == null) {
                                //将cells单元附到cell[]数组上 即该槽位的值设置为1.
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            //释放锁
                            cellsBusy = 0;
                        }
                        if (created)//created = true;  break
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n             
关注
打赏
1638062488
查看更多评论
0.0377s