01 安装并允许zookeeper

  1. 安装jdk
  2. 官网下载zookeeper的压缩包,我这里下载的是3.4.10版本
  3. 解压后进入到zookeeper-3.4.10/conf,修改zoo_sample.cfg文件修改为zoo.cfg文件
mv zoo_sample.cfg zoo.cfg
  • 1
  1. 打开zoo.cfg文件,修改dataDir路径。修改后在/usr/local/zookeeper-3.4.10目录创建文件夹mkdir zkData
dataDir=/usr/local/zookeeper-3.4.10/zkData
  • 1
  1. 启动zookeeper
/usr/local/zookeeper-3.4.10/bin/zkServer.sh start
  • 1

02 springboot应用配置CuratorFramework

  1. 导入maven依赖
<!-- zookeeper 客户端 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency> <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
  1. 配置CuratorFramework

zookeeper的默认端口是2181

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling
@EnableJpaAuditing
@SpringBootApplication
public class MyDemoApplication { public static void main(String[] args) {
SpringApplication.run(MyDemoApplication.class, args);
} @Bean
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(5, 1000));
}
}
  1. 启动CuratorFramework客户端
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service; /**
* 实现了ApplicationRunner接口后,当容器启动后,会执行实现的run方法
*
* @author 594781919
*/
@Service
public class StartService implements ApplicationRunner { @Autowired
private CuratorFramework curatorFramework; @Autowired
private ListenerService listenerService; @Override
public void run(ApplicationArguments applicationArguments) {
// 非常重要!!!Start the client. Most mutator methods will not work until the client is started
curatorFramework.start();
System.out.println("zookeeper client start");
// 初始化监听方法
listenerService.listener();
}
}

03 使用zookeeper实现集群只一个应用实例执行定时任务

当我们启动多个实例时,需要其中一个实例执行定时任务,其它实例不执行。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import java.io.IOException;
import java.util.Date; /**
* 实现多个应用实例只一个执行定时任务
*
* @author 594781919@qq.com
*/
@Service
public class TimerTaskService { @Autowired
private CuratorFramework curatorFramework; @Value("${server.port}")
private String port; @Scheduled(cron = "0/5 * * * * *")
public void task() {
LeaderLatch leaderLatch = new LeaderLatch(curatorFramework, "/timerTask");
try {
leaderLatch.start();
// Leader选举需要一些时间,等待2秒
Thread.sleep(2000);
// 判断是否为主节点
if (leaderLatch.hasLeadership()) {
System.out.println(new Date() + " port=" + port + " 是领导");
// 定时任务的业务逻辑代码
} else {
System.out.println(new Date() + " port=" + port + " 是从属");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
leaderLatch.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

04 使用zookeeper实现分布式锁

import com.igola.domain.Employee;
import com.igola.repository.EmployeeRepository;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; /**
* @author 594781919@qq.com
*/
@RestController
public class EmployeeController {
@Autowired
private EmployeeRepository employeeRepository; @Autowired
private CuratorFramework curatorFramework; @GetMapping("/emp/save")
public Employee save(String name) { // 获取锁
InterProcessSemaphoreMutex balanceLock = new InterProcessSemaphoreMutex(curatorFramework, "/zktest" + name);
Employee employee = new Employee();
try {
// 执行加锁操作
balanceLock.acquire();
System.out.println("已经加锁了, name=" + name);
employee.setName(name);
if ("abc".equals(name)) {
Thread.sleep(30000);
}
employee.setAge((int) (Math.random() * 100));
employee.setSex(false);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放锁资源
balanceLock.release();
} catch (Exception e) {
e.printStackTrace();
}
} employeeRepository.save(employee); return employee;
}
}
  • 1
  • 2

05 使用zookeeper实现调度任务

当我们在启动多个服务后,访问了其中一个服务,执行了一些方法。然后我们需要其它服务也要执行这些方法,就需要用到NodeCache。

比如我们把一些数据缓存到Map对象中,当需要更新这个Map对象的数据时,我们就可以用NodeCache将每个服务都更新自己的Map对象。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service; import javax.annotation.PreDestroy;
import java.util.Date; /**
* @author 594781919
*/
@Service
public class ListenerService {
private final CuratorFramework curatorFramework;
private NodeCache nodeCache; public static final String path = "/hello/world"; public ListenerService(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework; } public void listener() {
try {
// 创建路径
Stat stat = curatorFramework.checkExists().forPath(path);
if (stat == null) {
curatorFramework.create().creatingParentsIfNeeded().forPath(path);
}
} catch (Exception e) {
e.printStackTrace();
}
nodeCache = new NodeCache(curatorFramework, path);
// 添加监听的路径改变后需要执行的任务
nodeCache.getListenable().addListener(this::run);
try {
nodeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("开始监听......");
} @PreDestroy
public void preDestroy() {
CloseableUtils.closeQuietly(nodeCache);
} public void notifyNodeCache() {
try {
curatorFramework.setData().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
} // 需要执行的调度任务
private void run() {
System.out.println(new Date().toLocaleString() + ", 开始执行监听任务");
}
}

最新文章

  1. ng-repeat循环出来的部分调用同一个函数并且实现每个模块之间不能相互干扰
  2. Hui之Hui.js 官方文档
  3. [No00000F]Excel快捷键大全 Excel2013/2010/2007/2003常用快捷键大全
  4. [转]Python程序员必须知道的30条编程技巧
  5. SQLServer 触发器 同时插入多条记录有关问题
  6. Linux+Apache+Tomcat集群配置
  7. java ant 命令大全
  8. CXF整合Spring开发WebService
  9. python描述符 descriptor
  10. 数字雨Shopex 4.8.5 SQL Injection Exp
  11. oracle之sql语句优化
  12. MySQL --当AUTO_INCREMENT自增遇到0
  13. 接收JSON类型转成对象
  14. iOS----------四舍五入(只舍不入)
  15. 自己遇到的ajax调用ashx文件无法获取返回值的一种情况
  16. error loading midas.dll问题
  17. day11:装饰器
  18. 在线即时展现 Html、JS、CSS 编辑工具 - JSFiddle
  19. 20145324王嘉澜 《网络对抗》进阶实践之 shellcode注入和Return-to-libc攻击深入
  20. 2013长沙网赛E题Travel by Bike

热门文章

  1. NOIP 模拟赛 day5 T2 水 故事题解
  2. 备战-Java IO
  3. [.NET大牛之路 003] .NET 的发展简史
  4. 案例分享:Qt+Arm基于RV1126平台的内窥镜软硬整套解决方案(实时影像、冻结、拍照、录像、背光调整、硬件光源调整,其他产品也可使用该平台,如视频监控,物联网产品等等)
  5. 结对开发_石家庄地铁查询web系统_psp表
  6. [刘阳Java]_什么是EasyUI_第1讲
  7. 【LeetCode】389.找不同
  8. Bigdecimal用法
  9. GhostScript 沙箱绕过(命令执行)漏洞(CVE-2018-19475)
  10. 记录21.07.22 —— Vue.js基础(一)