Spark metrics實現(xiàn)KafkaSink

背景

監(jiān)控是Spark非常重要的一部分鹉究。Spark的運行情況是由ListenerBus以及MetricsSystem 來完成的。通過Spark的Metrics系統(tǒng)亏拉,我們可以把Spark Metrics的收集到的信息發(fā)送到各種各樣的Sink耗溜,比如HTTP昆著、JMX以及CSV文件。
目前支持的Sink包括:

  • ConsoleSink
  • CSVSink
  • JmxSink
  • MetricsServlet
  • GraphiteSink
  • GangliaSink

有時我們需要實時獲取metrics數(shù)據(jù)通過spark分析展示等需求缸夹,這個時候若有個KafkaSink將metrics指標數(shù)據(jù)實時往kafka發(fā)送那就太方便了痪寻,故有了這篇博文。

實踐

所有的Sink都需要繼承Sink這個特質(zhì):

private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}

當該Sink注冊到metrics系統(tǒng)中時虽惭,會調(diào)用start方法進行一些初始化操作橡类,再通過report方式進行真正的輸出操作,stop方法可以進行一些連接關(guān)閉等操作芽唇。直接上代碼:

package org.apache.spark.metrics.sink

import java.util.concurrent.TimeUnit
import java.util.{Locale, Properties}

import com.codahale.metrics.MetricRegistry
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.spark.SecurityManager
import org.apache.spark.internal.Logging

private[spark] class KafkaSink(val property: Properties, val registry: MetricRegistry,
                               securityMgr: SecurityManager) extends Sink with Logging{

    val KAFKA_KEY_PERIOD = "period"
    val KAFKA_DEFAULT_PERIOD = 10

    val KAFKA_KEY_UNIT = "unit"
    val KAFKA_DEFAULT_UNIT = "SECONDS"

    val KAFKA_TOPIC = "topic"
    val KAFKA_DEFAULT_TOPIC = "kafka-sink-topic"

    val KAFAK_BROKERS = "kafka-brokers"
    val KAFAK_DEFAULT_BROKERS = "XXX:9092"

    val TOPIC = Option(property.getProperty(KAFKA_TOPIC)).getOrElse(KAFKA_DEFAULT_TOPIC)
    val BROKERS = Option(property.getProperty(KAFAK_BROKERS)).getOrElse(throw new IllegalStateException("kafka-brokers is null!"))

    private val kafkaProducerConfig = new Properties()
    kafkaProducerConfig.put("bootstrap.servers",BROKERS)
    kafkaProducerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProducerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    private val producer = new KafkaProducer[String, String](kafkaProducerConfig)

    private val reporter: KafkaReporter = KafkaReporter.forRegistry(registry)
        .topic(TOPIC)
        .build(producer)


    val pollPeriod = Option(property.getProperty(KAFKA_KEY_PERIOD)) match {
        case Some(s) => s.toInt
        case None => KAFKA_DEFAULT_PERIOD
    }

    val pollUnit: TimeUnit = Option(property.getProperty(KAFKA_KEY_UNIT)) match {
        case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
        case None => TimeUnit.valueOf(KAFKA_DEFAULT_UNIT)
    }

    override def start(): Unit = {
        log.info("I4 Metrics System KafkaSink Start ......")
        reporter.start(pollPeriod, pollUnit)
    }

    override def stop(): Unit = {
        log.info("I4 Metrics System KafkaSink Stop ......")
        reporter.stop()
        producer.close()
    }

    override def report(): Unit = {
        log.info("I4 Metrics System KafkaSink Report ......")
        reporter.report()
    }
}

KafkaReporter類:

package org.apache.spark.metrics.sink;

import com.alibaba.fastjson.JSONObject;
import com.codahale.metrics.*;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

public class KafkaReporter  extends ScheduledReporter  {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReporter.class);

    public static KafkaReporter.Builder forRegistry(MetricRegistry registry) {
        return new KafkaReporter.Builder(registry);
    }

    private KafkaProducer producer;
    private Clock clock;
    private String topic;

    private KafkaReporter(MetricRegistry registry,
                        TimeUnit rateUnit,
                        TimeUnit durationUnit,
                        MetricFilter filter,
                        Clock clock,
                        String topic,
                        KafkaProducer producer) {
        super(registry, "kafka-reporter", filter, rateUnit, durationUnit);
        this.producer = producer;
        this.topic = topic;
        this.clock = clock;
    }

    @Override
    public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
        final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());

        // Gauge
        for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
            reportGauge(timestamp,entry.getKey(), entry.getValue());
        }
        // Histogram
//        for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
//            reportHistogram(timestamp, entry.getKey(), entry.getValue());
//        }
    }


    private void reportGauge(long timestamp, String name, Gauge gauge) {
        report(timestamp, name, gauge.getValue());
    }

    private void reportHistogram(long timestamp, String name, Histogram histogram) {
        final Snapshot snapshot = histogram.getSnapshot();
        report(timestamp, name, snapshot.getMax());
    }

    private void report(long timestamp, String name,  Object values) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("name",name);
        jsonObject.put("timestamp",timestamp);
        jsonObject.put("value",values);
        producer.send(new ProducerRecord(topic,name, jsonObject.toJSONString()));
    }


    public static class Builder {

        private final MetricRegistry registry;
        private TimeUnit rateUnit;
        private TimeUnit durationUnit;
        private MetricFilter filter;
        private Clock clock;
        private String topic;

        private Builder(MetricRegistry registry) {
            this.registry = registry;
            this.rateUnit = TimeUnit.SECONDS;
            this.durationUnit = TimeUnit.MILLISECONDS;
            this.filter = MetricFilter.ALL;
            this.clock = Clock.defaultClock();
        }

        /**
         * Convert rates to the given time unit.
         *
         * @param rateUnit a unit of time
         * @return {@code this}
         */
        public KafkaReporter.Builder convertRatesTo(TimeUnit rateUnit) {
            this.rateUnit = rateUnit;
            return this;
        }

        /**
         * Convert durations to the given time unit.
         *
         * @param durationUnit a unit of time
         * @return {@code this}
         */
        public KafkaReporter.Builder convertDurationsTo(TimeUnit durationUnit) {
            this.durationUnit = durationUnit;
            return this;
        }

        /**
         * Use the given {@link Clock} instance for the time.
         *
         * @param clock a {@link Clock} instance
         * @return {@code this}
         */
        public Builder withClock(Clock clock) {
            this.clock = clock;
            return this;
        }

        /**
         * Only report metrics which match the given filter.
         *
         * @param filter a {@link MetricFilter}
         * @return {@code this}
         */
        public KafkaReporter.Builder filter(MetricFilter filter) {
            this.filter = filter;
            return this;
        }

        /**
         * Only report metrics which match the given filter.
         *
         * @param topic a
         * @return {@code this}
         */
        public KafkaReporter.Builder topic(String topic) {
            this.topic = topic;
            return this;
        }

        /**
         * Builds a {@link KafkaReporter} with the given properties, writing {@code .csv} files to the
         * given directory.
         *
         * @return a {@link KafkaReporter}
         */
        public KafkaReporter build(KafkaProducer producer) {
            return new KafkaReporter(registry,
                    rateUnit,
                    durationUnit,
                    filter,
                    clock,
                    topic,
                    producer);
        }
    }
}

其中的report方法就是獲取各種類型指標顾画,并進行對應(yīng)的輸出操作的時機。

如何使用

可在配置文件或者程序中設(shè)定需要注冊的sink,并帶上對應(yīng)的參數(shù)即可:

spark.metrics.conf.*.sink.kafka.class=org.apache.spark.metrics.sink.KafkaSink
spark.metrics.conf.*.sink.kafka.kafka-brokers=XXX:9092

我的GitHub

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末亲雪,一起剝皮案震驚了整個濱河市勇凭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌义辕,老刑警劉巖虾标,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異灌砖,居然都是意外死亡璧函,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門基显,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蘸吓,“玉大人,你說我怎么就攤上這事撩幽】饧蹋” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵窜醉,是天一觀的道長宪萄。 經(jīng)常有香客問我,道長榨惰,這世上最難降的妖魔是什么拜英? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮琅催,結(jié)果婚禮上居凶,老公的妹妹穿的比我還像新娘。我一直安慰自己藤抡,他們只是感情好侠碧,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著缠黍,像睡著了一般舆床。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嫁佳,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天挨队,我揣著相機與錄音,去河邊找鬼蒿往。 笑死盛垦,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的瓤漏。 我是一名探鬼主播腾夯,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼颊埃,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蝶俱?” 一聲冷哼從身側(cè)響起班利,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎榨呆,沒想到半個月后罗标,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡积蜻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年闯割,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片竿拆。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡宙拉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出丙笋,到底是詐尸還是另有隱情谢澈,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布御板,位于F島的核電站锥忿,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏稳吮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一井濒、第九天 我趴在偏房一處隱蔽的房頂上張望灶似。 院中可真熱鬧,春花似錦瑞你、人聲如沸酪惭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽春感。三九已至,卻和暖如春虏缸,著一層夾襖步出監(jiān)牢的瞬間鲫懒,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工刽辙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留窥岩,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓宰缤,卻偏偏與公主長得像颂翼,于是被迫代替她去往敵國和親晃洒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容