《HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程》一文中,我们详细了解了数据节点DataNode周期性发送心跳给名字节点NameNode的BPServiceActor工作线程,了解了它实现心跳的大体流程:

1、与NameNode握手:

1.1、第一阶段:获取命名空间信息并验证、设置;

1.2、第二阶段:DataNode注册;

2、周期性调用sendHeartBeat()方法发送心跳信息,并处理来自心跳响应中的命令;

3、调用reportReceivedDeletedBlocks()方法发送数据库增量汇报:包括正在接收的、已接收的和已删除的数据块;

4、调用blockReport()方法周期性进行数据块汇报,并处理返回的相关命令。

本文,我们重点讲解下其中的第三步:调用reportReceivedDeletedBlocks()方法发送数据库增量汇报:包括正在接收的、已接收的和已删除的数据块。

首先,这个数据块增量汇报是什么情况下发生的呢?在DataNode与NameNode握手并注册后实现心跳的offerService()方法的while循环内,有这么一段代码,如下:

  1. // 如果标志位sendImmediateIBR为true,或者数据块增量汇报时间已到,
  2. // 数据块增量汇报时间间隔是心跳时间间隔的100倍,默认情况下是5分钟
  3. if (sendImmediateIBR ||
  4. (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
  5. // 调用reportReceivedDeletedBlocks()方法发送数据块增量汇报
  6. reportReceivedDeletedBlocks();
  7. // 设置上次数据块增量汇报时间lastDeletedReport为startTime
  8. lastDeletedReport = startTime;
  9. }

首先,这个sendImmediateIBR是一个标志位,它标识着是否立即发送一个数据块增量汇报,在BPServiceActor工作线程初始化时默认为false。而数据块增量汇报是否发送,这里有两个条件,只要满足其中一个即可发送数据块增量汇报:

1、是否立即发送一个数据块增量汇报的标志位sendImmediateIBR为true;

2、数据块增量汇报的时间间隔已到:数据块增量汇报的时间间隔是心跳时间间隔的100倍,默认情况下是5分钟。

在讲解reportReceivedDeletedBlocks()方法前,我们先看BPServiceActor工作线程的一个成员变量,定义如下:

  1. /**
  2. * Between block reports (which happen on the order of once an hour) the
  3. * DN reports smaller incremental changes to its block list. This map,
  4. * keyed by block ID, contains the pending changes which have yet to be
  5. * reported to the NN. Access should be synchronized on this object.
  6. *
  7. * 在数据块汇报(通常一小时一次)之间,DataNode会汇报其数据块列表的增量变化情况。
  8. * 这个Map,包含尚未汇报给NameNode的DataNode上数据块正在发生的变化。
  9. * 访问它必须使用synchronized关键字。
  10. */
  11. private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
  12. pendingIncrementalBRperStorage = Maps.newHashMap();

先说下这个pendingIncrementalBRperStorage变量对应的数据结构,它是一个Map,key为DatanodeStorage类型,value为PerStoragePendingIncrementalBR类型。而这个PerStoragePendingIncrementalBR类型在其内部封装了一个叫做pendingIncrementalBR的HashMap,key为blockId,value为ReceivedDeletedBlockInfo,ReceivedDeletedBlockInfo对Block做了一层封装了,它标识了对应Block在DataNode上的状态BlockStatus,BlockStatus是一个枚举类,包含的Block状态分别有正在接收的数据块RECEIVING_BLOCK(1)、已经接收的数据块RECEIVED_BLOCK(2)、已被删除的数据块DELETED_BLOCK(3)三种状态。

也就是说,pendingIncrementalBRperStorage实际上存储了DataNode上每个DatanodeStorage到对应的增量数据块集合的映射关系,而这个增量数据块,包含正在接收的、已接受的和已删除的。

在数据块汇报(通常一小时一次)之间,DataNode会汇报其数据块列表的增量变化情况,这个是作为一个小的(smaller)汇报进行的。这个Map,包含尚未汇报给NameNode的DataNode上数据块正在发生的变化,访问它必须使用synchronized关键字。而这个数据块增量汇报,其主要目的就应该是尽早让名字节点NameNode了解数据节点DataNode上数据块的变化情况,而不是通过正常的每小时一次的数据块汇报来告知名字节点,那样的话对于整个文件系统来说,是很被动的一见事。

好了,我们再看下reportReceivedDeletedBlocks()方法,它是完成数据块增量汇报的核心方法,代码如下:

  1. /**
  2. * Report received blocks and delete hints to the Namenode for each
  3. * storage.
  4. *
  5. * @throws IOException
  6. */
  7. private void reportReceivedDeletedBlocks() throws IOException {
  8. // Generate a list of the pending reports for each storage under the lock
  9. // 创建一个存储StorageReceivedDeletedBlocks的ArrayList列表reports,
  10. // 大小为pendingIncrementalBRperStorage的大小
  11. // StorageReceivedDeletedBlocks是对DatanodeStorage和ReceivedDeletedBlockInfo数组的一个封装,
  12. // 实际上就是将pendingIncrementalBRperStorage由Map转换为List列表形式
  13. ArrayList<StorageReceivedDeletedBlocks> reports =
  14. new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
  15. // 使用synchronized对pendingIncrementalBRperStorage进行同步:
  16. synchronized (pendingIncrementalBRperStorage) {
  17. // 遍历pendingIncrementalBRperStorage
  18. for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
  19. pendingIncrementalBRperStorage.entrySet()) {
  20. // 取出每个DatanodeStorage、PerStoragePendingIncrementalBR进行处理
  21. final DatanodeStorage storage = entry.getKey();
  22. final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
  23. // 如果perStorageMap中存在发生变化的数据块:
  24. if (perStorageMap.getBlockInfoCount() > 0) {
  25. // Send newly-received and deleted blockids to namenode
  26. // 发送新接收的或者已删除的数据块ID给NameNode
  27. // 从perStorageMap中获得ReceivedDeletedBlockInfo数组
  28. ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
  29. // 将根据DatanodeStorage和ReceivedDeletedBlockInfo数组构造的StorageReceivedDeletedBlocks加入reports列表
  30. reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
  31. }
  32. }
  33. // 立即汇报的标志位sendImmediateIBR设置为false
  34. sendImmediateIBR = false;
  35. }
  36. if (reports.size() == 0) {// reports大小为0的话,直接返回null
  37. // Nothing new to report.
  38. return;
  39. }
  40. // Send incremental block reports to the Namenode outside the lock
  41. // 发送是否成功的标志位success初始化为false
  42. boolean success = false;
  43. try {
  44. // 通过NameNode代理的blockReceivedAndDeleted()方法,将新接收的或者已删除的数据块汇报给NameNode,汇报的信息包括:
  45. // 1、数据节点注册信息DatanodeRegistration;
  46. // 2、数据块池ID;
  47. // 3、需要汇报的数据块及其状态信息列表StorageReceivedDeletedBlocks;
  48. bpNamenode.blockReceivedAndDeleted(bpRegistration,
  49. bpos.getBlockPoolId(),
  50. reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
  51. // 发送是否成功的标志位success设置为true
  52. success = true;
  53. } finally {
  54. if (!success) {// 汇报不成功的话
  55. synchronized (pendingIncrementalBRperStorage) {
  56. for (StorageReceivedDeletedBlocks report : reports) {
  57. // If we didn't succeed in sending the report, put all of the
  58. // blocks back onto our queue, but only in the case where we
  59. // didn't put something newer in the meantime.
  60. // 将数据块再放回到perStorageMap
  61. PerStoragePendingIncrementalBR perStorageMap =
  62. pendingIncrementalBRperStorage.get(report.getStorage());
  63. perStorageMap.putMissingBlockInfos(report.getBlocks());
  64. // 立即汇报的标志位sendImmediateIBR设置为true
  65. sendImmediateIBR = true;
  66. }
  67. }
  68. }
  69. }
  70. }

这个reportReceivedDeletedBlocks()方法的大致处理流程如下:

1、创建一个存储StorageReceivedDeletedBlocks的ArrayList列表reports:

大小为pendingIncrementalBRperStorage的大小。StorageReceivedDeletedBlocks是对DatanodeStorage和ReceivedDeletedBlockInfo数组的一个封装,实际上就是将pendingIncrementalBRperStorage由Map转换为List列表形式;

2、使用synchronized对pendingIncrementalBRperStorage进行同步,遍历pendingIncrementalBRperStorage:

2.1、取出每个DatanodeStorage、PerStoragePendingIncrementalBR进行处理;

2.2、如果perStorageMap中存在发生变化的数据块,发送新接收的或者已删除的数据块ID给NameNode:

2.2.1、从perStorageMap中获得ReceivedDeletedBlockInfo数组;

2.2.3、将根据DatanodeStorage和ReceivedDeletedBlockInfo数组构造的StorageReceivedDeletedBlocks加入reports列表;

3、立即汇报的标志位sendImmediateIBR设置为false;

4、reports大小为0的话,直接返回null;

5、发送是否成功的标志位success初始化为false;

6、通过NameNode代理bpNamenode的blockReceivedAndDeleted()方法,将新接收的或者已删除的数据块汇报给NameNode,汇报的信息包括:

6.1、数据节点注册信息DatanodeRegistration;

6.2、数据块池ID;

6.3、需要汇报的数据块及其状态信息列表StorageReceivedDeletedBlocks;

7、 发送是否成功的标志位success设置为true;

8、汇报不成功的话,遍历reports:

8.1、将数据块再放回到perStorageMap;

8.2、立即汇报的标志位sendImmediateIBR设置为true。

针对上述流程,我们先说下是否应立即汇报增量数据块信息的标志位sendImmediateIBR。当BPServiceActor工作线程创建时,这个标志位默认为false,即不会立即发送数据块增量汇报,而是周期性的到期才会发送。而当该发送数据块增量汇报时,无论标志位之前为true还是false,统一设置为false,因为此时数据块增量汇报已经发送了,下次没必要再立即发送了。而只有当数据块增量汇报不成功时,该标志位才会被设置为true,以便下次循环直接发送之前未成功的数据块增量汇报,而不用管数据块增量汇报的时间间隔是否到期。这个标志位就是为了在数据块增量汇报失败的情况下,下次循环中能立即发送出去,以便让NameNode及时了解DataNode数据块情况。

那么,数据块增量汇报是如何发送给NameNode的呢?我们先看下NameNode在DataNode上的代理bpNamenode,它的定义如下:

  1. DatanodeProtocolClientSideTranslatorPB bpNamenode;

它是BPServiceActor线程中一个DatanodeProtocolClientSideTranslatorPB类型的变量,也就意味着每个与NameNode通讯的BPServiceActor工作线程,都持有一个NameNode的代理,其初始化是在BPServiceActor工作线程与NameNode连接时完成的,我们看下DatanodeProtocolClientSideTranslatorPB类中完成数据块增量汇报的blockReceivedAndDeleted()方法,代码如下:

  1. @Override
  2. public void blockReceivedAndDeleted(DatanodeRegistration registration,
  3. String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
  4. throws IOException {
  5. BlockReceivedAndDeletedRequestProto.Builder builder =
  6. BlockReceivedAndDeletedRequestProto.newBuilder()
  7. .setRegistration(PBHelper.convert(registration))
  8. .setBlockPoolId(poolId);
  9. for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) {
  10. StorageReceivedDeletedBlocksProto.Builder repBuilder =
  11. StorageReceivedDeletedBlocksProto.newBuilder();
  12. repBuilder.setStorageUuid(storageBlock.getStorage().getStorageID());  // Set for wire compatibility.
  13. repBuilder.setStorage(PBHelper.convert(storageBlock.getStorage()));
  14. for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) {
  15. repBuilder.addBlocks(PBHelper.convert(rdBlock));
  16. }
  17. builder.addBlocks(repBuilder.build());
  18. }
  19. try {
  20. // 通过实现了DatanodeProtocolPB接口的blockReceivedAndDeleted()方法发送的
  21. // rpcProxy最终加载的是参数rpc.engine.DatanodeProtocolPB配置的类
  22. rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build());
  23. } catch (ServiceException se) {
  24. throw ProtobufHelper.getRemoteException(se);
  25. }
  26. }

而rpcProxy最终加载的是参数rpc.engine.DatanodeProtocolPB配置的类,实际上也就是DatanodeProtocolServerSideTranslatorPB类,由它负责向NamNode发送RPC请求,而NameNode对应RPC请求处理的方法在NameNodeRpcServer类中的blockReceivedAndDeleted()方法,代码如下:

  1. @Override // DatanodeProtocol
  2. public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
  3. StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
  4. verifyRequest(nodeReg);
  5. metrics.incrBlockReceivedAndDeletedOps();
  6. if(blockStateChangeLog.isDebugEnabled()) {
  7. blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
  8. +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
  9. +" blocks.");
  10. }
  11. for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
  12. // 最终遍历StorageReceivedDeletedBlocks数组,针对每个StorageReceivedDeletedBlocks,
  13. // 调用FSNamesystem的processIncrementalBlockReport()方法进行处理
  14. namesystem.processIncrementalBlockReport(nodeReg, r);
  15. }
  16. }

最终遍历StorageReceivedDeletedBlocks数组,针对每个StorageReceivedDeletedBlocks,调用FSNamesystem的processIncrementalBlockReport()方法进行处理。ok,继续追踪,如下:

  1. public void processIncrementalBlockReport(final DatanodeID nodeID,
  2. final StorageReceivedDeletedBlocks srdb)
  3. throws IOException {
  4. <span style="white-space:pre">    </span>
  5. <span style="white-space:pre">    </span>// 典型的写锁模式
  6. <span style="white-space:pre">    </span>
  7. <span style="white-space:pre">    </span>// 获取写锁
  8. writeLock();
  9. try {
  10. <span style="white-space:pre">  </span>
  11. // 调用BlockManager的processIncrementalBlockReport()方法处理数据块增量汇报
  12. blockManager.processIncrementalBlockReport(nodeID, srdb);
  13. } finally {
  14. <span style="white-space:pre">  </span>
  15. // 释放写锁
  16. writeUnlock();
  17. }
  18. }

FSNamesystem的processIncrementalBlockReport()方法是典型的一个读写锁中写锁模式,获取写锁,try模块中处理业务逻辑,finally模块中释放写锁。而业务逻辑的处理,则是通过调用BlockManager的processIncrementalBlockReport()方法来完成的。FSNamesystem相当于名字节点NameNod门面模式中的门面,由它负责一切文件系统操作相关的处理。而BlockManager则是名字节点NameNode中针对所有block状态保持、变更处理等的大管家,我们会在后续文章后陆续介绍这两个重要的变量。

好吧,我们先看下BlockManager的processIncrementalBlockReport()方法,代码如下:

  1. /**
  2. * The given node is reporting incremental information about some blocks.
  3. * This includes blocks that are starting to be received, completed being
  4. * received, or deleted.
  5. *
  6. * This method must be called with FSNamesystem lock held.
  7. */
  8. public void processIncrementalBlockReport(final DatanodeID nodeID,
  9. final StorageReceivedDeletedBlocks srdb) throws IOException {
  10. assert namesystem.hasWriteLock();
  11. int received = 0;
  12. int deleted = 0;
  13. int receiving = 0;
  14. final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
  15. if (node == null || !node.isAlive) {
  16. blockLog
  17. .warn("BLOCK* processIncrementalBlockReport"
  18. + " is received from dead or unregistered node "
  19. + nodeID);
  20. throw new IOException(
  21. "Got incremental block report from unregistered or dead node");
  22. }
  23. DatanodeStorageInfo storageInfo =
  24. node.getStorageInfo(srdb.getStorage().getStorageID());
  25. if (storageInfo == null) {
  26. // The DataNode is reporting an unknown storage. Usually the NN learns
  27. // about new storages from heartbeats but during NN restart we may
  28. // receive a block report or incremental report before the heartbeat.
  29. // We must handle this for protocol compatibility. This issue was
  30. // uncovered by HDFS-6094.
  31. storageInfo = node.updateStorage(srdb.getStorage());
  32. }
  33. // 取出每个ReceivedDeletedBlockInfo进行处理
  34. for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
  35. switch (rdbi.getStatus()) {
  36. case DELETED_BLOCK:// 如果是已被删除的数据块
  37. // 调用removeStoredBlock()方法在NameNode中移除node对应数据块元信息
  38. removeStoredBlock(rdbi.getBlock(), node);
  39. // 计数器deleted加1
  40. deleted++;
  41. break;
  42. case RECEIVED_BLOCK:// 如果是已接收的数据块
  43. // 调用addBlock()方法在NameNode中添加数据块元信息
  44. addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
  45. // 计数器received加1
  46. received++;
  47. break;
  48. case RECEIVING_BLOCK:// 如果是正在接收的数据块
  49. // 计数器receiving加1
  50. receiving++;
  51. // 调用processAndHandleReportedBlock()方法在NameNode中处理正在接收的数据块
  52. processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
  53. ReplicaState.RBW, null);
  54. break;
  55. default:
  56. String msg =
  57. "Unknown block status code reported by " + nodeID +
  58. ": " + rdbi;
  59. blockLog.warn(msg);
  60. assert false : msg; // if assertions are enabled, throw.
  61. break;
  62. }
  63. if (blockLog.isDebugEnabled()) {
  64. blockLog.debug("BLOCK* block "
  65. + (rdbi.getStatus()) + ": " + rdbi.getBlock()
  66. + " is received from " + nodeID);
  67. }
  68. }
  69. blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
  70. + nodeID + " receiving: " + receiving + ", " + " received: " + received
  71. + ", " + " deleted: " + deleted);
  72. }

整个逻辑非常清晰,取出每个ReceivedDeletedBlockInfo进行处理:

1、如果是已被删除的数据块:

1.1、调用removeStoredBlock()方法在NameNode中移除node对应数据块元信息;

1.2、计数器deleted加1;

2、如果是已接收的数据块:

2.1、调用addBlock()方法在NameNode中添加数据块元信息;

2.2、计数器received加1;

3、如果是正在接收的数据块:

3.1、计数器receiving加1;

3.2、调用processAndHandleReportedBlock()方法在NameNode中处理正在接收的数据块。

至于NameNode的BlockManager到底是何如处理的,我们留到以后分析NameNode和BlockManager时再做详细分析吧!

这里做个简单总结:

数据块增量汇报是负责向NameNode发送心跳信息工作线程BPServiceActor中周期性的一个工作,它负责向NameNode及时汇报DataNode节点上数据块的变化情况,比如数据块正在接收、已接收或者已被删除。它的工作周期要小于正常的数据块汇报,目的就是为了能够让NameNode及时掌握DataNode上数据块变化情况,以便HDFS系统运行正常,略显机智!而且,当数据块增量汇报不成功时,下一个循环会接着立即发送数据块增量汇报,而不是等其下一个周期的到来,这显示了HDFS良好的容错性,是一个值得我们借鉴的设计方法。

最新文章

  1. 5-udev多路径
  2. winform公共标签和常用属性
  3. jdk的设置及安装android studio提示does not point to a valid jvm问题
  4. ArcGIS Engine断开其他ArcSDE用户连接的解决方案
  5. 开始使用版本控制,局域网搭个SVN
  6. Html.RenderPartial与Html.RenderAction区别(转)
  7. Android Tombstone/Crash的log分析和定位
  8. 关于string转整数
  9. 让 Python 带你进入开源的世界——Git 从入门到与他人协作开发
  10. 单元测试系列:Mock工具Jmockit使用介绍
  11. 如何彻底关闭windows defender
  12. c#枚举位运算操作
  13. Complex类的实现
  14. Java8-对map排序
  15. ldap 导出、导入ldif数据
  16. 204. jetcache:在Spring Boot中怎么玩?
  17. 【转】SpringMVC,获取request的几种方法,及线程安全性
  18. 湾区求职分享:三个月刷题拿到 Google offer,欢迎踊跃提问
  19. Android 将拼接好并加上边框的图片保存到内存卡中
  20. Java设计模式(9)适配器模式(Adapter模式)

热门文章

  1. RMQ 算法 学习整理
  2. 关于mysql编码问题
  3. Linux signal 那些事儿(4)信号的deliver顺序【转】
  4. shell script timer and 無限迴圈
  5. CentOS 7系统添加启动项
  6. javascript 数据类型的一些方法总结
  7. TopCoder SRM 722 Div1 Problem 600 DominoTiling(简单插头DP)
  8. python安装在windows server2008,故障排除
  9. 列表的 sort
  10. OpenSSL使用1(用OpenSSL生成自签名证书在IIS上搭建Https站点)(用于iOS的https访问)