1. 引入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>

2. 生产者

package org.study.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap;
import java.util.Map; /**
* 生产者
*/
public class ProducerSample { public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
//zookeeper的地址
props.put("zk.connect", "127.0.0.1:2181");
//用于建立与 kafka 集群连接的 host/port 组
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 1"));
producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 2"));
producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 3")); producer.close();
}
}

3. 消费者

package org.study.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays;
import java.util.Properties; /**
* 消费者
*/
public class ConsumerSample { public static void main(String[] args) {
String topic = "test-topic";// topic name Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。
props.put("group.id", "testGroup1");// Consumer Group Name
props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交
props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
} }
}

最新文章

  1. 压力测试之TCPP
  2. 删除 Mac OS X 中“打开方式”里重复或无用的程序列表
  3. JQUERY1.9学习笔记 之基本过滤器(十一) 奇数选择器
  4. FPGA知识大梳理(二)verilogHDL语法入门(1)
  5. WPF之TabControl控件用法
  6. Java虚拟机学习 - 垃圾收集器
  7. CDIF: 基于JSON的SOA软件框架
  8. 201621123062《Java程序设计》第一周学习总结
  9. OpenCV-Python : 直方图
  10. IDEA Maven项目默认编译器使用JDK1.5的解决办法
  11. 使用hadoop平台运行Apriori算法
  12. Linux下使用pv监控进度
  13. aspx页面使用ajax遇到try catch中使用Response.End()报错
  14. AT91RM9200---SMC简介
  15. IDEA导入springboot项目不能启动
  16. C++17尝鲜:编译期 if 语句
  17. 【自动化测试】selenium之 chromedriver与chrome版本映射表
  18. 四.jQuery源码解析之jQuery.fn.init()的参数解析
  19. [翻译] HTKDragAndDropCollectionViewLayout
  20. 树状数组怒刷sum!!!(前缀和应用)

热门文章

  1. VSFTPD匿名用户上传文件
  2. 从TEB到PEB再到SEH(二)
  3. FOREIGN KEY 外键约束; UNIQUE和PRIMARY KEY 主键约束、CREATE INDEX建立索引的使用
  4. javascript之反柯里化uncurrying
  5. 咕泡学院java架构vip课程
  6. SDN上机第四次作业
  7. Node Addon
  8. [C++] 类的成员变量和成员方法
  9. mark_rabbitMQ
  10. flowable表简要说明