查询订阅某topic的所有consumer group(Java API)
2024-08-20 01:22:36
在网上碰到的问题,想了下使用现有的API还是可以实现的。
首先,需要引入Kafka服务器端代码,比如加入Kafka 1.0.0依赖:
Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
Gradle
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '1.0.0'
然后编写获取订阅某topic的所有group的方法,代码如下:
/**
* get all subscribing consumer group names for a given topic
* @param brokerListUrl localhost:9092 for instance
* @param topic topic name
* @return
*/
public static Set<String> getAllGroupsForTopic(String brokerListUrl, String topic) {
AdminClient client = AdminClient.createSimplePlaintext(brokerListUrl); try {
List<GroupOverview> allGroups = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());
Set<String> groups = new HashSet<>();
for (GroupOverview overview: allGroups) {
String groupID = overview.groupId();
Map<TopicPartition, Object> offsets = scala.collection.JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupID));
Set<TopicPartition> partitions = offsets.keySet();
for (TopicPartition tp: partitions) {
if (tp.topic().equals(topic)) {
groups.add(groupID);
}
}
}
return groups;
} finally {
client.close();
}
}
最新文章
- SQL注入的分类
- 几种循环语句 ,break,continue语句用法
- 【转】apache与tomcat的区别
- Web Uploader文件上传&;&;使用webupload有感(黄色部分)
- AVL的旋转
- Qt中gb2312/GBK的URL编解码函数
- Windows Azure应用系列:微软的云部署VPN
- DBCP连接池的使用
- 关于数据结构的10个面试题(c语言实现)
- 使用Git将本地项目或代码上传到GitHub上
- OLAP工作的基本概念(结合个人工作)
- C语言实现万年历
- How tomcat works 读书笔记十二 StandardContext 下
- PHP删除目录及目录下所有文件
- 解决STM32 I2C接口死锁在BUSY状态的方法讨论
- Python面向对象5:类的常用魔术方法
- maven构建SSM框架中pom.xml文件配置
- 大杂烩 -- equals、hashCode联系与区别
- 关于运行python脚本产生__pycache__
- linux通过sendmail发送邮件