zk 节点是一个 QuorumPeer,选举结束后,leader 和 follower 各自执行自己的逻辑:

org.apache.zookeeper.server.quorum.QuorumPeer#run

org.apache.zookeeper.server.quorum.QuorumPeer#setLeader
org.apache.zookeeper.server.quorum.Leader#lead org.apache.zookeeper.server.quorum.QuorumPeer#setFollower
org.apache.zookeeper.server.quorum.Follower#followLeader

不管是 leader 还是 follower,都封装了一个 QuorumPeer 对象,QuorumPeer.ServerCnxnFactory 监听端口,处理客户端 io 事件。
请求处理的入口:把请求交给 procesor 链的 firstProcessor

org.apache.zookeeper.server.ZooKeeperServer#processPacket

leader

构建 processor 链

// org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

请求处理链:PrepRequestProcessor, ProposalRequestProcessor, CommitProcessor, ToBeAppliedRequestProcessor, FinalRequestProcessor

由 ProposalRequestProcessor 构造函数可,还有一条链:SyncRequestProcessor, AckRequestProcessor

public ProposalRequestProcessor(LeaderZooKeeperServer zks,
RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
}

follower

构建 processor 链

org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#setupRequestProcessors

链1:FollowerRequestProcessor, CommitProcessor, FinalRequestProcessor

链2:SyncRequestProcessor, SendAckRequestProcessor

两阶段写

leader 接收写请求,反序列化请求体

org.apache.zookeeper.server.PrepRequestProcessor#processRequest

leader 发送建议给 follower

// org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
zks.getLeader().propose(request);

leader 发送建议给自身

// 由 leader 的 SyncRequestProcessor 把请求写入事务日志
syncProcessor.processRequest(request);

leader 的 AckRequestProcessor 返回 ack 给自身

org.apache.zookeeper.server.quorum.AckRequestProcessor#processRequest

follower 接收并处理 leader 发送的建议

org.apache.zookeeper.server.quorum.Follower#processPacket

follower 的 SyncRequestProcessor 写入事务日志

org.apache.zookeeper.server.SyncRequestProcessor#run

follower 的 SendAckRequestProcessor 发送 ACK 给 leader

org.apache.zookeeper.server.quorum.SendAckRequestProcessor#processRequest

leader 接收并处理 follower 的 ack

org.apache.zookeeper.server.quorum.LearnerHandler#run
org.apache.zookeeper.server.quorum.Leader#processAck

ack 超过半数,向 follower 发送提交事件(把数据放入发送队列),并把写操作应用到 DataTree

follower 提交写操作,把数据写入 DataTree

CommitProcessor 和 FinalRequestProcessor

数据写入的 2 个关键时机
写事务日志

// org.apache.zookeeper.server.SyncRequestProcessor
zks.getZKDatabase().append(si)

写 DataTree

// org.apache.zookeeper.server.FinalRequestProcessor
getZKDatabase().processTxn(hdr, txn)

异常情况一:follower 处理 propose 超时,leader 如何处理?
leader 定期发送 ping 给 follower

// org.apache.zookeeper.server.quorum.LearnerHandler#ping
public void ping() {
long id;
if (syncLimitCheck.check(System.nanoTime())) {
synchronized(leader) {
id = leader.lastProposed;
}
QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
queuePacket(ping);
} else {
LOG.warn("Closing connection to peer due to transaction timeout.");
shutdown();
}
}

如果发现上一条 propose 超时,则断开与 follower 的连接,follower 会 shutdown,然后重新创建对象,重连 leader,会发送 ack 消息

查看事务日志:

org.apache.zookeeper.server.LogFormatter#main 

参数 D:/zk_test/data1/version-2/log.100000001

最新文章

  1. Goodbye2014,Hello2015
  2. 查看最点CPU的语句
  3. jQuery源码解读-事件分析
  4. 截取视图某一段另存为部分视图(Partial View)
  5. Pyqt 打包资源文件
  6. 64位Win7系统下vs2010调试无法连接oracle
  7. Opencv Linux环境搭建(2)
  8. virsh常用命令
  9. 谈谈作为一个菜B的培训感受
  10. 总结一下SQL语句中引号(')、quotedstr()、('')、format()在SQL语句中的用法
  11. highcharts动态获取数据生成图表问题
  12. js冒泡排序和二分查找
  13. Template - Strategy
  14. [Selenium With C#学习笔记] Lesson-02 Web元素定位
  15. Xamarin+Prism开发之.net standard化
  16. SQL Server 2016 Alwayson新增功能
  17. asp.net web api客户端调用
  18. C++中的const总结
  19. 三种方法实现Hadoop(MapReduce)全局排序(1)
  20. Java框架spring Boot学习笔记(四):Spring Boot操作MySQL数据库

热门文章

  1. SSD源码解读——网络搭建
  2. Java并发(基础知识)—— Executor框架及线程池
  3. 读《JavaScript面向对象编程指南》(一)
  4. DeepFaceLab更新至2019.12.23
  5. QR分解迭代求特征值——原生python实现(不使用numpy)
  6. Lambda学习总结(二)--Stream流
  7. MyEclipse导航代码第二弹,Java开发更便捷
  8. VS插件CodeRush for Visual Studio全新发布v19.1.7|附下载
  9. k8s管理pod资源对象(上)
  10. k8s配置文件模板