RocketMQ 的主和从一直在使用 nio 进行数据同步:

master

master 监听端口
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#beginAccept master 建立连接
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run master 读取 slave 上报的 maxOffset
org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run master 传输数据给 slave
org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run

slave

slave 连接 master
org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster slave 定时报告 maxOffset 给 master
org.apache.rocketmq.store.ha.HAService.HAClient#run slave 接收 master 传输来的数据
org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent

这里的同步,暂时只涉及到 commitLog。

同步双写的本质,master 先写磁盘,然后等待 slave 同步消息成功。

写磁盘:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}

同步到 slave:

// org.apache.rocketmq.store.CommitLog#handleHA
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// SYNC_MASTER 则执行逻辑
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
// slave 没有落后 master 太多
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
// 创建 GroupCommitRequest 放入 GroupTransferService 的 requestsWrite 中
// GroupTransferService.run 会一直比较 GroupCommitRequest#nextOffset 和 slave 已提交的位移
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
// 等待 5 秒,检查 slave 的同步结果
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}

最新文章

  1. SpringCloud: 服务发现
  2. 从零开始学Python07作业思路:模拟人生小游戏
  3. 删除数组中重复的元素(JSON)
  4. codeforce Pashmak and Buses(dfs枚举)
  5. knockout 第一个实例visible
  6. 缓存方案之Redis
  7. ubuntu 系统出错一览
  8. .NET开发Windows Service程序 - Topshelf
  9. Learning Django Resources
  10. pip 的 Assert Error
  11. cobaltstrike3.5使用记录
  12. javascript 中 arguments.callee属性
  13. hihocoder1388 Periodic Signal
  14. 鹅厂优文|打通小程序音视频和webRTC
  15. C++使用类和对象
  16. 带你由浅入深探索webpack4(二)
  17. js 秒数格式化
  18. easyui tagbox 自动触发回车事件
  19. JDBC-通用查询
  20. CAD绘制室外平台步骤5.3

热门文章

  1. js 继承,Object.setPrototypeOf | Object.getPrototypeOf | Object.create class
  2. js事件冒泡、阻止事件冒泡以及阻止默认行为
  3. js 向数组对象中添加属性和属性值
  4. Linux--磁盘管理--04
  5. gcc编译动态链接库
  6. P5200 [USACO19JAN]Sleepy Cow Sorting 牛客假日团队赛6 D 迷路的牛 (贪心)
  7. 牛客假日团队赛5 L Catch That Cow HDU 2717 (BFS)
  8. Zookeeper客户端使用(使用原生zookeeper)
  9. u-boot中bss段的使用
  10. linux命令历史