参考:https://kafka.apache.org

  • 服务相关命令

1、启动/停止zk

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/zookeeper-server-stop.sh

2、启动kafka

> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-stop.sh

3、配置多节点brokers集群

首先复制出两个配置文件:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
然后配置:
broker.id是两个broker的唯一标志,port需要修改为不同的两个
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2
启动两个节点:
> bin/kafka-server-start.sh config/server-1.properties &
> bin/kafka-server-start.sh config/server-2.properties &
  • topic命令

1、创建一个topic
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TEST_TOPIC

2、查看topic列表

> bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

除了手动创建topic,还可以配置auto-create topics,当一个不存在的topic发布时可自动创建

3、查看某个topic信息

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic TEST_TOPIC

4、修改某个topic的partition个数(分区只能增不能减)

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic TEST_TOPIC --partitions 10

5、删除某个topic(只会删除zookeeper内的元数据,消息文件需要手动删除)

> bin/kafka-topics.sh  --delete --zookeeper localhost:2181  --topic TEST_TOPIC
  • consumer相关

1、查看所有consumer的group列表

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

2、查看指定group(group_test)的消费情况

> bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092
  • console相关

1、生产数据

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC

2、消费数据

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic TEST_TOPIC
  • 其他

1、重置指定topic的offset

背景,因为程序写的不够好,辛苦造了2亿多数据跑入库测性能,跑到中间数据库服务器撑爆了,结果都写进日志了。。。

> bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group group_test --reset-offsets -to-offset 97000000 --topic TEST_TOPIC --execute

Other supported arguments:

--shift-by [positive or negative integer] - Shifts offset forward or backward from given integer.往前或往后加减offset

--to-current and --to-latest are same as --to-offset [integer] and --to-earliest.

--to-datetime [Datetime format is yyyy-MM-ddTHH:mm:ss.xxx]

> bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group group_test --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute

how to validata:

> bin/kafka-consumer-groups --bootstrap-server locahost:9092 --group <group_id> --describe

注意:这里重置的offset是针对每个partition而言,重置offset之前,所有的consumer必须是inactive的,也就是应用消费必须暂停

最新文章

  1. linux中rz中的-e选项
  2. [翻译] AKKA笔记- ACTORSYSTEM (配置CONFIGURATION 与调度SCHEDULING) - 4(一)
  3. 移动到web整理
  4. eclipse 3.6 + tomcat 6.0 开发SSH框架学习
  5. Unity 联网小测试(WWW)
  6. POJ 2486 Apple Tree
  7. Android讯飞语音云语音听写学习
  8. iphone dev 入门实例7:How to Add Splash Screen in Your iOS App
  9. 【阿里云产品公测】在Laravel4框架中使用阿里云OCS缓存
  10. 用jQuery+easyUI遇到的几个插件与文件详解
  11. XJOI——NOIP2015提高组模拟题19-day1——观光旅行
  12. iOS8数字键盘加左下角完成button
  13. 使用java API操作hdfs--读取hdfs文件并打印
  14. Apollo阿波罗配置中心docker
  15. String [] 转 List&lt;String&gt;
  16. ES6躬行记(5)——对象字面量的扩展
  17. 完美解决safari、微信浏览器下拉回弹效果
  18. Socket网络编程--简单Web服务器(3)
  19. Webrtc源码走读(一)
  20. windows7下安装python环境和django

热门文章

  1. 推荐Windows下SVN服务器端和客户端工具软件
  2. Jmeter命令行 传递参数
  3. Mybatis中#{}与${}的使用
  4. centos7 安装python虚拟环境
  5. .Net Core身份认证:IdentityServer4实现OAuth 2.0 客户端模式
  6. ReactOS 代码更新后的编译安装
  7. Javascript高级程序设计--读书笔记之面向对象(二)
  8. Android开发 Butterknife使用方法总结
  9. 第四章 K8s部署安装
  10. Codeforces 348C Subset Sums 分块思想