<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>

生产者

package com.test.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class MyProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092");
// ack
properties.put("acks", "all");
// 重试次数
properties.put("retries", 1);
// 批次大小
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// 缓冲区大小
properties.put("buffer.memory", 33554432);
// 设置数据key和value的序列化处理类
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class); KafkaProducer<String, String> produce = new KafkaProducer<>(properties); for (int i=0; i < 2; i++) {
produce.send(new ProducerRecord<>("test","key-"+i,"value--" + i));
} produce.close(); }
}

消费者

package com.test.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration;
import java.util.Arrays;
import java.util.Properties; public class MyConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交延时
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata"); // 重置消费者的offset(换组、offset过期)
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); // 订阅主题
consumer.subscribe(Arrays.asList("test")); // 获取数据
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for(ConsumerRecord<String,String> record: records){
System.out.println("key:" + record.key() + "" + ",value:" +record.value()); }
} }
}

带有callback的生产者

package com.test.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CallbackProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i=0; i < 10; i++) {
producer.send(new ProducerRecord<>("test","1","tt--" + i), (metadata, exception) -> {
if (exception == null){
System.out.println(metadata.partition() + "--" + metadata.offset()) ;
} else {
exception.printStackTrace();
}
});
} producer.close(); } }

最新文章

  1. Android Button的基本使用
  2. angular2 问题请教
  3. 学习 opencv---(6)玩转opencv源代码:生成opencv 工程解决方案与opencv 源码编译
  4. strange error encountered today in ROS
  5. Struts2 和 spring mvc的 迭代标签常用属性对比
  6. html+css 知识整理
  7. hihocoder 1186
  8. SQL获取数据库中表的列名和列类型
  9. Hadoop WritableComparable接口
  10. gulp+browserfy模块化工具环境搭建
  11. oracle 之数据字典屣履造门。
  12. hdu 5532(最长上升子序列)
  13. 《剑指offer(第二版)》面试题64——求1+2+...+n
  14. Linux下1号进程的前世(kernel_init)今生(init进程)----Linux进程的管理与调度(六)
  15. 从 0 到 1 实现 React 系列 —— 5.PureComponent 实现 &amp;&amp; HOC 探幽
  16. Scrapy爬虫框架中的两个流程
  17. virtualenv的使用及pip常用命令
  18. 使用 GCD 实现倒计时效果
  19. mysql 中 max_allowed_packet 查询和修改
  20. Java 中类的初始化过程

热门文章

  1. Cannot add middleware after an application has started
  2. 2022年JMUCTF WP
  3. Redis后端面试题
  4. CAD中如何将图形对象快速转换成三维曲面?
  5. ssh 登陆 Host key verification failed.
  6. 在Vue中实现app拍照-选取本地图库-图片上传成功后预览
  7. C++future promise
  8. SFINAE几种实现方式
  9. spring 任务执行器实例
  10. 1.CD冷却效果