问题

  • index 文件有什么作用,结构又是如何

概述

index 文件主要是为了 message key 服务的,rocketmq 发送消息的时候可以带上 key , messge key 是为了标识某个消息的一个标志。

思考

我们思考一下,message key 是由用户生成的,我们需要尽可能地保证散列保存,这样当我们就可以快速地拿出来了。那么通常的作法就是利用哈希散列,当然最重要的是如何解决冲突。我们下面看一下rocketmq 是如何实现的。

总体思路

下面两张图片来自参考文章。侵删(作者的文章写得真的好)

我们从这里可以看到index文件分为三部分,头,散列值,索引文件。其中散列值会一一对应索引文件中的一个值,该值就是储存该message信息的。

可以看到假如有冲突(即找到散列值那个位置的时候已经有一个对应的索引位了),那么索引位就存放在新的索引位的“上一个索引位”的属性里,这里就形成一条单链表。

源码分析

index文件和其他consumerQueue文件的思路是一样的,同样是利用持久化在文件中,然后通过mappedFile 文件加载到内存中,开启一个服务,当发消息对消息进行持久化的时候,将消息的key持久化在index文件。

写入

DefaultMessageStore$$CommitLogDispatcherBuildIndex内部类

    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

        @Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}

该内部类就是当接受到消息对index进行记录的分发器,可以看到最主要的还是利用了indexService,我们来看一下indexService到底执行了什么操作。

从方法名我们对index文件的加载,会刷,获取,写入等。我们看一下 buildIndex 方法

IndexFile&putKey方法

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
//获取hashCode 后取模
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
//计算绝对位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
} long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//为什么要把 hashSlot 那部分的总长也加进来呢?
//因为这个pos是相对于 mapfile的位移量进行获取的
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//先保存上一个槽位的值
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//然后覆盖掉旧的
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
//更新index属性
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp); return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
} return false;
} //直接取 hashCode,并返回正数
public int indexKeyHashMethod(final String key) {
int keyHash = key.hashCode();
int keyHashPositive = Math.abs(keyHash);
if (keyHashPositive < 0)
keyHashPositive = 0;
return keyHashPositive;
}

读取的方法

读取方法的调用链如下 :
DefaultMessageStore&queryMessage -> indexService&queryOffset -> IndexFile&selectPhyOffset

DefaultMessageStore&queryMessage 方法

  @Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult(); long lastQueryMsgTime = end; //重试
for (int i = 0; i < 3; i++) {
//查找到返回结果
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) {
break;
}
//封装返回结果
Collections.sort(queryOffsetResult.getPhyOffsets()); queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp()); for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m); try { boolean match = true;
MessageExt msg = this.lookMessageByOffset(offset);
if (0 == m) {
lastQueryMsgTime = msg.getStoreTimestamp();
} // String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
// if (topic.equals(msg.getTopic())) {
// for (String k : keyArray) {
// if (k.equals(key)) {
// match = true;
// break;
// }
// }
// } if (match) {
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
}
} else {
log.warn("queryMessage hash duplicate, {} {}", topic, key);
}
} catch (Exception e) {
log.error("queryMessage exception", e);
}
} if (queryMessageResult.getBufferTotalSize() > 0) {
break;
} if (lastQueryMsgTime < begin) {
break;
}
} return queryMessageResult;
}

indexService&queryOffset 逻辑很好懂,定位文件,解决冲突的链表进行查找

    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
List<Long> phyOffsets = new ArrayList<Long>(maxNum); long indexLastUpdateTimestamp = 0;
long indexLastUpdatePhyoffset = 0;
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
try {
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
for (int i = this.indexFileList.size(); i > 0; i--) {
IndexFile f = this.indexFileList.get(i - 1);
boolean lastFile = i == this.indexFileList.size();
if (lastFile) {
indexLastUpdateTimestamp = f.getEndTimestamp();
indexLastUpdatePhyoffset = f.getEndPhyOffset();
} if (f.isTimeMatched(begin, end)) { f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
} if (f.getBeginTimestamp() < begin) {
break;
} if (phyOffsets.size() >= maxNum) {
break;
}
}
}
} catch (Exception e) {
log.error("queryMsg exception", e);
} finally {
this.readWriteLock.readLock().unlock();
} return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
}

总结

参考资料

  • https://www.kunzhao.org/blog/2018/04/08/rocketmq-message-index-flow/

最新文章

  1. TermServDevices错误
  2. 自定义的dialog
  3. 【HDOJ】5296 Annoying problem
  4. 符号表实现(Symbol Table Implementations)
  5. HBASE学习笔记--配置信息
  6. 提问!同一ajax请求获取的图片路劲,在谷歌浏览器能正确展示图片,在火狐浏览器则显示路径undefined
  7. LoadRunner性能测试-LoadRunner工具安装
  8. HDU1097-A hard puzzle-快速幂+取模
  9. 开源数字媒体资产管理系统:Razuna安装方法
  10. Scrapyd 改进第一步: Web Interface 添加 charset=UTF-8, 避免查看 log 出现中文乱码
  11. java 11 新的Epsilon垃圾收集器
  12. parrot os 更新到3.7后无法上网(DNS暂时无法解析)
  13. poj 2955&quot;Brackets&quot;(区间DP)
  14. 微信小程序ext_json示例
  15. 无法连接ssh,fatal: daemon() failed: No such device
  16. Android Studio 使用USB真机调试教程
  17. vc通过webbrowser操作ie元素
  18. maven安装和与IDE集成
  19. 【算法笔记】B1025 反转链表
  20. 卷积 &amp; 杜教筛

热门文章

  1. SVN提交时没有写注释
  2. IText异常 NoClassDefFoundError: org/bouncycastle/asn1/ASN1Encodable
  3. C++分割string字符串(转)
  4. Linux - Deepin Linux,intel无线网卡下载慢、不能跑满宽带的解决方案
  5. JS高级---总结apply和call方法的使用
  6. Java+Selenium+Testng自动化测试学习(三)— 断言
  7. 九、c++容器
  8. C++-POJ1018-Communication System
  9. torchvision的理解和学习 加载常用数据集,对主流模型的调用.md
  10. 【Python】【爬虫】爬取酷狗音乐网络红歌榜