
public class MyTaskClass implements StreamTask {

  public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
// process message




# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass # Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory # The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent # Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory # Use the "json" serializer for messages in the "PageViewEvent" topic
/** Every message that is delivered to a StreamTask is wrapped
* in an IncomingMessageEnvelope, which contains metadata about
* the origin of the message. */
public class IncomingMessageEnvelope {
/** A deserialized message. */
Object getMessage() { ... } /** A deserialized key. */
Object getKey() { ... } /** The stream and partition that this message came from. */
SystemStreamPartition getSystemStreamPartition() { ... }
    注意键和值都要被声明为对象,而且须要转化为正确的类型。假设你不配置一个serializer/deserializer。它们就会成为典型的java字节数组。一个deserializer可以转化这些字节到其它随意类型,举个样例来说j一个son deserializer可以将字节数组转化为Map、List以及字符串对象。

    1. The system:系统的名字来源于消息。就在你job的配置里定义。你能够有多个用于输入和输出的不同名字的系统;
    2. The stream name: 在原系统里数据流(话题、队列)的名字。相同也是在job的配置里定义;
    3. The partition: 一条数据流一般会被划分到多个分区。而且每个分区会被Samza安排一个StreamTask实例;
/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream { /** The name of the system which provides this stream. It is
defined in the Samza job's configuration. */
public String getSystem() { ... } /** The name of the stream/topic/queue within the system. */
public String getStream() { ... } /** The partition within the stream. */
public Partition getPartition() { ... }


/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
void send(OutgoingMessageEnvelope envelope);
    为了发送一个消息, 你会创建一个OutgoingMessageEnvelop对象而且把它传递给消息收集器。它至少会确定你想要发送的消息、系统以及数据流名字再发送出去。你也能够确定分区的key和还有一些參数。详细能够參考javadoc(http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)。



public class SplitStringIntoWords implements StreamTask {

  // Send outgoing messages to a stream called "words"
// in the "kafka" system.
private final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "words"); public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
String message = (String) envelope.getMessage(); for (String word : message.split(" ")) {
// Use the word as the key, and 1 as the value.
// A second task can add the 1's to get the word count.
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));



  1. Android GZIP压缩IO流,优化APP数据传输(一)
  2. codevs 2597 团伙
  3. SQL SERVER同步环境新增发布对象时不能生成(sp_MS+表名)同步存储过程
  4. C 到C++的升级
  5. Ubuntu 16.10 在 VMware 上无法安装的解决办法
  6. ios 实现推送消息
  7. Hibernate 查询语句
  8. 2440 lcd10分钟休眠修改
  9. [转] Boost智能指针——scoped_ptr
  10. IO中同步、异步与阻塞、非阻塞的区别(转)
  11. SPOJ 375(树链剖分)
  12. 【Linux】Shell学习笔记之四——文件和目录管理(硬连接和软连接)
  13. 【翻译】使用新的Sencha Cmd 4命令app watch
  14. Go语言系列(八)- Goroute和Channel
  15. Oracle数据库:ORA-54013错误解决办法
  16. JSON Web Token(JWT)机制
  17. TextCNN
  18. [Pytorch]Pytorch的tensor变量类型转换
  19. 会话技术及jsp概述
  20. Java程序设计10——与运行环境交互


  1. 用Fiddle跟踪调试移动表单
  2. Non-resolvable parent POM for **: Could not find artifact **
  3. unity-unet-同步各个player唯一标识
  4. Stack switching mechanism in a computer system
  5. 使用@Order调整配置类加载顺序
  6. Android 为什么要有handler机制?handler机制的原理
  7. js日期常用方法
  8. 【Educational Codeforces Round 33 A】Chess For Three
  9. [Angular] Using the Argon 2 Hashing Function In Our Sign Up Backend Service
  10. 11.5 Android显示系统框架_Vsync机制_代码分析