Samza的task可以把数据进行本地存储,并且对这些数据进行丰富的查询。

 
比较SQL中的select ... where...并不需要保存状态。但是aggregation和join就需要存储row之间的状态。
Samza提供了一些基本功能,能够使得streaming join和aggregation以及其它的有状态的处理更容易实现。
 
何处需要stateful processing?
  • windowed aggregation 比如:每个用户每小时的点击量
    • 这种windowed processing通常用于ranking和relevance , 发现"trending topics ", 以及简单的实时reporting和monitoring。
    • 困难在于:当一个window处理的消息很多时,如果这个window 失败了,当重启时应该如何避免需要把全部消息重新处理一遍。
  • table-table join
  • stream-table join
  • stream-stream join
如何管理task state? 如何支持这样的stateful processing?
 
有以下几种常用方案
In-memory state with checkpointing
周期性的把task在内存中的数据做checkpoint. S4的状态管理就是这样做的。
缺点是当作为state的数据量很大时,每次都完全dump所有数据不切实际,如果用diff又太复杂。
 
Using an external store
另一种常见的方案是把状态写进一个外部的数据库或者key-value store中。
samza支持这种方式,但是提供了本地持久化作为更好的选项。
 
Local state in Samza
Samza allows tasks to maintain persistent, mutable, queryable state that is physically co-located with each task.
Samza支持task在跟task同一台机器上维持持久化的、可变的、可查询的状态。
每个task在写OutputStream的同时,还会写Changlog Stream。
 
Key-value storage
Kafka自带一个key-value store的实现,使用LevelDB。这个k-v store使用一个高可靠的"changelog" stream做为支撑,这个stream通过做为一个"redo log"来为task的状态提供了fault-tolerance功能。
 
Fault-tolerance
一个task的local storage实际上是一个缓存,那么当一个机器fail之后,怎么才能在另一个机器上重建这个缓存呢?
这里有两种选择
  1. 在一个task重启时,重新读取曾经的所有输入以重建它的state。但是通常这个state会比input stream小得多,或者input stream是不可重放的。所以重新处理原有输入是一种浪费
  2. 使用一个changelog流。task把它的每次状态的改变记在这个流里。changelog就是一个普通的流,可以被其它人订阅。当然changelog是不断增长的,为了避免它占用太多空间,可以使用Kafka 0.8.1提供的log compaction功能,来去掉重复的条目。
Using the key-value store
首先在config里加上以下配置
 
# Use the key-value store implementation for a store called "my-store"
stores.my-store.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory

# Log changes to the store to an output stream for restore
# If no changelog is specified the store will not be logged (but you can still rebuild off your input streams)
stores.my-store.changelog=kafka.my-stream-name

# The serialization format to use
stores.my-store.key.serde=string
stores.my-store.msg.serde=string

 
然后在StreamTask里这么写
 
public class MyStatefulTask implements StreamTask, InitableTask {
  private KeyValueStore<String, String> store;

public void init(Config config, TaskContext context) {
    this.store = (KeyValueStore<String, String>) context.getStore("store");
  }

public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    System.out.println("Adding " + envelope.getKey() + " => " + envelope.getMessage() + " to the store.");
    store.put((String) envelope.getKey(), (String) envelope.getMessage());
  }
}

 
 
 

最新文章

  1. “插件(application/x-vlc-plugin)不受支持”NPAPI和PPAPI的问题
  2. json_encode charset
  3. hibernate 建表一对一 就是一对多,多的一方外键唯一unique
  4. 解决eclipse插件svn不显示svn信息和显示的信息为数字的问题
  5. int? 参数是这个的时候 是可以传入null的 而int的就不行
  6. [转载]10 Best Tools For Websites And Apps Development Ever
  7. js 当前系统时间
  8. SQL Server save transaction
  9. css3---线性渐变
  10. springboot 实现配置文件给常量赋值
  11. 编写ROS程序--HelloROS
  12. centos7.5 安装mysql8.0
  13. Mysql数据库报错:Cannot add or update a child row: a foreign key constraint fails(添加多对多关系)
  14. 32个Python爬虫实战项目,满足你的项目慌
  15. Grunt、Gulp和Webpack对比
  16. 1-求组合数(c(n, m))的几种方法
  17. 复利计算程序单元测试(C语言)
  18. sudo -s/sodo -i/su root
  19. Python 微信公众号发送消息
  20. jvm-垃圾收集器与内存分配策略

热门文章

  1. Web 服务器上的请求筛选被配置为拒绝该请求,因为查询字符串过长
  2. 在Ubuntu下配置Apache多域名服务器
  3. C#对象XML序列化
  4. JavaScript学习笔记(10)——JavaScript语法之操作DOM
  5. OC8_NSData
  6. java SimpleDateFormat非线程安全测试
  7. javascript笔记---算法基础学习
  8. bzoj 1005 HNOI2008 明明的烦恼
  9. java学习笔记_GUI(2)
  10. 对整站的a链接进行监控,对匹配规则进行指定页面的跳转