在很多应用场景中,分布式系统的可靠性保障尤其重要。比如电商平台中,客户的购买请求需要可靠处理,不能因为节点故障等原因丢失请求;比如告警系统中,产生的核心告警必须及时完整的知会监控人员,不能因为网络故障而丢失数据。

Storm消息可靠性保障是Storm核心特性之一,其中消息树的跟踪管理机制是Storm核心算法之一,本文将详细介绍Storm消息可靠处理机制。我们从Storm初探中的例子入手。

一、消息处理流程

1、 Spout节点

(1) Spout接收到一个文本消息;

msg1

刘备 关羽 张飞

曹操 郭嘉 荀彧

(2) Spout把文本消息拆分为2个行字符串消息,并把2个消息发送给NamesSplit Bolt节点。

2、 NamesSplit Bolt节点

(1) NamesSplit Bolt接收到两个行字符串消息;

msg2 刘备 关羽 张飞

msg3 曹操 郭嘉 荀彧

(2) NamesSplit Bolt把2个行字符串消息拆分为6个名字消息,发送给HelloWorld Bolt节点;

(3) NamesSplit Bolt确认,msg2、msg3处理完成。

3、 HelloWorld Bolt节点

(1) HelloWorld Bolt接收到6个名字消息;

msg4 刘备

msg5 关羽

msg6 张飞

msg7 曹操

msg8 郭嘉

msg9 荀彧

(2) HelloWorld Bolt SayHello;

(3) HelloWorld Bolt确认,msg4、msg5、msg6、msg7、msg8、msg9处理完成。

 二、关键代码

 1、 Spout

下面代码表示Spout节点发送消息,消息绑定到messageId上,这里的messageId可以看做上述例子中的msg1,tuple可以看做上述例子中的msg2或msg3。

public void nextTuple()
{
this.collector.emit(List<Object> tuple, Object messageId);
}

下面代码会在消息处理成功或失败后调用。

public void ack(Object msgId)
{
} public void fail(Object msgId)
{
}

2、 Bolt

这段代码是Bolt消息处理发送代码,我们详细看一下标红代码。

public void execute(Tuple input)
{
String[] nameArray = names.split(" ");
for(String name : nameArray)
{
List<Object> splitList = new ArrayList<Object>();
splitList.add(name);
collector.emit(inputList, splitList);
}
collector.ack(input);
}
OutputCollector.emit(Collection<Tuple> anchors, List<Object> tuple) 中tuple表示发送的子消息,anchors表示子消息的父节点。
这段代码既发送了子消息,又把子消息锚定到了消息树上。上述例子中,相当于把消息msg4 刘备,msg5关羽,msg6张飞锚定到消息msg2 刘备 关羽 张飞上。
OutputCollector.ack(Tuple input)表示回答消息处理完成。上述例子中,相当于确认msg2 刘备 关羽 张飞处理完成。
下面代码会在消息处理成功或失败后调用。
public void ack(Object msgId)
{
} public void fail(Object msgId)
{
}

三、消息重发机制

可以看到,一条消息从Spout发送后,会产生一棵消息树,只有当消息树中的所有消息都被确认后(ack),Storm才认为消息处理完成。

代码上可以轻易看出,我们只需要指定根节点消息ID(即Spout接收到的消息ID),其他消息ID系统会自动生成。同时,我们只需要确认非根节点消息处理完成。

实际上,Spout或者Bolt每发送一条消息,消息便会存储到kestrel队列中,Bolt每接收到一条消息,kestrel便会标记这条消息在处理中(pengding),直到该条消息被确认处理完成,kestrel才把它移除出队列。

Bolt消息处理过程中,发生异常或者超时,kestrel会把该条消息从处理中状态重新置为待处理状态,等待Storm下一次调度处理。

四、消息树管理算法

可以看到,Spout每处理一个消息,就会生成一棵消息树,如果Storm存储每棵消息树每个节点的状态,内存很快便会耗尽,显然是不可取的。

实际上,Storm仅仅采用20字节管理一棵消息树,数据结构如下:

treeId|{64bit}

treeId用于区分不同的消息树(和代码中指定的根节点ID一一对应),{64bit}则用于消息树节非根点异或计算。

每生成一个64位msgid,则与{64bit}异或计算一次,直到该消息确认处理完成后,再与{64bit}异或计算一次。(异或计算结果为新的{64bit}值)

{64bit}==0时,表示消息处理完毕。(一目了然,在此不再证明)

最新文章

  1. 对路径的访问被拒绝,解决之后又报-未在本地计算机上注册“Microsoft.Jet.OLEDB.4.0”提供程序。
  2. bzoj4462: [Jsoi2013]编程作业
  3. 练习sql语句的好去处——http://www.sqlzoo.cn/
  4. study notes for python
  5. 利用 Python 只连接一次 MySQL
  6. 一个人的Scrum之准备工作
  7. PowerDesigner12逆向生成oracle数据表时,错误解决
  8. C# DbHelperSQLP,操作不同的数据库帮助类 (转载)
  9. javascript倒置再次被否定作用
  10. 小兴趣:用python生成excel格式座位表
  11. 新建maven项目遇到Select an Archetype时没有maven-archetype-webapp处理方法
  12. pta-树种统计
  13. Metaclasses
  14. lnmp环境部署脚本-y
  15. eclipse工具类及插件(修改eclipse.ini文件及作者名字)
  16. &quot;美女相册&quot;的 js 实现代码
  17. [ZJOI2005]午餐
  18. web前端开发与iOS终端开发的异同[转]
  19. pomelo 安装
  20. iOS之富文本(二)

热门文章

  1. Java秒杀简单设计一:搭建springboot环境
  2. 神奇的thrust::device_vector与nvcc编译选项
  3. DevOps的概念
  4. zabbix中文乱码的问题
  5. 51nod 1009 - 数字1的数量 - [数位DP][模板的应用以及解释]
  6. HDU 1045 - Fire Net - [DFS][二分图最大匹配][匈牙利算法模板][最大流求二分图最大匹配]
  7. initrd和initramfs的区别
  8. vue.js个人学习心得
  9. 5839Special Tetrahedron---hdu5839(计算几何,求特殊四面体个数)
  10. eclipse spket插件 错误 Syntax error on token &quot;(&quot;, FunctionExpressionHeader expected after this