笔记来源: Java工程师面试突击专栏 https://apppukyptrl1086.pc.xiaoe-tech.com/detail/p_5d3114935b4d7_CEcL8yMS/6
文章目录
zk 获取锁的过程
- zk 获取锁的过程
- 创建临时节点的原因
- zk 分布式锁的demo代码
获取锁的时候, 尝试去创建一个临时节点 . 如果这个临时节点不存在, 那么代表该分布式锁创建成功.
另外一个系统, 尝试去创建一个相同名称的临时节点, 如果已经存在, 代表别人已经占有这把锁, 对这个临时节点注册一个监听器.
释放锁: 删除临时节点, 一旦删除, 那么就会通知别的系统这个节点被删除了.
一个系统如果没有获取到锁, 那么就会去注册一个监听器 , 一旦其他的系统释放了锁, 那么就会把消息推送给监听器, 监听器感知到被人释放了锁之后, 再次重新尝试获取锁, 如果再次没有获取到锁, 那么再次注册一个监听器, 直到获取到锁.
假如系统A 获取锁成功, 但是系统A在没有释放锁的时候系统宕机了, 导致其他的系统也无法获取锁, 就造成了死锁. 创建临时节点的好处是, 假如某个系统挂了, 那么zk就能感知到, 把这个系统获取的临时节点删除, 避免死锁, 让别的系统去获取锁.
zk 分布式锁的demo代码/**
* ZooKeeperSession
* @author Administrator
*/
public class ZooKeeperSession {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zookeeper;
private CountDownLatch latch;
public ZooKeeperSession() {
try {
// zk 建立连接
this.zookeeper = new ZooKeeper(
"192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181",
50000,
// 注册监听器
new ZooKeeperWatcher());
try {
connectedSemaphore.await();
} catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取分布式锁
* @param productId
*/
public Boolean acquireDistributedLock(Long productId) {
// 拼接锁对应的节点, 即锁对应的名字, 此处以商品id拼接为锁的名称
String path = "/product-lock-" + productId;
try {
// 调用zk 客户端, 创建临时节点
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 如果创建成功, 直接返回true
return true;
}
catch (Exception e) {
// 如果创建锁失败, 那么就会进入异常的代码, 去反复的循环尝试上锁.
while(true) {
try {
// stae 如果是Null , 那么代表别人已经释放锁了
Stat stat = zk.exists(path, true); // 相当于是给node注册一个监听器,去看看这个监听器是否存在
if(stat != null) {
// stat 不是null , 代表其他系统没有释放资源.
// 使用CountDownLatch 指定等待的线程数是1
this.latch = new CountDownLatch(1);
// 调用await方法, 该线程进入阻塞状态, 如果别的系统释放了锁, 会唤醒此线程
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
// 此处线程状态变为runnable, latch设置为null
this.latch = null;
}
// 尝试去获取锁
zookeeper.create(path, "".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 成功获取了锁
return true;
}
catch(Exception e) {
// 执行上锁失败, 继续 去循环获取锁
continue;
}
}
// 很不优雅,我呢就是给大家来演示这么一个思路
// 比较通用的,我们公司里我们自己封装的基于zookeeper的分布式锁,我们基于zookeeper的临时顺序节点去实现的,比较优雅的
}
return true;
}
/**
* 释放掉一个分布式锁
* @param productId
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for product[id=" + productId + "]......");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 建立zk session的watcher
* @author Administrator
*
*/
private class ZooKeeperWatcher implements Watcher {
// 其他系统释放了锁, 会回调监听器的此process方法
public void process(WatchedEvent event) {
// 在event中判断是否为监听的节点.
System.out.println("Receive watched event: " + event.getState());
if(KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
if(this.latch != null) {
// latch 不为null ,代表在等待别的系统释放锁,
// 此处执行了countDown方法后
//就会把acquireDistributedLock方法中this.latch.await这一行往下去执行代码
this.latch.countDown();
}
}
}
/**
* 封装单例的静态内部类
* @author Administrator
*
*/
private static class Singleton {
private static ZooKeeperSession instance;
static {
instance = new ZooKeeperSession();
}
public static ZooKeeperSession getInstance() {
return instance;
}
}
/**
* 获取单例
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}
/**
* 初始化单例的便捷方法
*/
public static void init() {
getInstance();
}
}