kafka性能監(jiān)控之KafkaMetrics Sensor

說起kafka的metrics,很多人應該是即陌生又熟悉,

熟悉是因為閱讀源碼的過程中,不可避免地會看到metrics.add()的代碼.而陌生是因為metrics僅僅只是輔助功能,并不是kafka主要邏輯的一部分,并不會引起讀者太多的關(guān)注.

在這里首先說明一個容易產(chǎn)生誤解的地方,不少文章說kafka使用yammers框架來實現(xiàn)性能監(jiān)控.這么說其實沒有問題,因為kafka確實通過yammers向外暴露了接口,可以通過jmx或者grahite來監(jiān)視各個性能參數(shù).但是kafka內(nèi)的性能監(jiān)控比如producer,consumer的配額限制,并不是通過yammer實現(xiàn)的.而是通過自己的一套metrics框架來實現(xiàn)的.

事實上,kafka有兩個metrics包,在看源碼的時候很容易混淆

package kafka.metrics

以及

package org.apache.kafka.common.metrics

可以看到這兩個包的包名都是metrics,但是他們負責的任務并不相同,而且兩個包中的類并沒有任何的互相引用關(guān)系.可以看作是兩個完全獨立的包.kafka.mtrics這個包,主要調(diào)用yammer的Api,并進行封裝,提供給client監(jiān)測kafka的各個性能參數(shù).而commons.metrics這個包是我這篇文章主要要介紹的,這個包并不是面向client提供服務的,他是為了給kafka中的其他組件,比如replicaManager,PartitionManager,QuatoManager提供調(diào)用,讓這些Manager了解kafka現(xiàn)在的運行狀況,以便作出相應決策的.

首先metrics第一次被初始化,在kafkaServer的startup()方法中

metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
quotaManagers = QuotaFactory.instantiate(config, metrics, time)

初始化了一個Metrics,并將這個實例傳到quotaManagers的構(gòu)造函數(shù)中,這里簡單介紹一下quotaManagers.這是kafka中用來限制kafka,producer的傳輸速度的,比如在config文件下設置producer不能以超過5MB/S的速度傳輸數(shù)據(jù),那么這個限制就是通過quotaManager來實現(xiàn)的.

回到metrics上,跟進代碼.

public class Metrics implements Closeable {
 ....
 ....
    private final ConcurrentMap<MetricName, KafkaMetric> metrics;
    private final ConcurrentMap<String, Sensor> sensors;

metrics與sensors這兩個concurrentMap是Metrics中兩個重要的成員屬性.那么什么是KafkaMetric,什么是Sensor呢?

首先分析KafkaMetric

KafkaMetric實現(xiàn)了Metric接口,可以看到它的核心方法value()返回要監(jiān)控的參數(shù)的值.

public interface Metric {

    /**
     * A name for this metric
     */
    public MetricName metricName();

    /**
     * The value of the metric
     */
    public double value();

}

那么KafkaMetric又是如何實現(xiàn)value()方法的呢?

@Override
public double value() {
    synchronized (this.lock) {
        return value(time.milliseconds());
    }
}

double value(long timeMs) {
    return this.measurable.measure(config, timeMs);
}

原來value()是通過kafkaMetric中的另一個成員屬性measurable完成

public interface Measurable {

    /**
     * Measure this quantity and return the result as a double
     * @param config The configuration for this metric
     * @param now The POSIX time in milliseconds the measurement is being taken
     * @return The measured value
     */
    public double measure(MetricConfig config, long now);

}

其實這邊挺繞的,Metrics有kafkaMetric的成員變量,而kafkaMetric又通過Measurable返回要檢測的值.打個比方,Metrics好比是汽車的儀表盤,kafkaMetric就是儀表盤上的一個儀表,Measurable就是對真正要檢測的組件的一個封裝.來看看一個Measrable的簡單實現(xiàn),在sender.java類中.

metrics.addMetric(m, new Measurable() {
    public double measure(MetricConfig config, long now) {
        return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
    }
});

可以看到measure的實現(xiàn)就是簡單地返回要返回的值,因為是直接在目標類中定義的,所以可以直接獲得相應變量的引用.

介紹完KafkaMetric,接下來介紹Sensor,也就是下面的ConcurrentMap中的Sensor

private final ConcurrentMap<String, Sensor> sensors;

以下是Sensor類的源碼

/**
 * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
 * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
 * of metrics about request sizes such as the average or max.
 */
public final class Sensor {
    //一個kafka就只有一個Metrics實例,這個registry就是對這個Metrics的引用
    private final Metrics registry;
    private final String name;
    private final Sensor[] parents;
    private final List<Stat> stats;
    private final List<KafkaMetric> metrics;

這一段的注釋很有意義,從注釋中可以看到Sensor的作用不同KafkaMetric. KafkaMetric僅僅是返回某一個參數(shù)的值,而Sensor有基于某一參數(shù)時間序列進行統(tǒng)計的功能,比如平均值,最大值,最小值.那這些統(tǒng)計又是如何實現(xiàn)的呢?答案是List<Stat> stats這個屬性成員.

public interface Stat {

    /**
     * Record the given value
     * @param config The configuration to use for this metric
     * @param value The value to record
     * @param timeMs The POSIX time in milliseconds this value occurred
     */
    public void record(MetricConfig config, double value, long timeMs);

}

可以看到Stat是一個接口,其中有一個record方法可以記錄一個采樣數(shù)值,下面看一個例子,max這個功能如何用Stat來實現(xiàn)?

public final class Max extends SampledStat {

    public Max() {
        super(Double.NEGATIVE_INFINITY);
    }

    @Override
    protected void update(Sample sample, MetricConfig config, double value, long now) {
        sample.value = Math.max(sample.value, value);
    }

    @Override
    public double combine(List<Sample> samples, MetricConfig config, long now) {
        double max = Double.NEGATIVE_INFINITY;
        for (int i = 0; i < samples.size(); i++)
            max = Math.max(max, samples.get(i).value);
        return max;
    }

}

是不是很簡單,update相當于冒一次泡,把當前的值與歷史的最大值比較.combine相當于用一次完整的冒泡排序找出最大值,需要注意的是,max是繼承SampleStat的,而SampleStat是Stat接口的實現(xiàn)類.那我們回到Sensor類上來.

public void record(double value, long timeMs) {
    this.lastRecordTime = timeMs;
    synchronized (this) {
        // increment all the stats
        for (int i = 0; i < this.stats.size(); i++)
            this.stats.get(i).record(config, value, timeMs);
        checkQuotas(timeMs);
    }
    for (int i = 0; i < parents.length; i++)
        parents[i].record(value, timeMs);
}

record方法,每個注冊于其中的stats提交值,同時如果自己有父sensor的話,向父sensor提交.

public void checkQuotas(long timeMs) {
    for (int i = 0; i < this.metrics.size(); i++) {
        KafkaMetric metric = this.metrics.get(i);
        MetricConfig config = metric.config();
        if (config != null) {
            Quota quota = config.quota();
            if (quota != null) {
                double value = metric.value(timeMs);
                if (!quota.acceptable(value)) {
                    throw new QuotaViolationException(
                        metric.metricName(),
                        value,
                        quota.bound());
                }
            }
        }
    }
}

checkQuotas,通過這里其實是遍歷注冊在sensor上的每一個KafkaMetric來檢查他們的值有沒有超過config文件中設置的配額.注意這里的QuotaVioLationException,是不是很熟悉.在QuatoManager中,如果有一個client的上傳/下載速度超過指定配額.那么就會拋出這個異常

try {
  clientSensors.quotaSensor.record(value)
  // trigger the callback immediately if quota is not violated
  callback(0)
} catch {
  case qve: QuotaViolationException =>
    // Compute the delay
    val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
    throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
    clientSensors.throttleTimeSensor.record(throttleTimeMs)
    // If delayed, add the element to the delayQueue
    delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
    delayQueueSensor.record()
    logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}

最后,Sensor會初始化一個線程專門用來清除長時間沒有使用的Sensor.這個線程名為"SensorExpiryThread"

class ExpireSensorTask implements Runnable {
    public void run() {
        for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) {
            // removeSensor also locks the sensor object. This is fine because synchronized is reentrant
            // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C.
            // Calling record on C would cause a record on P as well.
            // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed,
            // that will cause C to also get removed.
            // Since the expiration time is typically high it is not expected to be a significant concern
            // and thus not necessary to optimize
            synchronized (sensorEntry.getValue()) {
                if (sensorEntry.getValue().hasExpired()) {
                    log.debug("Removing expired sensor {}", sensorEntry.getKey());
                    removeSensor(sensorEntry.getKey());
                }
            }
        }
    }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子绊率,更是在濱河造成了極大的恐慌困肩,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件膨报,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機术奖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來轻绞,“玉大人采记,你說我怎么就攤上這事≌” “怎么了唧龄?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長奸远。 經(jīng)常有香客問我既棺,道長,這世上最難降的妖魔是什么懒叛? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任丸冕,我火速辦了婚禮,結(jié)果婚禮上薛窥,老公的妹妹穿的比我還像新娘胖烛。我一直安慰自己,他們只是感情好拆檬,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布洪己。 她就那樣靜靜地躺著,像睡著了一般竟贯。 火紅的嫁衣襯著肌膚如雪答捕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天屑那,我揣著相機與錄音拱镐,去河邊找鬼艘款。 笑死,一個胖子當著我的面吹牛沃琅,可吹牛的內(nèi)容都是我干的哗咆。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼益眉,長吁一口氣:“原來是場噩夢啊……” “哼晌柬!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起郭脂,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤年碘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后展鸡,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屿衅,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年莹弊,在試婚紗的時候發(fā)現(xiàn)自己被綠了涤久。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡忍弛,死狀恐怖响迂,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情剧罩,我是刑警寧澤栓拜,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站惠昔,受9級特大地震影響幕与,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜镇防,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一啦鸣、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧来氧,春花似錦诫给、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至扑毡,卻和暖如春胃榕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瞄摊。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工勋又, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留苦掘,地道東北人。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓楔壤,卻偏偏與公主長得像鹤啡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蹲嚣,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容