在之前《Redis实现分布式锁》一文中我已经介绍了使用Redis实现分布式锁原理,今天主要是介绍一下如何使用zookeeper实现分布式锁。
一.zookeeper介绍和安装 1.zookeeper介绍 1.1.什么是zookeeperZooKeeper由雅虎研发,是Google Chubby的开源实现,目前托管给Apache,是一个经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务。
分布式应用程序可以基于ZooKeeper实现数据发布与订阅、负载均衡、命名服务、分布式协调与通知、集群管理、Leader选举、分布式锁、分布式队列等功能 它基于层次型的目录树的数据结构,并通过对树上的节点进行有效管理,可以设计出各种各样的分布式集群管理功能,其本身也是分布式的。
1.2.zookeeper存储方式
Zookeeper会维护一个具有层次关系的树状的数据结构,它非常类似于一个标准的文件系统,如下图所
示:同一个目录下不能有相同名称的目录节点
ZooKeeper 节点是有生命周期的这取决于节点的类型,在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。
- 持久节点(PERSISTENT) 所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。
- 持久顺序节点(PERSISTENT_SEQUENTIAL) 这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。
- 临时节点(EPHEMERAL) 和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。
- 临时顺序节点(EPHEMERAL_SEQUENTIAL) 在临时几点的基础上增加了顺序,可以用来实现分布式锁
顺序节点可以用来为所有的事件进行全局排序,这样客户端可以通过序号推断事件的顺序。
2.zookeeper安装(windows) 2.1.下载zookeeper下载地址:https://mirrors.bfsu.edu.cn/apache/zookeeper/,下载之后解压即可
2.2.配置zookeeper解压之后,找到config/ zoo_sample.cfg 文件,复制一份重命名为:zoo.cfg ,这个是zookeeper配置文件,编辑zoo.cfg ,将dataDir 和 dataLogDir 修改成磁盘的某个地址
#数据目录 dataDir=D:/opensource/zookeeper-3.4.9/data #日志目录 dataLogDir=D:/opensource/zookeeper-3.4.9/log
回到bin目录中,执行zkServer.cmd 启动zookeeper
执行bin/zsCli.cmd 启动zookeeper自带客户端,执行 "ls /"查看根节点
[zk: localhost:2181(CONNECTED) 20] ls / [zookeeper]
创建节点
[zk: localhost:2181(CONNECTED) 21] create /testkey testvalue Created /testkey
获取节点值
[zk: localhost:2181(CONNECTED) 23] get /testkey testvalue cZxid = 0x82 ctime = Fri May 14 23:15:45 CST 2021 mZxid = 0x82 mtime = Fri May 14 23:15:45 CST 2021 pZxid = 0x82 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 9 numChildren = 0 [zk: localhost:2181(CONNECTED) 24]
删除节点
[zk: localhost:2181(CONNECTED) 24] delete /testkey [zk: localhost:2181(CONNECTED) 25] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 26]
其他命令
stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port二.基于Zookeeper的分布式锁 1.zookeeper分布式锁原理
分布式锁就是基于zk的 临时顺序节点+watch监听机制完成的。临时顺序节点特点是客户端断开节点释放,且自己维护节点顺序值,当多个线程同时创建节点我们就可以按照顺序创建N个顺序临时节点,然后依次从第一个往后获取锁。只不过能拿到锁的只能是第一个节点的线程,所以后面的线程需要监听自己上一个节点的节点释放。轮到谁,谁就拿到锁。
<parent> <groupId>org.springframework.boot <dependency> <groupId>org.apache.zookeeper //zookeeper客户端对象 private ZooKeeper zooKeeper; //当前节点 private String currentNode; //通过构造器初始化zookeeper ZookeeperLock(){ try { //参数:链接字符串 , 会话超时 , 监听器 this.zooKeeper = new ZooKeeper("172.16.2.54:2181",10000,this); } catch (IOException e) { e.printStackTrace(); } } /**====================================================================================== * 方法描述:提供获取锁的方法: code作为业务相关的编码,比如传入订单ID,就锁住这个订单 * 逻辑步骤: 1.创建根节点 2.进来一个线程,就为线程在zookeeper的根节点创建一个瞬时有序节点 2. 3. ======================================================================================*/ public boolean getLock(String code){ try { //1.创建根节点,如果不存在,stat为null 就创建 String path = "/" + code; Stat exists = zooKeeper.exists(path, false); //创建节点 if(null == exists){ // 节点路径 ,节点值 ,权限:不需要账户密码就能链接 , 创建模式:顺序临时节点 zooKeeper.create(path,code.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //2.进来一个线程,就为线程在zookeeper创建一个瞬时有序节点 , 如: /xx/xx000000001 ;/xx/xx000000002 //把当前节点保存为成员变量,后面用来做判断 currentNode = zooKeeper.create(path+path,code.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //3.对所有的子节点进行排序 List<String> children = zooKeeper.getChildren(path, false); Collections.sort(children); //4.如果排序过的children中的第一个和 currentNode 一致,拿到锁 String firstNode = children.get(0); if(currentNode.endsWith(firstNode)){ return true; } //5.如果不是第一个节点,需要监听前一个节点 //用一个临时变量记录当前节点的上一个节点 String previousNode = firstNode; for(String node : children){ if(currentNode.endsWith(node)){ //如果当前节点是node节点 ,那么就监听它的上一个节点 :比如 currentNode 这里是 0003节点 ,那 node就是 0002节点 //第一个参数是监听的节点,第二个参数是是否要监听,zooKeeper在初始化的时候设置好了监听器 log.info("监听上一个节点:{}",node); zooKeeper.exists(path+"/"+previousNode,true); }else{ //把children中的节点复制给上一个节点 previousNode = node; } } //代码到这里,节点已经做好监听了,只需要等待,等待上一个节点完成工作后唤醒他 synchronized (this){ //wait会释放锁 wait(); } //到这里说明被唤醒,说明获取到锁 log.info("拿到锁:{}",currentNode); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public void close() { //释放锁 :节点路径, 节点版本号(-1匹配所有版本) log.info("释放节点:{}",currentNode); if(null != currentNode){ try { zooKeeper.delete(currentNode ,-1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } } @Override public void process(WatchedEvent watchedEvent) { //当节点被释放,就会到这里被监听到 if(watchedEvent.getType() == Event.EventType.NodeDeleted){ synchronized (this){ //唤醒等待的线程 log.info("当前节点:{},唤醒",watchedEvent.getPath()); notify(); } } } }2.3.测试代码
public class ZKTest { @Test public void testZK() throws Exception { for(int i = 0 ; i < 10 ; i++){ new Thread(()->{ ZookeeperLock zookeeperLock = new ZookeeperLock(); boolean getLock = zookeeperLock.getLock("order"); System.out.println("是否获取到锁:"+getLock); zookeeperLock.close(); }).start(); } Thread.sleep(5000); } }
打印日志:
09:18:24.500 [Thread-0] INFO cn.itsource.ZookeeperLock - /order/order0000000014监听上一个节点:/order/order0000000013 09:18:24.500 [Thread-9] INFO cn.itsource.ZookeeperLock - /order/order0000000017监听上一个节点:/order/order0000000016 09:18:24.500 [Thread-1] INFO cn.itsource.ZookeeperLock - /order/order0000000019监听上一个节点:/order/order0000000018 09:18:24.500 [Thread-8] INFO cn.itsource.ZookeeperLock - /order/order0000000018监听上一个节点:/order/order0000000017 09:18:24.500 [Thread-4] INFO cn.itsource.ZookeeperLock - /order/order0000000016监听上一个节点:/order/order0000000015 09:18:24.500 [Thread-2] INFO cn.itsource.ZookeeperLock - /order/order0000000020监听上一个节点:/order/order0000000019 09:18:24.500 [Thread-3] INFO cn.itsource.ZookeeperLock - /order/order0000000013监听上一个节点:/order/order0000000012 09:18:24.500 [Thread-7] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000011 09:18:24.500 [Thread-6] INFO cn.itsource.ZookeeperLock - /order/order0000000015监听上一个节点:/order/order0000000014 09:18:24.500 [Thread-5] INFO cn.itsource.ZookeeperLock - /order/order0000000012监听上一个节点:/order/order0000000011 09:18:24.509 [Thread-5-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000011,唤醒 09:18:24.509 [Thread-5] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000012 是否获取到锁:true 09:18:24.509 [Thread-5] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000012 09:18:24.511 [Thread-3-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000012,唤醒 09:18:24.512 [Thread-3] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000013 是否获取到锁:true 09:18:24.512 [Thread-3] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000013 09:18:24.514 [Thread-0-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000013,唤醒 09:18:24.515 [Thread-0] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000014 是否获取到锁:true 09:18:24.515 [Thread-0] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000014 09:18:24.517 [Thread-6-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000014,唤醒 09:18:24.517 [Thread-6] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000015 是否获取到锁:true 09:18:24.517 [Thread-6] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000015 09:18:24.520 [Thread-4-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000015,唤醒 09:18:24.520 [Thread-4] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000016 是否获取到锁:true 09:18:24.520 [Thread-4] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000016 09:18:24.522 [Thread-9] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000017 是否获取到锁:true 09:18:24.522 [Thread-9] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000017 09:18:24.525 [Thread-8-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000017,唤醒 09:18:24.525 [Thread-8] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000018 是否获取到锁:true 09:18:24.525 [Thread-8] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000018 09:18:24.527 [Thread-1-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000018,唤醒 09:18:24.527 [Thread-1] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000019 是否获取到锁:true 09:18:24.527 [Thread-1] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000019 09:18:24.530 [Thread-2-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000019,唤醒 09:18:24.530 [Thread-2] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000020 是否获取到锁:true 09:18:24.530 [Thread-2] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000020
这个效果看起来是对的。但是如果自己基于zookeeper封装分布式锁未免太过麻烦,而且容易出BUG,Apache提供了一个基于Zookeeper的客户端工具curator已经实现了分布式锁的封装,我们使用它就可以了。
3.使用curator实现分布式锁 3.1.curator介绍Apache Curator是用于Apache ZooKeeper(一种分布式协调服务)的Java / JVM客户端库。它包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加轻松和可靠。解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。
重点是它对分布式锁进行了封装 :http://curator.apache.org/getting-started.html
3.1.导入依赖<dependency> <groupId>org.apache.curator //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //创建客户端 CuratorFramework client = CuratorFrameworkFactory.newClient("172.16.2.54:2181", retryPolicy); return client; }3.3.分布式锁案例
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes = AppStart.class) public class ZKTest { @Autowired private CuratorFramework curatorFramework; /**====================================================================================== * 方法描述:使用curator实现分布式锁 ======================================================================================*/ @Test public void testCurator() throws Exception { for(int i = 0 ; i < 10 ; i++){ new Thread(()->{ testCuratorLock(); }).start(); } Thread.sleep(5000); } public void testCuratorLock(){ //分布式锁 InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/order"); try { if ( lock.acquire(1, TimeUnit.SECONDS) ){ //处理业务逻辑 log.info("获取到锁"); } } catch (Exception e) { e.printStackTrace(); } finally{ try { //释放锁 log.info("释放锁"); lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
打印效果
2021-05-15 09:54:32.251 INFO 22488 --- [ Thread-4] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.252 INFO 22488 --- [ Thread-4] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.267 INFO 22488 --- [ Thread-9] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.267 INFO 22488 --- [ Thread-9] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.274 INFO 22488 --- [ Thread-5] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.274 INFO 22488 --- [ Thread-5] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.277 INFO 22488 --- [ Thread-8] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.277 INFO 22488 --- [ Thread-8] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.280 INFO 22488 --- [ Thread-6] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.280 INFO 22488 --- [ Thread-6] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.283 INFO 22488 --- [ Thread-3] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.283 INFO 22488 --- [ Thread-3] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.285 INFO 22488 --- [ Thread-11] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.286 INFO 22488 --- [ Thread-11] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.288 INFO 22488 --- [ Thread-10] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.288 INFO 22488 --- [ Thread-10] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.292 INFO 22488 --- [ Thread-12] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.292 INFO 22488 --- [ Thread-12] cn.itsource.ZKTest : 释放锁 2021-05-15 09:54:32.294 INFO 22488 --- [ Thread-7] cn.itsource.ZKTest : 获取到锁 2021-05-15 09:54:32.294 INFO 22488 --- [ Thread-7] cn.itsource.ZKTest : 释放锁
看这个效果是没有问题的 , 好吧到这文章结束,喜欢的话给个好评吧!!!