kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。

这里直接使用最新2.3版本,0.9以后的版本都适用。

注意引用的包为:org.apache.kafka.clients.producer

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerDemo { public static void main(String[] args) { Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<>("topic", "value"));
kafkaProducer.close(); } }

0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class TransactionsProducerDemo { public static void main(String[] args) { Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close(); } }

更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算

最新文章

  1. Docker如何为企业产生价值?
  2. IO流的练习 —— 创建用户注册、登陆案例
  3. Android 动态加载 (三) PAK 详解
  4. oracle学习 五 使用存储过程创建一个重置密码为123456的功能(持续更新中)
  5. ECSHOP 数据库结构说明 (适用版本v2.7.3)
  6. MSSQL效率优化随记
  7. Web平台开发流程以及规范
  8. [译]Stairway to Integration Services Level 5 - 增量删除数据
  9. maven本地jar
  10. 菜鸟互啄:WINFORM如何实现无聚焦框的Button按钮
  11. [编织消息框架][传输协议]stcp简单开发
  12. Spring的IOC分析(一)
  13. R画图的颜色搭配
  14. mysql 安装不了的问题解决
  15. 转发对python装饰器的理解
  16. Android Studio运行项目报错:Error:null value in entry: annotationProcessorOutputFolder=null的解决方案
  17. 移动端开发demo—移动端web相册(一)
  18. openerp7 时区问题解决--改成本地时区
  19. mysql查看进程
  20. js for form

热门文章

  1. 【Ajax】Ajax入门总结
  2. JAVA笔记 -- this关键字
  3. 如何访问到静态的文件,如jpg,js,css.
  4. Add a Class from the Business Class Library 从业务类库添加类 (XPO)
  5. export default和export的使用
  6. 微信小程序上拉加载——分页
  7. 安装Ubuntu系统后的配置工作
  8. 7.JavaCC官方入门指南-例2
  9. Linux文件传输协议2019-7-9
  10. SpringBoot与SpringMVC的区别是什么?