kafka示例
2024-08-29 04:15:38
1. 引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>
2. 生产者
package org.study.kafka; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap;
import java.util.Map; /**
* 生产者
*/
public class ProducerSample { public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
//zookeeper的地址
props.put("zk.connect", "127.0.0.1:2181");
//用于建立与 kafka 集群连接的 host/port 组
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 1"));
producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 2"));
producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 3")); producer.close();
}
}
3. 消费者
package org.study.kafka; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays;
import java.util.Properties; /**
* 消费者
*/
public class ConsumerSample { public static void main(String[] args) {
String topic = "test-topic";// topic name Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。
props.put("group.id", "testGroup1");// Consumer Group Name
props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交
props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
} }
}
最新文章
- 压力测试之TCPP
- 删除 Mac OS X 中“打开方式”里重复或无用的程序列表
- JQUERY1.9学习笔记 之基本过滤器(十一) 奇数选择器
- FPGA知识大梳理(二)verilogHDL语法入门(1)
- WPF之TabControl控件用法
- Java虚拟机学习 - 垃圾收集器
- CDIF: 基于JSON的SOA软件框架
- 201621123062《Java程序设计》第一周学习总结
- OpenCV-Python : 直方图
- IDEA Maven项目默认编译器使用JDK1.5的解决办法
- 使用hadoop平台运行Apriori算法
- Linux下使用pv监控进度
- aspx页面使用ajax遇到try catch中使用Response.End()报错
- AT91RM9200---SMC简介
- IDEA导入springboot项目不能启动
- C++17尝鲜:编译期 if 语句
- 【自动化测试】selenium之 chromedriver与chrome版本映射表
- 四.jQuery源码解析之jQuery.fn.init()的参数解析
- [翻译] HTKDragAndDropCollectionViewLayout
- 树状数组怒刷sum!!!(前缀和应用)