针对golang的 kafka client 有很多开源package,例如sarama, confluent等等。在使用sarama 包时,高并发中偶尔遇到crash。于是改用confluent-kafka-go,其简单易用,并且表现稳定。

本文主要介绍confluent-kafka-go的使用方法。

confluent-kafka-go,是kafka官网推荐的golang package。

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

编译环境搭建

安装librdkafka

下载

$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka

配置、编译、安装

$ ./configure --prefix /usr
$ make
$ sudo make install

配置PKG_CONFIG_PATH

在文件~/.bashrc 末尾添加

export PKG_CONFIG_PATH=/usr/lib/pkgconfig

下载go client

$ go get -u github.com/confluentinc/confluent-kafka-go/kafka

自动下载到GOPATH目录下,也可到github上自行下载,然后放到GOPATH中。

Example

// Example function-based Apache Kafka producer
package main /**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
) func main() { if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
os.Args[0])
os.Exit(1)
} broker := os.Args[1]
topic := os.Args[2] p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
} fmt.Printf("Created Producer %v\n", p) // Optional delivery channel, if not specified the Producer object's
// .Events channel is used.
deliveryChan := make(chan kafka.Event) value := "Hello Go!"
err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan) e := <-deliveryChan
m := e.(*kafka.Message) if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
} close(deliveryChan)
}

注意:

如果需要链接静态库,可删除/usr/lib/下面关于rdkafka的动态库文件(.so文件)。然后,go build编译时加上选项 –tags static

例如:

go build -tags static produer.go

更多example,可参考

https://github.com/confluentinc/confluent-kafka-go/tree/master/examples

参考

https://github.com/confluentinc/confluent-kafka-go

最新文章

  1. Hibernate 系列 07 - Hibernate中Java对象的三种状态
  2. windows8建立局域网的方法
  3. javascript定时函数
  4. Jenkins_获取源码编译并启动服务(一)
  5. eclipse内存溢出报错:java.lang.OutOfMemoryError:Java heap space
  6. Klist
  7. 【WEB-INF】WEB-INF是Java的WEB应用的安全目录
  8. reactjs 入门
  9. hibernate generator class=&quot;&quot; id详解
  10. 2D丛林逃生
  11. Object-c学习之路八(NSArray(数组)遍历和排序)
  12. HTML5基本知识点
  13. cookieUtil
  14. Java课后练习
  15. 支持向量机(SVM)举例
  16. SVN:This client is too old to work with working copy…解决的方法
  17. opencv的安装及填坑
  18. 1014 2018 使用FLAG counter
  19. python sys.stdin、sys.stdout和sys.stderr
  20. DevExpress v17.2新版亮点—DevExtreme篇(三)

热门文章

  1. maven分别打包开发、生产配置文件
  2. PHP:第四章——PHP数组array_diff计算数组差集
  3. UVALIve 5987 素数
  4. css控制编辑器内容自动换行
  5. Sqoop2安装
  6. DevExpress v18.1最新版帮助文档下载大全
  7. JSONField解决序列化与反序列化字段匹配问题
  8. avalonJS-源码阅读(3) VMODEL
  9. idea中看不到项目结构该怎么办
  10. Ajax请求数据的两种方式