《HDFS源码分析心跳汇报之整体结构》一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager、BPOfferService和BPServiceActor三者之间的关系。那么,HDFS心跳相关的这些数据结构,都是如何被初始化的呢?本文,我们就开始研究HDFS心跳汇报之数据结构初始化。

首先,在DataNode节点启动时所必须执行的startDataNode()方法中,有如下代码:

  1. // DataNode启动时执行的startDataNode()方法
  2. // 构造一个BlockPoolManager实例
  3. // 调用其refreshNamenodes()方法
  4. blockPoolManager = new BlockPoolManager(this);
  5. blockPoolManager.refreshNamenodes(conf);

它构造了一个BlockPoolManager实例,并调用其refreshNamenodes()方法,完成NameNodes的刷新。我们来看下这个方法:

  1. void refreshNamenodes(Configuration conf)
  2. throws IOException {
  3. LOG.info("Refresh request received for nameservices: " + conf.get
  4. (DFSConfigKeys.DFS_NAMESERVICES));
  5. // 从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap
  6. Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
  7. .getNNServiceRpcAddressesForCluster(conf);
  8. // 需要通过使用synchronized关键字在refreshNamenodesLock上加互斥锁
  9. synchronized (refreshNamenodesLock) {
  10. // 调用doRefreshNamenodes()方法执行集合newAddressMap中的刷新
  11. doRefreshNamenodes(newAddressMap);
  12. }
  13. }

很简单,两大步骤:第一步,从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap,第二步调用doRefreshNamenodes()方法执行集合newAddressMap中NameNodes的刷新。

首先,我们看下如何从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap,相关代码如下:

  1. /**
  2. * Returns list of InetSocketAddresses corresponding to the namenode
  3. * that manages this cluster. Note this is to be used by datanodes to get
  4. * the list of namenode addresses to talk to.
  5. *
  6. * Returns namenode address specifically configured for datanodes (using
  7. * service ports), if found. If not, regular RPC address configured for other
  8. * clients is returned.
  9. *
  10. * @param conf configuration
  11. * @return list of InetSocketAddress
  12. * @throws IOException on error
  13. */
  14. public static Map<String, Map<String, InetSocketAddress>>
  15. getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException {
  16. / Use default address as fall back
  17. String defaultAddress;
  18. try {
  19. // 获取默认地址defaultAddress
  20. defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf));
  21. } catch (IllegalArgumentException e) {
  22. defaultAddress = null;
  23. }
  24. // 获取hdfs的内部命名服务:dfs.internal.nameservices,得到集合parentNameServices
  25. Collection<String> parentNameServices = conf.getTrimmedStringCollection
  26. (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
  27. if (parentNameServices.isEmpty()) {// 如果没有配置dfs.internal.nameservices
  28. // 获取dfs.nameservices,赋值给集合parentNameServices
  29. parentNameServices = conf.getTrimmedStringCollection
  30. (DFSConfigKeys.DFS_NAMESERVICES);
  31. } else {
  32. // Ensure that the internal service is ineed in the list of all available
  33. // nameservices.
  34. // 获取dfs.nameservices
  35. Set<String> availableNameServices = Sets.newHashSet(conf
  36. .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
  37. // 验证parentNameServices中的每个nsId在dfs.nameservices中是否都存在
  38. // 即参数dfs.internal.nameservices包含在参数dfs.nameservices范围内
  39. for (String nsId : parentNameServices) {
  40. if (!availableNameServices.contains(nsId)) {
  41. throw new IOException("Unknown nameservice: " + nsId);
  42. }
  43. }
  44. }
  45. // 调用getAddressesForNsIds()方法,获取nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合
  46. // dfs.namenode.servicerpc-address
  47. // dfs.namenode.rpc-address
  48. Map<String, Map<String, InetSocketAddress>> addressList =
  49. getAddressesForNsIds(conf, parentNameServices, defaultAddress,
  50. DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
  51. if (addressList.isEmpty()) {
  52. throw new IOException("Incorrect configuration: namenode address "
  53. + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
  54. + DFS_NAMENODE_RPC_ADDRESS_KEY
  55. + " is not configured.");
  56. }
  57. return addressList;
  58. }

这个方法的处理逻辑如下:

1、首先,根据NameNode类的静态方法getAddress()从配置信息中获取默认地址defaultAddress;

2、然后,获取hdfs的内部命名服务:dfs.internal.nameservices,得到集合parentNameServices:

2.1、如果没有配置dfs.internal.nameservices,获取dfs.nameservices,赋值给集合parentNameServices;

2.2、如果配置了dfs.internal.nameservices,再获取获取dfs.nameservices,得到availableNameServices,验证parentNameServices中的每个nsId在availableNameServices中是否都存在,即参数dfs.internal.nameservices包含在参数dfs.nameservices范围内;

3、调用getAddressesForNsIds()方法,利用conf、parentNameServices、defaultAddress等获取nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合addressList,并返回。

下面,我们再看下getAddressesForNsIds()方法,代码如下:

  1. /**
  2. * Returns the configured address for all NameNodes in the cluster.
  3. * @param conf configuration
  4. * @param nsIds
  5. *@param defaultAddress default address to return in case key is not found.
  6. * @param keys Set of keys to look for in the order of preference   @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
  7. */
  8. private static Map<String, Map<String, InetSocketAddress>>
  9. getAddressesForNsIds(Configuration conf, Collection<String> nsIds,
  10. String defaultAddress, String... keys) {
  11. // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
  12. // across all of the configured nameservices and namenodes.
  13. // dfs.namenode.servicerpc-address
  14. // dfs.namenode.rpc-address
  15. Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
  16. // 遍历每个nameserviceId,做以下处理:
  17. for (String nsId : emptyAsSingletonNull(nsIds)) {
  18. // 通过getAddressesForNameserviceId()方法获取nameNodeId->InetSocketAddress的对应关系,nameNodeId来自参数dfs.ha.namenodes.nsId
  19. Map<String, InetSocketAddress> isas =
  20. getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
  21. if (!isas.isEmpty()) {
  22. // 将nameserviceId->{nameNodeId->InetSocketAddress}的对应关系放入集合ret
  23. ret.put(nsId, isas);
  24. }
  25. }
  26. // 返回nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合ret
  27. return ret;
  28. }

非常简单,遍历每个nameserviceId,做以下处理:

1、通过getAddressesForNameserviceId()方法获取nameNodeId->InetSocketAddress的对应关系,nameNodeId来自参数dfs.ha.namenodes.nsId;

2、将nameserviceId->{nameNodeId->InetSocketAddress}的对应关系放入集合ret;

3、最后返回nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合ret。

继续看getAddressesForNameserviceId()方法,如下:

  1. private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
  2. Configuration conf, String nsId, String defaultValue,
  3. String... keys) {
  4. // keys
  5. // dfs.namenode.servicerpc-address
  6. // dfs.namenode.rpc-address
  7. // 获取dfs.ha.namenodes.nsId
  8. Collection<String> nnIds = getNameNodeIds(conf, nsId);
  9. Map<String, InetSocketAddress> ret = Maps.newHashMap();
  10. for (String nnId : emptyAsSingletonNull(nnIds)) {
  11. String suffix = concatSuffixes(nsId, nnId);
  12. // 根据keys获取address
  13. String address = getConfValue(defaultValue, suffix, conf, keys);
  14. if (address != null) {
  15. // 将address封装成InetSocketAddress,得到isa
  16. InetSocketAddress isa = NetUtils.createSocketAddr(address);
  17. if (isa.isUnresolved()) {
  18. LOG.warn("Namenode for " + nsId +
  19. " remains unresolved for ID " + nnId +
  20. ".  Check your hdfs-site.xml file to " +
  21. "ensure namenodes are configured properly.");
  22. }
  23. // 将nnId->InetSocketAddress的对应关系放入到Map中
  24. ret.put(nnId, isa);
  25. }
  26. }
  27. return ret;
  28. }

它通过参数获取dfs.ha.namenodes.nsId获取到NameNodeId的集合nnIds,然后针对每个NameNode,根据keys获取address,这keys传递进来的就是dfs.namenode.servicerpc-address、dfs.namenode.rpc-address,也就是优先取前一个参数,前一个取不到的话,再取第二个参数,然后将address封装成InetSocketAddress,得到isa,将nnId->InetSocketAddress的对应关系放入到Map中,最终返回给上层应用。

至此,从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap就分析完了。下面,我们再看下初始化的重点:调用doRefreshNamenodes()方法执行集合newAddressMap中的刷新。代码如下:

  1. private void doRefreshNamenodes(
  2. Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
  3. // 确保当前线程在refreshNamenodesLock上拥有互斥锁
  4. assert Thread.holdsLock(refreshNamenodesLock);
  5. // 定义三个集合,分别为待刷新的toRefresh、待添加的toAdd和待移除的toRemove
  6. Set<String> toRefresh = Sets.newLinkedHashSet();
  7. Set<String> toAdd = Sets.newLinkedHashSet();
  8. Set<String> toRemove;
  9. // 使用synchronized关键字在当前对象上获得互斥锁
  10. synchronized (this) {
  11. // Step 1. For each of the new nameservices, figure out whether
  12. // it's an update of the set of NNs for an existing NS,
  13. // or an entirely new nameservice.
  14. // 第一步,针对所有新的nameservices中的每个nameservice,
  15. // 确认它是一个已存在nameservice中的被更新了的NN集合,还是完全的一个新的nameservice
  16. // 判断的依据就是对应nameserviceId是否在bpByNameserviceId结合中存在
  17. // 循环addrMap,放入待添加或者待刷新集合
  18. for (String nameserviceId : addrMap.keySet()) {
  19. // 如果bpByNameserviceId结合中存在nameserviceId,加入待刷新集合toRefresh,否则加入到待添加集合toAdd
  20. if (bpByNameserviceId.containsKey(nameserviceId)) {
  21. toRefresh.add(nameserviceId);
  22. } else {
  23. toAdd.add(nameserviceId);
  24. }
  25. }
  26. // Step 2. Any nameservices we currently have but are no longer present
  27. // need to be removed.
  28. // 第二步,删除所有我们目前拥有但是现在不再需要的,也就是bpByNameserviceId中存在,而配置信息addrMap中没有的
  29. // 加入到待删除集合toRemove
  30. toRemove = Sets.newHashSet(Sets.difference(
  31. bpByNameserviceId.keySet(), addrMap.keySet()));
  32. // 验证,待刷新集合toRefresh的大小与待添加集合toAdd的大小必须等于配置信息addrMap中的大小
  33. assert toRefresh.size() + toAdd.size() ==
  34. addrMap.size() :
  35. "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
  36. "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
  37. "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
  38. // Step 3. Start new nameservices
  39. // 第三步,启动所有新的nameservices
  40. if (!toAdd.isEmpty()) {// 待添加集合toAdd不为空
  41. LOG.info("Starting BPOfferServices for nameservices: " +
  42. Joiner.on(",").useForNull("<default>").join(toAdd));
  43. // 针对待添加集合toAdd中的每个nameserviceId,做以下处理:
  44. for (String nsToAdd : toAdd) {
  45. // 从addrMap中根据nameserviceId获取对应Socket地址InetSocketAddress,创建集合addrs
  46. ArrayList<InetSocketAddress> addrs =
  47. Lists.newArrayList(addrMap.get(nsToAdd).values());
  48. // 根据addrs创建BPOfferService
  49. BPOfferService bpos = createBPOS(addrs);
  50. // 将nameserviceId->BPOfferService的对应关系添加到集合bpByNameserviceId中
  51. bpByNameserviceId.put(nsToAdd, bpos);
  52. // 将BPOfferService添加到集合offerServices中
  53. offerServices.add(bpos);
  54. }
  55. }
  56. // 启动所有BPOfferService,实际上是通过调用它的start()方法启动
  57. startAll();
  58. }
  59. // Step 4. Shut down old nameservices. This happens outside
  60. // of the synchronized(this) lock since they need to call
  61. // back to .remove() from another thread
  62. // 第4步,停止所有旧的nameservices。这个是发生在synchronized代码块外面的,是因为它们需要回调另外一个线程的remove()方法
  63. if (!toRemove.isEmpty()) {
  64. LOG.info("Stopping BPOfferServices for nameservices: " +
  65. Joiner.on(",").useForNull("<default>").join(toRemove));
  66. // 遍历待删除集合toRemove中的每个nameserviceId
  67. for (String nsToRemove : toRemove) {
  68. // 根据nameserviceId从集合bpByNameserviceId中获取BPOfferService
  69. BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
  70. // 调用BPOfferService的stop()和join()方法停止服务
  71. bpos.stop();
  72. bpos.join();
  73. // they will call remove on their own
  74. // 它们会调用本身的remove()方法
  75. }
  76. }
  77. // Step 5. Update nameservices whose NN list has changed
  78. // 第5步,更新NN列表已变化的nameservices
  79. if (!toRefresh.isEmpty()) {// 待更新集合toRefresh不为空时
  80. LOG.info("Refreshing list of NNs for nameservices: " +
  81. Joiner.on(",").useForNull("<default>").join(toRefresh));
  82. // 遍历待更新集合toRefresh中的每个nameserviceId
  83. for (String nsToRefresh : toRefresh) {
  84. // 根据nameserviceId从集合bpByNameserviceId中取出对应的BPOfferService
  85. BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
  86. // 根据BPOfferService从配置信息addrMap中取出NN的Socket地址InetSocketAddress,形成列表addrs
  87. ArrayList<InetSocketAddress> addrs =
  88. Lists.newArrayList(addrMap.get(nsToRefresh).values());
  89. // 调用BPOfferService的refreshNNList()方法根据addrs刷新NN列表
  90. bpos.refreshNNList(addrs);
  91. }
  92. }
  93. }

整个doRefreshNamenodes()方法比较长,但是主体逻辑很清晰,主要分五大步骤,分别如下:

1、第一步,针对nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap中每个nameserviceid,确认它是一个完全新加的nameservice,还是一个其NameNode列表被更新的nameservice,分别加入待添加toAdd和待刷新toRefresh集合;

2、第二步,针对newAddressMap中没有,而目前DataNode内存bpByNameserviceId中存在的nameservice,需要删除,添加到待删除toRemove集合;

3、第三步,处理待添加toAdd集合,启动所有新的nameservices:根据addrs创建BPOfferService,维护BPOfferService相关映射集合,然后启动所有的BPOfferService;

4、第四步,处理待删除toRemove集合,停止所有旧的nameservices;

5、第五步,处理待刷新toRefresh集合,更新NN列表已变化的nameservices。

对,就是这么简单,将需要处理的nameservice分别加入到不同的集合,然后按照添加、删除、更新的顺序针对处理类型相同的nameservice一并处理即可。

接下来,我们分别研究下每一步的细节:

1、第一步,针对nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap中每个nameserviceid,确认它是一个完全新加的nameservice,还是一个其NameNode列表被更新的nameservice,分别加入待添加toAdd和待刷新toRefresh集合;

它的处理思路是,循环addrMap中每个nameserviceid,放入待添加toAdd或者待刷新toRefresh集合;如果bpByNameserviceId结合中存在nameserviceId,加入待刷新集合toRefresh,否则加入到待添加集合toAdd。

2、第二步,针对newAddressMap中没有,而目前DataNode内存bpByNameserviceId中存在的nameservice,需要删除,添加到待删除toRemove集合;

它的处理思路是:利用Sets的difference()方法,比较bpByNameserviceId和addrMap两个集合的keySet,找出bpByNameserviceId中存在,但是addrMap中不存在的nameserviceid,生成待删除集合toRemove。

3、第三步,处理待添加toAdd集合,启动所有新的nameservices:根据addrs创建BPOfferService,维护BPOfferService相关映射集合,然后启动所有的BPOfferService;

这一步针对待添加集合toAdd中的每个nameserviceId,做以下处理:

3.1、从addrMap中根据nameserviceId获取对应Socket地址InetSocketAddress,创建集合addrs;

3.2、根据addrs创建BPOfferService实例bpos;

3.3、将nameserviceId->BPOfferService的对应关系添加到集合bpByNameserviceId中

3.4、将BPOfferService添加到集合offerServices中;

最后,调用startAll()方法启动所有BPOfferService,实际上是通过调用它的start()方法启动。

其中,创建BPOfferService实例bpos时,BPOfferService的构造方法如下:

  1. // 构造方法
  2. BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
  3. Preconditions.checkArgument(!nnAddrs.isEmpty(),
  4. "Must pass at least one NN.");
  5. this.dn = dn;
  6. // 遍历nnAddrs,为每个namenode添加一个构造的BPServiceActor线城实例,加入到bpServices列表
  7. for (InetSocketAddress addr : nnAddrs) {
  8. this.bpServices.add(new BPServiceActor(addr, this));
  9. }
  10. }

它实际上是遍历nnAddrs,为每个namenode添加一个构造的BPServiceActor线城实例,加入到bpServices列表。
        而调用startAll()方法启动所有BPOfferService时,执行的代码如下:

  1. synchronized void startAll() throws IOException {
  2. try {
  3. UserGroupInformation.getLoginUser().doAs(
  4. new PrivilegedExceptionAction<Object>() {
  5. @Override
  6. public Object run() throws Exception {
  7. // 遍历offerServices,启动所有的BPOfferService
  8. for (BPOfferService bpos : offerServices) {
  9. bpos.start();
  10. }
  11. return null;
  12. }
  13. });
  14. } catch (InterruptedException ex) {
  15. IOException ioe = new IOException();
  16. ioe.initCause(ex.getCause());
  17. throw ioe;
  18. }
  19. }

它会遍历offerServices,启动所有的BPOfferService,而BPOfferService的启动,实际上就是将其所持有的每个NameNode对应的BPServiceActor线程启动,代码如下:

  1. //This must be called only by blockPoolManager
  2. void start() {
  3. for (BPServiceActor actor : bpServices) {
  4. actor.start();
  5. }
  6. }

4、第四步,处理待删除toRemove集合,停止所有旧的nameservices;

在这一步中,遍历待删除集合toRemove中的每个nameserviceId:

4.1、根据nameserviceId从集合bpByNameserviceId中获取BPOfferService;

4.2、调用BPOfferService的stop()和join()方法停止服务,它们会调用本身的remove()方法;

而BPOfferService的stop()和join()方法,则是依次调用BPOfferService所包含的所有BPServiceActor线程的stop()和join()方法,代码如下:

  1. //This must be called only by blockPoolManager.
  2. void stop() {
  3. for (BPServiceActor actor : bpServices) {
  4. actor.stop();
  5. }
  6. }
  7. //This must be called only by blockPoolManager
  8. void join() {
  9. for (BPServiceActor actor : bpServices) {
  10. actor.join();
  11. }
  12. }

5、第五步,处理待刷新toRefresh集合,更新NN列表已变化的nameservices;

在最后一步中,遍历待更新集合toRefresh中的每个nameserviceId:

5.1、根据nameserviceId从集合bpByNameserviceId中取出对应的BPOfferService;

5.2、根据BPOfferService从配置信息addrMap中取出NN的Socket地址InetSocketAddress,形成列表addrs;

5.3、调用BPOfferService的refreshNNList()方法根据addrs刷新NN列表。

好了,HDFS心跳相关数据结构的初始化已分析完毕,至此,涉及到每个命名空间服务中每个NameNode相关的BPServiceActor线程均已启动,它是真正干活的苦力,真正的底层劳动人民啊!至于它是怎么运行来完成HDFS心跳的,我们下一节再分析吧!

最新文章

  1. 查看.NET Core源代码通过Autofac实现依赖注入到Controller属性
  2. 解决Gradle编译时出现: 编码GBK的不可映射字符
  3. js数组的队栈操作
  4. canvas &amp; animation
  5. tcp/IP点对点通信程序
  6. EF入门 IQueryable和IEnumberable的区别
  7. perl中执行linux命令,及其区别
  8. div、span
  9. 用Qemu搭建aarch32学习环境
  10. Java基础笔记(1)----语言基础
  11. 强如 Disruptor 也发生内存溢出?
  12. WebSocket刨根问底(四)之五子棋大战江湖
  13. tensorflow如何正确加载预训练词向量
  14. Apache+Tomcat+Memcached实现会话保持
  15. Asp.net动态生成表单
  16. Luogu5110 块速递推
  17. python基础之while语句continue以及break --语法以及案例
  18. HDUOJ-------1753大明A+B(大数之小数加法)
  19. C++多态之虚函数
  20. 软工alpha阶段个人总结

热门文章

  1. Extension Methods &quot;点&quot;函数方法 扩展方法
  2. linux内核之系统调用nanosleep与pause()
  3. 关于 gstreamer 和 webrtc 的结合,有点小突破
  4. LeetCode OJ-- Integer to Roman @
  5. Java原子操作类,你知道多少?
  6. 判断vps类型
  7. [置顶] zabbix告警信息-lykchat信息发送系统
  8. goreplay使用
  9. MFC中 报错:error : bitmap file Res\tankBattle.ico is not in 3.00 format
  10. 转: CSRF(Cross Site Request Forgery 跨站域请求伪造) 背景与介绍