任何应用功能再强大、性能再优越,如果没有与之匹配的监控,那么一切都是虚无缥缈的。监控不仅可以为应用提供运行时的数据作为依据参考,还可以迅速定位问题,提供预防及告警等功能,很大程度上增强了整体服务的鲁棒性。

一、Kafka监控指标与获取

Kafka监控的4个维度:

  • 集群信息
  • broker信息
  • topic信息
  • consumer group信息

使用JConsole访问JMX

(1)终端输入jconsole,启动Java监视和管理控制台。

(2)修改kafka-run-class.sh,使JConsole可以通过远程连接。

KAFKA_JMX_OPTS="

-Dcom.sun.management.jmxremote 

-Dcom.sun.management.jmxremote.authenticate=false  

-Dcom.sun.management.jmxremote.ssl=false 

-Djava.rmi.server.hostname=服务器的IP地址或者域名"

(3)修改kafka-server-start.sh,增加export JMX_PORT="9999"

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi

(4)检查是否开启JMX

broker在启动过程中,始终会将JMX端口信息写入Kafka对应的位置.

(5)连接

(6)查看MBean

MBean的名称,xxx.type=yyy,{attr} = zzz

其中xxx指的是组件名,如xxx = kafka.server

zzz 和 attr 指的是MBean的范围,例如topic = test,表示该MBean的作用范围是名为test的topic。

指标分类:

  • kafka.server 服务器端JMX指标
  • kafka.network 网络相关JMX指标
  • kafka.log 分区日志相关JMX指标
  • kafka.controller controller相关指标

使用Java程序访问JMX

(1)监控broker一分钟消息流入的速度

kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

OneMinuteRate 表示某个broker一分钟消息流入的速度(messages/s)。


public class KafkaJmxDemo { private MBeanServerConnection conn; private String jmxUrl; private String ipAndPort; public KafkaJmxDemo(String ipAndPort) {
this.ipAndPort = ipAndPort;
} /**
* 初始化JMX连接
*
* @return
*/
public boolean init() {
jmxUrl = "service:jmx:rmi:///jndi/rmi://" + ipAndPort + "/jmxrmi";
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxUrl);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
conn = connector.getMBeanServerConnection();
if (conn == null) {
return false;
}
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return true;
} public double getMsgInPerSec() {
String objectName = "kafka.server:type=BrokerTopicMetrics," +
"name=MessagesInPerSec";
Object val = getAttribute(objectName, "OneMinuteRate");
if (val != null) {
return (double) (Double) val;
}
return 0.0;
} private Object getAttribute(String objName, String objAttr) {
ObjectName objectName;
try {
objectName = new ObjectName(objName);
return conn.getAttribute(objectName, objAttr);
} catch (Exception e) {
e.printStackTrace();
}
return null;
} public static void main(String[] args) {
KafkaJmxDemo kafkaJmxDemo = new KafkaJmxDemo("127.0.0.1:9999");
kafkaJmxDemo.init();
System.out.println(kafkaJmxDemo.getMsgInPerSec());
} }

(2)获取指定Topic、指定分区的LEO值


public long getTopicPatitionLeo(String topic, int partition) {
String objectName = "kafka.log:type=Log,name=LogEndOffset,topic=" + topic + ",partition=" + partition;
Object val = getAttribute(objectName, "Value");
if (val != null) {
return (long) (Long) val;
}
return 0L;
}

(3)监控指定Topic的消息流入的速度


public double getBrokerTopicMetrics(String topic) {
String objectName = "kafka.server:type=BrokerTopicMetrics," +
"name=BytesInPerSec,topic=" + topic;
Object val = getAttribute(objectName, "OneMinuteRate");
if (val != null) {
return (double) (Double) val;
}
return 0.0;
}

输出

Kafka重要监控参数

(1)消息入站、出站速率

## 入站速率
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec ## 出站速率
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
属性名 含义
Count broker处理过的总消息字节数
OneMinuteRate 统计过去1分钟内的消息速率
MeanRate 统计平均消息速率

二、监控系统kafka-manager

注意每一行后面不要留空格。

[repositories]
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

Add Cluster时,会提示异常。

Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]. Message of type [kafka.manager.model.ActorModel$KMAddCluster]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

三、监控系统kafka-eagle

安装参考

官方参考文档:https://docs.kafka-eagle.org/

https://www.cnblogs.com/yinzhengjie/p/9957389.html

下载


wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.2.0.tar.gz

效果图

创建Topic

监控Broker的消息出入站速率

参考文档

kafka-manager Github

Kafka集群管理工具kafka-manager的安装使用

kafka manager的使用,kafka manager页面参数说明

Kafka Manager几个指标含义

最新文章

  1. CSS 3学习——transition 过渡
  2. json_encode 中文乱码
  3. 直接拿来用!最火的Android开源项目(完结篇)(转)
  4. 最完整PHP.INI中文版
  5. 差之毫厘谬之千里!带你认识CPU后缀含义
  6. How do I place a group of functions or variables in a specific section?
  7. Java这点事
  8. sublimetext3官网安装
  9. PHP+jQuery实现翻板抽奖
  10. Codeforces Round #198 (Div. 2) C. Tourist Problem
  11. 利用python完成大学刷课(从0到完成的思路)
  12. django补充和form组件
  13. navicat有数据额结构同步
  14. [转]如何在 Git 里撤销(几乎)任何操作
  15. s11d27 算法
  16. Delphi开发单机瘦数据库程序要点(后缀cds)
  17. 已知圆上三个点坐标,求圆半径 r 和 圆心坐标
  18. RK3288 添加WiFi&BT模块AP6212
  19. JAVA MAC 配置
  20. 【题解】Digit Tree

热门文章

  1. weui中的picker使用js进行动态绑定数据
  2. Node.js学习之(第三章:简易小demo)
  3. cscope安装
  4. robot framework笔记(二):在RF中自定义chrome启动参数
  5. jenkins报错 Upgrading Jenkins. Failed to update the default Update Site 'default'. Plugi
  6. js javascirpt 数学库、 算法库 (转载)
  7. linux各种服务的搭建
  8. mysql FORMAT() 格式化后的数字运算出错
  9. 二进制安装docker-18.06.3-ce
  10. QtCreator常用快捷键