一、概述

生产环境中,有一个topic的数据量非常大。这些数据不是非常重要,需要定期清理。

要求:默认保持24小时,某些topic 需要保留2小时或者6小时

二、清除方式

主要有3个:

1. 基于时间

2. 基于日志大小

3. 基于日志起始偏移量

详情,请参考链接:

https://blog.csdn.net/u013256816/article/details/80418297

接下来,主要介绍基于时间的清除!

kafka版本为:  2.11-1.1.0

zk版本为:  3.4.13

三、kafka配置

# 启用删除主题
delete.topic.enable=true
# 检查日志段文件的间隔时间,以确定是否文件属性是否到达删除要求。
log.retention.check.interval.ms=

注意:这2行配置必须存在,否则清除策略失效!

log.retention.check.interval.ms 参数的单位是毫秒,这里表示间隔1秒钟

四、清除策略

全局topic

在 server.properties 文件中配置的是全局策略,针对每一个topic

比如:

log.retention.hours=

表示保留3个小时

单个topic

针对单个topic策略,需要使用脚本kafka-configs.sh

此脚本不需要重启kafka就会生效!

首先来查看一下,当前的topic策略,比如test

bin/kafka-configs.sh --zookeeper zookeeper-.default.svc.cluster.local: --describe --entity-type topics --entity-name test

参数解释:

--describe  详细信息

--entity-type 实体类型

--entity-name 指定topic名

输出:

Configs for topic 'test' are

这个表示为策略为空

删除topic数据

如果需要删除topic所有数据,使用命令

bin/kafka-topics.sh --delete --topic test --zookeeper zookeeper-.default.svc.cluster.local:

这个命令,请谨慎执行!!!

如果想保留主题,只删除主题现有数据(log)。可以通过修改数据保留时间实现

bin/kafka-configs.sh --zookeeper zookeeper-.default.svc.cluster.local: --entity-type topics --entity-name test --alter --add-config retention.ms=

执行输出:

Completed Updating config for entity: topic 'test'.

注意:修改保留时间为10秒钟,并不是10秒钟就马上删掉。kafka是采用轮询的方式,轮询到这个topic时,删除10秒钟前的数据。

时间由server.properties里面的log.retention.check.interval.ms选项为主

假设说 log.retention.check.interval.ms 值为1分钟,那么等待70秒,这个topic的数据就会自动被删除!

再次查看topic策略

bin/kafka-configs.sh --zookeeper zookeeper-.default.svc.cluster.local: --describe --entity-type topics --entity-name test

输出:

Configs for topic 'test' are retention.ms=

发现目前的删除策略为 retention.ms=10000

删除策略

如果需要删除上面的10秒策略,使用以下命令:

bin/kafka-configs.sh --zookeeper zookeeper-.default.svc.cluster.local: --entity-type topics --entity-name test --alter --delete-config retention.ms

输出:

Completed Updating config for entity: topic 'test'.

再次查看topic策略

bin/kafka-configs.sh --zookeeper zookeeper-.default.svc.cluster.local: --describe --entity-type topics --entity-name test

输出:

Configs for topic 'test' are

发现策略为空,说明删除成功了!

五、测试清除策略

测试思路

说明:

第一步,设置清除策略为保留10秒

第二步,进入生产者模式,输入消息 a

第三步,等待5秒,再次进入生产者模式,输入消息 b

第四部,进入消费者模式,看输出的消息是a还是b

判断标准:

在进行第三步时,a这条消息,应该已经被删除了。所以在第15秒进入消费者模式时,应该输出 b,这样的话,策略才是成功的!

设置策略

topic 为test的数据保留10秒

bin/kafka-configs.sh --zookeeper zookeeper-.default.svc.cluster.local: --entity-type topics --entity-name test --alter --add-config retention.ms=

生产模式

进入生产模式,输入a

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test
> a

等待5秒后,再次进入生产模式,输入b

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test
> b

消费者模式

等待5秒后,进入 消费者模式

bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning

b

如果消费者输出为b,表示策略成功!

备注:

如果生产环境中,正在不断的进行生产和消费,执行kafka-configs.sh 脚本,是否会有影响呢?

答案是不会的,它是动态策略!

本文参考链接:

https://blog.csdn.net/forrest_ou/article/details/78999983

最新文章

  1. map<虽然一直不喜欢map>但突然觉得挺好用的
  2. 关于TFS地址改变后,项目迁移的问题。
  3. BZOJ 2654: tree
  4. OSPF(Open Shortest Path First开放式最短路径优先 -链路状态路由协议
  5. jsoncpp用法通俗易懂之解析
  6. left join 关联条件位置
  7. CSS3动画之旋转魔方盒
  8. 17.1 Replication Configuration 复制:
  9. 【原创】Python第二章——标识符命名规则
  10. 697. Degree of an Array
  11. 为什么有时候NSData转换成NSString的时候返回nil
  12. BZOJ2730:[HNOI2012]矿场搭建——题解
  13. bzoj 4773: 负环——倍增
  14. 战火魔兽CJQ圣印问题
  15. makefile之if函数
  16. 【转】C#中的combobox里DropDownStyle
  17. zabbix-proxy 层级制监控
  18. HDU 5695 Gym Class && 百度之星 初赛 1006
  19. python -- Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools
  20. 线程同步-CountDownLatch

热门文章

  1. 使用sed替换指定文件指定行的指定文本
  2. 学习windows编程 day6 之处理鼠标移动
  3. docker下载镜像与替换默认源
  4. POJ 2970 The lazy programmer
  5. windows Zookeeper本地服务化
  6. PHP7 学习笔记(七)如何使用zephir编译一个扩展记录
  7. Spring面向切面编程AOP(around)实战
  8. C# print2flash3文件转化
  9. mysql 架构~多写模式MGR
  10. Django学习手册 - cookie / session