参考,

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

http://kafka.apache.org/08/configuration.html , 0.8版本,关于producer,consumer,broker所有的配置

 

因为Producer相对于consumer比较简单,直接看代码,需要注意的点

1. 配置参数,详细参考上面链接
    1.1 metadata.broker.list, 不同于0.7,不需要给出zk的地址,而是给出一些broker地址,不用全部,这里建议给两个防止一个不可用
          Kafka会自己找到相应topic,partition的leader broker
    1.2 serializer.class,需要给出message的序列化的encoder,这里使用的是简单的StringEncoder
          并且对于key还可以单独的设定,"key.serializer.class" 
          注意,除非明确知道message编码,否则不要直接使用StringEncoder,
          因为源码中的逻辑是如果没有在初始化时指定编码会默认按UTF8转码,会导致乱码
          所以不明确的时候,不要指定serializer.class,默认的encoder逻辑是直接将byte[]放入broker,不会做转码
    1.3 partitioner.class,可以不设置,默认就是random partition,当然这里可以自定义,如何根据key来选择partition
    1.4 request.required.acks, 是否要求broker给出ack,如果不设置默认是'fire and forget', 会丢数据
          默认为0,即和0.7一样,发完不会管是否成功,lowest latency but the weakest durability
          1, 等待leader replica的ack,否则重发,折中的方案,当leader在同步数据前dead,会丢数据
          -1,等待all in-sync replicas的ack,只要有一个replica活着,就不会丢数据
    1.5 producer.type, 
         sync,单条发送
         async,buffer一堆请求后,再一起发送
         如果不是对丢数据非常敏感,请设为async,因为对throughput帮助很大,但是当client crash时,会丢数据
    1.6 compression.codec
         支持"none", "gzip" and "snappy"
         可以通过,compressed.topics,来指定压缩的topic

    当producer.type选择async的时候,需要关注如下配置
    queue.buffering.max.ms (5000), 最大buffer数据的时间,默认是5秒
    batch.num.messages (200), batch发送的数目,默认是200,producer会等待buffer的messages数目达到200或时间超过5秒,才发送数据
    queue.buffering.max.messages (10000), 最多可以buffer的message数目,超过要么producer block或把数据丢掉
    queue.enqueue.timeout.ms (-1), 默认是-1,即达到buffer最大meessage数目时,producer会block
                                                       设为0,达到buffer最大meessage数目时会丢掉数据

 

2. Producer发送的是kv数据
无论Producer或KeyedMessage都是<String, String>的泛型,这里是指key和value的类型

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random(); Properties props = new Properties();
props.put("metadata.broker.list", "host1:9092, host2:9092 "); //
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner"); //可以不设置
props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); //指定topic,key,value
producer.send(data);
}
producer.close();
}
}

 

对于自定义partitioner也很简单,

对于partition,两个参数,key和partitions的数目

所要完成的逻辑就是,如果根据key在partitions中挑选一个合适的partition

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) { } public int partition(String key, int a_numPartitions) {
int partition = 0;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
}
return partition;
} }

最新文章

  1. 学用MVC4做网站六后台管理:6.1.1管理员登录、6.1.2退出
  2. XSS 跨站脚本攻击之构造剖析(二)
  3. java应用死循环排查方法或查找程序消耗资源的线程方法(面试)
  4. c语言学习上的思考与心得
  5. Machine Learning for hackers读书笔记(一)使用R语言
  6. PAT 1001
  7. IntelliJ Idea 常用快捷键列表(精简版)
  8. BZOJ1572: [Usaco2009 Open]工作安排Job
  9. Java 实现享元(Flyweight)模式
  10. WCF中传递对象的成员变量行不通?
  11. python全栈学习--day4
  12. js实现接口隔离
  13. 常见js面试题
  14. android笔试题二
  15. SpringBoot整合Mybatis完整详细版
  16. 深入剖析GPU Early Z优化
  17. Gym - 101911B Glider(前缀和+二分)
  18. @Resource、@Autowired跟default-autowire区别联系
  19. 数据分析与挖掘 - R语言:K-means聚类算法
  20. ispostback的使用

热门文章

  1. (转)一种开源的跨平台视频开发框架:VideoLAN - VLC media player
  2. R语言中的标准输入,输出, 错误流
  3. cake build使用:
  4. asp.net mvc清空指定cookies
  5. 【java 类加载的深入研究1】loadClass()的研究
  6. 怎么输入MathType不等号
  7. ubuntu下使用sublime text进行C编程开发尝鲜
  8. PHPCMS v9设置文章的审核功能
  9. sublime text 2 破解
  10. Java精选笔记_Servlet事件监听器