1下载安装包

2解压安装包

 tar -zxvf  kafka_2.-0.9.0.1.tgz

 mv kafka_2.-0.9.0.1 kafka

3修改配置文件

cp server.properties  server.properties.bak

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker.
broker.id= zookeeper.connect=mini1:,mini2:,mini3:
############################# Socket Server Settings ############################# #listeners=PLAINTEXT://:9092 # The port the socket server listens on
port= # Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=mini1 # Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=192.168.74.100 # The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port= # The number of threads handling network requests
num.network.threads= # The number of threads doing disk I/O
num.io.threads= # The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes= # The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes= # The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes= ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files
log.dirs=/apps/logs/kafka # The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions= # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir= ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# . Durability: Unflushed data may be lost if you are not using replication.
# . Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# . Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages= # The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms= ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log. # The minimum age of a log file to be eligible for deletion
log.retention.hours= # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes= # The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes= # The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms= ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes. # Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=

/etc/profile

export KAFKA_HOME=/root/apps/kafka
export PATH=${KAFKA_HOME}/bin:$PATH

4分发安装包

 scp -r $PWD mini2:$PWD
scp -r $PWD mini3:$PWD

修改 mini2上的配置文件

server.properties  broker.id=1  host.name=mini2 advertised.host.name=192.168.74.101

修改 mini3上的配置文件

server.properties  broker.id=2  host.name=mini3  advertised.host.name=192.168.74.103

5启动集群

在/root/apps/kafka/bin目录下

./kafka-server-start.sh /root/apps/kafka/config/server.properties

6查看集群

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

在mini1的/root/apps/kafka目录下

1 创建topic

bin/kafka-topics.sh --create --zookeeper mini1: --replication-factor  --partitions  --topic test0225

2 生产者生产数据

 bin/kafka-console-producer.sh --broker-list mini1: --topic test0225

在mini3的/root/apps/kafka目录下

3消费者消费

 bin/kafka-console-consumer.sh --zookeeper mini1: --from-beginning --topic test0225    

最新文章

  1. sprintf、fprintf和printf这三个函数
  2. sdutoj 2623 The number of steps
  3. java数组获取最值
  4. HW4.24
  5. Linux守护进程详解(init.d和xinetd)
  6. 传值 UI考试知识点
  7. Dedecms v5.7包含上传漏洞利用
  8. RESTful最佳实践之基于 jersey 的增删改查
  9. C语言高效位操作
  10. Myexclipse创建Junit测试
  11. SNMP相关的RFC建议和链接
  12. Shell获取时间,日期,上月,当月,下月
  13. 关于select的id以及value传给后台的问题
  14. Kafka的Log存储解析
  15. 使用react-navigation时报错:undefined is not an object (evaluating rngesturehandlermodule.direction)
  16. Linux:rm:du命令
  17. ubuntu下安装.deb包的安装方法
  18. Numpy中扁平化函数ravel()和flatten()的区别
  19. Robot Framework_Ride(Run标签)
  20. 自定义jsonp请求数据

热门文章

  1. prerender-spa-plugin预处理vue项目实践
  2. codeforces727C(交互)
  3. 一行JS搞定快速关机
  4. TLS、SSL、HTTPS以及证书
  5. 关于&0xF0的一些认识
  6. 再用python写一个文本处理的东东
  7. sql中保留2位小数
  8. mybatis 部署日志
  9. sping IOC的设计原理和高级特性
  10. Python基础篇 -- 列表