多线程环境可以使用 读用volatile 、写用锁保证线程安全(基本数据类型)
public class T3
{
volatile int number = 0;
//读取
public int getNumber()
{
return number;
}
//写入加锁保证原子性
public synchronized void setNumber()
{
number++;
}
}
多线程环境 还可以使用原子类保证线程安全(基本数据类型)
public class T3
{
volatile int number = 0;
//读取
public int getNumber()
{
return number;
}
//写入加锁保证原子性
public synchronized void setNumber()
{
number++;
}
//=================================
AtomicInteger atomicInteger = new AtomicInteger();
public int getAtomicInteger()
{
return atomicInteger.get();
}
public void setAtomicInteger()
{
atomicInteger.getAndIncrement();
}
}
前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
Unsafe.class
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
使用到了compareAndSwapInt ,它其实是CAS理论的一种实现
CASCompare And Swap(exchange),比较并交换(无锁机制)。可以通过 CAS 来进行同步,保证原子性,是实现并发算法时常用到的一种技术。
无论是 ReentrantLock 内部的 AQS,还是各种 Atomic 开头的原子类,内部都应用到了 CAS。
CAS 可以保证一次的读-改-写操作是原子操作。
当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,并可以再次尝试。
原理:
在 CAS 中有三个参数:内存值 V、旧的预期值 A、新值 B ,当且仅当内存值 V 的值等于旧的预期值 A 时,才会将内存值 V 的值修改为 B,否则什么都不干。并再次尝试。
获取共享变量V时,为了保证该变量的可见性,需要使用 volatile 修饰。
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
CAS是思想 具体实现比如atomic原子类
自旋锁也借鉴了CAS思想
源码CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。 它是非阻塞的且自身原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。
CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。 执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现的, 其实在这一点上还是有排他锁的,只是比起用synchronized, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好
其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线(有的说法是锁的北桥信号TODO)锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的
public class CASDemo
{
public static void main(String[] args) throws InterruptedException
{
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2020)+"\t"+atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 1024)+"\t"+atomicInteger.get());
}
}
运行结果
5
true 308
false 308
源码分析compareAndSet(int expect,int update)
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);//valueOffset内存位置偏移量v
}
compareAndSet()方法的源代码:
Unsafe.class中
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
上面三个方法都是类似的,主要对4个参数做一下说明。 var1:表示要操作的对象 var2:表示要操作对象中属性地址的偏移量 var4:表示需要修改数据的期望的值 var5/var6:表示需要修改为的新值
AtomicInteger中的UnSafe类public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
...
}
1 Unsafe 是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。 注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务
2 变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
3 变量value用volatile修饰,保证了多线程之间的内存可见性。
我们知道i++线程不安全的,那atomicInteger.getAndIncrement()为什么能保证线程安全?在多线程环境下,int 类型的自增操作(i++)不是原子的,线程不安全,可以使用 AtomicInteger 代替。
CAS的全称为Compare-And-Swap,它是一条CPU并发原语。 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。 AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。
AtomicInteger atomicInteger = new AtomicInteger(0);
atomicInteger.incrementAndGet();
incrementAndGet()内部调用unsafe.getAndAddInt(this, valueOffset, 1)
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
字节码Unsafe.class
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
//CAS操作
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
compareAndSwapInt是native方法实现
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
C++中底层实现OpenJDK源码里面查看下Unsafe.java
假设线程A和线程B两个线程同时执行getAndAddInt操作(分别跑在不同CPU上):
1 AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存。
2 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起。
3 线程B也通过getIntVolatile(var1, var2)方法获取到value值3,此时刚好线程B没有被挂起并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为4,线程B打完收工,一切OK。
4 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值已经被其它线程抢先一步修改过了,那A线程本次修改失败,只能重新读取重新来一遍了。
5 线程A重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功。
native修饰的方法代表是底层方法
Unsafe类中的compareAndSwapInt,是一个本地方法,该方法的实现位于unsafe.cpp
中
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 先想办法拿到变量value在内存中的地址,根据偏移量valueOffset,计算 value 的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是即将更新的值,参数e是原内存的值
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
Atomic::cmpxchg(x, addr, e)
调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是即将更新的值,参数e是原内存的值
atomic.cpp
unsigned Atomic::cmpxchg(unsigned int exchange_value,volatile unsigned int* dest, unsigned int compare_value) {
assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
/*
* 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载函数*/
return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest, (jint)compare_value);
}
在不同的操作系统下会调用不同的cmpxchg重载函数,
Window环境atomic_windows_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
//判断是否是多核CPU
int mp = os::is_MP();
__asm {
//三个move指令表示的是将后面的值移动到前面的寄存器上
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
//CPU原语级别,CPU触发
LOCK_IF_MP(mp)// 多CPU底层加锁
//比较并交换指令
//cmpxchg: 即“比较并交换”指令
//dword: 全称是 double word 表示两个字,一共四个字节
//ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元
//将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值进行对比,
//如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中
cmpxchg dword ptr [edx], ecx
}
}
CAS真正实现的机制了,它最终是由操作系统的汇编指令完成的。
linux环境atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
调用cmpxchg(x,addr,e)方法
接着cmpxchg是一个内联方法
asm_ volatile 汇编语言 是具体的实现
Lock_IF_MP multi processors 如果是多个CPU就加Lock
cmpxchg指令 即Compare And exchange 如果一个CPU执行cmpxchg指令
os.hpp
// Interface for detecting multiprocessor system
static inline bool is_MP() {
assert(_processor_count > 0, "invalid processor count");
return _processor_count > 1 || AssumeMP;
}
atomic_linux_x86.inline.hpp
// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
总结
CAS是靠硬件实现的从而在硬件层面提升效率,最底层还是交给硬件来保证原子性和可见性
实现方式是基于硬件平台的汇编指令 lock cmpxchg
指令,
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线(有的说法是锁的北桥信号)锁住,在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。
cmpxchg = cas修改变量值的操作
核心思想就是:比较要更新变量的值V和预期值E(compare),相等才会将V的值设为新值N(swap)如果不相等自旋再来。
原子类原子类中使用到了CAS
除了AtomicInteger原子整型,可否有其它原子类型?
比如AtomicReference原子引用AtomicBook 、AtomicOrder。
之后的文章会详细介绍原子类
package com.dongguo.atomic;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author Dongguo
* @date 2021/9/7 0007-10:25
* @description:
*/
public class AtomicReferenceDemo {
public static void main(String[] args) {
User z3 = new User("z3", 24);
User li4 = new User("li4", 26);
AtomicReference atomicReferenceUser = new AtomicReference();
atomicReferenceUser.set(z3);
//如果User是z3对象就修改为li4对象
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
System.out.println(atomicReferenceUser.compareAndSet(z3, li4) + "\t" + atomicReferenceUser.get().toString());
}
}
@Getter
@ToString
@AllArgsConstructor
class User {
String userName;
int age;
}
运行结果
true User(userName=li4, age=26)
false User(userName=li4, age=26)
自旋锁,借鉴CAS思想
自旋锁(spinlock)
是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁, 当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU
OpenJDK源码里面查看下Unsafe.java do while循环
自己实现一个自旋锁SpinLockDemo
package com.dongguo.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author Dongguo
* @date 2021/9/7 0007-10:36
* 题目:实现一个自旋锁
* 自旋锁好处:循环比较获取没有类似wait的阻塞。
*
* 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒钟,B随后进来后发现
* 当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到。
*/
public class SpinLockDemo {
AtomicReference atomicReference = new AtomicReference();
public void MyLock() {
System.out.println(Thread.currentThread().getName() + " come in ");
//修改不成功
while (!atomicReference.compareAndSet(null, Thread.currentThread())) {
//循环等待(自旋)
}
//修改成功
System.out.println(Thread.currentThread().getName() + "加锁成功");
}
public void MyUnLock() {
atomicReference.compareAndSet(Thread.currentThread(), null);
System.out.println(Thread.currentThread().getName() + "释放锁成功");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.MyLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.MyUnLock();
}, "t1").start();
//暂停一会儿线程,保证A线程先于B线程启动并完成
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
spinLockDemo.MyLock();
spinLockDemo.MyUnLock();
}, "t2").start();
}
}
运行结果
t1 come in
t1加锁成功
t2 come in
t1释放锁成功
t2加锁成功
t2释放锁成功
CAS 的特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。 CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我再重试。 synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。 CAS 体现的是无锁并发、无阻塞并发, 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
CAS缺点: ABA问题: 加版本号CAS会导致“ABA问题”。
CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。
比如说一号线程读取的值为 A,二号线程也从内存中
读取了 A,并且进行了一些操作变成了 B,然后又变成 A,
这时候一号线程发现值仍然是 A,尽管一号线程cas的操作成功,但是不代表这个过程就是没有问题的。可能存在潜藏的问题。
package com.dongguo.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Dongguo
* @date 2021/9/7 0007-12:48
* @description:
*/
public class ABADemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(10);
new Thread(()->{
atomicInteger.compareAndSet(10,100);
atomicInteger.compareAndSet(100,10);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
//保证t1发生在t2之前
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
boolean result = atomicInteger.compareAndSet(10, 123);
System.out.println("结果为"+result+" 值为"+atomicInteger.get());
},"t2").start();
}
}
解决:
1使用版本号确认有没有被修改
在变量上追加上版本号,每次变量更新的时候把版本号加1,
A-B-A 就变成了A1-B2-A3
2时间戳
3 AtomicReference
从 Java1.5 开始 JDK 的 atomic包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。AtomicMarkableReference(通过引入一个boolean来反映中间有没有变过),
AtomicStampedReference(通过引入一个int来累加来反映中间有没有变过)
这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是 否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
package com.dongguo.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* @author Dongguo
* @date 2021/9/7 0007-12:48
* @description:
*/
public class ABADemo {
public static void main(String[] args) {
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(10, 1);
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();//首次版本号
System.out.println(Thread.currentThread().getName() + " 1首次版本号" + stamp + "值为" + atomicStampedReference.getReference());
//保证t1、t2得到相同版本号
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(10, 100, stamp, stamp + 1);
stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + " 2次版本号" + stamp + "值为" + atomicStampedReference.getReference());
atomicStampedReference.compareAndSet(100, 10, stamp, stamp + 1);
stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + " 3次版本号" + stamp + "值为" + atomicStampedReference.getReference());
}, "t1").start();
//保证t1发生在t2之前
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();//首次版本号
System.out.println(Thread.currentThread().getName() + " 1首次版本号" + stamp + "值为" + atomicStampedReference.getReference());
//保证t1、t2得到相同版本号 并且t1发生ABA问题
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(10, 123, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "修改结果为" + result + " 值为" + atomicStampedReference.getReference());
}, "t2").start();
}
}
t2 1首次版本号1值为10
t1 1首次版本号1值为10
t1 2次版本号2值为100
t1 3次版本号3值为10
t2修改结果为false 值为10
循环时间长开销大:
对于资源竞争严重(线程冲突严重)的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized。 Cas失败,会一直进行尝试
我们可以看到getAndAddInt方法执行时,有个do while
AtomicInteger.java
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
Unsafe.class
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。
只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。
还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。