一、kafka集群搭建  

  至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了。

(没安装java环境的需要先安装 yum -y install java-1.8.0-openjdk*)

1. 下载zookeeper  https://zookeeper.apache.org/releases.html

2. 下载kafka http://kafka.apache.org/downloads

3. 启动zookeeper集群(我的示例是3台机器,后面的kafka也一样,这里就以1台代指3台,当然你也可以只开1台)

  1)配置zookeeper。 修改复制一份 zookeeper-3.4.13/conf/zoo_sample.cfg 改名成zoo.cfg。修改以下几个参数,改成适合自己机器的。

  dataDir=/home/test/zookeeper/data
  dataLogDir=/home/test/zookeeper/log
  server.=10.22.1.1::
  server.=10.22.1.2::
  server.=10.22.1.3::

  2) 创建myid文件,确定机器编号。分别在3台机器的/home/test/zookeeper/data目录执行分别执行命令 echo 1 > myid(注意ip为10.22.1.2把1改成2,见上面的配置)

  3) 启动zookeeper集群。分别进入目录zookeeper-3.4.13/bin 执行 sh zkServer.sh start

4. 启动kafka集群

  1) 配置kafka。进入kafka_2.11-2.2.0/config。复制3份,分别为server1.properties,server2.properties,server3.properties。修改以下几项(注意对应的机器id)

log.dirs和zookeeper.connect 是一样的。broker.id和listeners分别填对应的id和ip
broker.id=
listeners=PLAINTEXT://10.22.1.1:9092
log.dirs=/home/test/kafka/log
zookeeper.connect=10.22.1.1:,10.22.1.2:,10.22.1.3:

  2) 启动kafka集群。分别进入kafka_2.11-2.2.0/bin目录,分别执行sh kafka-server-start.sh ../config/server1.properties (第2台用server2.properties配置文件)

 

二、Golang生产者和消费者

  目前比较流行的golang版的kafka客户端库有两个:

  1. https://github.com/Shopify/sarama

  2. https://github.com/confluentinc/confluent-kafka-go

  至于谁好谁坏自己去分辨,我用的是第1个,star比较多的。

1. kafka生产者代码

  这里有2点要说明:

  1)  config.Producer.Partitioner = sarama.NewRandomPartitioner,我分partition用的是随机,如果你想稳定分paritition的话可以自定义,还有轮询和hash方式

  2) 我的topic是走的外部配置,可以根据自己的需求修改

// Package kafka_producer kafka 生产者的包装
package kafka_producer import (
"github.com/Shopify/sarama"
"strings"
"sync"
"time" "github.com/alecthomas/log4go"
) // Config 配置
type Config struct {
Topic string `xml:"topic"`
Broker string `xml:"broker"`
Frequency int `xml:"frequency"`
MaxMessage int `xml:"max_message"`
} type Producer struct {
producer sarama.AsyncProducer topic string
msgQ chan *sarama.ProducerMessage
wg sync.WaitGroup
closeChan chan struct{}
} // NewProducer 构造KafkaProducer
func NewProducer(cfg *Config) (*Producer, error) { config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.NoResponse // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = time.Duration(cfg.Frequency) * time.Millisecond // Flush batches every 500ms
config.Producer.Partitioner = sarama.NewRandomPartitioner p, err := sarama.NewAsyncProducer(strings.Split(cfg.Broker, ","), config)
if err != nil {
return nil, err
}
ret := &Producer{
producer: p,
topic: cfg.Topic,
msgQ: make(chan *sarama.ProducerMessage, cfg.MaxMessage),
closeChan: make(chan struct{}),
} return ret, nil
} // Run 运行
func (p *Producer) Run() { p.wg.Add()
go func() {
defer p.wg.Done() LOOP:
for {
select {
case m := <-p.msgQ:
p.producer.Input() <- m
case err := <-p.producer.Errors():
if nil != err && nil != err.Msg {
l4g.Error("[producer] err=[%s] topic=[%s] key=[%s] val=[%s]", err.Error(), err.Msg.Topic, err.Msg.Key, err.Msg.Value)
}
case <-p.closeChan:
break LOOP
} }
}() for hasTask := true; hasTask; {
select {
case m := <-p.msgQ:
p.producer.Input() <- m
default:
hasTask = false
}
} } // Close 关闭
func (p *Producer) Close() error {
close(p.closeChan)
l4g.Warn("[producer] is quiting")
p.wg.Wait()
l4g.Warn("[producer] quit over") return p.producer.Close()
} // Log 发送log
func (p *Producer) Log(key string, val string) {
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(val),
} select {
case p.msgQ <- msg:
return
default:
l4g.Error("[producer] err=[msgQ is full] key=[%s] val=[%s]", msg.Key, msg.Value)
}
}

2. kafka消费者

  几点说明:

  1) kafka一定要选用支持集群的版本

  2) 里面带了创建topic,删除topic,打印topic的工具

  3) replication是外面配置的

  4) 开多个consumer需要在创建topic时设置多个partition。官方的示例当开多个consumer的时候会崩溃,我这个版本不会,我给官方提交了一个PR,还不知道有没有采用

// Package main Kafka消费者
package main import (
"context"
"encoding/xml"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time" "github.com/Shopify/sarama"
"github.com/alecthomas/log4go"
) // Consumer Consumer配置
type ConsumerConfig struct {
Topic []string `xml:"topic"`
Broker string `xml:"broker"`
Partition int32 `xml:"partition"`
Replication int16 `xml:"replication"`
Group string `xml:"group"`
Version string `xml:"version"`
} var (
configFile = "" // 配置路径
initTopic = false
listTopic = false
delTopic = ""
cfg = &Config{}
) // Config 配置
type Config struct {
Consumer ConsumerConfig `xml:"consumer"`
} func init() {
flag.StringVar(&configFile, "config", "../config/consumer.xml", "config file ")
flag.BoolVar(&initTopic, "init", initTopic, "create topic")
flag.BoolVar(&listTopic, "list", listTopic, "list topic")
flag.StringVar(&delTopic, "del", delTopic, "delete topic") } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) defer func() {
time.Sleep(time.Second)
log4go.Warn("[main] consumer quit over!")
log4go.Global.Close()
}() contents, _ := ioutil.ReadFile(configFile)
xml.Unmarshal(contents, cfg) // sarama的logger
sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags) // 指定kafka版本,一定要支持kafka集群
version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)
if err != nil {
panic(err)
}
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest // 工具
if tool(cfg, config) {
return
} // kafka consumer client
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)
if err != nil {
panic(err)
} consumer := Consumer{}
go func() {
for {
err := client.Consume(ctx, cfg.Consumer.Topic, &consumer)
if err != nil {
log4go.Error("[main] client.Consume error=[%s]", err.Error())
// 5秒后重试
time.Sleep(time.Second * )
}
}
}() // os signal
sigterm := make(chan os.Signal, )
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) <-sigterm cancel()
err = client.Close()
if err != nil {
panic(err)
} log4go.Info("[main] consumer is quiting")
} func tool(cfg *Config, config *sarama.Config) bool {
if initTopic || listTopic || len(delTopic) > {
ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)
if nil != err {
panic(err)
} if len(delTopic) > { // 删除Topic
if err := ca.DeleteTopic(delTopic); nil != err {
panic(err)
}
log4go.Info("delete ok topic=[%s]\n", delTopic)
} else if initTopic { // 初始化Topic
if detail, err := ca.ListTopics(); nil != err {
panic(err)
} else {
for _, v := range cfg.Consumer.Topic {
if d, ok := detail[v]; ok {
if cfg.Consumer.Partition > d.NumPartitions {
if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {
panic(err)
}
log4go.Info("alter topic ok", v, cfg.Consumer.Partition)
} } else {
if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {
panic(err)
}
log4go.Info("create topic ok", v)
}
}
}
} // 显示Topic列表
if detail, err := ca.ListTopics(); nil != err {
log4go.Info("ListTopics error", err)
} else {
for k := range detail {
log4go.Info("[%s] %+v", k, detail[k])
}
} if err := ca.Close(); nil != err {
panic(err)
} return true
}
return false
} type Consumer struct {
} func (consumer *Consumer) Setup(s sarama.ConsumerGroupSession) error {
return nil
} func (consumer *Consumer) Cleanup(s sarama.ConsumerGroupSession) error {
return nil
} func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
key := string(message.Key)
val := string(message.Value)
log4go.Info("%s-%s", key, val)
session.MarkMessage(message, "")
} return nil
}

最新文章

  1. 【Net跨平台第一步】逆天带你零基础Linux入门【更新完毕】
  2. Test,Nginx Hello World Module
  3. 普通委托到泛型委托到Linq
  4. 有关npm rum的3个简洁技巧
  5. 分享一个圆角自定义的漂亮AlertDialog
  6. npoi批量
  7. View not attached to window manager
  8. Linux下SVN配置
  9. Java泛型集合
  10. DependencyInjection源码解读之ServiceProvider
  11. Ubuntu 下超简单的安装指定版本的nodejs
  12. 慢慢啃css
  13. html页面转jsp后 乱码问题。
  14. 2017-11-11 Sa Oct How to open a browser in Python
  15. PHP 浮点型转整型的一个奇怪现象
  16. vue install后出现的问题
  17. &lt;记录&gt;TP5 关联模型使用(嵌套关联、动态排序以及隐藏字段)
  18. 解决UITableView上的cell的重用
  19. 《ASP.NET MVC 5 破境之道》:第一境 ASP.Net MVC5项目初探 — 第二节:MVC5项目结构
  20. IntelliJ IDEA详细配置和使用教程-字体、编码和基本设置

热门文章

  1. ES6_函数方法
  2. javaScript系列 [01]-javaScript函数基础
  3. C# GDI+之Graphics类 z
  4. Netty 中ChannelOption的含义以及使用的场景
  5. WPF双向数据绑定总结
  6. boost::filesystem经常使用使用方法具体解释
  7. easyui combox 手动添加项
  8. Mac下的Chrome或Safari访问跨域设置,MBP上使用模拟器Simulator.app或iphone+Safari调试网页
  9. solr+zookeeper集群配置
  10. 【C++】C++中的基本内置类型