简介:

Apache Kafka 是一个 Scala 语言编写的可扩展、分布式、高性能的容错消息发布、订阅系统。

官网地址:http://kafka.apache.org

中文教程:http://www.orchome.com/kafka/index

下载地址:http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz ( Scala 2.11 is recommended )

Java版本:jdk-8u111-linux-x64.rpm

一、单机部署

shell > vim /etc/hosts

192.168.10.23  zk-node01 kb-node01

# 必须配置,否则无法启动

shell > rpm -ivh jdk-8u111-linux-x64.rpm

shell > java -version
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) -Bit Server VM (build 25.111-b14, mixed mode) shell > cd /usr/local/src; tar zxf kafka_2.-1.0..tgz -C /usr/local shell > cd /usr/local/kafka_2.-1.0.

1、zookeeper

shell > vim config/zookeeper.properties

# 数据目录
dataDir=/data/zookeeper_data
# 监听端口
clientPort=
# 最大连接数 不限制
maxClientCnxns= shell > sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 启动 zookeeper

2、kafka

shell > vim config/server.properties

# 唯一ID
broker.id=
# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092
# 向 Zookeeper 注册的地址,这里如果需要同时内外网访问需要注册 hostname,否则只能注册外网IP地址,会导致所有流量都走外网
advertised.listeners=PLAINTEXT://kb-node01:9092
# 数据目录
log.dirs=/data/kafka_data
# 允许删除topic
delete.topic.enable=true
# 不允许自动创建topic
auto.create.topics.enable=false
# 磁盘IO不足的时候,可以适当调大该值 ( 当内存足够时 )
#log.flush.interval.messages=
#log.flush.interval.ms=
# kafka 数据保留时间 默认 -> 天
log.retention.hours=
# zookeeper
zookeeper.connect=zk-node01: # 其余都使用默认配置 # 这样配置,外网访问 kafka 时,需要配置 hosts ( IP kb-node01 ) 否则提示主机名未知! shell > sh bin/kafka-server-start.sh -daemon config/server.properties # 启动 kafka

二、kafka 指令

1、topic

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --create --topic kafka-test --partitions  --replication-factor
Created topic "kafka-test". shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --list
kafka-test

# 创建一个 topic kafka-test,它有一个分区、一个副本

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --describe --topic kafka-test
Topic:kafka-test PartitionCount: ReplicationFactor: Configs:
Topic: kafka-test Partition: Leader: Replicas: Isr:

# 查看这个 topic 的属性,总共一个分区,一个副本;当前分区为 0,leader 为 broker.id=1 的 broker,
# 副本所在 broker,活跃的 broker ( 需要同步副本的broker )

2、consumer

shell > sh bin/kafka-console-consumer.sh --bootstrap-server kb-node01: --topic kafka-test --from-beginning

# 启动一个消费者,消费 kafka-test 这个 topic,从头读取

3、producer

shell > sh bin/kafka-console-producer.sh --broker-list kb-node01: --topic kafka-test
> hello world

# 重启一个终端,启动一个生产者,输入 hello world,这是消费者终端会看到消息 hello world

4、alter topic

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --alter --topic kafka-test --partitions 

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --describe --topic kafka-test
Topic:kafka-test PartitionCount: ReplicationFactor: Configs:
Topic: kafka-test Partition: Leader: Replicas: Isr:
Topic: kafka-test Partition: Leader: Replicas: Isr:

# 这样就将 topic kafka-test 的分区修改成了 2 个

5、delete topic

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --delete --topic kafka-test

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --list
__consumer_offsets

# 这样就删除了 topic kafka-test了,__consumer_offsets 这个 topic 为系统自动生成,用来存放消费者 offset 信息的。

三、伪分布式部署

shell > sh bin/kafka-server-stop.sh

# 停止原来的 kafka 不停也行...

shell > cp config/server.properties config/server-.properties
shell > cp config/server.properties config/server-.properties
shell > cp config/server.properties config/server-.properties

# 没办法,强迫症

shell > vim config/server-.properties

broker.id=
listeners=PLAINTEXT://0.0.0.0:9091
advertised.listeners=PLAINTEXT://kb-node01:9091
log.dirs=/data/kafka_data- shell > vim config/server-.properties broker.id=
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kb-node01:9092
log.dirs=/data/kafka_data- shell > vim config/server-.properties broker.id=
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kb-node01:9093
log.dirs=/data/kafka_data-

# 这几项不能重复 注意:防火墙要开启相应的端口

shell > sh bin/kafka-server-start.sh -daemon config/server-.properties
shell > sh bin/kafka-server-start.sh -daemon config/server-.properties
shell > sh bin/kafka-server-start.sh -daemon config/server-.properties

# 启动这些 broker

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --create --topic kafka-all --partitions  --replication-factor
Created topic "kafka-all". shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --list
__consumer_offsets
kafka-all

# 新创建的 topic

shell > sh bin/kafka-topics.sh --zookeeper zk-node01: --describe --topic kafka-all
Topic:kafka-all PartitionCount: ReplicationFactor: Configs:
Topic: kafka-all Partition: Leader: Replicas: ,, Isr: ,,
Topic: kafka-all Partition: Leader: Replicas: ,, Isr: ,,
Topic: kafka-all Partition: Leader: Replicas: ,, Isr: ,,
Topic: kafka-all Partition: Leader: Replicas: ,, Isr: ,,
Topic: kafka-all Partition: Leader: Replicas: ,, Isr: ,,
Topic: kafka-all Partition: Leader: Replicas: ,, Isr: ,,

# 一个 6 分区、3 副本的 topic 诞生了,0 分区的 leader 是 3,
# 副本分布在 3、1、2 上,活跃的 broker 为 3、1、2 ( 需要同步副本的broker )

shell > sh bin/kafka-console-producer.sh --broker-list kb-node01:,kb-node01: --topic kafka-all
> hello kafka-all

# 启动一个生产者,注意:只写了两个 broker 向 kafka-all topic 中发送了一条消息

shell > sh bin/kafka-console-consumer.sh --bootstrap-server kb-node01: --topic kafka-all --from-beginning
hello kafka-all

# 启动一个消费者,注意:只写了生产者没填写的 broker,还是消费到了消息

# 说明:broker 不需要全部填写,会自动发现,即使有机器宕机数据也不会丢失!

# 注意:集群部署时,zookeeper 要么为 1,要么为 3,不要是两台,否则其中一台宕机,集群则无法提供服务!

# 部署 zookeeper 集群请参照其余博文

最新文章

  1. tomcat在linux下自启动
  2. centos7安装mplayer的方法
  3. HTML5 之Canvas绘制太阳系
  4. HDU 5742 It's All In The Mind (贪心) 2016杭电多校联合第二场
  5. 分布式拒绝服务攻击(DDoS)原理及防范
  6. 024-ActionResult解说
  7. 6/7 Sprint2 看板和燃尽图
  8. java编程acm基础
  9. 採用Hexo 搭建Team Blog
  10. html5中的postMessage解决跨域问题
  11. C# 6 与 .NET Core 1.0 高级编程 - 38 章 实体框架核心(上)
  12. Clang-Format: Visual Studio Style
  13. 用IDEA生成javadoc文档
  14. JAVA通过注解处理器重构代码,遵循单一职责
  15. Springboot+ActiveMQ(ActiveMQ消息持久化,保证JMS的可靠性,消费者幂等性)
  16. Dubbo x Cloud Native 服务架构长文总结(很全)
  17. Loadrunner脚本开发-基于HTTP协议的流媒体视频在线播放服务器性能测试
  18. python -u 启动python文件的作用,PYTHONUNBUFFERED环境变量的作用
  19. (转)2018几大主流的UI/JS框架——前端框架 [Vue.js(目前市场上的主流)]
  20. Docker学习笔记二 使用镜像

热门文章

  1. MLCC Y5V 和 X7R 电容记录
  2. pysonar
  3. 后台取IE的相关信息
  4. GitHub10岁之际HanLP自然语言处理包用户量跃居榜首
  5. 【Oracle学习笔记-5--】集合操作之union,intersect和minus操作
  6. bzoj2002 弹飞绵羊
  7. -Java-Runoob-高级教程-实例-数组:09. Java 实例 – 数组扩容
  8. 杂项-Java:jar 包与 war 包介绍与区别
  9. ZooKeeper 集群的安装、配置---Dubbo 注册中心
  10. 使用XML-RPC进行远程文件共享