上一篇atomic原子类中对比了LongAdder、LongAccumulator 性能要比AtomicLong快
那么我们看一看LongAdder为什么这么快吧。
架构LongAdder是Striped64的子类
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
}
Striped64继承自Number
abstract class Striped64 extends Number {
Striped64
LongAdder是Striped64的子类
Striped64有几个比较重要的成员函数
Striped64.java
/** Number of CPUS, to place bound on table size CPU数量,即cells数组的最大长度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
cells数组,为2的幂,2,4,8,16.....,方便以后位运算
*/
transient volatile Cell[] cells;//最大值为CPU数量
/**基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
/**创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。 1为加锁
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
Striped64中一些变量或者方法的定义
其中最重要的两个变量是cells和base
是 java.util.concurrent.atomic 下 Striped64 的一个内部类
cas(long prev, long next)
方法 表示用 cas 方式进行累加, prev 表示旧值, next 表示新值
@sun.misc.Contended
注解为了防止缓存行伪共享
缓存与内存的速度比较
因为 CPU 与 内存的速度差异很大,如果每次都到内存中去读数据,那么时间都耗费在读取上了,
那么可以预读数据至缓存来提升效率。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long) 缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU0使用这份数据,CPU1也使用这份数据、他们都会将数据读到各自的缓存中。
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效 (64byte的数据都失效)
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了: Core-0 要修改 Cell[0] Core-1 要修改 Cell[1] 无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效 @sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding填充,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
官网说明
当多个线程更新用于收集统计信息(而不是细粒度同步控制)等目的的公共和时,该类通常比AtomicLong更可取。在低更新竞争下,这两个类具有相似的特性。但在高争用情况下,该类的预期吞吐量明显更高,而代价是更高的空间消耗。
阿里Java开发手册
LongAdder的基本思路是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
//获得long值的LongAdder.longValue()调用的是sum()
public long longValue() {
return sum();
}
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。
Striped64内部有一个base变量,一个Cell[]数组。
base变量:非竞态条件下,直接累加到该变量上
Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
数学表达
sum= Base +
AtomicLong 基于 base + CAS自旋
5000W并发使用CAS就有几千W自旋的请求
LongAdder 基于base+cell[]
采用分散热点思想 ,将5000W并发分散到cell数组中
比如可以线程id对cell个数进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标
然后获取值时,将各个槽中的结果累加返回 ,即 base+cell[]
其中cell[]大小为2的次幂,cell[最大值为CPU核数
所以
并发低时 LongAdder 相当于AtomicLong =>base
并发高时LongAdder 相当于AtomicLong+cell数组=》base+cell[]
可以拿购物来举例,如果买的少就可用口袋装,口袋就是base
如果买的多了,口袋装不下,就要买购物袋装,购物袋就是cell数组
买的所有东西就是口袋装的加上购物袋装的==》sum=base+cell数组
longAdder.increment()源码解读LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。
数学表达
对于自增的方法incremen
LongAdder longAdder = new LongAdder();
public void add_LongAdder() {
longAdder.increment();
}
//longAdder.increment()
public void increment() {
add(1L);
}
increment调用的是add()方法
add()public void add(long x) {//1
// as 为累加单元数组
// b 为基础值
Cell[] as; long b, v; int m; Cell a;
// 进入 if 的两个条件
// 1. as 有值, 表示已经发生过竞争, 初始化了cells,
// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
//此时cells未初始化,cells=null ;
//casBase(b = base, b + x) cas操作能够成功
if ((as = cells) != null || !casBase(b = base, b + x)) {//结果为(false||false)=》if条件不满足
// uncontended= true
boolean uncontended = true;
if (as == null || (m = as.length - 1) 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n 0 &&
//true
rs[j = (m - 1) & h] == null) {
//将cells单元附到cell[]数组上 即该槽位的值设置为1.
rs[j] = r;
created = true;
}
} finally {
//释放锁
cellsBusy = 0;
}
if (created)//created = true; break
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?