Kafka介绍

Kafka是Apache软件基金会开发的一个开源流处理平台,由Java和Scala编写;Kafka是一种高吞吐、分布式、基于订阅发布的消息系统。

Kafka名称解释

  • Producer:生产者
  • Consumer:消费者
  • Topic:消息主题,每一类的消息称之为一个主题
  • Broker:Kafka以集群的方式运行,可以由一个或多个服务器组成,每个服务器叫做一个broker
  • Partition:物理概念上的分区,为了提供系统吞吐量,在物理上每个Topic会分为一个或多个Partition

Kafka架构图

一个典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。

Kafka通过Zookeeper管理集群配置及服务协同,Producer使用push模式将消息发布到broker,Consumer通过监听使用pull模式从broker订阅并消费消息。

图上有个细节需要注意,producer给broker的过程是push,也就是有数据就推送给broker,而consumer给broker的过程是pull,是通过consumer主动去拉数据的,而不是broker把数据主动发送给consumer端的。

Kafka与RabbitMQ比较

  • Kafka比RabbitMQ性能要高
  • RabbitMQ比Kafka可靠性要高
  • 因此在金融支付领域使用RabbitMQ居多,而在日志处理、大数据等方面Kafka使用居多。

Kafka安装

第一步 下载Kafka:

  地址 http://kafka.apache.org/downloads

第二步 解压Kafka:

  tar -zxvf kafka.tgz -C  /usr/local/kafka

第三步 运行Zookeeper:

以后台方式运行 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &  zookeeper端口 2181

第四步 运行Kafka:

以后台方式运行 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties  kafka端口 9092

Kafka图形管理工具

http://www.kafkatool.com/download.html

Go语言中使用Kafka

Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

安装sarama

  go get github.com/Shopify/sarama

Producer

package main

import (
"fmt"
"github.com/Shopify/sarama"
) func main() {
// 新建一个arama配置实例
config := sarama.NewConfig() // WaitForAll waits for all in-sync replicas to commit before responding.
config.Producer.RequiredAcks = sarama.WaitForAll // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true // 新建一个同步生产者
client, err := sarama.NewSyncProducer([]string{"172.16.65.210:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close() // 定义一个生产消息,包括Topic、消息内容、
msg := &sarama.ProducerMessage{}
msg.Topic = "revolution"
msg.Key = sarama.StringEncoder("miles")
msg.Value = sarama.StringEncoder("hello world...") // 发送消息
pid, offset, err := client.SendMessage(msg) msg2 := &sarama.ProducerMessage{}
msg2.Topic = "revolution"
msg2.Key = sarama.StringEncoder("monroe")
msg2.Value = sarama.StringEncoder("hello world2...")
pid2, offset2, err := client.SendMessage(msg2) if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
fmt.Printf("pid2:%v offset2:%v\n", pid2, offset2)
}

Consumer

package main

import (
"sync"
"github.com/Shopify/sarama"
"fmt"
) var wg sync.WaitGroup func main() {
consumer, err := sarama.NewConsumer([]string{"172.16.65.210:9092"}, nil)
if err != nil {
fmt.Println("consumer connect error:", err)
return
}
fmt.Println("connnect success...")
defer consumer.Close()
partitions, err := consumer.Partitions("revolution")
if err != nil {
fmt.Println("geet partitions failed, err:", err)
return
} for _, p := range partitions {
partitionConsumer, err := consumer.ConsumePartition("revolution", p, sarama.OffsetOldest)
if err != nil {
fmt.Println("partitionConsumer err:", err)
continue
}
wg.Add()
go func(){
for m := range partitionConsumer.Messages() {
fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)
}
wg.Done()
}()
}
wg.Wait()
}

最新文章

  1. 设置type为file的input标签选择图片类型
  2. 数据结构--线段树--lazy延迟操作
  3. jQuery EasyUI DataGrid Checkbox 数据设定与取值
  4. python抓取中文网页乱码通用解决方法
  5. Linq to Xml示例
  6. mysql open files
  7. css3学习笔记之文本效果
  8. svn-代码回滚
  9. 思考 Swift 中的 MirrorType 协议
  10. PHP导出MySQL数据到Excel
  11. pragma指令简介
  12. 使用Spring简化JDBC操作数据库
  13. spring 加载配置文件的相关配置总结
  14. HBase作为存储方案
  15. jsp篇 之 基本概念
  16. Java 本周四、五的相关研究——Excel 的文件管理(数据库初步)
  17. Latex 公式居中
  18. cordova 跨平台APP版本升级
  19. 解决windows7笔记本下玩游戏的显示问题
  20. python海龟绘图

热门文章

  1. js intanceof 或 typeof
  2. asp.net 2.0里也可以用JSON的使用方法
  3. sourcenav安装
  4. 2、easyUI-创建 CRUD可编辑dataGrid(表格)
  5. 字符设备驱动程序--LED驱动
  6. 【BZOJ4849】[Neerc2016]Mole Tunnels 模拟费用流
  7. docker-compose安装confluence
  8. maven 打包报错
  9. 使用 mock 测试
  10. 批量索引以提高索引速度 -d --data-binary