索引文件的刷盘并不是采取定时刷盘机制,而是每更新一次索引文件就会将上一次的改动刷写到磁盘。

同步刷盘:

GroupCommitRequest将被提交到GroupCommitService线程,GroupCommitService线程处理GroupCommitRequest对象后将调用wakeupCustomer方法将消费发送线程唤醒。并将刷盘告知GroupCommitRequest。

这里将写操作和读操作做了分离,避免了任务提交与任务执行的锁冲突

GroupCommitService每处理一批同步刷盘请求后休息10ms,然后继续处理下一批,其任务的核心实现为doCommit方法:

CommitLog.this.mappedFileQueue.flush最终调用MappedByteBuffer#force()方法。

处理完所有同步刷盘任务后,更新刷盘检查点StoreCheckPoint中的physicMsgTimestamp,但是并没有执行检查点的刷盘操作,检查点的刷盘操作将在刷写消息队列文件时触发。

异步刷盘

CommitRealTimeService线程默认每200ms将ByteBuffer新追加的内容的数据提交到MappedByteBuffer中:

FlushRealTimeService线程默认每500ms将MappedByteBuffer中新最佳的内存(wrotePosition减去上一次刷写位置flushedPositiont)通过调用MappedByteBuffer#force()方法将数据刷写到磁盘:

class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
private long printTimes = 0; public void run() {
CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
} try {
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
} if (printFlushProgress) {
this.printFlushProgress();
} long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
} // Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
} this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end");
}

最新文章

  1. Object-c 类方法和实例方法的区别和联系
  2. springMVC使用@ResponseBody返回json
  3. IOS开发之--NSPredicate
  4. hdu3294 girl‘s research
  5. 记32位程序(使用3gb用户虚拟内存)使用D3DX9导致的一个崩溃的问题
  6. OpenCV学习 物体检测 人脸识别 填充颜色
  7. sql server 常用的系统存储过程
  8. 动态调用webservice 接口
  9. 【M1】仔细区别pointers和references
  10. NFC(5)编写NFC程序的基本步骤
  11. SQL Server GOTO使用实例详解
  12. 【Shell剧本练习】得出的结论是当前用户
  13. emguCv3.x 实现字符分割,轮廓检测
  14. 灵玖Nlpir Parser智能挖掘汉语精准分词
  15. .NET 简单多线程
  16. css之relative
  17. C++ 打印机设置
  18. ESP8266 wifi钓鱼
  19. oracle系统化学习笔记
  20. PHP中正则表达式函数(Perl兼容)

热门文章

  1. Centos7或RedHat7下安装Mysql
  2. Flask 中的MTV架构之Models
  3. TCP的粘包和拆包问题及解决办法(C#)
  4. Luogu P3324 [SDOI2015]星际战争
  5. find命令的简单使用
  6. 【SpringCloud】01.常见软件架构的区别
  7. 区分部分javascript的选择器
  8. 手动实现简单的tomcat服务器
  9. Fiddler的一系列学习瞎记(没有章法的笔记)
  10. Java入门基础知识点总结(详细篇)