转自https://blog.csdn.net/tianlan996/article/details/80495208

1. 类

public class KafkaProducer<K,V>
extends java.lang.Object
implements Producer<K,V>

2. producer是线程安全的(这点不同于consumer),多线程共享producer可以提高效率。

3. 使用示例:

 Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", );---重试次数
props.put("batch.size", );
props.put("linger.ms", );
props.put("buffer.memory", );
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = ; i < ; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();

producer包含一个缓存空间来存放未发送的记录,一个后台i/O进程负责缓存中记录的请求和传送。使用后,如果不关闭producer,那么将存在资源泄漏。

send()方法是异步的。

batch.size:producer维护了每个分区未发送记录的缓存,该缓存的大小由batch.size设定。

linger.ms:一般情况下,记录会被立即发送出去,而不会等待缓存的填充。用户可以通过配置linger.ms来让producer等待一段时间再发送消息。

buffer.memory:缓存的大小。消息填满缓存后,后续的消息就会阻塞。阻塞超过max.block.ms设定的时间,就会抛出TimeoutException。

key.serializer and value.serializer:如何将key和value组合成对象,可以自定义类。使用 StringSerializer默认组合成字符串。

3. idempotent producer

enable.idempotence true
retries Integer.MAX_VALUE(不设置,默认即为此值)
acks all
通过上述配置开启idempotent,可以保证exactly语意。prouducer java api 不变。只有在同一个session中才能保证produder的idempotent。
4. transactional producer
transactional.id <one value>
replication.factor
min.insync.replicas
通过上述配置开启transactional。transactional.id被设置后,idempotent也会自动开启。
consumer端需要配置为 只消费committed的消息。

在分区应用中,每个producer的transactional.id须唯一。

 Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");// 一旦被设置,发送代码就需要使用事务
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try {
producer.beginTransaction();
for (int i = ; i < ; i++) //100条消息组成一个单独的事务
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();

最新文章

  1. [2016.01.18]文本替换专家 v5.3
  2. context.Request.Files为NULL问题 在实现图片上传功能的时候出现在ashx等处理页面出现context.Request.Files为NULL异常,有几点需要注意:
  3. Java——新IO 缓冲区与Buffer
  4. Shell编程中括号判断中赋值语句和判断语句
  5. 326.Power of Three
  6. git plumbing 更加底层命令解析-深入理解GIT
  7. POJ 3164 Command Network (最小树形图)
  8. [ ArcGIS Server技术版]如何得到本机上的所有的REST服务?
  9. 搞定python多线程和多进程
  10. java--整理下关于static关键字的知识
  11. windows转mac-开发环境搭建(一):需要搭建的环境及安装的工具
  12. linux新建用户登录不了
  13. ORACLE复杂查询之连接查询
  14. 英特尔神经棒使用入门-NCS2 &amp; NCS1 -OpenVino
  15. Ubuntu16.04安装Python3.6 和pip(python3 各版本切换)
  16. [js]nodejs初探http/url/fs模块
  17. java学习笔记整理
  18. 利用NATAPP隧道解决微信公众号开发之本地调试难题
  19. source的简单操作
  20. windows10下使用source insight出现&quot;source insight program editor已停止工作&quot;的问题

热门文章

  1. 8.View类
  2. 去掉utf-8的Bom头:使用java以及jdbc不使用第三方库执行sql文件脚本
  3. Referenced file contains errors (http://www.springframework.org/schema/beans/spring-beans-3.1.xsd)
  4. ps 常用命令
  5. 数组谓词查询法 NSPredicate
  6. js判断页面从何种浏览器打开
  7. ASP SQLDATASOURCE
  8. Python 之 装饰器
  9. [比赛|考试]nowcoder NOIP提高组组第二场
  10. ArcGIS api for javascript-图层控制(图层树)