Kakfa分布式集群搭建

本位以最新版本kafka_2.11-0.10.1.0版本讲述分布式kafka集群环境的搭建过程。服务器列表:

1
2
3
172.31.10.1
172.31.10.2
172.31.10.3

1.下载kafka安装包

登录kafka官网http://kafka.apache.org/,

  • 单击左侧“Download”按钮
  • 选择对应的版本,版本2.11代表scala版本(kafka是由scala编写的),0.10.1.0代表kafka的版本
  • 在弹出的窗口中选择下载链接即可

2.下载zookeeper安装包

kafka整体架构如下:

而kafka集群通常会依赖zookeeper的命名服务,单机版的可以直接用kafka安装包的zookeeper,而通常生产环境为保证命名服务的可用性,一般会单独搭建zookeeper集群。服务器不足可以直接和kafka broker共用服务器,zookeeper命名服务队资源要求不高。

登录zookeeper官网http://www.apache.org/dyn/closer.cgi/zookeeper/,一路选择download下载即可,本文选择稳定版zookeeper-3.4.8

3.安装zookeeper集群

将安装包zookeeper-3.4.8.tar上传至服务器172.31.10.1,

  • 解压,目录/opt/zookeeper/zookeeper-3.4.8

    1
    tar -zxvf zookeeper-3.4.8.tar
  • 配置,切换到conf目录,并更改dataDir和server.x

    1
    2
    cd /opt/zookeeper/zookeeper-3.4.8/conf
    mv zoo_sample.cfg zoo.cfg

    更改后的zoo.cfg配置如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=/var/logs/data/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    server.1=172.31.10.1:2888:3888
    server.2=172.31.10.2:2888:3888
    server.3=172.31.10.3:2888:3888
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1

    其中dataDir为zookeeper目录,server.x为zookeeper服务器列表的地址和通信端口

  • 远程复制到其他两台服务器,并在dataDir目录下创建myid文件,内容为server.x中的数字。本文设置如下:
1
2
3
4
5
6
7
8
9
10
11
#172.31.10.1执行
cd /var/logs/data/zookeeper
echo "1" >  /var/logs/data/zookeeper/myid
 
#172.31.10.2执行
cd /var/logs/data/zookeeper
echo "2" >  /var/logs/data/zookeeper/myid
 
#172.31.10.3执行
cd /var/logs/data/zookeeper
echo "3" >  /var/logs/data/zookeeper/myid
  • 启动zookeeper集群和验证
1
2
3
4
5
6
7
#在每台服务器上启动zookeeper
cd /opt/zookeeper/zookeeper-3.4.8/bin
/opt/zookeeper/zookeeper-3.4.8/bin/zkServer.sh start
 
#查看服务器上zookeeper节点角色
cd /opt/zookeeper/zookeeper-3.4.8/bin
/opt/zookeeper/zookeeper-3.4.8/bin/zkServer.sh status

4.安装kafka集群

  • 解压,到/opt/kafka/kafka_2.11-0.10.1.0
1
2
tar -zxvf kafka_2.11-0.10.1.0.tgz
cd /opt/kafka/kafka_2.11-0.10.1.0
  • 更改conf/server.properties配置,主要是更改如下几项:
  • 1
    2
    3
    4
    broker.id=1
    host.name=172.31.10.1
    log.dirs=/var/logs/data/kafka
    zookeeper.connect=172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka

  注意每台服务器上的broker.id均不同,需要保证整个集群中唯一性

  更改后的server.properties如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
############################# Server Basics #############################
 
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
 
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=172.31.10.1
 
# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true
 
############################# Socket Server Settings #############################
 
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
 
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
 
# The number of threads handling network requests
num.network.threads=3
 
# The number of threads doing disk I/O
num.io.threads=8
 
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
 
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
 
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
 
 
############################# Log Basics #############################
 
# A comma seperated list of directories under which to store log files
log.dirs=/var/logs/data/kafka
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
 
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
 
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
 
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
 
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
 
############################# Zookeeper #############################
 
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka
 
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
  • 同步到其他服务器,更改broker.id
    • kafka启动和验证

      1
      2
      cd /opt/kafka/kafka_2.11-0.10.1.0/bin
      nohup /opt/kafka/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh config/server.properties &

      创建topic,如能成功创建topic则表示集群安装完成,也可以用jps命令查看kafka进程是否存在。

      1
      /opt/kafka/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper 172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka --replication-factor 3 --partitions 1 --topic test

      至此,kafka分布式集群安装完成,后续将深入讲解kafka其他内容。

最新文章

  1. Linux C 信号 pause、sigsuspend 的相同与区别
  2. SqlServer2008安装时提示重启计算机失败 解决办法
  3. jQuery的.bind()、.live()和.delegate()的区别
  4. Three.js typescript definitely typed 文件
  5. 百度UEditor基本使用
  6. curl operate elasticsearch
  7. 北京程序员 VS 硅谷程序员(转)
  8. paip.最好的脚本语言node js 环境搭建连接mysql
  9. Spark 0.9的安装配置
  10. 从VSS到SVN再到Git 记Git的基本操作
  11. JAVASCRIPT——文字出现效果练习
  12. 支付顺序-->微信支付到公司账户-->待出票
  13. https post
  14. Quartz.NET总结(六)了解Jobs 和 Triggers
  15. Shiro权限模型以及权限分配的两种方式
  16. C#中周,月,第几周,周开始结束时间de方法总结
  17. URAL-1039 Anniversary Party---树形DP入门题
  18. 架构师必须搞懂DNS【转】
  19. file_get_content服务器对服务器二进制文件上传
  20. Anaconda 安装和配置

热门文章

  1. Windows server 共享文件夹权限设置
  2. Android IOS WebRTC 音视频开发总结(四八)-- 从商业和技术的角度看视频行业的机会
  3. 新手浅谈C#Task异步编程
  4. 用于MySql的SqlHelper
  5. ArcMap中用VBA读度矢量图层信息
  6. jquery 入门之-------jquery 简介
  7. CentOS学习笔记--防火墙iptables
  8. CentOS6.X安装vsftpd服务
  9. 使用C#中JavaScriptSerializer类将对象转换为Json格式数据
  10. 2013/8/28 JS+HTML 三级省市区联动