- ReentrantReadWriteLock
- ReentrantReadWriteLock 类的整体结构
- 写锁的获取和释放
- 读锁的获取与释放
- 读写锁的意义
- 读写锁有以下三个重要的特性:
- 注意事项
- 读写锁的演变
- 读写锁演变案例
- 无锁实现
- 使用ReentrantLock
- 使用ReentrantReadWriteLock
- 读写锁的降级
- jdk8写锁降级为读锁过程
- 锁降级使用场景
- 先写后读案例
- 那么先读再写会怎么样呢?
- 不可锁升级
- ReentrantWriteLock应用实践之缓存
- 使用hashmap实现缓存
- 缓存更新策略
- 读写锁实现一致性缓存
- 总结
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
读写锁定义为 一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。
ReentrantReadWriteLock是ReadWriteLock接口的实现,ReentrantReadWriteLock中有两个静态内部类:ReadLock读锁和WriteLock写锁,这两个锁实现了Lock接口,ReentrantReadWriteLock支持可重入,同步功能依赖自定义同步器(AbstractQueuedSynchronizer)实现,读写状态就是其同步器的同步状态。同步状态由一个整型变量表示,因为这个变量需要表示多个线程的读和写的状态,因此读写锁在实现上将该变量的高16位表示读,低16位表示写。它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。读写互斥,读读共享
ReentrantReadWriteLock 类的整体结构public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}
/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
...
}
可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。
写锁的获取和释放线程进入写锁的前提条件: • 没有其他线程的读锁 • 没有其他线程的写锁
写锁WriteLock是支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取读锁时,读锁已经被获取并且该线程不是已获取写锁的线程,则当前线程进入等待状态。读写锁确保写锁的操作对读锁可见。写锁释放每次减少写状态,当前写状态为0时表示写锁已背释放。
读锁的获取与释放线程进入读锁的前提条件: • 没有其他线程的写锁 • 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。
读锁ReadLock是支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(写状态为0)时,读锁总是能够被成功地获取,而所做的也只是增加读状态(线程安全)。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已经被获取,则进入等待状态。
读写锁的意义『读写锁ReentrantReadWriteLock』并不是真正意义上的读写分离,它只允许读读共存,而读写和写写依然是互斥的, 大多实际场景是“读/读”线程间并不存在互斥关系,只有"读/写"线程或"写/写"线程间的操作需要互斥的。因此引入ReentrantReadWriteLock。
一个ReentrantReadWriteLock同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁 也即一个资源可以被多个读操作访问或一个写操作访问,但两者不能同时进行。
只有在读多写少情境之下,读写锁才具有较高的性能体现。
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的
select …from … lock in share mode (共享锁)
读写锁有以下三个重要的特性:(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
(2)重进入:读锁和写锁都支持线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
注意事项读锁不支持条件变量Condition
支持锁降级但不支持锁升级
读写锁的演变ReentrantReadWriteLock
有锁饥饿问题
锁降级。读线程不能写,写线程可以读 ,
写的时候,正在写的线程还可以进行读操作, 不同线程之间读写互斥,但是同一个线程因为是可重入读写锁,可以先获取写锁,再获取读锁。
读写锁演变案例模拟缓存的过程,对一个hashmap进行读和写操作
无锁实现package com.dongguo.readwrite;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/4 0004-9:46
* @description:
*/
public class Demo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//创建线程放数据
for (int i = 1; i {
myCache.put(num + "", num + "");
}, "t" + i).start();
}
//创建线程取数据
for (int i = 1; i {
myCache.get(num + "");
}, "t" + i).start();
}
}
}
class MyCache {
private volatile HashMap hashMap = new HashMap();
//放数据
public void put(String k, String v) {
try {
System.out.println(Thread.currentThread().getName() + "开始写入"+k);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
hashMap.put(k, v);
System.out.println(Thread.currentThread().getName() + "写入完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取数据
public void get(String k) {
System.out.println(Thread.currentThread().getName() + "开始读取");
String result = hashMap.get(k);
System.out.println(Thread.currentThread().getName() + "读取完成"+result);
}
}
运行结果
t1开始写入1
t2开始写入2
t3开始写入3
t4开始写入4
t1开始读取
t5开始写入5
t2开始读取
t1读取完成null
t2读取完成null
t4开始读取
t4读取完成null
t3开始读取
t3读取完成null
t5开始读取
t5读取完成null
t4写入完成
t3写入完成
t1写入完成
t2写入完成
t5写入完成
这样实现还没有写入完成,就有线程进行读取操作,读取的数据肯定不正确。
使用ReentrantLockpackage com.dongguo.readwrite;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Dongguo
* @date 2021/8/24 0024-19:44
* @description: ReadWriteLock
*/
public class ReadWriteLockDemo1 {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//创建线程放数据
for (int i = 0; i {
myCache.put(num + "", num + "");
}, "Thread"+i).start();
}
//创建线程取数据
for (int i = 0; i {
myCache.get(num + "");
}, "Thread"+(i + 5)).start();
}
}
//资源类
static class MyCache {
//创建map集合
private volatile Map map = new HashMap();
//创建锁对象
private Lock lock = new ReentrantLock();
//放数据
public void put(String key, Object value) {
//添加锁
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "写入:" + key);
//暂停一会
TimeUnit.MILLISECONDS.sleep(300);
//放数据
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完" + key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
lock.unlock();
}
}
//取数据
public Object get(String key) {
//添加锁
lock.lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + "取出:" + key);
result = map.get(key);
System.out.println(Thread.currentThread().getName() + "取完" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
lock.unlock();
}
return result;
}
}
}
运行结果
Thread0写入:0
Thread0写完0
Thread1写入:1
Thread1写完1
Thread2写入:2
Thread2写完2
Thread3写入:3
Thread3写完3
Thread4写入:4
Thread4写完4
Thread5取出:0
Thread5取完0
Thread6取出:1
Thread6取完1
Thread7取出:2
Thread7取完2
Thread8取出:3
Thread8取完3
Thread9取出:4
Thread9取完4
只能一次完成一个读或写操作,串行化 都是独占锁 读读也是独占锁无法共享,读的性能低
使用ReentrantReadWriteLock对一个hashmap进行读和写操作
package com.dongguo.readwrite;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Dongguo
* @date 2021/9/4 0004-9:46
* @description:
*/
public class Demo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//创建线程放数据
for (int i = 1; i {
myCache.put(num + "", num + "");
}, "t" + i).start();
}
//创建线程取数据
for (int i = 1; i {
myCache.get(num + "");
}, "t" + i).start();
}
}
}
class MyCache {
private volatile HashMap hashMap = new HashMap();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//放数据
public void put(String k, String v) {
try {
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "开始写入"+k);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
hashMap.put(k, v);
System.out.println(Thread.currentThread().getName() + "写入完成");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
//取数据
public void get(String k) {
try {
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "开始读取");
String result = hashMap.get(k);
System.out.println(Thread.currentThread().getName() + "读取完成"+result);
} finally {
readWriteLock.readLock().unlock();
}
}
}
运行结果:
t2开始写入2
t2写入完成
t1开始写入1
t1写入完成
t3开始写入3
t3写入完成
t4开始写入4
t4写入完成
t5开始写入5
t5写入完成
t1开始读取
t1读取完成1
t2开始读取
t2读取完成2
t5开始读取
t5读取完成5
t4开始读取
t4读取完成4
t3开始读取
t3读取完成3
ReentrantReadWriteLock的读锁是共享锁, 写锁是独占锁,要注意并发
对于邮戳锁,后续文章会介绍到
读写锁的降级锁降级 : 是指保持住当前的写锁(已拥有),再获取读锁,随后释放写锁的过程。
《Java 并发编程的艺术》中关于锁降级的说明:
锁的严苛程度变强叫做升级,反之叫做降级
将写锁降级为读锁(可以用类似Linux文件读写权限理解,就像写权限要高于读权限一样)
jdk8写锁降级为读锁过程Java8 官网说明
重入还允许通过获取写入锁定,然后读取锁然后释放写锁。 可以 从写锁到读锁,
但是,从读锁升级到写锁是不可能的。
Oracle公司ReentrantWriteReadLock源码总结
1 代码中声明了一个volatile类型的cacheValid变量,保证其可见性。
2 首先获取读锁,如果cache不可用,则释放读锁,获取写锁,在更改数据之前,再检查一次cacheValid的值,然后修改数据,将cacheValid置为true,然后在释放写锁前获取读锁;此时,cache中数据可用,处理cache中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性。
如果违背锁降级的步骤 如果当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程D获取了写锁并修改了数据,那么C线程无法感知到数据已被修改,则数据出现错误。
如果遵循锁降级的步骤 线程C在释放写锁之前获取读锁,那么线程D在获取写锁时将被阻塞,直到线程C完成数据处理过程,释放读锁。这样可以保证返回的数据是这次更新的数据,该机制是专门为了缓存设计的。
锁降级使用场景锁降级是为了让当前线程感知到数据的变化,目的是保证数据可见性
线程修改完数据之后, 经过耗时操作后再使用数据时,希望使用的是自己修改后的数据,而不是其他线程修改后的数据,这样的话确实是需要锁降级,那么如果其他线程想要修改数据,必须等到当前线程把读锁释放了才能成功获得写锁,这样就可以维持数据的可见性,达到写后立即读的目的
如果只是希望最后使用数据的时候,拿到的是最新的数据,而不一定是自己刚修改过的数据,那么先解写锁,再上读锁,然后使用数据也无妨
先写后读案例package com.dongguo.readwrite;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Dongguo
* @date 2021/9/9 0009-20:21
* @description: 锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。
*
* 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。
*/
public class LockDownGradingDemo {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
System.out.println("-------正在写入");
readLock.lock();
System.out.println("-------正在读取");
System.out.println("-------写完了");
writeLock.unlock();
System.out.println("-------读完了");
readLock.unlock();
}
}
运行结果
-------正在写入
-------正在读取
-------写完了
-------读完了
代码能够正常运行,在写的过程成也可以读
那么先读再写会怎么样呢?package com.dongguo.readwrite;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Dongguo
* @date 2021/9/9 0009-20:21
* @description: 锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。
*
* 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。
*/
public class LockDownGradingDemo {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
readLock.lock();
System.out.println("-------正在读取");
writeLock.lock();
System.out.println("-------正在写入");
System.out.println("-------写完了");
writeLock.unlock();
System.out.println("-------读完了");
readLock.unlock();
}
}
运行结果:
结果表示执行写操作的时候是不能进行写操作的。如果有线程在读,那么写线程是无法获取写锁的,是悲观锁的策略
不可锁升级线程获取读锁是不能直接升级为写入锁的。
在ReentrantReadWriteLock中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会被阻塞。 所以,需要释放所有读锁,才可获取写锁,
package com.dongguo.readwrite;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author Dongguo
* @date 2021/9/9 0009-20:21
* @description: 锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。
*
* 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。
*/
public class LockDownGradingDemo {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
readLock.lock();
System.out.println("-------正在读取");
System.out.println("-------读完了");
readLock.unlock();
writeLock.lock();
System.out.println("-------正在写入");
System.out.println("-------写完了");
writeLock.unlock();
}
}
写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁) ,这是因为读写锁要保持写操作的可见性。
如果允许读锁在被获取的情况下可以对写锁获取,那么正在运行的其他读线程无法感知到当前写线程的操作。
ReentrantWriteLock应用实践之缓存实体类
package com.dongguo.readwrite;
import java.math.BigDecimal;
class Emp {
private int empno;
private String ename;
private String job;
private BigDecimal sal;
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public BigDecimal getSal() {
return sal;
}
public void setSal(BigDecimal sal) {
this.sal = sal;
}
@Override
public String toString() {
return "Emp{" +
"empno=" + empno +
", ename='" + ename + '\'' +
", job='" + job + '\'' +
", sal=" + sal +
'}';
}
}
Dao
package com.dongguo.readwrite;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.*;
public class GenericDao {
static String URL = "jdbc:mysql://localhost:3306/employee?characterEncoding=utf-8&useSSL=false";
static String USERNAME = "root";
static String PASSWORD = "root";
public List queryList(Class beanClass, String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
BeanRowMapper mapper = new BeanRowMapper(beanClass);
return queryList(sql, mapper, args);
}
public T queryOne(Class beanClass, String sql, Object... args) {
System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
BeanRowMapper mapper = new BeanRowMapper(beanClass);
return queryOne(sql, mapper, args);
}
private List queryList(String sql, RowMapper mapper, Object... args) {
try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
try (PreparedStatement psmt = conn.prepareStatement(sql)) {
if (args != null) {
for (int i = 0; i 查询
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
============> 更新
sql: [update emp set sal = ? where empno = ?] params:[800, 1]
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=800}
问题
对同一个empno进行三次查询,查询同一个数据执行了三次sql
使用hashmap实现缓存可以使用缓存原理,第一次将查到的数据放入缓存中,只要数据没有被修改,就直接查缓存就行了,
如果数据被修改就删除缓存。
package com.dongguo.readwrite;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class GenericDaoTest {
public static void main(String[] args) {
GenericDao dao = new GenericDaoCached();
System.out.println("============> 查询");
String sql = "select * from emp where empno = ?";
int empno = 1;
Emp emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
System.out.println("============> 更新");
dao.update("update emp set sal = ? where empno = ?", 800, empno);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
}
}
/**
* 缓存
*/
class GenericDaoCached extends GenericDao {
private GenericDao dao = new GenericDao();
private Map map = new HashMap();
//未实现缓存功能
@Override
public List queryList(Class beanClass, String sql, Object... args) {
return dao.queryList(beanClass, sql, args);
}
@Override
public T queryOne(Class beanClass, String sql, Object... args) {
// 先从缓存中找,找到直接返回
SqlPair key = new SqlPair(sql, args);
T value = (T) map.get(key);
if (value != null) {
return value;
}
// 缓存中没有,查询数据库
value = dao.queryOne(beanClass, sql, args);
map.put(key, value);
return value;
}
@Override
public int update(String sql, Object... args) {
// 清空缓存
map.clear();
// 更新库
int update = dao.update(sql, args);
return update;
}
// 作为 key 保证其是不可变的
class SqlPair {
private String sql;//sql语句
private Object[] args;//参数
public SqlPair(String sql, Object[] args) {
this.sql = sql;
this.args = args;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return Objects.equals(sql, sqlPair.sql) &&
Arrays.equals(args, sqlPair.args);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(args);
return result;
}
}
}
数据库数据恢复
运行结果
============> 查询
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
============> 更新
sql: [update emp set sal = ? where empno = ?] params:[800, 1]
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=800}
第一次查询缓存中没有数据,查询数据库,并将数据加入缓存
第二次,第三次直接使用缓存中的数据
对数据更新后清除了缓存,再次查询去数据库查询
问题
由于hashmap是非线程安全的,并发条件下会出现数据错乱
更新数据时,删除缓存和更新数据库的操作不是原子操作会出现缓存一致性问题
缓存更新策略就是更新时,是先清缓存还是先更新数据库
先删除缓存,再更新数据库
1.当B去更新数据,清空缓存后,还未将新数据写入数据库中时
2.A查询数据,此时缓存已经被清除,就去数据库中查
3.A查询数据库成功后,将数据写入缓存中
4.B在这个时候才将新的数据写入数据库中
5由于A将旧的数据写到了缓存中,以后的查询获取的都是缓存中的旧数据,而不是数据库的新数据,出现数据库和缓存不一致问题
先更新数据库,再删除缓存
先更新数据库,再删缓存。这是一个经典的缓存更新模式《Cache-Aside pattern》
1.B更新数据时,将新数据写入数据库中,还未清空缓存时,
2.A查询到缓存的值
3.B清空缓存的值
4.A查询,此时缓存已经删除,查数据库成功后,写入缓存
5.后续查询缓存。虽然后续没有发生数据库和缓存不一致问题,但是在步骤2出现了数据库和缓存不一致问题
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
1.A读缓存,假设第一次查询缓存中没有数据,查询数据库后,还没有将数据写入缓存中
2.B更新数据,将新数据写入数据库
3.B清空缓存
4.A将旧数据写入缓存
5.后续查询一直查询到的都是缓存中旧的数据,出现了数据库与缓存不一致问题
虽然这种情况的出现几率非常小,但是仍然可能会发生
对于解决该并发问题,可以采用延时双删策略,保证读请求完成以后,再次进行删除缓存操作。
解决
当然这些问题的根源是更新数据库和删除缓存的操作不是原子操作,那么我们可以加锁解决这个问题
**扩展:**对于数据库和缓存数据不一致问题是面试经常被提及到的。比如redis中如何保证缓存与数据库的双写一致性。会提及更多的缓存策略,以及每种缓存策略的优缺和数据不一致的解决办法,当然还有缓存删除失败后的重试机制的实现等问题,如果感兴趣的话可以自行百度。
读写锁实现一致性缓存我们可以使用读写锁实现一个简单的按需加载的缓存
使用HashMap 作为缓存是非线程安全的
使用读写锁,读读共享,读操作不允许其它线程进行写操作
写操作不允许其他线程进行读操作和写操作。
package com.dongguo.readwrite;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class GenericDaoTest {
public static void main(String[] args) {
GenericDao dao = new GenericDaoCached();
System.out.println("============> 查询");
String sql = "select * from emp where empno = ?";
int empno = 1;
Emp emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
System.out.println("============> 更新");
dao.update("update emp set sal = ? where empno = ?", 800, empno);
emp = dao.queryOne(Emp.class, sql, empno);
System.out.println(emp);
}
}
/**
* 实现缓存功能
*/
class GenericDaoCached extends GenericDao {
private GenericDao dao = new GenericDao();
// HashMap 作为缓存非线程安全, 需要保护
private Map map = new HashMap();
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//未实现缓存功能
@Override
public List queryList(Class beanClass, String sql, Object... args) {
return dao.queryList(beanClass, sql, args);
}
@Override
public T queryOne(Class beanClass, String sql, Object... args) {
// 先从缓存中找,找到直接返回
SqlPair key = new SqlPair(sql, args);;
// 加读锁, 防止其它线程对缓存更改
rw.readLock().lock();
try {
T value = (T) map.get(key);
if(value != null) {
return value;
}
} finally {
rw.readLock().unlock();
}
// 加写锁, 防止其它线程对缓存读取和更改
rw.writeLock().lock();
try {
// get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
// 为防止重复查询数据库, 再次验证
T value = (T) map.get(key);
if(value == null) {
// 缓存中没有,查询数据库
value = dao.queryOne(beanClass, sql, args);
map.put(key, value);
}
return value;
} finally {
rw.writeLock().unlock();
}
}
@Override
public int update(String sql, Object... args) {
// 加写锁, 防止其它线程对缓存读取和更改
rw.writeLock().lock();
try {
// 先更新库
int update = dao.update(sql, args);
// 清空缓存
map.clear();
return update;
} finally {
rw.writeLock().unlock();
}
}
// 作为 key 保证其是不可变的
class SqlPair {
private String sql;//sql语句
private Object[] args;//参数
public SqlPair(String sql, Object[] args) {
this.sql = sql;
this.args = args;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return Objects.equals(sql, sqlPair.sql) &&
Arrays.equals(args, sqlPair.args);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(args);
return result;
}
}
}
数据库数据恢复
运行结果
============> 查询
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
============> 更新
sql: [update emp set sal = ? where empno = ?] params:[800, 1]
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=800}
注意
以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
适合读多写少,如果写操作比较频繁,以上实现性能低
没有考虑缓存容量
没有考虑缓存过期
只适合单机
更新方法太过简单粗暴,清空了所有 key(可以考虑按类型分区或重新设计 key)
总结在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。 原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
分析读写锁ReentrantReadWriteLock,会发现它有个潜在的问题:
读锁全完,写锁有望;写锁独占,读写全堵;
即ReadWriteLock读的过程中不允许写,只有等待线程都释放了读锁,当前线程才能获取写锁, 也就是写入必须等待,这是一种悲观的读锁,人家还在读着那,你先别去写,省的数据乱。
StampedLock为解决读写互斥的问题
它改进之处在于:读的过程中 允许获取写锁介入(读和写两个操作也让你“共享” (注意引号)),这样会导致我们读的数据就可能不一致!所以,需要额外的方法来判断读的过程中是否有写入,这是一种乐观的读锁, 显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
StampedLock邮戳锁后续会详细介绍。