kafka环境搭建:

    一、安装配置java-jdk

    (1)kafka需要java环境,安装java-jdk,下载地址:https://www.oracle.com/technetwork/java/javase/downloads/index.html

   (2)安装目录如下:

    (3)环境变量配置:

    

    

    二、下载kafka

    (1)下载kafka2.10-0.9.0.1版本,自带了zookeeper jar包,不用再次下载zookeeper。kafka代理无状态,zookeeper维持集群状态。下载地址:http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka-2.2.0-src.tgz

    (2)安装目录(不要带空格)如下:

    

    (3)修改zookeeper和kafka配置文件:

    (4)按照配置创建这两个目录

    (5)cmd启动zookeeper:  

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

    (6)再开cmd启动kafka:

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

bin\windows\kafka-server-start.bat config\server.properties

    (7)再开cmd创建topic发送消息:

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

# 创建topic
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kjTest # 列出topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 # 创建生产者
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kjTest # 发送消息
this is a test
hello

    (8)再开cmd接收消息:

cd D:\KAFKA\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1\kafka_2.10-0.9.0.1

# 创建消费者
bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kjTest --from-beginning # 消费
this is a test
hello


  golang实现Kfaka消息发送:

      创建main.go:

package main

import (
"fmt" "github.com/Shopify/sarama"
"time"
) //消息写入kafka
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//生产者
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer close,err:", err)
return
} defer client.Close()
var n int = 0 for n < 20 {
n++
//创建消息
msg := &sarama.ProducerMessage{}
msg.Topic = "kjTest"
msg.Value = sarama.StringEncoder("this is a good test,hello nola!")
//发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n,", pid, offset)
time.Sleep(10 * time.Millisecond) } }

      消费消息效果:

  参考博友:

    kafka环境搭建:https://www.cnblogs.com/UniqueColor/p/8657319.html

    golang发送消息到kafka:https://www.cnblogs.com/pyyu/p/8371649.html

    kafka入门,概念功能理解:https://blog.csdn.net/tflasd1157/article/details/81985722

最新文章

  1. 04.ubuntu下kvm 命令行安装64位ubuntu报&quot;Couldn&#39;t find hvm kernel for Ubuntu tree.&quot;的问题
  2. 用遗传算法GA改进CloudSim自带的资源调度策略(2)
  3. 同个项目写webservice引用EF出现的问题
  4. mac_snailSVN
  5. 数据结构算法C语言实现(六)---2.4一元多项式的表示及相加
  6. datagrid点击标题进行排序
  7. FMS直播流发布时 Microphone Speex 编码设置注意事项
  8. 编写高质量代码改善C#程序的157个建议[避免finaly内的无效代码、避免嵌套异常、避免吃掉异常、注意循环异常处理]
  9. 数据结构(Splay平衡树):HDU 1890 Robotic Sort
  10. Inno Setup 网页显示插件 webctrl (V2.1 版本)
  11. JAVA知识的相关积累--用于自己以后查找
  12. 基于visual Studio2013解决C语言竞赛题之1029二元数组平均值
  13. UI 基本控件使用
  14. GWAS基因芯片数据预处理:质量控制(quality control)
  15. set 转 enumeration
  16. eclipse + maven + com.sun.jersey 创建 restful api
  17. jzoj3519
  18. Ajax-01 Ajax概述
  19. SVM 之 MATLAB 实现代码
  20. nodejs模块之fs&amp;&amp;stream

热门文章

  1. Nginx 自定义添加Response Headers 修改server
  2. flink连接hbase方法及遇到的问题
  3. xss 加载远程第三方JS
  4. vue笔记-条件渲染
  5. 精读《Function VS Class 组件》
  6. DWM1000 帧过滤代码实现
  7. async与defer
  8. vue 源码学习(一) 目录结构和构建过程简介
  9. mysql数据库内容相关操作
  10. Nginx服务器 配置 https