Kafka 2.3 Producer (0.9以后版本适用)
2024-09-21 03:07:11
kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。
这里直接使用最新2.3版本,0.9以后的版本都适用。
注意引用的包为:org.apache.kafka.clients.producer
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
kafkaProducer.send(new ProducerRecord<>("topic", "value"));
kafkaProducer.close();
}
}
0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class TransactionsProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
}
}
更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算
最新文章
- Docker如何为企业产生价值?
- IO流的练习 —— 创建用户注册、登陆案例
- Android 动态加载 (三) PAK 详解
- oracle学习 五 使用存储过程创建一个重置密码为123456的功能(持续更新中)
- ECSHOP 数据库结构说明 (适用版本v2.7.3)
- MSSQL效率优化随记
- Web平台开发流程以及规范
- [译]Stairway to Integration Services Level 5 - 增量删除数据
- maven本地jar
- 菜鸟互啄:WINFORM如何实现无聚焦框的Button按钮
- [编织消息框架][传输协议]stcp简单开发
- Spring的IOC分析(一)
- R画图的颜色搭配
- mysql 安装不了的问题解决
- 转发对python装饰器的理解
- Android Studio运行项目报错:Error:null value in entry: annotationProcessorOutputFolder=null的解决方案
- 移动端开发demo—移动端web相册(一)
- openerp7 时区问题解决--改成本地时区
- mysql查看进程
- js for form