Kafka是什么

Kafka最初是由LinkedIn公司采用Scala语言开发的一个分布式、多分区、多副本且基于ZooKeeper协调的内部基础设置,现已捐献给Apache基金会。Kafka是一个流平台,主要用来发布和订阅数据流,是流式数据处理的利器。Kafka用于构建实时数据管道和流应用程序,具有水平可伸缩性,容错性,快速性。Kafka可以处理消费者的网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的,这是一个可行的解决方案。Kafka的目的是通过并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka主要特点

主题和日志

主题是Kafka提供的核心抽象。主题是将信息记录到的某个类别或订阅源名称。Kafka中的主题始终是多用户的,也就是说,一个主题可以有零个,一个或多个消费者来订阅写入该主题的数据。

对于每个主题,Kafka集群都会维护一个分区。每个分区都是由有序的不变的记录序列构成,这些记录连续地“append”到“commit log”中。每个分区中的记录都分配有一个称为“偏移量”的顺序ID,该ID唯一地标识分区中的每个记录。

机器:8核2Ghz,16G内存,千兆网卡,千兆网络

RabbitMQ、ActiveMQ和Kafka对比

生产者测试,总共发布1000万条消息,每条消息200字节。

消费者测试,总共获取1000万条消息。

Kafka安装配置

安装并运行

下载地址

https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

$ wget https://mirror.bit.edu.cn/apache/kafka/2.5./kafka_2.-2.5..tgz
配置

单机伪分布式,复制三份server.properites,但需要修改端口、broker.id、log.dirs。broker.id 不能重复,端口号如果在不同的服务器上可以重复

server.properties

# Kafka broker节点唯一标识
borker.id=
# 端口号(伪分布式不能冲突)
port=
# 对客户端提供的服务器地址和端口
advertised.listener=PLAINTEXT://192.168.56.105:
# Kafka日志存分路径
log.dirs=/home/hadoop/kafka/broker-

server1.properties

# Kafka broker节点唯一标识

borker.id=1

# 端口号(伪分布式不能冲突)

port=9093

# 对客户端提供的服务器地址和端口

advertised.listener=PLAINTEXT://192.168.56.105:9093

# Kafka日志存分路径

log.dirs=/home/hadoop/kafka/broker-1

server2.properties

# Kafka broker节点唯一标识

borker.id=2

# 端口号(伪分布式不能冲突)

port=9094

# 对客户端提供的服务器地址和端口

advertised.listener=PLAINTEXT://192.168.56.105:9094

# Kafka日志存分路径

log.dirs=/home/hadoop/kafka/broker-2

运行

运行(三种方式各选其一),这里注意路径。.sh文件在kafka的bin文件夹下面,properties文件在kafka的config文件夹下。

​$ kafka-server-start.sh server.properties &
$ kafka-server-start.sh server.properties1 &
$ kafka-server-start.sh server.properties2 &

配置参数

参数名

默认值

描述

broker.id

-

该服务器的broker ID。如未设置,将生成一个唯一的broker ID。为避免Zookeeper生成的broker ID与用户配置的broker ID之间发生冲突,生成的broker ID从reserved.broker.max.id +1开始。

log.dirs

-

保存日志数据的目录

zookeeper.connect

-

Kafka连接Zookeeper的地址字符串

advertised.host.name

-

仅在未设置advertised.listeners或listeners时使用。使用advertised.listeners替代。

num.network.threads

3

服务器用于接收请求并且发送响应的线程数

num.io.threads

8

服务器用于处理请求的线程数,其中可能包括磁盘IO

socket.send.buffer.bytes

102400

发送缓冲区大小。不是有数据就马上发送,而是先存储到缓冲区了等到达一定的大小后再发送,以能提高性能

socket.receive.buffer.bytes

102400

接收缓冲区大小,当数据到达一定大小后再序列化到磁盘

socket.request.max.bytes

104857600

请求的最大字节数。这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数

num.partitions

1

每个主题默认的分区数

num.recovery.threads.per.data.dir

1

每个数据目录在启动时用于日志恢复以及在关机时用于刷新的线程数

offset.metadata.max.bytes

4096

与提交偏移量关联的元数据最大字节数

offsets.commit.required.acks

-1

提交之前所需的确认写入成功的副本数

offsets.commit.timeout.ms

5000

偏移量提交超时时间

offsets.topic.num.partitions

50

偏移主题的分区数

offsets.topic.replication.factor

3

偏移量主题的复制因子(设置较高以确保可用性)

transaction.state.log.replication.factor

3

事务主题的复制因子(设置较高以确保可用性)

transaction.state.log.min.isr

2

事务主题的min.insync.replicas配置。

log.flush.interval.messages

非常大

将消息刷新到磁盘之前,在日志分区上累积的消息数

log.flush.interval.ms

null

刷新到磁盘之前,主题中的消息在内存中保留的最长时间(以毫秒为单位)。如果未设置,则使用log.flush.scheduler.interval.ms中的值

log.retention.hours

168

删除日志文件之前保留日志文件的小时数。此属性级别高于log.retention.ms

log.retention.check.interval.ms

300000

日志清除器根据保留策略检查日志是否可以将其删除的时间间隔(以毫秒为单位)

log.segment.bytes

1073741824

单个日志文件的最大大小。topic的分区下的所有日志会进行分段,达到该大小,会滚动出新的日志文件

group.initial.rebalance.delay.ms

3000

coordinator在执行rebalance之前等待更多消费者加入新组的时间。

min.insync.replicas

1

确认写入成功的最小副本数。

auto.create.topics.enable

true

在服务器上启用主题自动创建。

compression.type

producer

主题的压缩类型。此配置接受标准压缩编解码器“ gzip”,“ snappy”,“ lz4”和“ zstd”。“uncompressed”表示不进行压缩,而“producer”表示保留生产者设置的压缩编解码器。

delete.topic.enable

true

如果关闭此配置,则通过管理工具删除主题将无效

基本命令使用

创建:

# 创建一个拥有1个分区3个副本的topic
$ kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

修改:

# 修改topic
$ kafka-topics.sh --alter --zookeeper master:2181 --topic my-replicated-topic --partitions 2

查看:

# 查看topic信息
$ kafka-topics.sh --describe --zookeeper master:2181 --topic my-replicated-topic

删除:

# 删除topic
$ kafka-topics.sh -delete --zookeeper master:2181 -topic my-replicated-topic

查看主题列表

$ kafka-topics.sh --list --zookeeper master:2181
$ kafka-topics.sh --list --zookeeper master:2181,master:2182,master:2183

生产消息:

# 命令方式
$ kafka-console-producer.sh --broker-list master:9092 --topic my-replicated-topic # 调用工具类方式
$ kafka-run-class.sh kafka.tools.ConsoleProducer --broker-list master:9092,master:9093,master94 --topic my-replicated-topic

消息批量生成:

$ kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

消费消息:

# 命令方式
$ kafka-console-consumer.sh --bootstrap-server 192.168.56.105:9092 --from-beginning -topic mytopic-02 # 调用工具类方式
$ kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list master:9092,master:9093,master:9094 --topic my-replicated-topic

最新文章

  1. thymeleaf的常见用法
  2. JavaScriptCore框架介绍
  3. jQuery-2.1.4.min.js:4 Uncaught TypeError: Illegal invocation
  4. 《Linux内核设计与实现》读书笔记(十六)- 页高速缓存和页回写
  5. SSH_框架整合5--验证用户名是否可用
  6. (剑指Offer)面试题14:调整数组顺序使奇数位于偶数前面
  7. js构造函数式编程
  8. HTTP response codes
  9. rowid结构浅析
  10. TensorFlow实现knn(k近邻)算法
  11. 20155227 实现mypwd
  12. pthon/零起点(一、集合)
  13. 环境变量方式使用 Secret - 每天5分钟玩转 Docker 容器技术(158)
  14. Hugo + github 搭建个人博客
  15. 报错:Heartbeating to master:7182 failed.
  16. 学生管理系统(javaweb版)
  17. python之旅七【第七篇】面向对象之类成员
  18. python笔记-正则表达式常用函数
  19. Eclipse运行时发生An internal error occurred during:“**************” 的解决办法
  20. 绑定域名到JavaWeb项目,由域名直接访问到网站首页

热门文章

  1. Centos7.x RPM安装ELK 7.5.0
  2. https绕过证书认证请求 Get或Post请求(证书过期,忽略证书)
  3. 【Spring注解驱动开发】使用@Import注解给容器中快速导入一个组件
  4. ODEINT 求解常微分方程(2)
  5. abp-CMS模块-广告
  6. Django 源码阅读笔记(基础视图)
  7. 通过数据库客户端界面工具DBeaver连接Hive
  8. 动作函数-web_url
  9. (二)groupId和artifactId
  10. 恕我直言你可能真的不会java第2篇:Java Stream API?