1. 配置zookeeper单机模式

选择的是zookeeper-3.4.10版本,官网下载链接:http://mirrors.hust.edu.cn/apache/zookeeper/stable/。下载之后将zookeeper-3.4.10.tar.gz解压到指定文件夹。

tar xvzf dl/zookeeper-3.4..tar.gz -C app/

解压之后,将conf/zoo_sample.cfg复制为conf/zoo.cfg。

cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

然后编辑zoo.cfg,修改内容如下:

# /home/jason是我的用户主目录
dataDir=/home/jason/app/zookeeper/data

其他参数保持默认,若有特殊需求,也可进行修改。配置好后,手动创建dataDir目录。

mkdir /home/jason/app/zookeeper/data

若系统没有配置java环境变量,则需要在$ZOOKEEPER/bin/zkEnv.sh脚本中配置JAVA_HOME。在zkEnv.sh脚本的JAVA_HOME变量前面加上:

JAVA_HOME="/home/jason/app/jdk"

配置好后就可以启动zookeeper了:

./bin/zkServer.sh start

启动成功的日志如下:

ZooKeeper JMX enabled by default
Using config: /home/jason/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

需要结束服务时,仍在本目录下,执行如下命令即可:

./bin/zkServer.sh stop

如果需要自定义zookeeper的JVM配置,可以修改zkServer.sh脚本的JVMFLAGS参数,例如:

JVMFLAGS="-Xms128m -Xmx256m -Xmn64m -XX:PermSize=65m $JVMFLAGS"

修改后重启zookeeper,使用jmap -heap <pid>查看配置是否生效。

2.配置kafka单机模式

选择的是kafka_2.12-0.10.2.0版本,官网下载链接:http://mirrors.hust.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz 。下载之后将kafka_2.12-0.10.2.0.tgz解压到指定文件夹。

tar xvzf dl/kafka_2.12-0.10.2.0.tgz -C app/

配置kafka时需要修改2个配置文件:server.properties、zookeeper.properties,其他配置文件保持默认选项。

2.1. 配置server.properties

需要配置4项,其余保持默认值,如果kafka是部署在IAAS平台的话,后三个参数一定要配置,否则kafka客户端无法通过IAAS公网地址找到broker:

log.dirs=/home/jason/app/kafka/kafka-logs
port=9092
host.name=阿里云或腾讯云内网地址
advertised.host.name=阿里云或腾讯云外网映射地址

然后手动创建log.dirs目录:

mkdir /home/jason/app/kafka/kafka-logs

2.2. 配置zookeeper.properties

只需要配置一项,其余保持默认值:

dataDir=/home/jason/app/kafka/zookeeper/data

然后手动创建dataDir目录:

mkdir /home/jason/app/kafka/zookeeper/data

至此环境配置已经完成,然后可以开始使用kafka了。

3. 使用kafka

若系统没有配置java环境变量,则需要在$KAFKA/bin/kafka-run-class.sh脚本中配置JAVA_HOME。在kafka-run-class.sh脚本的JAVA_HOME变量前面加上:

JAVA_HOME="/home/jason/app/jdk"

3.1. 启动kafka服务(前提是要先启动zookeeper)

./bin/kafka-server-start.sh config/server.properties

对于这种启动之后就可以忽略的服务,可以在最前面加上nohup,让其在后台自己运行,或者:

/bin/kafka-server-start.sh -daemon config/server.properties

需要结束服务时,仍在本目录下,执行如下命令即可:

./bin/kafka-server-stop.sh

3.2. 创建话题topic

新开一个命令行窗口,创建一个叫做test的topic:

./bin/kafka-topics.sh --create --zookeeper localhost: --replication-factor  --partitions  --topic test

然后可以根据地址和端口号将话题topic展示出来:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

如果topic分区数需要扩展,可以通过下面命令:

./bin/kafka-topics.sh --zookeeper localhost:  --alter --topic test --partitions 

查看kafka特定topic的详情,使用--topic与--describe参数:

./bin/kafka-topics.sh --zookeeper localhost: --topic test --describe

删除指定Topic:

./bin/kafka-topics.sh --delete --zookeeper localhost: --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Note中指出该Topic并没有真正的删除,如果真删除,需要把server.properties中的delete.topic.enable置为true

3.3. 启动生产者producer

新开一个命令行窗口,执行以下命令:

./bin/kafka-console-producer.sh --broker-list localhost: --topic test

然后可以之间在本窗口输入消息,每遇到换行符就认为是一条消息输入完成。

3.4. 启动消费者consumer

./bin/kafka-console-consumer.sh --bootstrap-server localhost: --topic test --from-beginning

然后,每在producer那里输入一条,consumer这里就会显示一条,然后kafka服务那里也会产生日志记录。

在以后启动时,只需要依次启动kafka server,producer和consumer就可以了。

查看consumer group列表,使用--list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:

./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost: --list
test
./bin/kafka-consumer-groups.sh --zookeeper localhost: --list
console-consumer-
console-consumer-

查看特定consumer group 详情,使用--group与--describe参数

同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:

./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost: --group test --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
lx_test test kafka-python-1.3.1_/127.0.0.1
./bin/kafka-consumer-groups.sh --zookeeper localhost: --group console-consumer- --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
Could not fetch offset from zookeeper for group console-consumer- partition [lx_test_topic,] due to missing offset data in zookeeper.
console-consumer- test unknown unknown console-consumer-11967_aws-lx--d3a91f05-

其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)

上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。

最新文章

  1. mongodb高级应用
  2. C# 构建动态树
  3. Linux下lampp详解 (转)
  4. 4s前置摄像头调用
  5. gre网络细节
  6. 苹果 iOS 8 新固件新功能特性总结汇总 (苹果 iPhone/iPad 最新移动操作系统)
  7. 运用@media实现网页自适应中的几个关键分辨率
  8. Simple Shopping Cart By AngularJS
  9. hdu 2202 最大三角形_凸包模板
  10. 纯CSS写正方形自适应宽高,且左侧高与正方形高保持一致
  11. JavaScript笔记1———js的数据类型
  12. CRM项目自定义的知识点
  13. 【JS面试向】选择排序、桶排序、冒泡排序和快速排序简介
  14. Delphi中DLL初始化和退出处理
  15. 跨域请求之jsonp的实现方式
  16. 使用JS与jQuery实现文字逐渐出现特效
  17. java同一个类中,构造器如何调用另一个重载的构造器?
  18. CStringArray序列化处理
  19. Tkinter place() 方法
  20. IO模型《四》多路复用IO

热门文章

  1. 使用 maven 创建 java web 工程
  2. 分布式时序数据库InfluxDB
  3. 牛客Wannafly挑战赛11E 白兔的刁难
  4. [HAOI2009]逆序对数列(加强)
  5. Ubuntu16.04(Linux)安装JDK
  6. mysql备份整个数据库的表结构和数据
  7. 浏览器根对象window之窗体和工具条
  8. 131.005 Unsupervised Learning - Cluster | 非监督学习 - 聚类
  9. org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Catalina].StandardHost[localhost].StandardContext[/JJ]]
  10. Navicat for mysql 导出导入的问题