【mq读书笔记】mq索引文件刷盘
2024-09-21 09:22:37
索引文件的刷盘并不是采取定时刷盘机制,而是每更新一次索引文件就会将上一次的改动刷写到磁盘。
同步刷盘:
GroupCommitRequest将被提交到GroupCommitService线程,GroupCommitService线程处理GroupCommitRequest对象后将调用wakeupCustomer方法将消费发送线程唤醒。并将刷盘告知GroupCommitRequest。
这里将写操作和读操作做了分离,避免了任务提交与任务执行的锁冲突
GroupCommitService每处理一批同步刷盘请求后休息10ms,然后继续处理下一批,其任务的核心实现为doCommit方法:
CommitLog.this.mappedFileQueue.flush最终调用MappedByteBuffer#force()方法。
处理完所有同步刷盘任务后,更新刷盘检查点StoreCheckPoint中的physicMsgTimestamp,但是并没有执行检查点的刷盘操作,检查点的刷盘操作将在刷写消息队列文件时触发。
异步刷盘
CommitRealTimeService线程默认每200ms将ByteBuffer新追加的内容的数据提交到MappedByteBuffer中:
FlushRealTimeService线程默认每500ms将MappedByteBuffer中新最佳的内存(wrotePosition减去上一次刷写位置flushedPositiont)通过调用MappedByteBuffer#force()方法将数据刷写到磁盘:
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
private long printTimes = 0; public void run() {
CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
} try {
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
} if (printFlushProgress) {
this.printFlushProgress();
} long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
} // Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
} this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end");
}
最新文章
- Object-c 类方法和实例方法的区别和联系
- springMVC使用@ResponseBody返回json
- IOS开发之--NSPredicate
- hdu3294 girl‘s research
- 记32位程序(使用3gb用户虚拟内存)使用D3DX9导致的一个崩溃的问题
- OpenCV学习 物体检测 人脸识别 填充颜色
- sql server 常用的系统存储过程
- 动态调用webservice 接口
- 【M1】仔细区别pointers和references
- NFC(5)编写NFC程序的基本步骤
- SQL Server GOTO使用实例详解
- 【Shell剧本练习】得出的结论是当前用户
- emguCv3.x 实现字符分割,轮廓检测
- 灵玖Nlpir Parser智能挖掘汉语精准分词
- .NET 简单多线程
- css之relative
- C++ 打印机设置
- ESP8266 wifi钓鱼
- oracle系统化学习笔记
- PHP中正则表达式函数(Perl兼容)