說起kafka的metrics,很多人應該是即陌生又熟悉,
熟悉是因為閱讀源碼的過程中,不可避免地會看到metrics.add()的代碼.而陌生是因為metrics僅僅只是輔助功能,并不是kafka主要邏輯的一部分,并不會引起讀者太多的關(guān)注.
在這里首先說明一個容易產(chǎn)生誤解的地方,不少文章說kafka使用yammers框架來實現(xiàn)性能監(jiān)控.這么說其實沒有問題,因為kafka確實通過yammers向外暴露了接口,可以通過jmx或者grahite來監(jiān)視各個性能參數(shù).但是kafka內(nèi)的性能監(jiān)控比如producer,consumer的配額限制,并不是通過yammer實現(xiàn)的.而是通過自己的一套metrics框架來實現(xiàn)的.
事實上,kafka有兩個metrics包,在看源碼的時候很容易混淆
package kafka.metrics
以及
package org.apache.kafka.common.metrics
可以看到這兩個包的包名都是metrics,但是他們負責的任務并不相同,而且兩個包中的類并沒有任何的互相引用關(guān)系.可以看作是兩個完全獨立的包.kafka.mtrics這個包,主要調(diào)用yammer的Api,并進行封裝,提供給client監(jiān)測kafka的各個性能參數(shù).而commons.metrics這個包是我這篇文章主要要介紹的,這個包并不是面向client提供服務的,他是為了給kafka中的其他組件,比如replicaManager,PartitionManager,QuatoManager提供調(diào)用,讓這些Manager了解kafka現(xiàn)在的運行狀況,以便作出相應決策的.
首先metrics第一次被初始化,在kafkaServer的startup()方法中
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
初始化了一個Metrics,并將這個實例傳到quotaManagers的構(gòu)造函數(shù)中,這里簡單介紹一下quotaManagers.這是kafka中用來限制kafka,producer的傳輸速度的,比如在config文件下設置producer不能以超過5MB/S的速度傳輸數(shù)據(jù),那么這個限制就是通過quotaManager來實現(xiàn)的.
回到metrics上,跟進代碼.
public class Metrics implements Closeable {
....
....
private final ConcurrentMap<MetricName, KafkaMetric> metrics;
private final ConcurrentMap<String, Sensor> sensors;
metrics與sensors這兩個concurrentMap是Metrics中兩個重要的成員屬性.那么什么是KafkaMetric,什么是Sensor呢?
首先分析KafkaMetric
KafkaMetric實現(xiàn)了Metric接口,可以看到它的核心方法value()返回要監(jiān)控的參數(shù)的值.
public interface Metric {
/**
* A name for this metric
*/
public MetricName metricName();
/**
* The value of the metric
*/
public double value();
}
那么KafkaMetric又是如何實現(xiàn)value()方法的呢?
@Override
public double value() {
synchronized (this.lock) {
return value(time.milliseconds());
}
}
double value(long timeMs) {
return this.measurable.measure(config, timeMs);
}
原來value()是通過kafkaMetric中的另一個成員屬性measurable完成
public interface Measurable {
/**
* Measure this quantity and return the result as a double
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
* @return The measured value
*/
public double measure(MetricConfig config, long now);
}
其實這邊挺繞的,Metrics有kafkaMetric的成員變量,而kafkaMetric又通過Measurable返回要檢測的值.打個比方,Metrics好比是汽車的儀表盤,kafkaMetric就是儀表盤上的一個儀表,Measurable就是對真正要檢測的組件的一個封裝.來看看一個Measrable的簡單實現(xiàn),在sender.java類中.
metrics.addMetric(m, new Measurable() {
public double measure(MetricConfig config, long now) {
return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
}
});
可以看到measure的實現(xiàn)就是簡單地返回要返回的值,因為是直接在目標類中定義的,所以可以直接獲得相應變量的引用.
介紹完KafkaMetric,接下來介紹Sensor,也就是下面的ConcurrentMap中的Sensor
private final ConcurrentMap<String, Sensor> sensors;
以下是Sensor類的源碼
/**
* A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
* message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
* of metrics about request sizes such as the average or max.
*/
public final class Sensor {
//一個kafka就只有一個Metrics實例,這個registry就是對這個Metrics的引用
private final Metrics registry;
private final String name;
private final Sensor[] parents;
private final List<Stat> stats;
private final List<KafkaMetric> metrics;
這一段的注釋很有意義,從注釋中可以看到Sensor的作用不同KafkaMetric. KafkaMetric僅僅是返回某一個參數(shù)的值,而Sensor有基于某一參數(shù)時間序列進行統(tǒng)計的功能,比如平均值,最大值,最小值.那這些統(tǒng)計又是如何實現(xiàn)的呢?答案是List<Stat> stats這個屬性成員.
public interface Stat {
/**
* Record the given value
* @param config The configuration to use for this metric
* @param value The value to record
* @param timeMs The POSIX time in milliseconds this value occurred
*/
public void record(MetricConfig config, double value, long timeMs);
}
可以看到Stat是一個接口,其中有一個record方法可以記錄一個采樣數(shù)值,下面看一個例子,max這個功能如何用Stat來實現(xiàn)?
public final class Max extends SampledStat {
public Max() {
super(Double.NEGATIVE_INFINITY);
}
@Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
sample.value = Math.max(sample.value, value);
}
@Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
double max = Double.NEGATIVE_INFINITY;
for (int i = 0; i < samples.size(); i++)
max = Math.max(max, samples.get(i).value);
return max;
}
}
是不是很簡單,update相當于冒一次泡,把當前的值與歷史的最大值比較.combine相當于用一次完整的冒泡排序找出最大值,需要注意的是,max是繼承SampleStat的,而SampleStat是Stat接口的實現(xiàn)類.那我們回到Sensor類上來.
public void record(double value, long timeMs) {
this.lastRecordTime = timeMs;
synchronized (this) {
// increment all the stats
for (int i = 0; i < this.stats.size(); i++)
this.stats.get(i).record(config, value, timeMs);
checkQuotas(timeMs);
}
for (int i = 0; i < parents.length; i++)
parents[i].record(value, timeMs);
}
record方法,每個注冊于其中的stats提交值,同時如果自己有父sensor的話,向父sensor提交.
public void checkQuotas(long timeMs) {
for (int i = 0; i < this.metrics.size(); i++) {
KafkaMetric metric = this.metrics.get(i);
MetricConfig config = metric.config();
if (config != null) {
Quota quota = config.quota();
if (quota != null) {
double value = metric.value(timeMs);
if (!quota.acceptable(value)) {
throw new QuotaViolationException(
metric.metricName(),
value,
quota.bound());
}
}
}
}
}
checkQuotas,通過這里其實是遍歷注冊在sensor上的每一個KafkaMetric來檢查他們的值有沒有超過config文件中設置的配額.注意這里的QuotaVioLationException,是不是很熟悉.在QuatoManager中,如果有一個client的上傳/下載速度超過指定配額.那么就會拋出這個異常
try {
clientSensors.quotaSensor.record(value)
// trigger the callback immediately if quota is not violated
callback(0)
} catch {
case qve: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
delayQueueSensor.record()
logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
最后,Sensor會初始化一個線程專門用來清除長時間沒有使用的Sensor.這個線程名為"SensorExpiryThread"
class ExpireSensorTask implements Runnable {
public void run() {
for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) {
// removeSensor also locks the sensor object. This is fine because synchronized is reentrant
// There is however a minor race condition here. Assume we have a parent sensor P and child sensor C.
// Calling record on C would cause a record on P as well.
// So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed,
// that will cause C to also get removed.
// Since the expiration time is typically high it is not expected to be a significant concern
// and thus not necessary to optimize
synchronized (sensorEntry.getValue()) {
if (sensorEntry.getValue().hasExpired()) {
log.debug("Removing expired sensor {}", sensorEntry.getKey());
removeSensor(sensorEntry.getKey());
}
}
}
}