之前写过如何用服务器端的API代码来获取订阅某topic的所有consumer group,参见这里。使用服务器端的API需要用到kafka.admin.AdminClient类,但是这个类在0.11.0.0版本已经被标记为不推荐使用了,故目前最合适的方式还是通过客户端API:org.apache.kafka.clients.admin.AdminClient。今天碰到有人问这个问题,我就尝试写了一个。使用之前你需要引入kafka client包依赖(以2.2.0版本为例)

Maven:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>

Gradle:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.2.0'

下面是代码:

 private static List<String> getGroupsForTopic(String brokerServers, String topic)
throws ExecutionException, InterruptedException, TimeoutException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers); try (AdminClient client = AdminClient.create(props)) {
List<String> allGroups = client.listConsumerGroups()
.valid()
.get(10, TimeUnit.SECONDS)
.stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toList()); Map<String, ConsumerGroupDescription> allGroupDetails =
client.describeConsumerGroups(allGroups).all().get(10, TimeUnit.SECONDS); final List<String> filteredGroups = new ArrayList<>();
allGroupDetails.entrySet().forEach(entry -> {
String groupId = entry.getKey();
ConsumerGroupDescription description = entry.getValue();
boolean topicSubscribed = description.members().stream().map(MemberDescription::assignment)
.map(MemberAssignment::topicPartitions)
.map(tps -> tps.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
.anyMatch(tps -> tps.contains(topic));
if (topicSubscribed)
filteredGroups.add(groupId);
});
return filteredGroups;
}
}

我会假设你的集群中没有配置安全认证和授权机制或者发起此AdminClient的用户是合法用户且有CLUSTER以及GROUP的DESCRIBE权限。

另外值得注意的是,上面这个函数无法获取非运行中的consumer group,即虽然一个group订阅了某topic,但是若它所有的consumer成员都关闭的话这个函数是不会返回该group的。

最新文章

  1. Codeforces Round #342 (Div. 2) D. Finals in arithmetic(想法题/构造题)
  2. [Unity3d]向量的过度方法以及拖尾效果
  3. Linux 挂载 NFS
  4. Linux(Red Hat 6 32位) 下安装Mysql5.6.30
  5. Unity键值(KeyCode)
  6. 如何制作prezi swf格式字体(prezi 中文字体)
  7. Interlocked.Increment 方法 和Interlocked.Decrement 方法作用
  8. Oracle的硬解析和软解析
  9. ASP.NET MVC4.0 部署
  10. 【Beta】Daily Scrum Meeting——Day4
  11. IdentityServer(12)- 使用 ASP.NET Core Identity
  12. .net 配置swagger
  13. Java PDF转图片
  14. (转)使用Flexible实现手淘H5页面的终端适配
  15. windows 2012安装不了KB2919355
  16. ios Mac 地址获取
  17. PHP_GET后门,躲避任何安全软件
  18. Sqli-labs less 2
  19. uva 10808 - Rational Resistors(基尔霍夫定律+高斯消元)
  20. 2017年--10年java大神告诉你开发最常用的百分之二十的技术有哪些?

热门文章

  1. js中call和apply的作用和用法
  2. sql注入--基于报错的注入
  3. AIX使用命令修改网卡IP地址,永久生效
  4. bootstrap常见的面试题
  5. JAVA自学笔记10
  6. Springboot中Aspect实现切面(以记录日志为例)
  7. 使用Canvas制作画图工具
  8. 创建MySQL用户 赋予某指定库表的权限
  9. 一些Android手机的平台信息
  10. K-means算法原理