Kafka監(jiān)控-JMX自定義監(jiān)控以及常用監(jiān)控工具比較

目錄:
一视事、通過JMX自定義監(jiān)控
1.jconsole
2.Java監(jiān)控代碼:
二、Kafka三款監(jiān)控工具比較(轉(zhuǎn)載)
1.Kafka Web Conslole
2.Kafka Manager
3.KafkaOffsetMonitor

一类少、通過JMX自定義監(jiān)控

通過JMX監(jiān)控可以看到的數(shù)據(jù)有:
broker數(shù)據(jù)指標(biāo)
topic數(shù)據(jù)指標(biāo)
每個(gè)partition的數(shù)據(jù)指標(biāo)
consumer消費(fèi)滯后情況等壁涎。

1锅锨、jconsole

利用jconsole 工具:(可通過jconsole搪柑,找到Mbean對(duì)應(yīng)的指標(biāo)姻几,鼠標(biāo)懸浮指標(biāo)上方就能找到代碼查詢所需的ObjectName掸驱。)
本地直接連接kafka進(jìn)程
通過遠(yuǎn)程連接進(jìn)程:service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi(啟動(dòng)kafka時(shí)需開通JMX端口)
這里討論的kafka版本是0.8.1.x和0.8.2.x陨收,這兩者在使用jmx監(jiān)控時(shí)會(huì)有差異饭豹,差異體現(xiàn)在ObjectName之中 。
所以在本程序中通過Boolean類型的newKafkaVersion來區(qū)別對(duì)待务漩。
為確定使用者的objectName拄衰,可以利用jconsole工具,找到Mbean對(duì)應(yīng)的指標(biāo)饵骨,鼠標(biāo)懸浮指標(biāo)上方就能找到代碼查詢所需的objectName翘悉。

2、Java代碼

(推薦代碼原創(chuàng)《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》居触,同時(shí)歡迎關(guān)注作者的微信公眾號(hào):朱小廝的博客妖混。)
啟動(dòng)kafka時(shí)老赤,需啟動(dòng)jmx端口:JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &

JmxMgr.class
package monitor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Created by hidden on 2016/12/8.
*/
public class JmxMgr {
private static Logger log = LoggerFactory.getLogger(JmxMgr.class);
private static List<JmxConnection> conns = new ArrayList<>();

public static boolean init(List<String> ipPortList, boolean newKafkaVersion){
for(String ipPort:ipPortList){
log.info("init jmxConnection [{}]",ipPort);
JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);
boolean bRet = conn.init();
if(!bRet){
log.error("init jmxConnection error");
return false;
}
conns.add(conn);
}
return true;
}

public static long getMsgInCountPerSec(String topicName){
long val = 0;
for(JmxConnection conn:conns){
long temp = conn.getMsgInCountPerSec(topicName);
val += temp;
}
return val;
}

public static double getMsgInTpsPerSec(String topicName){
double val = 0;
for(JmxConnection conn:conns){
double temp = conn.getMsgInTpsPerSec(topicName);
val += temp;
}
return val;
}

public static Map<Integer, Long> getEndOffset(String topicName){
Map<Integer,Long> map = new HashMap<>();
for(JmxConnection conn:conns){
Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);
if(tmp == null){
log.warn("get topic endoffset return null, topic {}", topicName);
continue;
}
for(Integer parId:tmp.keySet()){//change if bigger
if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){
map.put(parId, tmp.get(parId));
}
}
}
return map;
}

public static void main(String[] args) {
List<String> ipPortList = new ArrayList<>();
ipPortList.add("localhost:9999");
//ipPortList.add("xx.101.130.2:9999");
JmxMgr.init(ipPortList,true);

String topicName = "demo2";
System.out.println(getMsgInCountPerSec(topicName));
System.out.println(getMsgInTpsPerSec(topicName));
System.out.println(getEndOffset(topicName));
}
}
JmxConnection.class
package monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* Created by hidden on 2016/12/8.
*/
public class JmxConnection {
private static Logger log = LoggerFactory.getLogger(JmxConnection.class);

private MBeanServerConnection conn;
private String jmxURL;
private String ipAndPort = "localhost:9999";
private int port = 9999;
private boolean newKafkaVersion = false;

public JmxConnection(Boolean newKafkaVersion, String ipAndPort){
this.newKafkaVersion = newKafkaVersion;
this.ipAndPort = ipAndPort;
}

public boolean init(){
jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
conn = connector.getMBeanServerConnection();
if(conn == null){
log.error("get connection return null!");
return false;
}
} catch (MalformedURLException e) {
e.printStackTrace();
return false;
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}

public String getTopicName(String topicName){
String s;
if (newKafkaVersion) {
s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;
} else {
s = "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"" + topicName + "-MessagesInPerSec\"";
}
return s;
}

/**
* @param topicName: topic name, default_channel_kafka_zzh_demo
* @return 獲取發(fā)送量(單個(gè)broker的,要計(jì)算某個(gè)topic的總的發(fā)送量就要計(jì)算集群中每一個(gè)broker之和)
*/
public long getMsgInCountPerSec(String topicName){
String objectName = getTopicName(topicName);
Object val = getAttribute(objectName,"Count");
String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;
if(val !=null){
log.info("{}, Count:{}",debugInfo,(long)val);
return (long)val;
}
return 0;
}

/**
* @param topicName: topic name, default_channel_kafka_zzh_demo
* @return 獲取發(fā)送的tps制市,和發(fā)送量一樣如果要計(jì)算某個(gè)topic的發(fā)送量就需要計(jì)算集群中每一個(gè)broker中此topic的tps之和抬旺。
*/
public double getMsgInTpsPerSec(String topicName){
String objectName = getTopicName(topicName);
Object val = getAttribute(objectName,"OneMinuteRate");
if(val !=null){
double dVal = ((Double)val).doubleValue();
return dVal;
}
return 0;
}

private Object getAttribute(String objName, String objAttr)
{
ObjectName objectName =null;
try {
objectName = new ObjectName(objName);
} catch (MalformedObjectNameException e) {
e.printStackTrace();
return null;
}
return getAttribute(objectName,objAttr);
}

private Object getAttribute(ObjectName objName, String objAttr){
if(conn== null)
{
log.error("jmx connection is null");
return null;
}

try {
return conn.getAttribute(objName,objAttr);
} catch (MBeanException e) {
e.printStackTrace();
return null;
} catch (AttributeNotFoundException e) {
e.printStackTrace();
return null;
} catch (InstanceNotFoundException e) {
e.printStackTrace();
return null;
} catch (ReflectionException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}

/**
* @param topicName
* @return 獲取topicName中每個(gè)partition所對(duì)應(yīng)的logSize(即offset)
*/
public Map<Integer,Long> getTopicEndOffset(String topicName){
Set<ObjectName> objs = getEndOffsetObjects(topicName);
if(objs == null){
return null;
}
Map<Integer, Long> map = new HashMap<>();
for(ObjectName objName:objs){
int partId = getParId(objName);
Object val = getAttribute(objName,"Value");
if(val !=null){
map.put(partId,(Long)val);
}
}
return map;
}

private int getParId(ObjectName objName){
if(newKafkaVersion){
String s = objName.getKeyProperty("partition");
return Integer.parseInt(s);
}else {
String s = objName.getKeyProperty("name");

int to = s.lastIndexOf("-LogEndOffset");
String s1 = s.substring(0, to);
int from = s1.lastIndexOf("-") + 1;

String ss = s.substring(from, to);
return Integer.parseInt(ss);
}
}

private Set<ObjectName> getEndOffsetObjects(String topicName){
String objectName;
if (newKafkaVersion) {
objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";
}else{
objectName = "\"kafka.log\":type=\"Log\",name=\"" + topicName + "-*-LogEndOffset\"";
}
ObjectName objName = null;
Set<ObjectName> objectNames = null;
try {
objName = new ObjectName(objectName);
objectNames = conn.queryNames(objName,null);
} catch (MalformedObjectNameException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}

return objectNames;
}
}

二、Kafka三款監(jiān)控工具比較

(轉(zhuǎn)載自藍(lán)色天堂博客息堂,本文鏈接地址:http://hadoop1989.com/2015/09/22/Kafka-Monitor_Compare)
之前的博客中嚷狞,介紹了Kafka Web Console這個(gè)監(jiān)控工具,在生產(chǎn)環(huán)境中使用荣堰,運(yùn)行一段時(shí)間后床未,發(fā)現(xiàn)該工具會(huì)和Kafka生產(chǎn)者、消費(fèi)者振坚、ZooKeeper建立大量連接薇搁,從而導(dǎo)致網(wǎng)絡(luò)阻塞。并且這個(gè)Bug也在其他使用者中出現(xiàn)過渡八,看來使用開源工具要慎重啃洋!該Bug暫未得到修復(fù),不得已屎鳍,只能研究下其他同類的Kafka監(jiān)控軟件宏娄。
通過研究,發(fā)現(xiàn)主流的三種kafka監(jiān)控程序分別為:
Kafka Web Conslole
Kafka Manager
KafkaOffsetMonitor
現(xiàn)在依次介紹以上三種工具:

1逮壁、Kafka Web Conslole

使用Kafka Web Console孵坚,可以監(jiān)控:
Brokers列表、Kafka 集群中 Topic列表窥淆,及對(duì)應(yīng)的Partition卖宠、LogSize等信息
點(diǎn)擊Topic,可以瀏覽對(duì)應(yīng)的Consumer Groups忧饭、Offset扛伍、Lag等信息
生產(chǎn)和消費(fèi)流量圖、消息預(yù)覽…

程序運(yùn)行后词裤,會(huì)定時(shí)去讀取kafka集群分區(qū)的日志長度刺洒,讀取完畢后,連接沒有正常釋放吼砂,一段時(shí)間后產(chǎn)生大量的socket連接逆航,導(dǎo)致網(wǎng)絡(luò)堵塞。

2帅刊、Kafka Manager

雅虎開源的Kafka集群管理工具:
管理幾個(gè)不同的集群
監(jiān)控集群的狀態(tài)(topics, brokers, 副本分布, 分區(qū)分布)
產(chǎn)生分區(qū)分配(Generate partition assignments)基于集群的當(dāng)前狀態(tài)
重新分配分區(qū)

3纸泡、KafkaOffsetMonitor

KafkaOffsetMonitor可以實(shí)時(shí)監(jiān)控:
Kafka集群狀態(tài)
Topic、Consumer Group列表
圖形化展示topic和consumer之間的關(guān)系
圖形化展示consumer的Offset、Lag等信息

總結(jié):

通過使用女揭,個(gè)人總結(jié)以上三種監(jiān)控程序的優(yōu)缺點(diǎn):
Kafka Web Console:監(jiān)控功能較為全面蚤假,可以預(yù)覽消息,監(jiān)控Offset吧兔、Lag等信息磷仰,但存在bug,不建議在生產(chǎn)環(huán)境中使用境蔼。
Kafka Manager:偏向Kafka集群管理灶平,若操作不當(dāng),容易導(dǎo)致集群出現(xiàn)故障箍土。對(duì)Kafka實(shí)時(shí)生產(chǎn)和消費(fèi)消息是通過JMX實(shí)現(xiàn)的逢享。沒有記錄Offset、Lag等信息吴藻。
KafkaOffsetMonitor:程序一個(gè)jar包的形式運(yùn)行瞒爬,部署較為方便。只有監(jiān)控功能沟堡,使用起來也較為安全侧但。

若只需要監(jiān)控功能,推薦使用KafkaOffsetMonito航罗,若偏重Kafka集群管理禀横,推薦使用Kafka Manager。
因?yàn)槎际情_源程序粥血,穩(wěn)定性欠缺柏锄。故需先了解清楚目前已存在哪些Bug,多測試一下立莉,避免出現(xiàn)類似于Kafka Web Console的問題绢彤。

重度引用:
如何使用JMX監(jiān)控Kafka:
https://blog.csdn.net/u013256816/article/details/53524884https://blog.csdn.net/u013256816/article/details/53524884
Kafka三款監(jiān)控工具比較:
http://www.reibang.com/p/97ad87a933e1

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末七问,一起剝皮案震驚了整個(gè)濱河市蜓耻,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌械巡,老刑警劉巖刹淌,帶你破解...
    沈念sama閱讀 212,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異讥耗,居然都是意外死亡有勾,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門古程,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蔼卡,“玉大人,你說我怎么就攤上這事挣磨」统眩” “怎么了荤懂?”我有些...
    開封第一講書人閱讀 158,369評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長塘砸。 經(jīng)常有香客問我节仿,道長,這世上最難降的妖魔是什么掉蔬? 我笑而不...
    開封第一講書人閱讀 56,799評(píng)論 1 285
  • 正文 為了忘掉前任廊宪,我火速辦了婚禮,結(jié)果婚禮上女轿,老公的妹妹穿的比我還像新娘箭启。我一直安慰自己,他們只是感情好蛉迹,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,910評(píng)論 6 386
  • 文/花漫 我一把揭開白布册烈。 她就那樣靜靜地躺著,像睡著了一般婿禽。 火紅的嫁衣襯著肌膚如雪赏僧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評(píng)論 1 291
  • 那天扭倾,我揣著相機(jī)與錄音淀零,去河邊找鬼。 笑死膛壹,一個(gè)胖子當(dāng)著我的面吹牛驾中,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播模聋,決...
    沈念sama閱讀 39,159評(píng)論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼肩民,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了链方?” 一聲冷哼從身側(cè)響起持痰,我...
    開封第一講書人閱讀 37,917評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎祟蚀,沒想到半個(gè)月后工窍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,360評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡前酿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,673評(píng)論 2 327
  • 正文 我和宋清朗相戀三年患雏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片罢维。...
    茶點(diǎn)故事閱讀 38,814評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡淹仑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情匀借,我是刑警寧澤取试,帶...
    沈念sama閱讀 34,509評(píng)論 4 334
  • 正文 年R本政府宣布,位于F島的核電站怀吻,受9級(jí)特大地震影響瞬浓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蓬坡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,156評(píng)論 3 317
  • 文/蒙蒙 一猿棉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧屑咳,春花似錦萨赁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至紫皇,卻和暖如春慰安,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背聪铺。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評(píng)論 1 267
  • 我被黑心中介騙來泰國打工化焕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人铃剔。 一個(gè)月前我還...
    沈念sama閱讀 46,641評(píng)論 2 362
  • 正文 我出身青樓撒桨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親键兜。 傳聞我的和親對(duì)象是個(gè)殘疾皇子凤类,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,728評(píng)論 2 351