windows下golang实现Kfaka消息发送及kafka环境搭建
2024-08-24 01:41:43
kafka环境搭建:
一、安装配置java-jdk
(1)kafka需要java环境,安装java-jdk,下载地址:https://www.oracle.com/technetwork/java/javase/downloads/index.html
(2)安装目录如下:
(3)环境变量配置:
二、下载kafka
(1)下载kafka2.10-0.9.0.1版本,自带了zookeeper jar包,不用再次下载zookeeper。kafka代理无状态,zookeeper维持集群状态。下载地址:http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka-2.2.0-src.tgz
(2)安装目录(不要带空格)如下:
(3)修改zookeeper和kafka配置文件:
(4)按照配置创建这两个目录
(5)cmd启动zookeeper:
cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1 bin\windows\zookeeper-server-start.bat config\zookeeper.properties
(6)再开cmd启动kafka:
cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1 bin\windows\kafka-server-start.bat config\server.properties
(7)再开cmd创建topic发送消息:
cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1 # 创建topic
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kjTest # 列出topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 # 创建生产者
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kjTest # 发送消息
this is a test
hello
(8)再开cmd接收消息:
cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1 # 创建消费者
bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kjTest --from-beginning # 消费
this is a test
hello
golang实现Kfaka消息发送:
创建main.go:
package main import (
"fmt" "github.com/Shopify/sarama"
"time"
) //消息写入kafka
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//生产者
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer close,err:", err)
return
} defer client.Close()
var n int = 0 for n < 20 {
n++
//创建消息
msg := &sarama.ProducerMessage{}
msg.Topic = "kjTest"
msg.Value = sarama.StringEncoder("this is a good test,hello nola!")
//发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n,", pid, offset)
time.Sleep(10 * time.Millisecond) } }
消费消息效果:
参考博友:
kafka环境搭建:https://www.cnblogs.com/UniqueColor/p/8657319.html
golang发送消息到kafka:https://www.cnblogs.com/pyyu/p/8371649.html
kafka入门,概念功能理解:https://blog.csdn.net/tflasd1157/article/details/81985722
最新文章
- 04.ubuntu下kvm 命令行安装64位ubuntu报";Couldn&#39;t find hvm kernel for Ubuntu tree.";的问题
- 用遗传算法GA改进CloudSim自带的资源调度策略(2)
- 同个项目写webservice引用EF出现的问题
- mac_snailSVN
- 数据结构算法C语言实现(六)---2.4一元多项式的表示及相加
- datagrid点击标题进行排序
- FMS直播流发布时 Microphone Speex 编码设置注意事项
- 编写高质量代码改善C#程序的157个建议[避免finaly内的无效代码、避免嵌套异常、避免吃掉异常、注意循环异常处理]
- 数据结构(Splay平衡树):HDU 1890 Robotic Sort
- Inno Setup 网页显示插件 webctrl (V2.1 版本)
- JAVA知识的相关积累--用于自己以后查找
- 基于visual Studio2013解决C语言竞赛题之1029二元数组平均值
- UI 基本控件使用
- GWAS基因芯片数据预处理:质量控制(quality control)
- set 转 enumeration
- eclipse + maven + com.sun.jersey 创建 restful api
- jzoj3519
- Ajax-01 Ajax概述
- SVM 之 MATLAB 实现代码
- nodejs模块之fs&;&;stream