Zookeeper--集群管理

在多台服务器组成的集群中,需要监控每台服务器的状态,一旦某台服务器挂掉了或有新的机器加入集群,集群都要感知到,从而采取相应的措施。一个主动的集群可以自动感知节点的死亡和新节点的加入,它才对更高效的提供服务。通常的做法是有台主机器定时的去获取其他机器的心跳,或其他机器定时主动汇报自己的状态,这种方式存在一定的延时,并且主机器成为单点,一旦挂掉便影响整个集群。

使用Zookeeper可以方便的实现集群管理的功能。思路如下,每个服务器启动时都向zk服务器提出创建临时节点的请求,并且使用getChildren设置父节点的观察,当该服务器挂掉之后,它创建的临时节点也被Zookeeper服务器删除,然后会触发监视器,其他服务器便得到通知。创建新节点也是同理。

并且利用Zookeeper的Leader选举功能可以选出服务中的一台作为Leader,在比如任务调度类似的场景中有用。

下面是一个简单的模拟:

ServerUnit模拟在不同机器上启动的服务,启动时向Zookeeper服务器注册自己,并保存自己的IP和端口;实现CallBack接口:在其他节点发生变化时执行的逻辑

public class ServerUnit {

    public static final String SER_NAME = "ServerUnit";

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {

        System.out.println("begin register to Zookeeper..");

        String address = IPUtil.getLoaclIP() + ":" + new Random().nextInt(255);

        ServiceMng mng = new ServiceMng(SER_NAME);

        String serverId = mng.register(address, new CallBack<ServiceMng.ChildrenChangedResult>() {
@Override
public void callback(ServiceMng.ChildrenChangedResult cn) throws KeeperException, InterruptedException {
for (String str : cn.getUp()) {
System.out.println("检测到服务加入: " + mng.queryAddress(str));
}
for (String str : cn.getDown()) {
System.out.println("检测到服务退出: " + mng.queryAddress(str));
}
}
}); System.out.println("ServerUnit started at: " + address);
TimeUnit.HOURS.sleep(1);
}
}

---

CallBack接口:

public interface CallBack<T> {

    void callback(T t) throws Exception;

}

---

ServerMng 提供向Zookeeper注册服务和获取服务等方法,被服务单元依赖

public class ServiceMng {

    private static final String APPS_PATH = "/__apps__";
private String serviceName;
private ZooKeeper zk;
private CountDownLatch latch = new CountDownLatch(1);
private List<String> serList;
private Map<String, String> serMap = new HashMap<>(); ServiceMng(String serviceName) {
this.serviceName = serviceName;
} public String register(String address, CallBack callback) throws KeeperException, InterruptedException, IOException {
if (zk != null) {
throw new IllegalArgumentException("method should not invoke twice.");
} zk = new ZooKeeper("localhost", 30000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
try {
List list = zk.getChildren(APPS_PATH + "/" + serviceName, true);
refresh(list);
callback.callback(new ChildrenChangedResult(list, serList));
serList = list;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}); latch.await();
if (zk.exists(APPS_PATH, false) == null) {
zk.create(APPS_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zk.exists(APPS_PATH + "/" + serviceName, false) == null) {
zk.create(APPS_PATH + "/" + serviceName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} String path = zk.create(APPS_PATH + "/" + serviceName + "/", address.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List list = zk.getChildren(APPS_PATH + "/" + serviceName, true);
refresh(list);
serList = list;
return path;
} private void refresh(List<String> paths) throws KeeperException, InterruptedException {
for (String path : paths) {
byte[] b = zk.getData(APPS_PATH + "/" + serviceName + "/" + path, false, null);
serMap.put(path, new String(b));
}
} public String queryLeaderIp(String serviceName) throws KeeperException, InterruptedException {
List<String> apps = zk.getChildren(APPS_PATH + "/" + serviceName, false);
if (apps.isEmpty()) {
return null;
}
Collections.sort(apps);
byte[] data = zk.getData(apps.get(0), false, null);
return new String(data);
} public String queryRandomServerIp(String serviceName) throws KeeperException, InterruptedException {
List<String> apps = zk.getChildren(APPS_PATH + "/" + serviceName, false);
if (apps.isEmpty()) {
return null;
}
Random r = new Random();
byte[] data = zk.getData(apps.get(r.nextInt(apps.size())), false, null);
return new String(data);
} public String queryAddress(String path) {
return serMap.get(path);
} public static class ChildrenChangedResult {
List<String> up = null;
List<String> down = null; ChildrenChangedResult(List now, List last) {
up = new LinkedList(now);
up.removeAll(last);
down = new LinkedList(last);
down.removeAll(now);
} public List<String> getUp() {
return up;
} public List<String> getDown() {
return down;
}
} }

---

依次启动3个ServerUnit,查看控制台:

  

依次关闭2个ServerUnit,查看控制台:

 

从关闭ServerUnit到控制台打印退出大概延迟8s左右,配置zk的tickTime=1000,比预期的要慢一些。

end

最新文章

  1. docfx预热中
  2. 002商城项目:maven工程的测试以及svn的使用
  3. EF增删改查操作
  4. Units Problem: How to read text size as custom attr from xml and set it to TextView in java code
  5. 快排java实现
  6. 。。。Hibernate注解配置的注意事项。。。
  7. 《使用this作为返回值的相关问题》
  8. oracle11g服务项及其启动顺序
  9. javascript_22_for_二维数组
  10. 关于cmd模式下切换目录
  11. Sencha Touch Guide
  12. HDU 2553 N皇后问题(dfs)
  13. java反射机制入门04
  14. 一张图搞定Java设计模式——工厂模式! 就问你要不要学!
  15. IT之光
  16. setInterval()与setTimeout()的区别
  17. P4478 [BJWC2018]上学路线
  18. haproxy实现会话保持
  19. CodeSmith的基础模版类(CodeSmith help中的内容)
  20. CentOS配置JDK环境

热门文章

  1. js停止(阻止)浏览器继续加载内容
  2. 同一主机,不同域名绑定不同网站(IIS主机头实现方法)
  3. C# 常用字符串处理办法
  4. 一月收集几个有用的谷歌Chrome插件
  5. JSP的EL和JSTL解析
  6. html hidefocus=&quot;true&quot;
  7. 从头搭建一个React应用
  8. 网络编程 socket编程 - Asyncsocke
  9. 关于python的包
  10. tableview小结-初学者的问题