在日常开发中我们常常使用到 AtomicLong 原子类
来进行计数等统计,AtomicLong
内部使用 CAS+自旋
来保证并发情况的原子性。
但是 AtomicLong 在高并发情况下的性能会急剧下降,在 JDK8
中 Doug Lea
大神 新写了一个 LongAdder
来解决此问题,阿里巴巴开发手册(泰山版)也推荐高并发下使用 LongAdder,它到底有何种黑科技呢?阅读完本文你将会学到:
- 为什么 AtomicLong 在高并发场景下性能急剧下降?
- LongAdder 为什么快?
- LongAdder 实现原理(图文分析)
- AtomicLong 是否可以被遗弃或替换?
最近阿里巴巴发布了Java 开发手册(泰山版)
,其中第17 条
写到:
对于Java 项目中
计数统计的一些需求,如果是 JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观锁的重试次数)
在大多数项目及开源组件中,计数统计使用最多的仍然还是AtomicLong
,虽然是阿里巴巴这样说,但是我们仍然要根据使用场景来决定是否使用LongAdder
。
今天主要是来讲讲LongAdder
的实现原理,还是老方式,通过图文一步步解开LongAdder
神秘的面纱,通过此篇文章你会了解到:
- 为什么 AtomicLong 在高并发场景下性能急剧下降?
- LongAdder 为什么快?
- LongAdder 实现原理(图文分析)
- AtomicLong 是否可以被遗弃或替换?
本文代码全部基于 JDK 1.8,建议边看文章边看源码更加利于消化
AtomicLong当我们在进行计数统计的时,通常会使用AtomicLong
来实现。AtomicLong
能保证并发情况下计数的准确性,其内部通过CAS
来解决并发安全性的问题。
说到线程安全的计数统计工具类,肯定少不了Atomic
下的几个原子类。AtomicLong
就是juc 包下重要的原子类,在并发情况下可以对长整形类型数据进行原子操作,保证并发情况下数据的安全性。
public class AtomicLong extends Number implements java.io.Serializable { public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } public final long decrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; }}
我们在计数的过程中,一般使用incrementAndGet()
和decrementAndGet()
进行加一和减一操作,这里调用了Unsafe
类中的getAndAddLong()
方法进行操作。
接着看看unsafe.getAndAddLong()
方法:
public final class Unsafe { public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);}
这里直接进行CAS+自旋操作更新AtomicLong
中的valu
e 值,进而保证value
值的原子性更新。
如上代码所示,我们在使用CAS + 自旋的过程中,在高并发环境下,N 个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong
的自旋会成为瓶颈。
如上图所示,高并发场景下AtomicLong
性能会急剧下降,我们后面也会举例说明。
那么高并发下计数的需求有没有更好的替代方案呢?在JDK8
中 Doug Lea
大神 新写了一个LongAdder
来解决此问题,我们后面来看LongAdder
是如何优化的。
我们说了很多LongAdder
上性能优于AtomicLong
,到底是不是呢?一切还是以代码说话:
/** * Atomic 和 LongAdder 耗时测试 * * @author:一枝花算不算浪漫 * @date:2020-05-12 7:06 */public class AtomicLongAdderTest { public static void main(String[] args) throws Exception{ testAtomicLongAdder(1, 10000000); testAtomicLongAdder(10, 10000000); testAtomicLongAdder(100, 10000000); } static void testAtomicLongAdder(int threadCount, int times) throws Exception{ System.out.println("threadCount: " + threadCount + ", times: " + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start) + "ms"); System.out.println("threadCount: " + threadCount + ", times: " + times); long atomicStart = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - atomicStart) + "ms"); System.out.println("----------------------------------------"); } static void testAtomicLong(int threadCount, int times) throws Exception{ AtomicLong atomicLong = new AtomicLong(); List list = Lists.newArrayList(); for (int i = 0; i < threadCount; i++) { list.add(new Thread(() -> { for (int j = 0; j < times; j++) { atomicLong.incrementAndGet(); } })); } for (Thread thread : list) { thread.start(); } for (Thread thread : list) { thread.join(); } System.out.println("AtomicLong value is : " + atomicLong.get()); } static void testLongAdder(int threadCount, int times) throws Exception{ LongAdder longAdder = new LongAdder(); List list = Lists.newArrayList(); for (int i = 0; i < threadCount; i++) { list.add(new Thread(() -> { for (int j = 0; j < times; j++) { longAdder.increment(); } })); } for (Thread thread : list) { thread.start(); } for (Thread thread : list) { thread.join(); } System.out.println("LongAdder value is : " + longAdder.longValue()); }}
执行结果:
这里可以看到随着并发的增加,AtomicLong
性能是急剧下降的,耗时是LongAdder
的数倍。至于原因我们还是接着往后看。
先看下LongAdder
的操作原理图:
既然说到LongAdder
可以显著提升高并发环境下的性能,那么它是如何做到的?
1、 设计思想上,LongAdder
采用"分段"的方式降低CAS
失败的频次
这里先简单的说下LongAdder
的思路,后面还会详述LongAdder
的原理。
我们知道,AtomicLong
中有个内部变量value
保存着实际的long
值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value
变量其实是一个热点数据,也就是N 个线程竞争一个热点。
LongAdder
的基本思路就是分散热点,将value
值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个value
值进行CAS
操作,这样热点就被分散了,冲突的概率就小很多。
LongAdder
有一个全局变量volatile long base
值,当并发不高的情况下都是通过CAS
来直接操作base
值,如果CAS
失败,则针对LongAdder
中的Cell[]
数组中的Cell
进行CA
S 操作,减少失败的概率。
例如当前类中base = 10
,有三个线程进行CAS
原子性的+1 操作,线程一执行成功,此时 base=11,线程二、线程三执行失败后开始针对于Cell[]
数组中的Cell
元素进行+1 操作,同样也是CAS
操作,此时数组index=1
和index=2
中Cell
的value
都被设置为了 1.
执行完成后,统计累加数据:sum = 11 + 1 + 1 = 13
,利用LongAdder
进行累加的操作就执行完了,流程图如下:
如果要获取真正的long
值,只要将各个槽中的变量值累加返回。这种分段的做法类似于JDK7
中ConcurrentHashMap
的分段锁。
2、使用 Contended 注解来消除伪共享
在 LongAdder
的父类 Striped64
中存在一个 volatile Cell[] cells;
数组,其长度是2 的幂次方,每个Cell
都使用 @Contended
注解进行修饰,而@Contended
注解可以进行缓存行填充,从而解决伪共享问题。伪共享会导致缓存行失效,缓存一致性开销变大。
@sun.misc.Contended static final class Cell {}
伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的 CPU 缓存失效
。尽管这些变量之间没有任何关系,但由于在主内存中邻近,存在于同一个缓存行之中,它们的相互覆盖会导致频繁的缓存未命中,引发性能下降。这里对于伪共享我只是提一下概念,并不会深入去讲解,大家可以自行查阅一些资料。
解决伪共享的方法一般都是使用直接填充,我们只需要保证不同线程的变量存在于不同的 CacheLine
即可,使用多余的字节来填充可以做点这一点,这样就不会出现伪共享问题。例如在Disruptor 队列
的设计中就有类似设计(可参考我之前的博客文章:Disruptor 学习笔记):
在Striped64
类中我们可以看看Doug Lea
在Cell
上加的注释也有说明这一点:
红框中的翻译如下:
Cell
类是AtomicLong
添加了padded(via@sun.misc.compended)
来消除伪共享的变种版本。缓存行填充对于大多数原子来说是繁琐的,因为它们通常不规则地分散在内存中,因此彼此之间不会有太大的干扰。但是,驻留在数组中的原子对象往往彼此相邻,因此在没有这种预防措施的情况下,通常会共享缓存行数据(对性能有巨大的负面影响)。
3、惰性求值
LongAdder
只有在使用longValue()
获取当前累加值时才会真正的去结算计数的数据,longValue()
方法底层就是调用sum()
方法,对base
和Cell 数组
的数据累加然后返回,做到数据写入和读取分离。
而AtomicLong
使用incrementAndGet()
每次都会返回long
类型的计数值,每次递增后还会伴随着数据返回,增加了额外的开销。
之前说了,AtomicLong
是多个线程针对单个热点值 value 进行原子操作。而LongAdder
是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS 操作
。
比如有三个线程同时对 value 增加 1,那么 value = 1 + 1 + 1 = 3
但是对于 LongAdder 来说,内部有一个 base 变量,一个 Cell[]数组。base 变量:非竞态条件下,直接累加到该变量上Cell[]数组:竞态条件下,累加个各个线程自己的槽 Cell[i]中最终结果的计算是下面这个形式:
value = base + $\sum\limits_{i=0}^nCell[i]$
LongAdder 源码剖析前面已经用图分析了LongAdder
高性能的原理,我们继续看下LongAdder
实现的源码:
public class LongAdder extends Striped64 implements Serializable { public void increment() { add(1L); } public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); }}
一般我们进行计数时都会使用increment()
方法,每次进行+1 操作,increment()
会直接调用add()
方法。
变量说明:
- as 表示 cells 引用
- b 表示获取的 base 值
- v 表示 期望值,
- m 表示 cells 数组的长度
- a 表示当前线程命中的 cell 单元格
条件分析:
条件一:as == null || (m = as.length - 1) < 0此条件成立说明 cells 数组未初始化。如果不成立则说明 cells 数组已经完成初始化,对应的线程需要找到 Cell 数组中的元素去写值。
条件二:(a = as[getProbe() & m]) == nullgetProbe()获取当前线程的 hash 值,m 表示 cells 长度-1,cells 长度是 2 的幂次方数,原因之前也讲到过,与数组长度取模可以转化为按位与运算,提升计算性能。
当条件成立时说明当前线程通过 hash 计算出来数组位置处的 cell 为空,进一步去执行 longAccumulate()方法。如果不成立则说明对应的 cell 不为空,下一步将要将 x 值通过 CAS 操作添加到 cell 中。
条件三:!(uncontended = a.cas(v = a.value, v + x)主要看 a.cas(v = a.value, v + x),接着条件二,说明当前线程 hash 与数组长度取模计算出的位置的 cell 有值,此时直接尝试一次 CAS 操作,如果成功则退出 if 条件,失败则继续往下执行 longAccumulate()方法。
接着往下看核心的longAccumulate()
方法,代码很长,后面会一步步分析,先上代码:
java.util.concurrent.atomic.Striped64.
:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n 0) { } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) }}
如上所示,第一个if 语句代表 CASE1
,里面再有if 判断
会以CASE1.1
这种形式来讲解,下面接着的else if
为CASE2
, 最后一个为CASE3
if ((as = cells) != null && (n = as.length) > 0) {}
cells 数组
不为空,且数组长度大于 0 的情况会执行CASE1
,CASE1
的实现细节代码较多,放到最后面讲解。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break;}
CASE2
标识cells 数组
还未初始化,因为判断cells == as
,这个代表当前线程到了这里获取的cells
还是之前的一致。我们可以先看这个case
,最后再回头看最为麻烦的CASE1
实现逻辑。
cellsBusy
上面说了是加锁的状态,初始化cells 数组
和扩容的时候都要获取加锁的状态,这个是通过CAS
来实现的,为 0 代表无锁状态,为 1 代表其他线程已经持有锁了。cells==as
代表当前线程持有的数组未进行修改过,casCellsBusy()
通过CAS 操作
去获取锁。但是里面的if 条件
又再次判断了cell==as
,这一点是不是很奇怪?通过画图来说明下问题:
如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]
表示数组的长度为 2,rs[h & 1] = new Cell(x)
表示创建一个新的Cell 元素
,value 是 x 值,默认为 1。
h & 1
类似于我们之前HashMap
或者ThreadLocal
里面经常用到的计算散列桶index
的算法,通常都是hash & (table.len - 1)
,这里就不做过多解释了。 执行完成后直接退出for 循环
。
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
进入到这里说明cells
正在或者已经初始化过了,执行caseBase()
方法,通过CAS 操作
来修改base
的值,如果修改成功则跳出循环,这个CAS
E 只有在初始化Cell 数组
的时候,多个线程尝试CAS
修改cellsBusy
加锁的时候,失败的线程会走到这个分支,然后直接CAS
修改base
数据。
分析完了CASE2 和 CASE3
,我们再折头回看一下CASE1
,进入CASE1
的前提是:cells 数组
不为空,已经完成了初始化赋值操作。
接着还是一点点往下拆分代码,首先看第一个判断分支CASE1.1
:
if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false;}
这个 if 条件中(a = as[(n - 1) & h]) == null
代表当前线程对应的数组下标位置的cell
数据为null
,代表没有线程在此处创建Cell
对象。
接着判断cellsBusy==0
,代表当前锁未被占用。然后新创建Cell 对象
,接着又判断了一遍cellsBusy == 0
,然后执行casCellsBusy()
尝试通过 CAS 操作修改cellsBusy=1
,加锁成功后修改扩容意向collide = false;
for (;;) { if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } if (created) break; continue;}
上面代码判断当前线程hash
后指向的数据位置元素是否为空,如果为空则将cell
数据放入数组中,跳出循环。如果不为空则继续循环。
继续往下看代码,CASE1.2:
else if (!wasUncontended) wasUncontended = true;h = advanceProbe(h);
wasUncontended
表示cells
初始化后,当前线程竞争修改失败wasUncontended =false
,这里只是重新设置了这个值为true
,紧接着执行advanceProbe(h)
重置当前线程的hash
,重新循环。
接着看CASE1.3:
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
进入CASE1.3
说明当前线程对应的数组中有了数据,也重置过hash 值
,这时通过 CAS 操作尝试对当前数中的value 值
进行累加 x 操作,x 默认为 1,如果CAS
成功则直接跳出循环。
接着看CASE1.4:
else if (n >= NCPU || cells != as) collide = false;
如果cells 数组
的长度达到了CPU 核心数
,或者cells
扩容了,设置扩容意向collide 为 false
并通过下面的h = advanceProbe(h)
方法修改线程的probe
再重新尝试
至于这里为什么要提出和CPU 数量
做判断的问题:每个线程会通过线程对cells[threadHash%cells.length]
位置的Cell
对象中的value
做累加,这样相当于将线程绑定到了cells
中的某个cell
对象上,如果超过CPU 数量
的时候就不再扩容是因为CPU
的数量代表了机器处理能力,当超过CPU
数量时,多出来的cells
数组元素没有太大作用。
接着看CASE1.5:
else if (!collide) collide = true;
如果扩容意向collide
是false
则修改它为true
,然后重新计算当前线程的hash
值继续循环,在CASE1.4
中,如果当前数组的长度已经大于了CPU
的核数,就会再次设置扩容意向collide=false
,这里的意义是保证扩容意向为false
后不再继续往后执行CASE1.6
的扩容操作。
接着看 CASE1.6 分支:
else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?