您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 1浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ReentrantReadWriteLock读写锁

Dongguo丶 发布时间:2021-09-27 12:26:41 ,浏览量:1

文章目录
  • ReentrantReadWriteLock
    • ReentrantReadWriteLock 类的整体结构
      • 写锁的获取和释放
      • 读锁的获取与释放
      • 读写锁的意义
      • 读写锁有以下三个重要的特性:
        • 注意事项
    • 读写锁的演变
      • 读写锁演变案例
        • 无锁实现
        • 使用ReentrantLock
        • 使用ReentrantReadWriteLock
    • 读写锁的降级
      • jdk8写锁降级为读锁过程
      • 锁降级使用场景
        • 先写后读案例
        • 那么先读再写会怎么样呢?
      • 不可锁升级
    • ReentrantWriteLock应用实践之缓存
      • 使用hashmap实现缓存
      • 缓存更新策略
      • 读写锁实现一致性缓存
  • 总结

ReentrantReadWriteLock

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。

读写锁定义为 一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。

ReentrantReadWriteLock是ReadWriteLock接口的实现,ReentrantReadWriteLock中有两个静态内部类:ReadLock读锁和WriteLock写锁,这两个锁实现了Lock接口,ReentrantReadWriteLock支持可重入,同步功能依赖自定义同步器(AbstractQueuedSynchronizer)实现,读写状态就是其同步器的同步状态。同步状态由一个整型变量表示,因为这个变量需要表示多个线程的读和写的状态,因此读写锁在实现上将该变量的高16位表示读,低16位表示写。它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。读写互斥,读读共享

ReentrantReadWriteLock 类的整体结构
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** 读锁 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 写锁 */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    /** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    /** 返回用于写入操作的锁 */
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    /** 返回用于读取操作的锁 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

            ...
}

可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化,在源代码中可以看到ReentrantReadWriteLock实现了自己的序列化逻辑。

写锁的获取和释放

线程进入写锁的前提条件: • 没有其他线程的读锁 • 没有其他线程的写锁

写锁WriteLock是支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取读锁时,读锁已经被获取并且该线程不是已获取写锁的线程,则当前线程进入等待状态。读写锁确保写锁的操作对读锁可见。写锁释放每次减少写状态,当前写状态为0时表示写锁已背释放。

读锁的获取与释放

线程进入读锁的前提条件: • 没有其他线程的写锁 • 没有写请求, 或者有写请求,但调用线程和持有锁的线程是同一个(可重入锁)。

读锁ReadLock是支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(写状态为0)时,读锁总是能够被成功地获取,而所做的也只是增加读状态(线程安全)。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已经被获取,则进入等待状态。

读写锁的意义

『读写锁ReentrantReadWriteLock』并不是真正意义上的读写分离,它只允许读读共存,而读写和写写依然是互斥的, 大多实际场景是“读/读”线程间并不存在互斥关系,只有"读/写"线程或"写/写"线程间的操作需要互斥的。因此引入ReentrantReadWriteLock。

一个ReentrantReadWriteLock同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁 也即一个资源可以被多个读操作访问或一个写操作访问,但两者不能同时进行。

只有在读多写少情境之下,读写锁才具有较高的性能体现。

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的

select …from … lock in share mode (共享锁)

读写锁有以下三个重要的特性:

(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

(2)重进入:读锁和写锁都支持线程重进入。

(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

注意事项

读锁不支持条件变量Condition

支持锁降级但不支持锁升级

读写锁的演变

image-20210913222704539

ReentrantReadWriteLock

有锁饥饿问题

锁降级。读线程不能写,写线程可以读 ,

写的时候,正在写的线程还可以进行读操作, 不同线程之间读写互斥,但是同一个线程因为是可重入读写锁,可以先获取写锁,再获取读锁。

读写锁演变案例

模拟缓存的过程,对一个hashmap进行读和写操作

无锁实现
package com.dongguo.readwrite;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/9/4 0004-9:46
 * @description:
 */
public class Demo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        //创建线程放数据
        for (int i = 1; i  {
                myCache.put(num + "", num + "");
            }, "t" + i).start();
        }
        //创建线程取数据
        for (int i = 1; i  {
                myCache.get(num + "");
            }, "t" + i).start();
        }
    }

}

class MyCache {
    private volatile HashMap hashMap = new HashMap();
    //放数据
    public void put(String k, String v) {
        try {
            System.out.println(Thread.currentThread().getName() + "开始写入"+k);
            //暂停一会
            TimeUnit.MICROSECONDS.sleep(300);
            hashMap.put(k, v);
            System.out.println(Thread.currentThread().getName() + "写入完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //取数据
    public void get(String k) {
        System.out.println(Thread.currentThread().getName() + "开始读取");
        String result = hashMap.get(k);
        System.out.println(Thread.currentThread().getName() + "读取完成"+result);
    }
}
运行结果
t1开始写入1
t2开始写入2
t3开始写入3
t4开始写入4
t1开始读取
t5开始写入5
t2开始读取
t1读取完成null
t2读取完成null
t4开始读取
t4读取完成null
t3开始读取
t3读取完成null
t5开始读取
t5读取完成null
t4写入完成
t3写入完成
t1写入完成
t2写入完成
t5写入完成

这样实现还没有写入完成,就有线程进行读取操作,读取的数据肯定不正确。

使用ReentrantLock
package com.dongguo.readwrite;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author Dongguo
 * @date 2021/8/24 0024-19:44
 * @description: ReadWriteLock
 */
public class ReadWriteLockDemo1 {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        //创建线程放数据
        for (int i = 0; i  {
                myCache.put(num + "", num + "");
            }, "Thread"+i).start();
        }
        //创建线程取数据
        for (int i = 0; i  {
                myCache.get(num + "");
            }, "Thread"+(i + 5)).start();
        }
    }

    //资源类
    static class MyCache {
        //创建map集合
        private volatile Map map = new HashMap();
        //创建锁对象
        private Lock lock = new ReentrantLock();

        //放数据
        public void put(String key, Object value) {
            //添加锁
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "写入:" + key);
                //暂停一会
                TimeUnit.MILLISECONDS.sleep(300);
                //放数据
                map.put(key, value);
                System.out.println(Thread.currentThread().getName() + "写完" + key);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放锁
                lock.unlock();
            }
        }

        //取数据
        public Object get(String key) {
            //添加锁
            lock.lock();
            Object result = null;
            try {
                System.out.println(Thread.currentThread().getName() + "取出:" + key);
                result = map.get(key);
                System.out.println(Thread.currentThread().getName() + "取完" + key);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //释放锁
                lock.unlock();
            }
            return result;
        }
    }
}
运行结果
Thread0写入:0
Thread0写完0
Thread1写入:1
Thread1写完1
Thread2写入:2
Thread2写完2
Thread3写入:3
Thread3写完3
Thread4写入:4
Thread4写完4
Thread5取出:0
Thread5取完0
Thread6取出:1
Thread6取完1
Thread7取出:2
Thread7取完2
Thread8取出:3
Thread8取完3
Thread9取出:4
Thread9取完4

只能一次完成一个读或写操作,串行化 都是独占锁 读读也是独占锁无法共享,读的性能低

使用ReentrantReadWriteLock

对一个hashmap进行读和写操作

package com.dongguo.readwrite;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author Dongguo
 * @date 2021/9/4 0004-9:46
 * @description:
 */
public class Demo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        //创建线程放数据
        for (int i = 1; i  {
                myCache.put(num + "", num + "");
            }, "t" + i).start();
        }
        //创建线程取数据
        for (int i = 1; i  {
                myCache.get(num + "");
            }, "t" + i).start();
        }
    }

}

class MyCache {
    private volatile HashMap hashMap = new HashMap();
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    //放数据
    public void put(String k, String v) {
        try {
            readWriteLock.writeLock().lock();
            System.out.println(Thread.currentThread().getName() + "开始写入"+k);
            //暂停一会
            TimeUnit.MICROSECONDS.sleep(300);
            hashMap.put(k, v);
            System.out.println(Thread.currentThread().getName() + "写入完成");
          
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            readWriteLock.writeLock().unlock();
        }
    }
    //取数据
    public void get(String k) {
        try {
            readWriteLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() + "开始读取");
            String result = hashMap.get(k);
            System.out.println(Thread.currentThread().getName() + "读取完成"+result);
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}
运行结果:
t2开始写入2
t2写入完成
t1开始写入1
t1写入完成
t3开始写入3
t3写入完成
t4开始写入4
t4写入完成
t5开始写入5
t5写入完成
t1开始读取
t1读取完成1
t2开始读取
t2读取完成2
t5开始读取
t5读取完成5
t4开始读取
t4读取完成4
t3开始读取
t3读取完成3

ReentrantReadWriteLock的读锁是共享锁, 写锁是独占锁,要注意并发

对于邮戳锁,后续文章会介绍到

读写锁的降级

锁降级 : 是指保持住当前的写锁(已拥有),再获取读锁,随后释放写锁的过程。

《Java 并发编程的艺术》中关于锁降级的说明:

image-20210909193022092

锁的严苛程度变强叫做升级,反之叫做降级

将写锁降级为读锁(可以用类似Linux文件读写权限理解,就像写权限要高于读权限一样)

jdk8写锁降级为读锁过程

image-20210909201725354

Java8 官网说明

image-20210909201749569

重入还允许通过获取写入锁定,然后读取锁然后释放写锁。 可以 从写锁到读锁,

但是,从读锁升级到写锁是不可能的。

Oracle公司ReentrantWriteReadLock源码总结

image-20210909225034608

1 代码中声明了一个volatile类型的cacheValid变量,保证其可见性。

2 首先获取读锁,如果cache不可用,则释放读锁,获取写锁,在更改数据之前,再检查一次cacheValid的值,然后修改数据,将cacheValid置为true,然后在释放写锁前获取读锁;此时,cache中数据可用,处理cache中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性。

如果违背锁降级的步骤 如果当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程D获取了写锁并修改了数据,那么C线程无法感知到数据已被修改,则数据出现错误。

如果遵循锁降级的步骤 线程C在释放写锁之前获取读锁,那么线程D在获取写锁时将被阻塞,直到线程C完成数据处理过程,释放读锁。这样可以保证返回的数据是这次更新的数据,该机制是专门为了缓存设计的。

锁降级使用场景

锁降级是为了让当前线程感知到数据的变化,目的是保证数据可见性

线程修改完数据之后, 经过耗时操作后再使用数据时,希望使用的是自己修改后的数据,而不是其他线程修改后的数据,这样的话确实是需要锁降级,那么如果其他线程想要修改数据,必须等到当前线程把读锁释放了才能成功获得写锁,这样就可以维持数据的可见性,达到写后立即读的目的

如果只是希望最后使用数据的时候,拿到的是最新的数据,而不一定是自己刚修改过的数据,那么先解写锁,再上读锁,然后使用数据也无妨

先写后读案例
package com.dongguo.readwrite;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author Dongguo
 * @date 2021/9/9 0009-20:21
 * @description: 锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。
 * 

* 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。 */ public class LockDownGradingDemo { public static void main(String[] args) { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); writeLock.lock(); System.out.println("-------正在写入"); readLock.lock(); System.out.println("-------正在读取"); System.out.println("-------写完了"); writeLock.unlock(); System.out.println("-------读完了"); readLock.unlock(); } } 运行结果 -------正在写入 -------正在读取 -------写完了 -------读完了

代码能够正常运行,在写的过程成也可以读

那么先读再写会怎么样呢?
package com.dongguo.readwrite;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author Dongguo
 * @date 2021/9/9 0009-20:21
 * @description: 锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。
 * 

* 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。 */ public class LockDownGradingDemo { public static void main(String[] args) { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); readLock.lock(); System.out.println("-------正在读取"); writeLock.lock(); System.out.println("-------正在写入"); System.out.println("-------写完了"); writeLock.unlock(); System.out.println("-------读完了"); readLock.unlock(); } }

运行结果:

image-20210909221110109

结果表示执行写操作的时候是不能进行写操作的。如果有线程在读,那么写线程是无法获取写锁的,是悲观锁的策略

不可锁升级

线程获取读锁是不能直接升级为写入锁的。

image-20210909201927693

image-20210909201933474

在ReentrantReadWriteLock中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会被阻塞。 所以,需要释放所有读锁,才可获取写锁,

image-20210909201945895

package com.dongguo.readwrite;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author Dongguo
 * @date 2021/9/9 0009-20:21
 * @description: 锁降级:遵循获取写锁→再获取读锁→再释放写锁的次序,写锁能够降级成为读锁。
 * 

* 如果一个线程占有了写锁,在不释放写锁的情况下,它还能占有读锁,即写锁降级为读锁。 */ public class LockDownGradingDemo { public static void main(String[] args) { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); readLock.lock(); System.out.println("-------正在读取"); System.out.println("-------读完了"); readLock.unlock(); writeLock.lock(); System.out.println("-------正在写入"); System.out.println("-------写完了"); writeLock.unlock(); } }

写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁) ,这是因为读写锁要保持写操作的可见性。

如果允许读锁在被获取的情况下可以对写锁获取,那么正在运行的其他读线程无法感知到当前写线程的操作。

ReentrantWriteLock应用实践之缓存

实体类

package com.dongguo.readwrite;

import java.math.BigDecimal;

class Emp {
    private int empno;
    private String ename;
    private String job;
    private BigDecimal sal;

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public BigDecimal getSal() {
        return sal;
    }

    public void setSal(BigDecimal sal) {
        this.sal = sal;
    }

    @Override
    public String toString() {
        return "Emp{" +
                "empno=" + empno +
                ", ename='" + ename + '\'' +
                ", job='" + job + '\'' +
                ", sal=" + sal +
                '}';
    }
}

Dao

package com.dongguo.readwrite;

import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.*;

public class GenericDao {
    static String URL = "jdbc:mysql://localhost:3306/employee?characterEncoding=utf-8&useSSL=false";
    static String USERNAME = "root";
    static String PASSWORD = "root";

    public  List queryList(Class beanClass, String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        BeanRowMapper mapper = new BeanRowMapper(beanClass);
        return queryList(sql, mapper, args);
    }

    public  T queryOne(Class beanClass, String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        BeanRowMapper mapper = new BeanRowMapper(beanClass);
        return queryOne(sql, mapper, args);
    }

    private  List queryList(String sql, RowMapper mapper, Object... args) {
        try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
            try (PreparedStatement psmt = conn.prepareStatement(sql)) {
                if (args != null) {
                    for (int i = 0; i  查询
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
============> 更新
sql: [update emp set sal = ? where empno = ?] params:[800, 1]
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=800}

问题

对同一个empno进行三次查询,查询同一个数据执行了三次sql

使用hashmap实现缓存

可以使用缓存原理,第一次将查到的数据放入缓存中,只要数据没有被修改,就直接查缓存就行了,

如果数据被修改就删除缓存。

package com.dongguo.readwrite;

import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class GenericDaoTest {
    public static void main(String[] args) {
        GenericDao dao = new GenericDaoCached();
        System.out.println("============> 查询");
        String sql = "select * from emp where empno = ?";
        int empno = 1;
        Emp emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);

        System.out.println("============> 更新");
        dao.update("update emp set sal = ? where empno = ?", 800, empno);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
    }
}

/**
 * 缓存
 */
class GenericDaoCached extends GenericDao {
    private GenericDao dao = new GenericDao();

    private Map map = new HashMap();


    //未实现缓存功能
    @Override
    public  List queryList(Class beanClass, String sql, Object... args) {
        return dao.queryList(beanClass, sql, args);
    }


    @Override
    public  T queryOne(Class beanClass, String sql, Object... args) {
        // 先从缓存中找,找到直接返回
        SqlPair key = new SqlPair(sql, args);
        
        T value = (T) map.get(key);
        if (value != null) {
            return value;
        }
        // 缓存中没有,查询数据库
        value = dao.queryOne(beanClass, sql, args);
        map.put(key, value);
        return value;
    }

    @Override
    public int update(String sql, Object... args) {
        // 清空缓存
        map.clear();
        // 更新库
        int update = dao.update(sql, args);
        return update;

    }

    // 作为 key 保证其是不可变的
    class SqlPair {
        private String sql;//sql语句
        private Object[] args;//参数

        public SqlPair(String sql, Object[] args) {
            this.sql = sql;
            this.args = args;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return Objects.equals(sql, sqlPair.sql) &&
                    Arrays.equals(args, sqlPair.args);
        }

        @Override
        public int hashCode() {
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(args);
            return result;
        }
    }
}

数据库数据恢复

image-20210927112759942

运行结果

============> 查询
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
============> 更新
sql: [update emp set sal = ? where empno = ?] params:[800, 1]
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=800}

第一次查询缓存中没有数据,查询数据库,并将数据加入缓存

第二次,第三次直接使用缓存中的数据

对数据更新后清除了缓存,再次查询去数据库查询

问题

由于hashmap是非线程安全的,并发条件下会出现数据错乱

更新数据时,删除缓存和更新数据库的操作不是原子操作会出现缓存一致性问题

缓存更新策略

就是更新时,是先清缓存还是先更新数据库

先删除缓存,再更新数据库

1.

1.当B去更新数据,清空缓存后,还未将新数据写入数据库中时

2.A查询数据,此时缓存已经被清除,就去数据库中查

3.A查询数据库成功后,将数据写入缓存中

4.B在这个时候才将新的数据写入数据库中

5由于A将旧的数据写到了缓存中,以后的查询获取的都是缓存中的旧数据,而不是数据库的新数据,出现数据库和缓存不一致问题

先更新数据库,再删除缓存

先更新数据库,再删缓存。这是一个经典的缓存更新模式《Cache-Aside pattern》

image-20210927113933046

1.B更新数据时,将新数据写入数据库中,还未清空缓存时,

2.A查询到缓存的值

3.B清空缓存的值

4.A查询,此时缓存已经删除,查数据库成功后,写入缓存

5.后续查询缓存。虽然后续没有发生数据库和缓存不一致问题,但是在步骤2出现了数据库和缓存不一致问题

补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

image-20210927114746483

1.A读缓存,假设第一次查询缓存中没有数据,查询数据库后,还没有将数据写入缓存中

2.B更新数据,将新数据写入数据库

3.B清空缓存

4.A将旧数据写入缓存

5.后续查询一直查询到的都是缓存中旧的数据,出现了数据库与缓存不一致问题

虽然这种情况的出现几率非常小,但是仍然可能会发生

对于解决该并发问题,可以采用延时双删策略,保证读请求完成以后,再次进行删除缓存操作。

解决

当然这些问题的根源是更新数据库和删除缓存的操作不是原子操作,那么我们可以加锁解决这个问题

**扩展:**对于数据库和缓存数据不一致问题是面试经常被提及到的。比如redis中如何保证缓存与数据库的双写一致性。会提及更多的缓存策略,以及每种缓存策略的优缺和数据不一致的解决办法,当然还有缓存删除失败后的重试机制的实现等问题,如果感兴趣的话可以自行百度。

读写锁实现一致性缓存

我们可以使用读写锁实现一个简单的按需加载的缓存

使用HashMap 作为缓存是非线程安全的

使用读写锁,读读共享,读操作不允许其它线程进行写操作

写操作不允许其他线程进行读操作和写操作。

package com.dongguo.readwrite;

import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class GenericDaoTest {
    public static void main(String[] args) {
        GenericDao dao = new GenericDaoCached();
        System.out.println("============> 查询");
        String sql = "select * from emp where empno = ?";
        int empno = 1;
        Emp emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);

        System.out.println("============> 更新");
        dao.update("update emp set sal = ? where empno = ?", 800, empno);
        emp = dao.queryOne(Emp.class, sql, empno);
        System.out.println(emp);
    }
}

/**
 * 实现缓存功能
 */
class GenericDaoCached extends GenericDao {
    private GenericDao dao = new GenericDao();
    // HashMap 作为缓存非线程安全, 需要保护
    private Map map = new HashMap();
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();

    //未实现缓存功能
    @Override
    public  List queryList(Class beanClass, String sql, Object... args) {
        return dao.queryList(beanClass, sql, args);
    }


    @Override
    public  T queryOne(Class beanClass, String sql, Object... args) {
        // 先从缓存中找,找到直接返回
        SqlPair key = new SqlPair(sql, args);;
        // 加读锁, 防止其它线程对缓存更改
        rw.readLock().lock();
        try {
            T value = (T) map.get(key);
            if(value != null) {
                return value;
            }
        } finally {
            rw.readLock().unlock();
        }
        // 加写锁, 防止其它线程对缓存读取和更改
        rw.writeLock().lock();
        try {
            // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
            // 为防止重复查询数据库, 再次验证
            T value = (T) map.get(key);
            if(value == null) {
                // 缓存中没有,查询数据库
                value = dao.queryOne(beanClass, sql, args);
                map.put(key, value);
            }
            return value;
        } finally {
            rw.writeLock().unlock();
        }
    }

    @Override
    public int update(String sql, Object... args) {
        // 加写锁, 防止其它线程对缓存读取和更改
        rw.writeLock().lock();
        try {
            // 先更新库
            int update = dao.update(sql, args);
            // 清空缓存
            map.clear();
            return update;
        } finally {
            rw.writeLock().unlock();
        }
    }
    // 作为 key 保证其是不可变的
    class SqlPair {
        private String sql;//sql语句
        private Object[] args;//参数

        public SqlPair(String sql, Object[] args) {
            this.sql = sql;
            this.args = args;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return Objects.equals(sql, sqlPair.sql) &&
                    Arrays.equals(args, sqlPair.args);
        }

        @Override
        public int hashCode() {
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(args);
            return result;
        }
    }

}

数据库数据恢复

image-20210927111048633

运行结果

============> 查询
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
Emp{empno=1, ename='张三', job='Java实习', sal=3000}
============> 更新
sql: [update emp set sal = ? where empno = ?] params:[800, 1]
sql: [select * from emp where empno = ?] params:[1]
Emp{empno=1, ename='张三', job='Java实习', sal=800}

注意

以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑

​ 适合读多写少,如果写操作比较频繁,以上实现性能低

​ 没有考虑缓存容量

​ 没有考虑缓存过期

​ 只适合单机

​ 更新方法太过简单粗暴,清空了所有 key(可以考虑按类型分区或重新设计 key)

总结

在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。 原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

分析读写锁ReentrantReadWriteLock,会发现它有个潜在的问题:

读锁全完,写锁有望;写锁独占,读写全堵;

即ReadWriteLock读的过程中不允许写,只有等待线程都释放了读锁,当前线程才能获取写锁, 也就是写入必须等待,这是一种悲观的读锁,人家还在读着那,你先别去写,省的数据乱。

StampedLock为解决读写互斥的问题

它改进之处在于:读的过程中 允许获取写锁介入(读和写两个操作也让你“共享” (注意引号)),这样会导致我们读的数据就可能不一致!所以,需要额外的方法来判断读的过程中是否有写入,这是一种乐观的读锁, 显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

StampedLock邮戳锁后续会详细介绍。

关注
打赏
1638062488
查看更多评论
立即登录/注册

微信扫码登录

0.0415s