您当前的位置: 首页 >  zookeeper

Charge8

暂无认证

  • 3浏览

    0关注

    447博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Zookeeper分布式锁

Charge8 发布时间:2022-05-23 23:05:42 ,浏览量:3

实现一把分布式锁通常有很多方法,比较常见的有 Redis 和 Zookeeper。

Redis分布式锁可参考之前的文章:

  • Redisson 分布式锁原理分析:https://blog.csdn.net/qq_42402854/article/details/123342331

Zookeeper能实现分布式锁,是因为它有一个特性,就是多个线程去 Zookeeper 里面去创建同一个节点的时候,只会有一个线程执行成功。

锁可理解为 ZooKeeper 上的一个节点,

  • 当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
  • 当释放锁时,只需要将会话关闭,临时节点就删除了,即释放了锁。如果万一客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点也会自动删除掉。

Zookeeper 的分布式锁实现: 实现有两种方案:

  1. 基于临时节点实现,会产生羊群效应,不推荐。
  2. 基于临时顺序节点实现。

一般我们认为,ZooKeeper 的分布式锁是基于临时顺序节点,然后通过监听机制来实现的。即方案2。

一、基于临时节点实现 1、实现思路

在这里插入图片描述 锁节点:即锁对象

获取锁:锁请求者加锁,则创建锁节点。

  • 如果创建成功,那么加锁成功,
  • 如果创建失败,那么加锁失败,则等待获取锁(等待获取锁成功的客户端释放锁)。注意:这里锁请求者监听的都是 锁节点的删除操作。

释放锁:只需要将会话关闭,临时节点就删除了,即释放了锁。

羊群效应:如果所有的锁请求者都来监听锁持有者,当锁持有者的节点被删除以后,所有的锁请求者都会通知到,即都会同时去竞争锁,但是只有一个锁请求者能拿到锁。这就是羊群效应。

2、实现代码

一般不推荐使用。这里我们自己简单实现一下。使用的是 ZkClient客户端。

2.1 ZkClient客户端连接
public class ZkClientConnectUtils {

    private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开

    /**
     * 使用匿名监听类
     * @throws Exception
     */
    public static ZkClient zKClientConnnect() throws Exception {
        ZkClient zkClient = new ZkClient(CONNECT_STR, 3000,60000);
        TimeUnit.SECONDS.sleep(3);
        return zkClient;
    }

}
2.2 锁实现 2.2.1 锁接口
/**
 * 自定义zk分布式锁:定义通用锁接口
 */
public interface ZKLock {

    /**
     * 加锁
     */
    void lock();

    /**
     * 释放锁
     */
    void unlock();
}
2.2.2 抽象锁对象
/**
 * 抽象锁对象:用到了模板方法设计模式,具体抽象方法由子类实现
 */
public abstract class AbstractZKLock implements ZKLock {

    protected static String path = "/lock";

    protected ZkClient zkClient = null;

    public AbstractZKLock() {
        initClient();
    }

    public void initClient(){
        try {
            zkClient = createClient();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("初始化 zk客户端连接失败,errorMessage=" + e.getMessage());
        }
    }

    /**
     * 交给子类创建
     * @return
     */
    protected abstract ZkClient createClient()  throws Exception;

    /**
     * lock方法(模板方法设计模式):获取锁的方法
     *   1.如果锁获取成功,那么业务代码继续往下走。
     *   2.如果锁获取失败,lock方法需要等待重新获取锁
     *      2.1等待到了前面那个获取锁的客户端释放锁以后(zk监听机制),
     *      2.2然后再去重新获取锁
     */
    @Override
    public void lock() {
        // 尝试去获取锁
        if(tryLock()){
            System.out.println(Thread.currentThread().getName() + "--->获取锁成功!");
        }else {
            // 获取失败,在这里等待
            waitforlock();
            // 重新获取锁
            lock();
        }
    }

    @Override
    public void unlock() {
        // 因为加锁创建的是临时节点,所以会话关闭,临时节点就删除了,即释放了锁
        zkClient.close();
    }

    /**
     * 获取锁
     * @return
     */
    protected abstract boolean tryLock();

    /**
     * 获取失败,等待其他释放锁,重新获取锁
     */
    protected abstract void waitforlock();
}
2.2.3 ZkClient客户端实现
/**
 * 基于ZkClient客户端实现锁:
 */
public class ZkClientLock extends AbstractZKLock {

    private CountDownLatch cdl = null;

    @Override
    protected ZkClient createClient() throws Exception{
        return ZkClientConnectUtils.zKClientConnnect();
    }

    /**
     * 尝试获取锁
     * @return
     */
    @Override
    protected boolean tryLock() {
        try {
            // 加锁:创建临时节点,创建成功表示加锁成功,否则加锁失败。zookeeper 的特性,节点名不能重复,否则创建失败。
            if(zkClient.exists(path)) {
                zkClient.delete(path);
            }
            //创建临时节点
            zkClient.createEphemeral(path);
            return true;
        } catch (RuntimeException e) {
            return false;
        }
    }

    /**
     * 等待获取锁:
     * 等前面那个获取锁成功的客户端释放锁
     * 没有获取到锁的客户端都会走到这里
     * 1、没有获取到锁的要注册对 path节点的watcher
     * 2、这个方法需要等待
     */
    @Override
    protected void waitforlock() {
        IZkDataListener iZkDataListener = new IZkDataListener() {

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }

            // 一旦 path节点被删除(释放锁)以后,就会触发这个方法
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                // 让等待的代码不再等待了
                // 即 waitforlock方法执行结束,重新去获取锁
                if (cdl != null) {
                    cdl.countDown();
                }
            }
        };
        // 注册对 path节点的watcher
        zkClient.subscribeDataChanges(path, iZkDataListener);

        // 等待
        if (zkClient.exists(path)) {
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 取消该客户端的订阅关系
        zkClient.unsubscribeDataChanges(path, iZkDataListener);
    }
}
2.3 业务代码
public class OrderService {
    private OrderNumGenereteUtils orderNumFactory = new OrderNumGenereteUtils();

    // ZkClient分布式锁
    private ZKLock lock = new ZkClientLock();

    /**
     * 创建订单号,模拟业务
     */
    public void createOrderNum() {
        lock.lock();
        String orderNum = generateOrderNum();
        System.out.println(Thread.currentThread().getName() + "创建了订单号:[" + orderNum + "]");
        lock.unlock();
    }


    /**
     * 生成时间格式的订单编号
     * @return
     */
    private static int orderNum = 0;
    public String generateOrderNum() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");
        return simpleDateFormat.format(new Date()) + ++orderNum;
    }

}
2.4 测试
public class ZkClientLockTest {

    private static Integer count = 50;
    private static CountDownLatch cdl = new CountDownLatch(count);

    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i             
关注
打赏
1664721914
查看更多评论
0.0386s