功能:将多个tuple组合成为一个批次,并保障每个批次的tuple被且仅被处理一次。

storm事务处理中,把一个批次的tuple的处理分为两个阶段processing和commit阶段。

  • processing阶段运行多个批次的tuple并行处理。
  • commit阶段各批次之间需强制按照顺序进行提交。

 

事务Topologies

在Transactional Topologies内部主要管理以下事情:

  1. 管理状态: 把所有实现Transactional Topologies所必须的状态保存在zookeeper里面,包括当前transaction id及定义每个batch的一些元数据。
  2. 协调事务: 决定在任何一个时间点是该proccessing还是该committing。
  3. 错误检测: 利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。不需要手动做任何acking或者anchoring (emit时发生的动作)。
  4. 中间数据清理:决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。

 

事务Topologies的实现

Spout

事务性的spout需要实现ITransactionalSpout,这个接口包含两个内部接口类Coordinator和Emitter。在topology运行的时候,事务性的spout内部包含一个子Topology.这里面有两种类型的tuple,一种是事务性的tuple,一种是batch中的tuple.

coordinator用于开启一个事务,并在准备进入一个事务的processing阶段时,发射一个事务性 tuple(transactionAttempt & metadata)到”batch emit”流,coordinator只有一个,emitter根据并行度可以有多个实例.

Emitter以all grouping(广播)的方式订阅coordinator的”batch emit”流,负责为每个batch实际发射tuple。发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。

coordinator与Emitter关系结构图

 

TransactionAttempt

TransactionAttempt中包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的,而且不管这个batch replay多少次都是一样的。

attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,storm利用这个id来区别一个batch发射的tuple的不同版本。

事务性Bolt

BaseTransactionalBolt

  • 处理batch在一起的tuples,对于每一个tuple调用execute方 法,而在整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成committer,则 只能在commit阶段调用finishBatch方法。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何知道batch的processing完成了,也就是bolt是否接收处理了batch里面所有的tuple,在bolt内部有一个 CoordinatedBolt的模型。
  • 被标记成committer的BatchBolt需要实现ICommitter接口或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology。

CoordinateBolt具体原理如下:

 

CoordinateBolt

  • 每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要给哪些task发送信息(同样根据groping信息)。
  • 等所有的tuple都发送完了之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过 tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理 完了所有的tuple。
  • 下游CoordinateBolt会重复上面的步骤,通知其下游。

事务内部处理流程图

最新文章

  1. js条件判断时隐式类型转换
  2. WooCommerce
  3. AngularJS 指令(使浏览器认识自己定义的标签)
  4. struts2最新s2-016代码执行漏洞CVE-2013-2251
  5. Codeforces Round #263 (Div. 2) D. Appleman and Tree(树形DP)
  6. BZOJ2626: JZPFAR
  7. HDU 2112 HDU Today -- from lanshui_Yang
  8. MySQL结果集处理
  9. openwrt教程 第一章 物联网&openwrt开发概述
  10. 记录WEUI中滚动加载的一个BUG
  11. AS3编程规范
  12. scala的多种集合的使用(4)之列表List(ListBuffer)的操作
  13. PSR-PHP开发规范(本文版权归作者:luluyrt@163.com)
  14. EntityFramework Code-First 简易教程(十一)-------从已存在的数据库中映射出表
  15. Suricata在ubuntu14.04环境下安装
  16. linux 使用spinlock的配对关系问题
  17. 自定义 mapper
  18. 【洛谷】P4585 [FJOI2015]火星商店问题
  19. LeetCode——Nth Digit
  20. Outlook自动回复功能无法使用

热门文章

  1. 微信支付JS API使用心得
  2. python 脚本
  3. EntityFramework-DBFirst-重新生成后写的验证消失(解决办法)
  4. 一步步学习NHibernate(7)——HQL查询(1)
  5. 对于数组使用sizeof(a)和使用sizeof(a[0])
  6. Hadoop集群(第5期)_Hadoop安装配置
  7. 十款优秀的在线JavaScript工具介绍
  8. python re.sub
  9. [jobdu]用两个栈实现队列
  10. SPRING IN ACTION 第4版笔记-第五章BUILDING SPRING WEB APPLICATIONS-006-处理表单数据(注册、显示用户资料)