您当前的位置: 首页 >  zookeeper

java持续实践

暂无认证

  • 1浏览

    0关注

    746博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

zookeeper 分布式锁原理

java持续实践 发布时间:2021-06-01 06:21:45 ,浏览量:1

笔记来源: Java工程师面试突击专栏 https://apppukyptrl1086.pc.xiaoe-tech.com/detail/p_5d3114935b4d7_CEcL8yMS/6

文章目录
      • zk 获取锁的过程
      • 创建临时节点的原因
      • zk 分布式锁的demo代码

zk 获取锁的过程

获取锁的时候, 尝试去创建一个临时节点 . 如果这个临时节点不存在, 那么代表该分布式锁创建成功.

另外一个系统, 尝试去创建一个相同名称的临时节点, 如果已经存在, 代表别人已经占有这把锁, 对这个临时节点注册一个监听器.

释放锁: 删除临时节点, 一旦删除, 那么就会通知别的系统这个节点被删除了.

一个系统如果没有获取到锁, 那么就会去注册一个监听器 , 一旦其他的系统释放了锁, 那么就会把消息推送给监听器, 监听器感知到被人释放了锁之后, 再次重新尝试获取锁, 如果再次没有获取到锁, 那么再次注册一个监听器, 直到获取到锁.

创建临时节点的原因

假如系统A 获取锁成功, 但是系统A在没有释放锁的时候系统宕机了, 导致其他的系统也无法获取锁, 就造成了死锁. 创建临时节点的好处是, 假如某个系统挂了, 那么zk就能感知到, 把这个系统获取的临时节点删除, 避免死锁, 让别的系统去获取锁.

zk 分布式锁的demo代码
/**
 * ZooKeeperSession
 * @author Administrator
 */
public class ZooKeeperSession {
	
	private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	private ZooKeeper zookeeper;
    private CountDownLatch latch;

	public ZooKeeperSession() {
		try {
		  //  zk 建立连接  
			this.zookeeper = new ZooKeeper(
					"192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 
					50000, 
					// 注册监听器 
					new ZooKeeperWatcher());			
			try {
				connectedSemaphore.await();
			} catch(InterruptedException e) {
				e.printStackTrace();
			}

			System.out.println("ZooKeeper session established......");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 获取分布式锁
	 * @param productId
	 */
	public Boolean acquireDistributedLock(Long productId) {
	   // 拼接锁对应的节点, 即锁对应的名字, 此处以商品id拼接为锁的名称
		String path = "/product-lock-" + productId;
	
		try {
		//  调用zk 客户端, 创建临时节点  
			zookeeper.create(path, "".getBytes(), 
				Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
				// 如果创建成功, 直接返回true
			return true;
			} 
			catch (Exception e) {
			// 如果创建锁失败, 那么就会进入异常的代码, 去反复的循环尝试上锁. 
		while(true) {
		try {
		// stae 如果是Null , 那么代表别人已经释放锁了 
			Stat stat = zk.exists(path, true); // 相当于是给node注册一个监听器,去看看这个监听器是否存在
			if(stat != null) {
			// stat 不是null , 代表其他系统没有释放资源. 
			//  使用CountDownLatch 指定等待的线程数是1 
				this.latch = new CountDownLatch(1); 
				// 调用await方法, 该线程进入阻塞状态, 如果别的系统释放了锁, 会唤醒此线程
				this.latch.await(waitTime, TimeUnit.MILLISECONDS);
				// 此处线程状态变为runnable, latch设置为null
				this.latch = null;
			}
			// 尝试去获取锁
			zookeeper.create(path, "".getBytes(), 
									Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
			// 成功获取了锁
			return true;
			} 
			catch(Exception e) {
			// 执行上锁失败, 继续 去循环获取锁
			continue;
	}
}
// 很不优雅,我呢就是给大家来演示这么一个思路
// 比较通用的,我们公司里我们自己封装的基于zookeeper的分布式锁,我们基于zookeeper的临时顺序节点去实现的,比较优雅的
		}
	return true;
}
	
	/**
	 * 释放掉一个分布式锁
	 * @param productId
	 */
	public void releaseDistributedLock(Long productId) {
		String path = "/product-lock-" + productId;
		try {
			zookeeper.delete(path, -1); 
			System.out.println("release the lock for product[id=" + productId + "]......");  
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 建立zk session的watcher
	 * @author Administrator
	 *
	 */
	private class ZooKeeperWatcher implements Watcher {
		// 其他系统释放了锁, 会回调监听器的此process方法
		public void process(WatchedEvent event) {
		//  在event中判断是否为监听的节点. 
			System.out.println("Receive watched event: " + event.getState());

			if(KeeperState.SyncConnected == event.getState()) {
				connectedSemaphore.countDown();
			} 

			if(this.latch != null) {  
			// latch 不为null ,代表在等待别的系统释放锁, 
			// 此处执行了countDown方法后
			//就会把acquireDistributedLock方法中this.latch.await这一行往下去执行代码
				this.latch.countDown();  
			}
		}
		
	}
	
	/**
	 * 封装单例的静态内部类
	 * @author Administrator
	 *
	 */
	private static class Singleton {
		
		private static ZooKeeperSession instance;
		
		static {
			instance = new ZooKeeperSession();
		}
		
		public static ZooKeeperSession getInstance() {
			return instance;
		}
		
	}
	
	/**
	 * 获取单例
	 * @return
	 */
	public static ZooKeeperSession getInstance() {
		return Singleton.getInstance();
	}
	
	/**
	 * 初始化单例的便捷方法
	 */
	public static void init() {
		getInstance();
	}
	
}

关注
打赏
1658054974
查看更多评论
立即登录/注册

微信扫码登录

0.0414s