事先必备:

kafka已安装完成

1.目录结构

2.父pom

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>KafkaAndSpringBoot</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<modules>
<module>KafkaProducer</module>
<module>KafkaConsumer</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>

3.producer模块

A.application.properties

server.port=8081

#kafka节点
spring.kafka.bootstrap-servers=192.168.204.139:9092
#kafka发送消息失败后的重试次数
spring.kafka.producer.retries=0
#当消息达到该值后再批量发送消息.16kb
spring.kafka.producer.batch-size=16384
#设置kafka producer内存缓冲区大小.32MB
spring.kafka.producer.buffer-memory=33554432
#kafka消息的序列化配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。??
#acks=1 : 只要集群的leader节点收到消息,生产者就会收到一个来自服务器成功响应。
#acks=-1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为producer请求成功。
# 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
spring.kafka.producer.acks=1

B.producer代码

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; @Component
@Slf4j
@Data
public class KafkaProducerDemo {
private final KafkaTemplate<String, Object> kafkaTemplate; public void sendMsg(String topic, Object object) {
ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, object);
send.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息发送失败:{}", ex.toString());
} @Override
public void onSuccess(SendResult<String, Object> result) {
log.info("消息发送成功:{}", result.toString());
}
});
}
}

C.启动类

 略

D.producerTest

import com.sakura.producer.KafkaProducerDemo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerTest {
@Autowired
private KafkaProducerDemo kafkaProducerDemo; @Test
public void send() throws InterruptedException {
String topic = "firstTopic";
for (int i = 0; i < 6; i++) {
kafkaProducerDemo.sendMsg(topic, "Hello kafka," + i);
}
Thread.sleep(Integer.MAX_VALUE);
}
}

4.consumer模块

A.application.properties

server.port=8082
#kafka节点
spring.kafka.bootstrap-servers=192.168.204.139:9092
#consumer消息签收机制
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
#如果没有设置offset或者设置的offset不存在时(例如数据被删除)采取的策略:
#earliest:使用最早的offset
#latest:使用最新的offset
#none:使用前一个offset,如果没有就向consumer抛异常
#anything else:直接向consumer抛出异常
spring.kafka.consumer.auto-offset-reset=earliest
## 序列化配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#监听消息消费的线程数,值范围在[1,partitionCounts]之间.
#假如有3个partition,concurrency的值为4,@KafkaListener的数量为2.
#其中一个@KafkaListener会启动两个线程分配到两个partition
#另一个@KafkaListener会启动一个线程分配到另一个partition
#当有一个@KafkaListener挂掉之后会触发broker的再均衡,由剩余的@KafkaListener启动线程重新分配至partition.
#@KafkaListener就像是消费者一样的存在,当值为1时broker会认为只有一个消费者在消费topic.
spring.kafka.listener.concurrency=1

B.consumer代码

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component; @Slf4j
@Component
public class KafkaConsumerDemo {
@KafkaListener(topics = "firstTopic",groupId = "groupDemo")
public void receiveMsg(ConsumerRecord<String, Object> record,
Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
log.info("消费消息:{}", record.value());
//手动ack
acknowledgment.acknowledge();
consumer.commitAsync();
}
}

C.启动类

最新文章

  1. Intellij Idea上传本地项目到Git
  2. MVC code first数据迁移 转
  3. [20150925]Linux之文件系统与SHELL
  4. 1069 Nim游戏
  5. handler以及AnyscTask处理机制
  6. SqlBulkCopy 插入100W条数据时 属性BatchSize的作用
  7. OpenCV Manager package was not found
  8. 关于&quot;cin&gt;&gt;&quot;输入成功或失败时的“返回值”(转载)
  9. 事半功倍之StyleCop(一)
  10. Windows 更快捷方便的安装软件,命令提示符上安装 Chocolatey
  11. DirectX11 With Windows SDK--16 流输出阶段
  12. APP,H5测试要点
  13. Windows Server 2012设置VMWare以服务方式启动(注销后也可以运行,开机也可以自动运行)
  14. 安装python的第三方Pillow库
  15. Django:管理站点
  16. MongDB篇,第一章:数据库知识2
  17. 《Linux内核分析》 第一节 计算机是如何工作的
  18. One-hot 编码/TF-IDF 值来提取特征,LAD/梯度下降法(Gradient Descent),Sigmoid
  19. 风景区的面积及道路状况分析问题 test
  20. 设置UINavigationController标题的属性

热门文章

  1. docker 在centos7中设置 DOCKER_OPTS
  2. c语言学习笔记第三章———数据和C
  3. HBase2.0 meta信息丢失的修复方法
  4. Numerical Sequence (Hard vision) 题解
  5. P5676 [GZOI2017]小z玩游戏 Tarjan+优化建图
  6. 蓝桥杯大学B组省赛2020模拟赛(一)题解与总结
  7. 隐写工具outguess安装使用介绍
  8. php 修改图片大小
  9. 重学 Java 设计模式:实战访问者模式「模拟家长与校长,对学生和老师的不同视角信息的访问场景」
  10. requests接口自动化8-传递数据为xml形式的post请求:data