基于临时序号节点来实现分布式锁

为什么要用临时节点呢?如果拿到锁的服务宕机了,会话失效ZK自己也会删除掉临时的序号节点,这样也不会阻塞其他服务。

流程:

1.在一个持久节点下面创建临时的序号节点作为锁节点,如:/lock/lockId00000001 /lock/lockId00000002

2.获取持久节点下面所有子节点,判断其最小的是不是自己,如果是自己则表示获取锁成功。

3.如果最小的结点不是自己,则阻塞等待,并对lock结点添加监听,如果结点数发生变化了,则说明有释放了锁。但是这里如果存在大量监听,当结点发生变化的时候,可能就会出现羊群效应。因为大家都监听了,同时会通知很多客户端,会造成ZK性能突然下降。

所以这里可以只监听自己前面的那个节点,排队执行,03监听02,02监听01

4.当锁释放,节点监听到变化之后,执行第2步。

释放锁只需要删除对应的临时节点,如果获取到锁的服务宕机了,因为是临时节点也会自己删除的。

代码实现:

package com.nijunyang.zookeeper.zklock;

import lombok.Data;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient; import java.util.List;
import java.util.stream.Collectors; /**
* Description:
* Created by nijunyang on 2020/11/29 21:51
*/
public class ZKLock {
private static final String SERVER = "192.168.0.67:2181";
private ZkClient zkClient;
private static final String ROOT_PATH = "/lock"; public ZKLock() {
zkClient = new ZkClient(SERVER, 5000, 20000);
buildRoot();
} private void buildRoot() {
if (!zkClient.exists(ROOT_PATH)) {
zkClient.createPersistent(ROOT_PATH);
}
} /**
* 加锁
*
* @param lockId
* @param timeout
* @return
*/
public Lock lock(String lockId, long timeout) {
Lock lockNode = createLock(lockId);
// 尝试激活锁
tryActiveLock(lockNode);
if (!lockNode.isActive()) {
try {
synchronized (lockNode) {
lockNode.wait(timeout);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (!lockNode.isActive()) {
throw new RuntimeException("获取锁超时");
}
return lockNode;
} /**
* 释放锁
*
* @param lock
*/
public void unlock(Lock lock) {
if (lock.isActive()) {
zkClient.delete(lock.getPath());
}
} /**
* 激活锁
* @param lockNode
*/
private void tryActiveLock(Lock lockNode) {
// 判断当前是否为最小节点
List<String> list = zkClient.getChildren(ROOT_PATH)
.stream()
.sorted()
.map(p -> ROOT_PATH + "/" + p)
.collect(Collectors.toList());
String firstNodePath = list.get(0); //第一个节点是当前节点,将锁设置为激活
if (firstNodePath.equals(lockNode.getPath())) {
lockNode.setActive(true);
} else {
//第一个节点不是当前节点,则监听前面一个节点
String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
//监听之后继续尝试加锁,加锁成功唤醒之前的等待
tryActiveLock(lockNode);
synchronized (lockNode) {
if (lockNode.isActive()) {
lockNode.notify();
}
}
zkClient.unsubscribeDataChanges(upNodePath, this);
}
});
}
}
/**
* 创建锁节点
*
* @param lockId
* @return Lock
*/
private Lock createLock(String lockId) {
String nodePath = zkClient.createEphemeralSequential(ROOT_PATH + "/" + lockId, "write");
return new Lock(lockId, nodePath);
} @Data
public static class Lock { private String lockId;
private String path;
/**
* 是否激活锁
*/
private boolean active; public Lock(String lockId, String path) {
this.lockId = lockId;
this.path = path;
}
}
}

最新文章

  1. [NOIP2010初赛]烽火传递+单调队列详细整理
  2. 【BZOJ-3757】苹果树 块状树 + 树上莫队
  3. PHP7在linux下的安装步骤
  4. 【uTenux实验】时间管理(系统时间/周期性处理/警报处理)
  5. JNI 程序开发
  6. mysql之视图
  7. Java多线程之wait(),notify(),notifyAll()
  8. Pig Apache Hadoop
  9. Linux下关闭node应用
  10. 工厂模式 and 单例模式
  11. How To Handle MLOG$_AP_SUPPLIER_SITES_AL, MLOG$_AP_SUPPLIERS Growing So Much? Having Lots of Data
  12. Python之常见算法介绍
  13. 前端性能优化 —— 添加Expires头
  14. INDY10 IDHTTPSERVER返回中文不乱码
  15. 「NOIP2018」保卫王国
  16. Android画布更新过程OnDraw调用过程
  17. SpringBoot-07:SpringBoot整合PageHelper做多条件分页查询
  18. Wannafly交流赛1 _A_有理数 【水】
  19. HDU1272---(并查集)简单应用
  20. 【转】python面向对象中的元类

热门文章

  1. Docker(8)- docker search 命令详解
  2. Git的全局及单个仓库配置
  3. 什么是4G模块 4G模块的工作原理及特点
  4. cmd,py脚本,py编译的exe,uipath及uibot对它们的调用
  5. VBA_headers_mapping
  6. Vue3.0响应式原理
  7. 指针常量&amp;常量指针&amp;指向常量的指针常量
  8. nice-ni 耗光cpu
  9. t分布与t检验的一点理解
  10. HDU100题简要题解(2000~2009)