目錄:
一视事、通過JMX自定義監(jiān)控
1.jconsole
2.Java監(jiān)控代碼:
二、Kafka三款監(jiān)控工具比較(轉(zhuǎn)載)
1.Kafka Web Conslole
2.Kafka Manager
3.KafkaOffsetMonitor
一类少、通過JMX自定義監(jiān)控
通過JMX監(jiān)控可以看到的數(shù)據(jù)有:
broker數(shù)據(jù)指標(biāo)
topic數(shù)據(jù)指標(biāo)
每個(gè)partition的數(shù)據(jù)指標(biāo)
consumer消費(fèi)滯后情況等壁涎。
1锅锨、jconsole
利用jconsole 工具:(可通過jconsole搪柑,找到Mbean對(duì)應(yīng)的指標(biāo)姻几,鼠標(biāo)懸浮指標(biāo)上方就能找到代碼查詢所需的ObjectName掸驱。)
本地直接連接kafka進(jìn)程
通過遠(yuǎn)程連接進(jìn)程:service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi(啟動(dòng)kafka時(shí)需開通JMX端口)
這里討論的kafka版本是0.8.1.x和0.8.2.x陨收,這兩者在使用jmx監(jiān)控時(shí)會(huì)有差異饭豹,差異體現(xiàn)在ObjectName之中 。
所以在本程序中通過Boolean類型的newKafkaVersion來區(qū)別對(duì)待务漩。
為確定使用者的objectName拄衰,可以利用jconsole工具,找到Mbean對(duì)應(yīng)的指標(biāo)饵骨,鼠標(biāo)懸浮指標(biāo)上方就能找到代碼查詢所需的objectName翘悉。
2、Java代碼
(推薦代碼原創(chuàng)《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》居触,同時(shí)歡迎關(guān)注作者的微信公眾號(hào):朱小廝的博客妖混。)
啟動(dòng)kafka時(shí)老赤,需啟動(dòng)jmx端口:JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
JmxMgr.class
package monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by hidden on 2016/12/8.
*/
public class JmxMgr {
private static Logger log = LoggerFactory.getLogger(JmxMgr.class);
private static List<JmxConnection> conns = new ArrayList<>();
public static boolean init(List<String> ipPortList, boolean newKafkaVersion){
for(String ipPort:ipPortList){
log.info("init jmxConnection [{}]",ipPort);
JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);
boolean bRet = conn.init();
if(!bRet){
log.error("init jmxConnection error");
return false;
}
conns.add(conn);
}
return true;
}
public static long getMsgInCountPerSec(String topicName){
long val = 0;
for(JmxConnection conn:conns){
long temp = conn.getMsgInCountPerSec(topicName);
val += temp;
}
return val;
}
public static double getMsgInTpsPerSec(String topicName){
double val = 0;
for(JmxConnection conn:conns){
double temp = conn.getMsgInTpsPerSec(topicName);
val += temp;
}
return val;
}
public static Map<Integer, Long> getEndOffset(String topicName){
Map<Integer,Long> map = new HashMap<>();
for(JmxConnection conn:conns){
Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);
if(tmp == null){
log.warn("get topic endoffset return null, topic {}", topicName);
continue;
}
for(Integer parId:tmp.keySet()){//change if bigger
if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){
map.put(parId, tmp.get(parId));
}
}
}
return map;
}
public static void main(String[] args) {
List<String> ipPortList = new ArrayList<>();
ipPortList.add("localhost:9999");
//ipPortList.add("xx.101.130.2:9999");
JmxMgr.init(ipPortList,true);
String topicName = "demo2";
System.out.println(getMsgInCountPerSec(topicName));
System.out.println(getMsgInTpsPerSec(topicName));
System.out.println(getEndOffset(topicName));
}
}
JmxConnection.class
package monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Created by hidden on 2016/12/8.
*/
public class JmxConnection {
private static Logger log = LoggerFactory.getLogger(JmxConnection.class);
private MBeanServerConnection conn;
private String jmxURL;
private String ipAndPort = "localhost:9999";
private int port = 9999;
private boolean newKafkaVersion = false;
public JmxConnection(Boolean newKafkaVersion, String ipAndPort){
this.newKafkaVersion = newKafkaVersion;
this.ipAndPort = ipAndPort;
}
public boolean init(){
jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
conn = connector.getMBeanServerConnection();
if(conn == null){
log.error("get connection return null!");
return false;
}
} catch (MalformedURLException e) {
e.printStackTrace();
return false;
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public String getTopicName(String topicName){
String s;
if (newKafkaVersion) {
s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;
} else {
s = "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"" + topicName + "-MessagesInPerSec\"";
}
return s;
}
/**
* @param topicName: topic name, default_channel_kafka_zzh_demo
* @return 獲取發(fā)送量(單個(gè)broker的,要計(jì)算某個(gè)topic的總的發(fā)送量就要計(jì)算集群中每一個(gè)broker之和)
*/
public long getMsgInCountPerSec(String topicName){
String objectName = getTopicName(topicName);
Object val = getAttribute(objectName,"Count");
String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;
if(val !=null){
log.info("{}, Count:{}",debugInfo,(long)val);
return (long)val;
}
return 0;
}
/**
* @param topicName: topic name, default_channel_kafka_zzh_demo
* @return 獲取發(fā)送的tps制市,和發(fā)送量一樣如果要計(jì)算某個(gè)topic的發(fā)送量就需要計(jì)算集群中每一個(gè)broker中此topic的tps之和抬旺。
*/
public double getMsgInTpsPerSec(String topicName){
String objectName = getTopicName(topicName);
Object val = getAttribute(objectName,"OneMinuteRate");
if(val !=null){
double dVal = ((Double)val).doubleValue();
return dVal;
}
return 0;
}
private Object getAttribute(String objName, String objAttr)
{
ObjectName objectName =null;
try {
objectName = new ObjectName(objName);
} catch (MalformedObjectNameException e) {
e.printStackTrace();
return null;
}
return getAttribute(objectName,objAttr);
}
private Object getAttribute(ObjectName objName, String objAttr){
if(conn== null)
{
log.error("jmx connection is null");
return null;
}
try {
return conn.getAttribute(objName,objAttr);
} catch (MBeanException e) {
e.printStackTrace();
return null;
} catch (AttributeNotFoundException e) {
e.printStackTrace();
return null;
} catch (InstanceNotFoundException e) {
e.printStackTrace();
return null;
} catch (ReflectionException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* @param topicName
* @return 獲取topicName中每個(gè)partition所對(duì)應(yīng)的logSize(即offset)
*/
public Map<Integer,Long> getTopicEndOffset(String topicName){
Set<ObjectName> objs = getEndOffsetObjects(topicName);
if(objs == null){
return null;
}
Map<Integer, Long> map = new HashMap<>();
for(ObjectName objName:objs){
int partId = getParId(objName);
Object val = getAttribute(objName,"Value");
if(val !=null){
map.put(partId,(Long)val);
}
}
return map;
}
private int getParId(ObjectName objName){
if(newKafkaVersion){
String s = objName.getKeyProperty("partition");
return Integer.parseInt(s);
}else {
String s = objName.getKeyProperty("name");
int to = s.lastIndexOf("-LogEndOffset");
String s1 = s.substring(0, to);
int from = s1.lastIndexOf("-") + 1;
String ss = s.substring(from, to);
return Integer.parseInt(ss);
}
}
private Set<ObjectName> getEndOffsetObjects(String topicName){
String objectName;
if (newKafkaVersion) {
objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";
}else{
objectName = "\"kafka.log\":type=\"Log\",name=\"" + topicName + "-*-LogEndOffset\"";
}
ObjectName objName = null;
Set<ObjectName> objectNames = null;
try {
objName = new ObjectName(objectName);
objectNames = conn.queryNames(objName,null);
} catch (MalformedObjectNameException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
return objectNames;
}
}
二、Kafka三款監(jiān)控工具比較
(轉(zhuǎn)載自藍(lán)色天堂博客息堂,本文鏈接地址:http://hadoop1989.com/2015/09/22/Kafka-Monitor_Compare)
之前的博客中嚷狞,介紹了Kafka Web Console這個(gè)監(jiān)控工具,在生產(chǎn)環(huán)境中使用荣堰,運(yùn)行一段時(shí)間后床未,發(fā)現(xiàn)該工具會(huì)和Kafka生產(chǎn)者、消費(fèi)者振坚、ZooKeeper建立大量連接薇搁,從而導(dǎo)致網(wǎng)絡(luò)阻塞。并且這個(gè)Bug也在其他使用者中出現(xiàn)過渡八,看來使用開源工具要慎重啃洋!該Bug暫未得到修復(fù),不得已屎鳍,只能研究下其他同類的Kafka監(jiān)控軟件宏娄。
通過研究,發(fā)現(xiàn)主流的三種kafka監(jiān)控程序分別為:
Kafka Web Conslole
Kafka Manager
KafkaOffsetMonitor
現(xiàn)在依次介紹以上三種工具:
1逮壁、Kafka Web Conslole
使用Kafka Web Console孵坚,可以監(jiān)控:
Brokers列表、Kafka 集群中 Topic列表窥淆,及對(duì)應(yīng)的Partition卖宠、LogSize等信息
點(diǎn)擊Topic,可以瀏覽對(duì)應(yīng)的Consumer Groups忧饭、Offset扛伍、Lag等信息
生產(chǎn)和消費(fèi)流量圖、消息預(yù)覽…
程序運(yùn)行后词裤,會(huì)定時(shí)去讀取kafka集群分區(qū)的日志長度刺洒,讀取完畢后,連接沒有正常釋放吼砂,一段時(shí)間后產(chǎn)生大量的socket連接逆航,導(dǎo)致網(wǎng)絡(luò)堵塞。
2帅刊、Kafka Manager
雅虎開源的Kafka集群管理工具:
管理幾個(gè)不同的集群
監(jiān)控集群的狀態(tài)(topics, brokers, 副本分布, 分區(qū)分布)
產(chǎn)生分區(qū)分配(Generate partition assignments)基于集群的當(dāng)前狀態(tài)
重新分配分區(qū)
3纸泡、KafkaOffsetMonitor
KafkaOffsetMonitor可以實(shí)時(shí)監(jiān)控:
Kafka集群狀態(tài)
Topic、Consumer Group列表
圖形化展示topic和consumer之間的關(guān)系
圖形化展示consumer的Offset、Lag等信息
總結(jié):
通過使用女揭,個(gè)人總結(jié)以上三種監(jiān)控程序的優(yōu)缺點(diǎn):
Kafka Web Console:監(jiān)控功能較為全面蚤假,可以預(yù)覽消息,監(jiān)控Offset吧兔、Lag等信息磷仰,但存在bug,不建議在生產(chǎn)環(huán)境中使用境蔼。
Kafka Manager:偏向Kafka集群管理灶平,若操作不當(dāng),容易導(dǎo)致集群出現(xiàn)故障箍土。對(duì)Kafka實(shí)時(shí)生產(chǎn)和消費(fèi)消息是通過JMX實(shí)現(xiàn)的逢享。沒有記錄Offset、Lag等信息吴藻。
KafkaOffsetMonitor:程序一個(gè)jar包的形式運(yùn)行瞒爬,部署較為方便。只有監(jiān)控功能沟堡,使用起來也較為安全侧但。
若只需要監(jiān)控功能,推薦使用KafkaOffsetMonito航罗,若偏重Kafka集群管理禀横,推薦使用Kafka Manager。
因?yàn)槎际情_源程序粥血,穩(wěn)定性欠缺柏锄。故需先了解清楚目前已存在哪些Bug,多測試一下立莉,避免出現(xiàn)類似于Kafka Web Console的問題绢彤。
重度引用:
如何使用JMX監(jiān)控Kafka:
https://blog.csdn.net/u013256816/article/details/53524884https://blog.csdn.net/u013256816/article/details/53524884
Kafka三款監(jiān)控工具比較:
http://www.reibang.com/p/97ad87a933e1