zk实现分布式锁纵观网络各种各样的帖子层出不穷,笔者查阅很多资料发现一个问题,有些文章只写原理并没有具体实现,有些文章虽然写了实现但是并不全面

借这个周末给大家做一个总结,代码拿来就可以用并且每一种实现都经过了测试没有bug。下面我们先从最简单的实现开始介绍:

  • 简单的实现
package com.srr.lock;

/**
* @Description 分布式锁的接口
*/
abstract public interface DistributedLock {
/**
* 获取锁
*/
boolean lock();
/**
* 解锁
*/
void unlock(); abstract boolean readLock();
abstract boolean writeLock();
} package com.srr.lock; /**
* 简单的zk分布式做实现策略
* 性能比较低会导致羊群效应
*/
public abstract class SimplerZKLockStrategy implements DistributedLock{
/**
* 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
* @throws Exception
*/
@Override
public boolean lock() {
//获取锁成功
if (tryLock()){
System.out.println(Thread.currentThread().getName()+"获取锁成功");
return true;
}else{ //获取锁失败
//阻塞一直等待
waitLock();
//递归,再次获取锁
return lock();
}
} /**
* 尝试获取锁,子类实现
*/
protected abstract boolean tryLock() ;
/**
* 等待获取锁,子类实现
*/
protected abstract void waitLock();
/**
* 解锁:删除key
*/
@Override
public abstract void unlock();
} package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.concurrent.CountDownLatch; /**
* 分布式锁简单实现
*/
public class SimpleZKLock extends SimplerZKLockStrategy{
private static final String PATH = "/lowPerformance_zklock";
private CountDownLatch countDownLatch = null;
//zk地址和端口
public static final String ZK_ADDR = "192.168.32.129:2181";
//创建zk
protected ZkClient zkClient = new ZkClient(ZK_ADDR); @Override
protected boolean tryLock() {
//如果不存在这个节点,则创建持久节点
try{
zkClient.createEphemeral(PATH, "lock");
return true;
}catch (Exception e){
return false;
}
} @Override
protected void waitLock() {
IZkDataListener lIZkDataListener = new IZkDataListener() { @Override
public void handleDataDeleted(String dataPath) throws Exception {
if (null != countDownLatch){
countDownLatch.countDown();
}
System.out.println("listen lock unlock");
} @Override
public void handleDataChange(String dataPath, Object data) throws Exception { }
};
//监听前一个节点的变化
zkClient.subscribeDataChanges(PATH, lIZkDataListener);
if (zkClient.exists(PATH)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(PATH, lIZkDataListener); } @Override
public void unlock() {
if (null != zkClient) {
System.out.println("lock unclock");
zkClient.delete(PATH);
}
} @Override
public boolean readLock() {
return true;
} @Override
public boolean writeLock() {
return true;
}
} package com.srr.lock; import redis.clients.jedis.Jedis; import java.util.concurrent.CountDownLatch; /**
* 测试场景
* count从1加到4
* 使用简单的分布式锁在分布式环境下保证结果正确
*/
public class T { volatile int count = 1; public void inc(){
for(int i = 0;i<3;i++){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
System.out.println("count == "+count);
}
} public int getCount(){
return count;
} public static void main(String[] args) throws InterruptedException {
final T t = new T();
final Lock lock = new Lock();
final CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i = 0;i<5;i++){
new Thread(new Runnable() {
@Override
public void run() {
DistributedLock distributedLock = new SimpleZKLock();
if(lock.lock(distributedLock)){
t.inc();
lock.unlock(distributedLock);
countDownLatch.countDown();
}
System.out.println("count == "+t.getCount());
}
}).start();
}
countDownLatch.await();
}
}

运行结果:

这种方式实现虽然简单,但是会引发羊群效应,因为每个等待锁的客户端都需要注册监听lock节点的删除事件,如果客户端并发请求很多,那么这将会非常消耗zookeeper集群

的资源,严重的化则会导致zookeeper集群宕机也不是没有可能。

  • 高性能实现,解决羊群效应问题
package com.srr.lock;

/**
* @Description 分布式锁的接口
*/
abstract public interface DistributedLock {
/**
* 获取锁
*/
boolean lock();
/**
* 解锁
*/
void unlock(); abstract boolean readLock();
abstract boolean writeLock();
} package com.srr.lock; public abstract class BlockingZKLockStrategy implements DistributedLock{
/**
* 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
* @throws Exception
*/
@Override
public final boolean lock() {
//获取锁成功
if (tryLock()){
System.out.println(Thread.currentThread().getName()+"获取锁成功");
return true;
}else{ //获取锁失败
//阻塞一直等待
waitLock();
//递归,再次获取锁
return true;
}
} /**
* 尝试获取锁,子类实现
*/
protected abstract boolean tryLock() ;
/**
* 等待获取锁,子类实现
*/
protected abstract void waitLock();
/**
* 解锁:删除key
*/
@Override
public abstract void unlock();
} package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch; public class BlockingZKLock extends BlockingZKLockStrategy{
private static final String PATH = "/highPerformance_zklock";
//当前节点路径
private String currentPath;
//前一个节点的路径
private String beforePath;
private CountDownLatch countDownLatch = null;
//zk地址和端口
public static final String ZK_ADDR = "192.168.32.129:2181";
//超时时间
public static final int SESSION_TIMEOUT = 30000;
//创建zk
protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT); public BlockingZKLock() {
//如果不存在这个节点,则创建持久节点
if (!zkClient.exists(PATH)) {
zkClient.createPersistent(PATH);
}
} @Override
protected boolean tryLock() {
//如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
//if (null == currentPath || "".equals(currentPath)) {
//在path下创建一个临时的顺序节点
currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
//}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取所有的临时节点,并排序
List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) {
return true;
}else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
int pathLength = PATH.length();
int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
beforePath = PATH+"/"+childrens.get(wz-1);
}
return false; } @Override
protected void waitLock() {
IZkDataListener lIZkDataListener = new IZkDataListener() { @Override
public void handleDataDeleted(String dataPath) throws Exception {
if (null != countDownLatch){
countDownLatch.countDown();
}
System.out.println("listen lock unlock");
} @Override
public void handleDataChange(String dataPath, Object data) throws Exception { }
};
//监听前一个节点的变化
zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
if (zkClient.exists(beforePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener); } @Override
public void unlock() {
if (null != zkClient) {
System.out.println("lock unclock");
zkClient.delete(currentPath);
}
} @Override
public boolean readLock() {
return true;
} @Override
public boolean writeLock() {
return true;
}
} package com.srr.lock; import java.util.concurrent.CountDownLatch; /**
* 测试场景
* count从1加到4
* 使用高性能的分布式锁在分布式环境下保证结果正确
*/
public class T { volatile int count = 1; public void inc(){
for(int i = 0;i<3;i++){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
System.out.println("count == "+count);
}
} public int getCount(){
return count;
} public static void main(String[] args) throws InterruptedException {
final T t = new T();
final Lock lock = new Lock();
final CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i = 0;i<5;i++){
new Thread(new Runnable() {
@Override
public void run() {
DistributedLock distributedLock = new BlockingZKLock();
if(lock.lock(distributedLock)){
t.inc();
lock.unlock(distributedLock);
countDownLatch.countDown();
}
System.out.println("count == "+t.getCount());
}
}).start();
}
countDownLatch.await();
}
}

这种实现客户端只需监听它前一个节点的变化,不需要监听所有的节点,从而提高了zookeeper锁的性能。

  • 共享锁(S锁)
  • 写到这个,看了网络上很多错误的文章实现把排它锁当做共享锁

共享锁正确是实现姿势如下:

package com.srr.lock;

/**
* @Description 分布式锁的接口
*/
abstract public interface DistributedLock {
/**
* 获取锁
*/
boolean lock();
/**
* 解锁
*/
void unlock(); abstract boolean readLock();
abstract boolean writeLock();
} package com.srr.lock; /**
* 共享锁策略
*/
abstract public class ZKSharedLockStrategy implements DistributedLock{
@Override
public boolean readLock() {
//获取锁成功
if (tryReadLock()){
System.out.println(Thread.currentThread().getName()+"获取读锁成功");
return true;
}else{ //获取锁失败
//阻塞一直等待
waitLock();
//递归,再次获取锁
return true;
}
} @Override
public boolean writeLock() {
//获取锁成功
if (tryWriteLock()){
System.out.println(Thread.currentThread().getName()+"获取写锁成功");
return true;
}else{ //获取锁失败
//阻塞一直等待
waitLock();
//递归,再次获取锁
return true;
}
} /**
* 尝试获取锁,子类实现
*/
protected abstract boolean tryWriteLock() ; /**
* 尝试获取锁,子类实现
*/
protected abstract boolean tryReadLock() ; /**
* 等待获取锁,子类实现
*/
protected abstract void waitLock(); /**
* 解锁:删除key
*/
@Override
public abstract void unlock();
} package com.srr.lock; import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient; import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch; /**
* 共享锁
*/
public class ZKSharedLock extends ZKSharedLockStrategy{ private static final String PATH = "/zk-root-readwrite-lock";
//当前节点路径
private String currentPath;
//前一个节点的路径
private String beforePath;
private CountDownLatch countDownLatch = null;
//zk地址和端口
public static final String ZK_ADDR = "192.168.32.129:2181";
//超时时间
public static final int SESSION_TIMEOUT = 30000;
//创建zk
protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
public ZKSharedLock() {
//如果不存在这个节点,则创建持久节点
if (!zkClient.exists(PATH)) {
zkClient.createPersistent(PATH);
}
} @Override
protected boolean tryWriteLock() {
//如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
if (null == currentPath || "".equals(currentPath)) {
//在path下创建一个临时的顺序节点
currentPath = zkClient.createEphemeralSequential(PATH+"/w", "writelock");
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取所有的临时节点,并排序
List<String> childrens = zkClient.getChildren(PATH);
Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) {
return true;
}else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
int pathLength = PATH.length();
int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
beforePath = PATH+"/"+childrens.get(wz-1);
}
return false;
} @Override
protected boolean tryReadLock() {
//如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
if (null == currentPath || "".equals(currentPath)) {
//在path下创建一个临时的顺序节点
currentPath = zkClient.createEphemeralSequential(PATH+"/r", "readklock");
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取所有的临时节点,并排序
List<String> childrens = zkClient.getChildren(PATH);
Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) {
return true;
}else if(isAllReadNodes(childrens)){
return true;
}else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
int pathLength = PATH.length();
int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); for (int i = wz - 1; i > 0; i--) {
// 找到了离得最近的一个写节点,那么它的后一个节点要么是一个读节点,要么就是待加锁的节点本身
if (childrens.get(i).indexOf("w") >= 0) {
beforePath = PATH + "/" + childrens.get(i);
break;
}
}
}
return false;
} // 判断比自已小的节点是否都是读节点
private boolean isAllReadNodes(List<String> sortNodes) {
int pathLength = PATH.length();
int currentIndex = Collections.binarySearch(sortNodes, currentPath.substring(pathLength+1));
for (int i = 0; i < currentIndex - 1; i++) {
// 只要有一个写锁,则不能直接获取读锁
if (sortNodes.get(i).indexOf("w") >= 0) {
return false;
}
} return true;
} @Override
protected void waitLock() {
IZkDataListener lIZkDataListener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
if (null != countDownLatch){
countDownLatch.countDown();
}
System.out.println("listen lock unlock");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception { }
};
//监听前一个节点的变化
zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
if (zkClient.exists(beforePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
} @Override
public boolean lock() {
return false;
} @Override
public void unlock() {
if (null != zkClient) {
System.out.println("lock unclock");
zkClient.delete(currentPath);
zkClient.close();
}
}
} package com.srr.lock; /**
* 锁工具类
*/
public class Lock {
/**
* 获取锁
*/
boolean lock(DistributedLock lock) {
return lock.lock();
}; /**
* 获取读锁
*/
boolean readlock(DistributedLock lock) {
return lock.readLock();
}; /**
* 获取读锁
*/
boolean writeLock(DistributedLock lock) {
return lock.writeLock();
}; /**
* 释放锁
*/
void unlock(DistributedLock lock) {
lock.unlock();
};
} package com.srr.lock; import java.util.concurrent.CountDownLatch; /**
* 测试共享锁
*/
public class SharedLockTest {
private static volatile int count = 0;
public static void main(String[] args) throws Exception {
final Lock lock = new Lock();
final CountDownLatch countDownLatch = new CountDownLatch(10); new Thread(new Runnable() {
@Override
public void run() {
testWriteLock(8);
}
}).start(); new Thread(new Runnable() {
@Override
public void run() {
testReadLock(10);
}
}).start(); new Thread(new Runnable() {
@Override
public void run() {
testReadLock(20);
}
}).start(); new Thread(new Runnable() {
@Override
public void run() {
testWriteLock(11);
}
}).start(); new Thread(new Runnable() {
@Override
public void run() {
testWriteLock(30);
}
}).start(); new Thread(new Runnable() {
@Override
public void run() {
testReadLock(9);
}
}).start(); countDownLatch.await();
} // 读锁
private static void testReadLock(long sleepTime) {
try {
Lock lock = new Lock();
DistributedLock dlock = new ZKSharedLock();
lock.readlock(dlock);
System.out.println("i get readlock ->" + sleepTime);
System.out.println("count = "+ count);
Thread.sleep(sleepTime);
lock.unlock(dlock);
} catch (Exception e) {
e.printStackTrace();
}
} // 写锁
private static void testWriteLock(long sleepTime) {
try {
Lock lock = new Lock();
DistributedLock dlock = new ZKSharedLock();
lock.writeLock(dlock);
System.out.println("i get writelock ->" + sleepTime);
count++;
Thread.sleep(sleepTime);
lock.unlock(dlock);
} catch (Exception e) {
e.printStackTrace();
}
} }

运行结果:

从结果可以看出读锁和读锁可以共享锁,而写锁必须等待读锁或者写锁释放之后才能获取锁。

最后,zk分布式锁完美解决方案:

  • Apache Curator
  • Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
  • Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.

网上很多文章竟然标题用Curator实现分布式锁,大哥Curator框架本身已经实现了分布式锁而且提供了各种各样的锁api供大家使用,我们不用再基于Curator实现分布式锁,这不是多此一举吗?这里给出一个简单的使用案例,旨在说明意图:

package com.srr.lock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; /**
* 测试场景
* count从1加到101
* 使用redis分布式锁在分布式环境下保证结果正确
*/
public class CuratorDistributedLockTest {
private static final String lockPath = "/curator_lock";
//zk地址和端口
public static final String zookeeperConnectionString = "192.168.32.129:2181"; volatile int count = 1; public void inc(){
for(int i = 0;i<10;i++){
count++;
System.out.println("count == "+count);
}
} public int getCount(){
return count;
} public static void main(String[] args) throws InterruptedException {
final T t = new T();
final Lock lock = new Lock();
final CountDownLatch countDownLatch = new CountDownLatch(4);
for(int i = 0;i<4;i++){
new Thread(new Runnable() {
@Override
public void run() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(10, 5000);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS))
{
try
{
System.out.println("get the lock");
t.inc();
}
finally
{
lock.release();
System.out.println("unlock the lock");
}
}
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
}
}).start();
} countDownLatch.await();
System.out.println("total count == "+t.getCount());
}
}

运行结果:

如果想更多了解Curator框架,请移步http://curator.apache.org/,官网给出了详细的使用案例及介绍。至此zk实现分布式锁总结完毕!

原创不易,请多多关注!

最新文章

  1. Lesson 23 A new house
  2. 关于Web服务器的认识
  3. Visual Studio Code 智能提示文件
  4. publishing failed with multiple errors resource is out of sync with the file system--转
  5. 简单实用的Log4net帮助类
  6. jNotify:操作结果信息提示条
  7. Java陷阱之assert关键字
  8. 利用 Django REST framework 编写 RESTful API
  9. 【drp 11】使用Junit简单测试接口方法
  10. 51nod1627 瞬间移动
  11. css3 transition 实现图片放大缩小
  12. Bluetooth in Android 4.2 and 4.3(三):Enable Bluetooth
  13. 在自定义的web监听器中嵌入web中的定时事件
  14. Jenkins的plugin开发
  15. c读写文件相关
  16. 分页技术之PageDataSource类
  17. 慕课linux学习笔记(八)常用命令(5)
  18. Nancy 框架
  19. LWP::UserAgent介绍3 -&gt; cookie设置
  20. oracle导入TYPE对象报错ORA-02304

热门文章

  1. Pycharm中设置encoding
  2. for-loop 与 json.Unmarshal 性能分析概要
  3. Next.js 7发布,构建速度提升40%
  4. Vue学习—— Vuex学习笔记
  5. 翻译 - Kafka Streams 介绍(一)
  6. matlab混合编程向导(vc,vb,.net...)
  7. 数学--数论--HDU - 6322 打表找规律
  8. RobotFrameWork 自动化环境搭建(基于 python3.6)
  9. Prime Path素数筛与BFS动态规划
  10. ztree根据参数动态控制是否显示复选框/单选框(静态JSON数据)