State Management
- windowed aggregation 比如:每个用户每小时的点击量
- 这种windowed processing通常用于ranking和relevance , 发现"trending topics ", 以及简单的实时reporting和monitoring。
- 困难在于:当一个window处理的消息很多时,如果这个window 失败了,当重启时应该如何避免需要把全部消息重新处理一遍。
- table-table join
- stream-table join
- stream-stream join
- 在一个task重启时,重新读取曾经的所有输入以重建它的state。但是通常这个state会比input stream小得多,或者input stream是不可重放的。所以重新处理原有输入是一种浪费
- 使用一个changelog流。task把它的每次状态的改变记在这个流里。changelog就是一个普通的流,可以被其它人订阅。当然changelog是不断增长的,为了避免它占用太多空间,可以使用Kafka 0.8.1提供的log compaction功能,来去掉重复的条目。
stores.my-store.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
# Log changes to the store to an output stream for restore
# If no changelog is specified the store will not be logged (but you can still rebuild off your input streams)
stores.my-store.changelog=kafka.my-stream-name
# The serialization format to use
stores.my-store.key.serde=string
stores.my-store.msg.serde=string
private KeyValueStore<String, String> store;
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("store");
}
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
System.out.println("Adding " + envelope.getKey() + " => " + envelope.getMessage() + " to the store.");
store.put((String) envelope.getKey(), (String) envelope.getMessage());
}
}
最新文章
- “插件(application/x-vlc-plugin)不受支持”NPAPI和PPAPI的问题
- json_encode charset
- hibernate 建表一对一 就是一对多,多的一方外键唯一unique
- 解决eclipse插件svn不显示svn信息和显示的信息为数字的问题
- int? 参数是这个的时候 是可以传入null的 而int的就不行
- [转载]10 Best Tools For Websites And Apps Development Ever
- js 当前系统时间
- SQL Server save transaction
- css3---线性渐变
- springboot 实现配置文件给常量赋值
- 编写ROS程序--HelloROS
- centos7.5 安装mysql8.0
- Mysql数据库报错:Cannot add or update a child row: a foreign key constraint fails(添加多对多关系)
- 32个Python爬虫实战项目,满足你的项目慌
- Grunt、Gulp和Webpack对比
- 1-求组合数(c(n, m))的几种方法
- 复利计算程序单元测试(C语言)
- sudo -s/sodo -i/su root
- Python 微信公众号发送消息
- jvm-垃圾收集器与内存分配策略