1. Zookeeper的基本操作

  zookeeper中的节点可以持久化/有序的两个维度分为四种类型:

  PERSIST:持久化无序(保存在磁盘中)

  PERSIST_SEQUENTIAL:持久化有序递增

  EPHEMERAL:非持久化的无序的,保存在内存中,当客户端关闭后消失。

  EPHEMERAL_SEQUENTIAL:非持久有序递增,保存在内存中,当客户端关闭后消失

  每个节点都可以注册Watch操作,用于监听节点的变化,有四种事件类型如下:

  Created event: Enabled with a call to exists

  Deleted event: Enabled with a call to exists, getData, and getChildren

  Changed event: Enabled with a call to exists and getData

  Child event: Enabled with a call to getChildren

  Watch的基本特征是客户端先得到通知,然后才能得到数据,Watch被fire之后就立即取消了,不会再有Watch后续变化,想要监听只能重新注册;

使用原生Zookeeper创建节点和监听节点变化代码如下:

  1. 引入依赖,pom.xml

 <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>

  2. 客户端连接类

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//创建链接,并监听连接状态
ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("链接客户端");
System.out.println(watchedEvent.getState());
}
});
//创建节点,/parent:节点路径, data.xx:数据,Ids:设置权限CreateNode.PERSISTENT:创建节点类型
String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//监听节点变化
zooKeeper.exists("/testRoot", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("state" + watchedEvent.getState());
}
});
System.out.println(parent);
Thread.sleep(10000000);
}
}

  运行创建一个持久化的节点。

  查看客户端可以看到:

  

  parent节点创建成功。

  删除parent节点,观察watche变化。

  控制台打印:

  

  表示监听了删除节点事件,此时再在客户端手动创建节点,观察变化

  

  控制台并没有打印任何创建信息,说明没有监听到,这就是我们说的一旦watche被fire之后就会被关闭,此时改造一下代码:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//创建链接,并监听连接状态
final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("链接客户端");
System.out.println(watchedEvent.getState());
}
});
//创建节点
String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//监听节点变化
zooKeeper.exists("/parent", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("state" + watchedEvent.getState());
try {
//重新注册监听事件
zooKeeper.exists("/parent", this);
} catch (KeeperException e) {
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// System.out.println(newNode);
Thread.sleep(10000000);
}
}

  删除节点,再手动创建节点:

  

  控制台打印如下:

  

  这样创建节点的事件就又被重新注册并监听到了。

2. 基于Zookeeper的Leader Election

  1. 抢注Leader节点——非公平模式

  编码流程:

  1. 创建Leader父节点,如/chroot,并将其设置为persist节点

  2. 各客户端通过在/chroot下创建Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral

  3. 若某创建Leader节点成功,则该客户端成功竞选为Leader

  4. 若创建Leader节点失败,则竞选Leader失败,在/chroot/leader节点上注册exist的watch,一旦该节点被删除则获得通知

  5. Leader可通过删除Leader节点来放弃Leader

  6. 如果Leader宕机,由于Leader节点被设置为ephemeral,Leader节点会自行删除。而其它节点由于在Leader节点上注册了watch,故可得到通知,参与下一轮竞选,从而保证总有客户端以Leader角色工作。

  实现代码如下:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//创建链接,并监听连接状态
final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("链接客户端");
System.out.println(watchedEvent.getState());
}
});
//创建节点
String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //监听节点变化
zooKeeper.exists("/parent", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("state" + watchedEvent.getState());
try {
zooKeeper.exists("/parent", this);
} catch (KeeperException e) {
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
// System.out.println(newNode);
Thread.sleep(10000000);
}
}

  当存在节点之后,会抛出异常,这样就会导致节点创建不成功,所以只有创建成功的node才能成为leader。使用watcher监听可以在节点被删除或宕机之后来抢占leader.

  2.  先到先得,后者监视前者——公平模式

  1. 创建Leader父节点,如/chroot,并将其设置为persist节点

  2. 各客户端通过在/chroot下创建Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral_sequential

  3. 客户端通过getChildren方法获取/chroot/下所有子节点,如果其注册的节点的id在所有子节点中最小,则当前客户端竞选Leader成功

  4. 否则,在前面一个节点上注册watch,一旦前者被删除,则它得到通知,返回step 3(并不能直接认为自己成为新Leader,因为可能前面的节点只是宕机了)

  5. Leader节点可通过自行删除自己创建的节点以放弃Leader

  代码实现如下:

package com.wangx.kafka.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

public class ZkDemo {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//创建链接,并监听连接状态
final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("链接客户端");
System.out.println(watchedEvent.getState());
}
});
//创建节点
String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //监听节点变化
zooKeeper.exists("/parent", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("state" + watchedEvent.getState());
try {
zooKeeper.exists("/parent", this);
} catch (KeeperException e) {
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}); String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
// System.out.println(newNode);
Thread.sleep(10000000);
}
}

  可以看到zk中的parent下多出了三个节点:

  

  默认以node+十个十进制数命名节点名称,数据递增。

  当id在所有子节点中最小,选举成为leader.

3. Leader Election在Curator中的实现

  手下引入Curator依赖,pom.xml如下:

 <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>3.2.1</version>
</dependency>

  1. Curator LeaderLatch特点及api的作用:

  1. 竞选为Leader后,不可自行放弃领导权

  2. 只能通过close方法放弃领导权

  3. 强烈建议增加ConnectionStateListener,当连接SUSPENDED或者LOST时视为丢失领导权

  4. 可通过await方法等待成功获取领导权,并可加入timeout

  5. 可通过hasLeadership方法判断是否为Leader

  6. 可通过getLeader方法获取当前Leader

  7. 可通过getParticipants方法获取当前竞选Leader的参与方

  简单实现:

package com.wangx.kafka.zk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLeaderLatch {
public static void main(String[] args) throws Exception {
//设置重试策略,这里是沉睡一秒后开始重试,重试五次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
//通过工厂类获取curatorFramework
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
//leader节点创建
LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,"/parent","node");
//监听leader节点
leaderLatch.addListener(new LeaderLatchListener() {
//当前节点是leader时回调
public void isLeader() {
System.out.println("I am a listener");
}
//不再是leader时回调
public void notLeader() {
System.out.println("I am not a listener");
}
});
//启动
curatorFramework.start();
leaderLatch.start();
Thread.sleep(100000000);
leaderLatch.close();
curatorFramework.close();
}
}

  2. Curator LeaderSelector特点及api的作用:

  1. 竞选Leader成功后回调takeLeadership方法

  2. 可在takeLeadership方法中实现业务逻辑

  3. 一旦takeLeadership方法返回,即视为放弃领导权

  4. 可通过autoRequeue方法循环获取领导权

  5. 可通过hasLeadership方法判断是否为Leader

  6. 可通过getLeader方法获取当前Leader

  7. 可通过getParticipants方法获取当前竞选Leader的参与方

简单实现:

package com.wangx.kafka.zk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.*;
import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLeaderSelector {
public static void main(String[] args) throws Exception {
//设置重试策略,这里是沉睡一秒后开始重试,重试五次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
//通过工厂类获取curatorFramework
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
//leader节点创建,监听Leader状态,并在takeLeadership回调函数中做自己的业务逻辑
LeaderSelector leaderSelector = new LeaderSelector(curatorFramework,"/node", new LeaderSelectorListenerAdapter() {
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
Thread.sleep(1000);
System.out.println("启动了 takeLeadership");
}
});
leaderSelector.autoRequeue();
leaderSelector.start();
//启动
curatorFramework.start();
Thread.sleep(100000000);
leaderSelector.close();
curatorFramework.close();
}
}

  这里的LeaderSelectorListenerAdapter实现了LeaderSelectorListener接口,源码如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
// package org.apache.curator.framework.recipes.leader; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState; public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
public LeaderSelectorListenerAdapter() {
}
//当连接失败时,会抛出异常,这样就会中断takeLeadership方法,防止业务逻辑错误操作
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (client.getConnectionStateErrorPolicy().isErrorState(newState)) {
throw new CancelLeadershipException();
}
}
}

4. Kafka的Leader Election

  1. Kafka“各自为政”Leader Election

  每个Partition的多个Replica同时竞争Leader,这样做的好处是实现起来比较简单,但是同样出现的问题的就是Herd Effect(可能会有很多的leader节点),Zookeeper负载过重,Latency较大(可能会产生很多其他的问题)

  2. Kafka基于Controller的Leader Election

  原理是在整个集群中选举出一个Broker作为Controller,Controller为所有Topic的所有Partition指定Leader及Follower,Kafka通过在zookeeper上创建/controller临时节点来实现leader选举,并在该节点中写入当前broker的信息 {“version”:1,”brokerid”:1,”timestamp”:”1512018424988”}

  利用Zookeeper的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的broker即为leader,即先到先得原则,leader也就是集群中的controller,负责集群中所有大小事务。 当leader和zookeeper失去连接时,临时节点会删除,而其他broker会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起leader选举。

  这样做极大缓解Herd Effect问题,减轻Zookeeper负载,Controller与Leader及Follower间通过RPC通信,高效且实时,但是由于引入Controller增加了复杂度,同时需要考虑Controller的Failover(容错)

  

最新文章

  1. leetcode 406
  2. Windows Store App JavaScript 开发:页面加载
  3. [Guava官方文档翻译] 6. 用Guava辅助Throwable异常处理 (Throwables Explained)
  4. 安卓查询当前所在地天气及查询地区(城市)代码cityCode localCode
  5. KMP模式匹配 三(弦)
  6. 文本去重-----awk或者uniq
  7. angular中复制文字到剪切板
  8. 7. Selenium的基本使用
  9. Ubuntu 服务器443端口证书配置
  10. Docker介绍基本概念(一)
  11. selenium 淘宝登入反爬虫解决方案(亲测有效)
  12. 3分钟快速presentation
  13. Linux内核分析——字符集总结与分析
  14. 分区表主键不包含分区键报错ERROR 1105 (HY000)
  15. Thinkpad X220 升级 Windows 10 后无线网卡消失问题
  16. oracle归档日志的操作
  17. poj1056
  18. (转)mysql主从切换步骤
  19. Jenkins启动报端口被占用,解决办法FAILED ServerConnector@2a265ea9{HTTP/1.1}{0.0.0.0:8080}: java
  20. H4CK1T CTF 2016 Mexico-Remote pentest writeup

热门文章

  1. TCP连接之未连接队列的理解
  2. 【ACM】nyoj_540_奇怪的排序_201308050951
  3. Codeforces Round #256 (Div. 2) B
  4. Linux 端口 为什么要有端口
  5. Maven—Windows操作系统中安装配置Maven环境
  6. 【HDU 1847】 Good Luck in CET-4 Everybody!
  7. c语言递归讲解分析
  8. Citrix架构
  9. 为什么用Mysql?
  10. JS+jquery 计算服务器控件textbox的值并显示在lable上