一、使用Kfaka自带的zookeeper服务。

1、下载Kafka,下载地址:

http://kafka.apache.org/downloads

2、区分Kfaka版本:

  Scala 2.12 - kafka_2.12-2.2.1.tgzascsha512

Kadka后面的 2.12是对应的scala版本,2.2.1是kafka版本。

3、安装

#tar zxvf kafka_2.-2.1..tgz -C /data/kafka/
#ln -s kafka_2.12-2.1.0 kafka
#cd /data/kafka

4、启动服务

启动zookeeper服务
#./bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka服务
#./bin/kafka-server-start.sh config/server.properties

5、查看kafka是否启动成功,kafka默认启动在9092端口

[root@test kafka ::]#netstat -anltp|grep
tcp 0.0.0.0: 0.0.0.0:* LISTEN /java
tcp 127.0.1.1: 127.0.0.1: ESTABLISHED /java
tcp 127.0.0.1: 127.0.1.1: ESTABLISHED /java
tcp 127.0.1.1: 127.0.0.1: ESTABLISHED /java
tcp 127.0.0.1: 127.0.1.1: ESTABLISHED /java
tcp 127.0.1.1: 127.0.0.1: ESTABLISHED /java
tcp 127.0.0.1: 127.0.1.1: ESTABLISHED /java
tcp 127.0.0.1: 127.0.1.1: CLOSE_WAIT /java
tcp 127.0.0.1: 127.0.1.1: ESTABLISHED /java
tcp 127.0.1.1: 127.0.0.1: ESTABLISHED /java

6、创建topic主题

1>>>  创建topic
#./bin/kafka-topics.sh --create --zookeeper localhost: --replication-factor --partitions --topic yjt
Created topic "yjt".
参数解释:
--create:创建。
--zookeeper: 指定zookeeper服务器,这里使用的是kafka自带的zookeeper服务器,所以使用localhost+端口的方式,如果是自己配置的zookeeper服务,这里需要使用主机+端口的方式。
--replication-factor:指定副本数量,主要是为了数据的冗余。
--partitions:指定主题的分区数量。
--topic:指定主题的名字,这里是yjt。 2>>> 查看当前以及存在的主题:

#./bin/kafka-topics.sh --list --zookeeper localhost:2181    #这里使用了--list选项。列出服务器上存在的topic。

__consumer_offsets
  connect-test
  my-topic
  test
  yjt

 3>>> 查看某一个topic的信息。

#./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic yjt   #这里使用了--describe选项。
  Topic:yjt PartitionCount:1 ReplicationFactor:1 Configs:
  Topic: yjt Partition: 0 Leader: 0 Replicas: 0 Isr: 0

解释:
第一行显示的是所有分区的摘要,其次,每一行显示一个分区,这里分区为1,所有只显示了一行(既第二行):
Topic:主题名字
PartitionCount:当前主题所有分区数量,刚刚在创建主题的时候,只指定了一个分区,所以,这里显示1。
ReplicationFactor:副本数量。同上。
第二行:
Leader:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的(注:针对于分区,也就是说,每一个分区都有一个leader,并且leader可能都不一样)。
Replicas:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
Isr(a set of in-sync replicas)也叫同步状态的副本集合:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。

7、发布一些消息到yjt这个topic上

#./bin/kafka-console-producer.sh --broker-list localhost: --topic yjt   #使用--broker-list指定kafaka服务器的地址,这里的9092是kafka默认的端口,如果在配置文件里面有改动,这里修改成对应的端口就行。
>My name is yjt.
>I am xxx years old.

8、消费上面发布的消息。

#./bin/kafka-console-consumer.sh --bootstrap-server localhost: --topic yjt --from-beginning #使用--bootstrap-server指定服务器,同时使用--from-beginning选项指定从头开始读取。
My name is yjt.
I am xxx years old.

9、如何在一台服务器上面配置多个kafka实例?也就是创建多个broker(broker可以理解为Kafka Server)。这里在创建两个broker。

  1)、复制配置文件,修改配置文件。

  复制配置文件:

#cp config/server.properties config/server-.properties
#cp config/server.properties config/server-.properties

  修改配置文件:

config/server-.properties:
broker.id=         # broker Id,全局唯一,不能重复。
listeners=PLAINTEXT://:9093 #监听地址,也可以是listeners=PLAINTEXT://主机或者IP:9093这种方式。
log.dir=/tmp/kafka-logs- #日志文件
config/server-.properties: 
  broker.id=
  listeners=PLAINTEXT://:9094
  log.dir=/tmp/kafka-logs-

10、启动多实例的broker。

#./bin/kafka-server-start.sh config/server-.properties &  #以后台启动,但是好像没什么卵用,控制台还是啪啪啪的输出一大堆信息,害的我开了好几个终端。
#./bin/kafka-server-start.sh config/server-.properties & > /dev/null 2>&1 #这种方式启动好像也是一样,不过没试nohup。

启动多实例以后就可以在创建主题的时候指定多个分区了,当然,单个实例也可以启动多个分区。然而副本数量一定要<=broker数量。

例如创建四个分区的topic

#./bin/kafka-topics.sh --create --zookeeper localhost: --replication-factor  --partitions  --topic yjt1
#./bin/kafka-topics.sh --describe --zookeeper localhost: --topic yjt1
Topic:yjt1 PartitionCount: ReplicationFactor: Configs:
Topic: yjt1 Partition: Leader: Replicas: Isr:
Topic: yjt1 Partition: Leader: Replicas: Isr:
Topic: yjt1 Partition: Leader: Replicas: Isr:
Topic: yjt1 Partition: Leader: Replicas: Isr: 2
上述信息具体参考上面的解释。

11、使用Kafka Connect 来导入导出数据。

  从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。

  kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。

1)、首先,创建一些种子数据用来测试。

#pwd
/data/kafka/kafka
#echo -e "foo\nbar" > test.txt

2)、接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。首先是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需的任何其他配置。

#./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

kafka附带了这些示例的配置文件,并且使用了刚才我们搭建的本地集群配置并创建了2个连接器:第一个是源连接器,从输入文件中读取并发布到Kafka主题中,第二个是接收连接器,从kafka主题读取消息输出到外部文件。

在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始。流程如下:如下是默认值,如果想要改变,请修改(config/connect-file-source.properties config/connect-file-sink.properties这两个文件)

1>>> 导入连接器应该读取从

test.txt

2>>> 创建和写入到topic:(默认情况下,当主题不存在的时候,可以自动创建)

connect-test

3>>> 从主题导导出连接器

connect-test

4>>> 写入到文件

test.sink.txt

3)查看文件信息:

#cat test.sink.txt
foo
bar

4)消费信息

#./bin/kafka-console-consumer.sh --bootstrap-server localhost: --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

二、使用外置的zookeeper服务。

外置的zookeeper服务搭建过程就不说了,只是在搭建过程中需要注意zookeeper服务器的奇数性(数量为奇数),和myid文件的配置。

zookeeper配置文件信息如下:

# egrep -v "(^#|^$)" zoo.cfg
tickTime=
initLimit=
syncLimit=
dataDir=/data1/hadoop/data/zookeeper/data
dataLogDir=/data1/hadoop/data/zookeeper/logs
clientPort=
server.=master::
server.=slave1::
server.=slave2::

解释:

tickTime:zookeeper客户端连接服务端心跳超时时间,默认是2s。
initLimit:zookeeper  flower启动初始化,从leader同步数据的超时时间,这个时间是tickTime的倍数,既这里是2*2000
syncLimit:在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果L发出心跳包在syncLimit之后,还没有从Flower那里收到响应,那么就认为这个Flower已经不在线了。注意:不要把这个参数设置得过大,否则可能会掩盖一些问题。

配置了外置的zookeeper以后,在启动Kafka集群时,需要在server的配置文件里面添加连接信息,例如
#sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=master:2181,slave1:2181,slave2:2181/g" config/server.properties

三、完全分布式部署

所谓的完全分布式模式,就是在每一台服务器上启动一个实例。

例如:

我有master、slave1、slave2三台机器,这三台机器已经安装好zookeeper,免密等操作。

在每一台服务器配置server文件

config/server.properties:
  broker.id=0 #必须修改这个值,全局唯一。其他的值像端口,日志存储等,都可以使用默认的配置,像上面的在一台服务器启动多个实例的时候,需要修改下面的两个配置(端口和日志路径,防止冲突),但是在完全分布式模式下,也可以不用修改。
  listeners=PLAINTEXT://:9092
  log.dir=/tmp/kafka-logs
host.name=master #主机名,每个server都不同,需要修改。

当然,可以自己编写服务的启动脚本,配置环境变量,编写拷贝安装包到其他节点的脚本。这里推荐使用pssh或者pdsh,当然也可以使用大型的自动化运维工具,像puppt、ansible、saltstack等工具。需要免密。

四、配置文件参数

1、server.properties

//当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=
//当前kafka对外提供服务的端口默认是9092
port=
//这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
host.name=master
//这个是borker进行网络处理的线程数
num.network.threads=
//这个是borker进行I/O处理的线程数
num.io.threads=
//发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=
//kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=
//这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=
//消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,
//如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/tmp/kafka-logs
//默认的分区数,一个topic默认1个分区数
num.partitions=
//每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=
//默认消息的最大持久化时间,168小时,7天
log.retention.hours=
//这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=
//每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=
//是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
//设置zookeeper的连接端口
zookeeper.connect=master:,slave1:,slave2:
//设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=

2、producer.properties

metadata.broker.list=master:9092,slave1:9092,slave2:9092

3、consumer.properties

zookeeper.connect=master:2181,slave1:2181,slave2:2181
 

推荐一些关于Kafka的博客:

https://www.orchome.com/6

https://www.cnblogs.com/qingyunzong/p/9004509.html(如果英文不好,该博客可以很好的学习kafka)

当然,要是可以看懂英文文档,直接浏览官网学习是最好的。

最新文章

  1. swift 学习笔记
  2. Unity MonoDevelop一打开未响应
  3. MongoDB,客户端工具备份数据库
  4. 【前端也要学点算法】 归并排序的JavaScript实现
  5. 跟着百度学PHP[4]OOP面对对象编程-13-魔术方法__set(),__get(),__isset(),__unset()
  6. 使用Yii框架自带的CActiveForm实现ajax提交表单
  7. 01-08-01【Nhibernate (版本3.3.1.4000) 出入江湖】NHibernate中的三种状态
  8. wpf之Popup弹出自定义输入&quot;键盘&quot;
  9. thrift概述
  10. Spring自动化装配bean
  11. CentOS上安装GitBlit服务
  12. mac上如何解压和压缩rar文件
  13. Git通过密钥对远程仓库上传和更新详细操作
  14. 180714、JRebel插件安装配置与破解激活(多方案)详细教程
  15. u-boot的内存分布
  16. 爬取ofo共享单车信息
  17. Customizing docker
  18. screen命令记录
  19. Git初级使用教程
  20. Android中的Surface, SurfaceHolder, SurfaceHolder.Callback, SurfaceView

热门文章

  1. php上传文件报错以及对应代号信息-转载http://jewel-m.iteye.com/blog/1210344
  2. vue的data里面的值是数组时,在更改其某一项的时候,怎么触发视图的重新渲染?
  3. Yarn介绍(设计理念与基本架构)
  4. Python——hashlib(加密模块)
  5. Win10开启蓝屏信息记录及文件查看位置的方法
  6. 记录java+testng运行selenium(一)
  7. Bash基础——printf
  8. C++——调用优化
  9. Django:ContentType组件
  10. HDU-5728-PowMod-求phi(i*n)前缀和+指数循环节