zookeeper实现分布式锁

在学习zk实现分布锁之前,我们应该需要了解一些zk的知识

  • 1、持久节点:客户端断开连接zk不删除persistent类型节点
  • 2、临时节点:客户端断开连接zk删除ephemeral类型节点
  • 3、顺序节点:节点后面会自动生成类似0000001的数字表示顺序
  • 4、节点变化的通知:客户端注册了监听节点变化的时候,会调用回调方法

 

一、zk实现的简单的分布式锁

1、zk实现简单的分布式锁的思路,主要是抓住一下三点

(1)当一个客户端成功创建一个节点,另外一个客户端是无法创建同名的节点(达到互斥的效果)

(2)我们注册该节点的监听时间,当节点删除,会通知其他的客户端,这个时候其他的客户端可以重新去创建该节点(可以认为时拿到锁的客户端释放锁,其他的客户端可以抢锁)

(3)创建的节点应该时临时节点,这样保证我们在已经拿到锁的客户端挂掉了会自动释放锁

       

导入maven jar

        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>
        <!--客户端-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

View Code

 

抽象模版:

AbstractLock.java

package com.opendev.redssion.zookeeper;

import org.I0Itec.zkclient.ZkClient;

public abstract class AbstractLock {

    //zk地址和端口
    public static final String ZK_ADDR = "192.168.0.230:2181";
    //超时时间
    public static final int SESSION_TIMEOUT = 10000;
    //创建zk
    protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);


    /**
     * 可以认为是模板模式,两个子类分别实现它的抽象方法
     * 1,简单的分布式锁
     * 2,高性能分布式锁
     */
    public void getLock() {
        String threadName = Thread.currentThread().getName();
        if (tryLock()) {
            System.out.println(threadName+"-获取锁成功");
        }else {
            System.out.println(threadName+"-获取锁失败,进行等待...");
            waitLock();
            //递归重新获取锁
            getLock();
        }
    }

    public abstract void releaseLock();

    public abstract boolean tryLock();

    public abstract void waitLock();

}

View Code

SimpleZkLock.java

package com.opendev.redssion.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;

import java.util.concurrent.CountDownLatch;

public class SimpleZkLock extends AbstractLock {

    private static final String NODE_NAME = "/test_simple_lock";

    private CountDownLatch countDownLatch;

    @Override
    public void releaseLock() {
        if (null != zkClient) {
            //删除节点
            zkClient.delete(NODE_NAME);
            zkClient.close();
            System.out.println(Thread.currentThread().getName() + "-释放锁成功");
        }
    }

    //直接创建临时节点,如果创建成功,则表示获取了锁,创建不成功则处理异常
    @Override
    public boolean tryLock() {
        if (null == zkClient) {
            return false;
        }
        try {
            zkClient.createEphemeral(NODE_NAME);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void waitLock() {
        //监听器
        IZkDataListener iZkDataListener = new IZkDataListener() {
            //节点被删除回调
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }

            //节点改变被回调
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                // TODO Auto-generated method stub

            }
        };
        zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener);
        //如果存在则阻塞
        if (zkClient.exists(NODE_NAME)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " 等待获取锁...");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //删除监听
        zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener);
    }
}

View Code

 

LockTest.java

package com.opendev.redssion.zookeeper;

public class LockTest {
    public static void main(String[] args) {
        //模拟多个10个客户端
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new LockRunnable());
            thread.start();
        }

    }

    static class LockRunnable implements Runnable {

        @Override
        public void run() {
            AbstractLock zkLock = new SimpleZkLock();
            //AbstractLock zkLock = new HighPerformanceZkLock();
            zkLock.getLock();
            //模拟业务操作
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            zkLock.releaseLock();
        }

    }
}

View Code

 

 

二、高性能分布式锁

上面使用zk实现的简单的分布式锁,实现比较简单,但是存在性能问题,从上面的打印的结果可以看出、每一次客户端释放锁的时候,其他的客户端都会去抢锁,这就造成了不必要的浪费。那么如果提升性能呢?

1、思路:客户端在抢锁的时候进行排队,客户端只要监听它前一个节点的变化就行,如果前一个节点释放了锁,客户端才去进行抢锁操作,这个时候我们就需要创建顺序节点了

2、图解

(1)客户端排队

       

 

 (2)获取锁的逻辑

 

 HighPerformanceZkLock .java

package com.opendev.redssion.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 高性能分布式锁
 *
 * @author hongtaolong
 */
public class HighPerformanceZkLock extends AbstractLock {

    private static final String PATH = "/highPerformance_zklock";
    //当前节点路径
    private String currentPath;
    //前一个节点的路径
    private String beforePath;

    private CountDownLatch countDownLatch = null;

    public HighPerformanceZkLock() {
        //如果不存在这个节点,则创建持久节点
        if (!zkClient.exists(PATH)) {
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    public void releaseLock() {
        if (null != zkClient) {
            zkClient.delete(currentPath);
            zkClient.close();
        }

    }

    @Override
    public boolean tryLock() {
        //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (null == currentPath || "".equals(currentPath)) {
            //在path下创建一个临时的顺序节点
            currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");
        }
        //获取所有的临时节点,并排序
        List<String> childrens = zkClient.getChildren(PATH);
        Collections.sort(childrens);
        if (currentPath.equals(PATH + "/" + childrens.get(0))) {
            return true;
        } else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
            int pathLength = PATH.length();
            int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength + 1));
            beforePath = PATH + "/" + childrens.get(wz - 1);
        }
        return false;
    }

    @Override
    public void waitLock() {
        IZkDataListener lIZkDataListener = new IZkDataListener() {

            //节点删除的回调机制
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                if (null != countDownLatch) {
                    countDownLatch.countDown();
                }
            }

            //节点数据改变的回调机制
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        //监听前一个节点的变化
        zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
    }
}

View Code