kafka安装使用
2024-09-04 17:50:29
版本:kafka_2.11-0.10.1.0 (之前安装2.10-0.10.0.0,一直出问题)
安装
- 下载并解压代码
wget http://mirrors.cnnic.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
#http://kafka.apache.org/downloads tar -zxvf kafka_2.10-0.10.0.0.tgzcd kafka_2.-0.10.0.0
- 修改每个broker安装目录下的配置文件
# $targetID默认是0,每个broker的broker.id必须要唯一
broker.id=$targetID #默认是注释的,$IP改成当前节点的IP即可。如果不改该配置项,在节点通过命令行可以收发消息,而在其他机器是无法通过IP去访问队列的
#在之前的版本不叫listeners,而是advertised.host.name和host.name
listeners=PLAINTEXT://$IP:9092 - 启动服务
#kafka依赖于zookeeper
#如果没有的话,可以通过kafka提供的脚本快速创建一个单节点zookeeper实例:
#bin/zookeeper-server-start.sh config/zookeeper.properties #确认zookeeper服务已经启动后,启动kafka服务
nohup bin/kafka-server-start.sh config/server.properties & - 创建一个名为test,有一份备份,一个分区的topic
bin/kafka-topics.sh --create --zookeeper localhost: --replication-factor --partitions --topic test
#查看所有topic bin/kafka-topics.sh --list --zookeeper localhost: - 发送消息
bin/kafka-console-producer.sh --broker-list localhost: --topic test
- 开启一个消费者接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost: --topic test --from-beginning
- 查看topic信息
bin/kafka-topics.sh --describe --zookeeper localhost: --topic test
Springboot结合Kafka的使用
1.在pom文件添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.在application.properties中添加配置
# APACHE KAFKA (KafkaProperties)
spring.kafka.bootstrap-servers=192.168.0.155:,192.168.0.156:
spring.kafka.client-id=K1
spring.kafka.consumer.auto-offset-reset= earliest
spring.kafka.consumer.enable-auto-commit= true
spring.kafka.consumer.group-id= test-consumer-group
spring.kafka.producer.batch-size=
spring.kafka.producer.bootstrap-servers= 192.168.0.155:,192.168.0.156:
spring.kafka.producer.client-id= P1
spring.kafka.producer.retries=
spring.kafka.template.default-topic= test
3.创建消费者类(订阅消息的对象)
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; @Component
public class ListenerBean { @KafkaListener(topics = "myTopic")
public void processMessage(String content) {
System.out.println("you have a new message:" + content);
// ...
}
}
4.创建生产者类(发布消息的对象)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController; @Component
@RestController
@RequestMapping("/send")
@EnableAutoConfiguration
public class SendMsgBean {
private final KafkaTemplate<String,String> kafkaTemplate; @Autowired
public SendMsgBean(KafkaTemplate<String,String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
} @RequestMapping(path="/{msg}",method=RequestMethod.GET)
public String send(@PathVariable("msg") String msg) {
System.out.println("==sending msg:" + msg);
kafkaTemplate.send("test","test-"+msg);
return "message has been sent!";
}
}
只需这4步,就可以在springboot中使用kafka了,现在我们访问 http://localhost:8080/send/mymessage 就可以在控制台看到信息了。
参考:
最新文章
- 转: 深入理解Linux修改hostname
- URAL 1549 Another Japanese Puzzle(构造)
- Effective Java 78 Consider serialization proxies instead of serialized instances
- java5、java6、java7、java8的新特性
- iPhone 6/iPhone 6 Plus硬件性能
- eclipse调试总结(转)
- iostat的深入理解
- 【多媒体封装格式详解】---MKV
- android layout 属性大全
- linux 管道命令 小记
- 小说mvvm
- HTML/CSS font-family对应的中英文名称 宋体 微软雅黑
- 用js,css3 做的一个球
- 交作业啊,python爬取58的页面
- SLF4J - 借助SLF4J, 统一适配所有日志实现为logback日志实现的实践
- android galley实现画廊效果
- BeanPostProcessors (for example: not eligible for auto-proxying),报错解决
- vue设置背景图片
- Leetcode#832. Flipping an Image(翻转图像)
- 在模拟器上运行Android项目时报错:DELETE_FAILED_INTERNAL_ERROR Error while Installing APKs
热门文章
- 3673: 可持久化并查集 by zky
- MVC action 执行两次 background url()
- 【BZOJ3073】[Pa2011]Journeys 线段树+堆优化Dijkstra
- FZU 2099 魔法阵(计算几何)
- QL 获取当前日期,年、月、日、周、时、分、秒
- StartCom免费ssl证书申请以及在Tomcat环境中的配置
- 段合并 segments merge 被删除的文档的删除时间
- ./bin/console server:run Surprise! There are no commands defined in the ";server"; namespace.
- centos中screen的使用 创建 退出
- 面向对象之继承(Day24)