聊聊skywalking的jvm-receiver-plugin

本文主要研究一下skywalking的jvm-receiver-plugin

JVMModuleProvider

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/JVMModuleProvider.java

public class JVMModuleProvider extends ModuleProvider {

    @Override public String name() {
        return "default";
    }

    @Override public Class<? extends ModuleDefine> module() {
        return JVMModule.class;
    }

    @Override public ModuleConfig createConfigBeanIfAbsent() {
        return null;
    }

    @Override public void prepare() {
    }

    @Override public void start() {
        GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
        grpcHandlerRegister.addHandler(new JVMMetricsServiceHandler(getManager()));
        grpcHandlerRegister.addHandler(new JVMMetricReportServiceHandler(getManager()));
    }

    @Override public void notifyAfterCompleted() {

    }

    @Override public String[] requiredModules() {
        return new String[] {CoreModule.NAME, SharingServerModule.NAME};
    }
}
  • JVMModuleProvider繼承了ModuleProvider颜及,其start方法獲取grpcHandlerRegister然后添加JVMMetricsServiceHandler好芭、JVMMetricReportServiceHandler

JVMMetricsService.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent/JVMMetricsService.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.language.agent";
option csharp_namespace = "SkyWalking.NetworkProtocol";

import "language-agent/Downstream.proto";
import "common/JVM.proto";

service JVMMetricsService {
    rpc collect (JVMMetrics) returns (Downstream) {
    }
}

message JVMMetrics {
    repeated JVMMetric metrics = 1;
    int32 applicationInstanceId = 2;
}
  • JVMMetricsService.proto定義了JVMMetricsService服務(wù)倍啥,它有一個collect方法接收JVMMetrics類型的參數(shù)

JVMMetricsServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricsServiceHandler.java

public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {

    private static final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);

    private final JVMSourceDispatcher jvmSourceDispatcher;

    public JVMMetricsServiceHandler(ModuleManager moduleManager) {
        this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
    }

    @Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
        int serviceInstanceId = request.getApplicationInstanceId();

        if (logger.isDebugEnabled()) {
            logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
        }

        request.getMetricsList().forEach(metrics -> {
            long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
            jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
        });

        responseObserver.onNext(Downstream.newBuilder().build());
        responseObserver.onCompleted();
    }

}
  • JVMMetricsServiceHandler繼承了JVMMetricsServiceGrpc.JVMMetricsServiceImplBase帜慢,其構(gòu)造器創(chuàng)建JVMSourceDispatcher裙盾;其collect方法遍歷request.getMetricsList()挨個執(zhí)行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMMetric.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent-v2/JVMMetric.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.language.agent.v2";
option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";
import "common/JVM.proto";

service JVMMetricReportService {
    rpc collect (JVMMetricCollection) returns (Commands) {
    }
}

message JVMMetricCollection {
    repeated JVMMetric metrics = 1;
    int32 serviceInstanceId = 2;
}
  • JVMMetric.proto定義了JVMMetricReportService服務(wù),它有一個collect方法接收JVMMetricCollection類型的參數(shù)

JVMMetricReportServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricReportServiceHandler.java

public class JVMMetricReportServiceHandler extends JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase implements GRPCHandler {

    private static final Logger logger = LoggerFactory.getLogger(JVMMetricReportServiceHandler.class);

    private final JVMSourceDispatcher jvmSourceDispatcher;

    public JVMMetricReportServiceHandler(ModuleManager moduleManager) {
        this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
    }

    @Override public void collect(JVMMetricCollection request, StreamObserver<Commands> responseObserver) {
        int serviceInstanceId = request.getServiceInstanceId();

        if (logger.isDebugEnabled()) {
            logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
        }

        request.getMetricsList().forEach(metrics -> {
            long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
            jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
        });

        responseObserver.onNext(Commands.newBuilder().build());
        responseObserver.onCompleted();
    }

}
  • JVMMetricReportServiceHandler繼承了JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase隘弊,其構(gòu)造器創(chuàng)建JVMSourceDispatcher拱镐;其collect方法遍歷request.getMetricsList()挨個執(zhí)行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMSourceDispatcher

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMSourceDispatcher.java

public class JVMSourceDispatcher {
    private static final Logger logger = LoggerFactory.getLogger(JVMSourceDispatcher.class);
    private final SourceReceiver sourceReceiver;
    private final ServiceInstanceInventoryCache instanceInventoryCache;

    public JVMSourceDispatcher(ModuleManager moduleManager) {
        this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
        this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
    }

    void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {
        ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);
        int serviceId;
        if (Objects.nonNull(serviceInstanceInventory)) {
            serviceId = serviceInstanceInventory.getServiceId();
        } else {
            logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
            return;
        }

        this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
        this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
        this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
        this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
    }

    private void sendToCpuMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, CPU cpu) {
        ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();
        serviceInstanceJVMCPU.setId(serviceInstanceId);
        serviceInstanceJVMCPU.setName(Const.EMPTY_STRING);
        serviceInstanceJVMCPU.setServiceId(serviceId);
        serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING);
        serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent());
        serviceInstanceJVMCPU.setTimeBucket(timeBucket);
        sourceReceiver.receive(serviceInstanceJVMCPU);
    }

    private void sendToGCMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, List<GC> gcs) {
        gcs.forEach(gc -> {
            ServiceInstanceJVMGC serviceInstanceJVMGC = new ServiceInstanceJVMGC();
            serviceInstanceJVMGC.setId(serviceInstanceId);
            serviceInstanceJVMGC.setName(Const.EMPTY_STRING);
            serviceInstanceJVMGC.setServiceId(serviceId);
            serviceInstanceJVMGC.setServiceName(Const.EMPTY_STRING);

            switch (gc.getPhrase()) {
                case NEW:
                    serviceInstanceJVMGC.setPhrase(GCPhrase.NEW);
                    break;
                case OLD:
                    serviceInstanceJVMGC.setPhrase(GCPhrase.OLD);
                    break;
            }

            serviceInstanceJVMGC.setTime(gc.getTime());
            serviceInstanceJVMGC.setCount(gc.getCount());
            serviceInstanceJVMGC.setTimeBucket(timeBucket);
            sourceReceiver.receive(serviceInstanceJVMGC);
        });
    }

    private void sendToMemoryMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,
        List<Memory> memories) {
        memories.forEach(memory -> {
            ServiceInstanceJVMMemory serviceInstanceJVMMemory = new ServiceInstanceJVMMemory();
            serviceInstanceJVMMemory.setId(serviceInstanceId);
            serviceInstanceJVMMemory.setName(Const.EMPTY_STRING);
            serviceInstanceJVMMemory.setServiceId(serviceId);
            serviceInstanceJVMMemory.setServiceName(Const.EMPTY_STRING);
            serviceInstanceJVMMemory.setHeapStatus(memory.getIsHeap());
            serviceInstanceJVMMemory.setInit(memory.getInit());
            serviceInstanceJVMMemory.setMax(memory.getMax());
            serviceInstanceJVMMemory.setUsed(memory.getUsed());
            serviceInstanceJVMMemory.setCommitted(memory.getCommitted());
            serviceInstanceJVMMemory.setTimeBucket(timeBucket);
            sourceReceiver.receive(serviceInstanceJVMMemory);
        });
    }

    private void sendToMemoryPoolMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,
        List<MemoryPool> memoryPools) {

        memoryPools.forEach(memoryPool -> {
            ServiceInstanceJVMMemoryPool serviceInstanceJVMMemoryPool = new ServiceInstanceJVMMemoryPool();
            serviceInstanceJVMMemoryPool.setId(serviceInstanceId);
            serviceInstanceJVMMemoryPool.setName(Const.EMPTY_STRING);
            serviceInstanceJVMMemoryPool.setServiceId(serviceId);
            serviceInstanceJVMMemoryPool.setServiceName(Const.EMPTY_STRING);

            switch (memoryPool.getType()) {
                case NEWGEN_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.NEWGEN_USAGE);
                    break;
                case OLDGEN_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.OLDGEN_USAGE);
                    break;
                case PERMGEN_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.PERMGEN_USAGE);
                    break;
                case SURVIVOR_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.SURVIVOR_USAGE);
                    break;
                case METASPACE_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.METASPACE_USAGE);
                    break;
                case CODE_CACHE_USAGE:
                    serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.CODE_CACHE_USAGE);
                    break;
            }

            serviceInstanceJVMMemoryPool.setInit(memoryPool.getInit());
            serviceInstanceJVMMemoryPool.setMax(memoryPool.getMax());
            serviceInstanceJVMMemoryPool.setUsed(memoryPool.getUsed());
            serviceInstanceJVMMemoryPool.setCommitted(memoryPool.getCommited());
            serviceInstanceJVMMemoryPool.setTimeBucket(timeBucket);
            sourceReceiver.receive(serviceInstanceJVMMemoryPool);
        });
    }
}
  • JVMSourceDispatcher主要是提供了sendMetric方法,該方法執(zhí)行sendToCpuMetricProcess饰豺、sendToMemoryMetricProcess亿鲜、sendToMemoryPoolMetricProcess、sendToGCMetricProcess方法冤吨;sendToCpuMetricProcess方法執(zhí)行sourceReceiver.receive(serviceInstanceJVMCPU)蒿柳;sendToMemoryMetricProcess方法執(zhí)行sourceReceiver.receive(serviceInstanceJVMMemory);sendToMemoryPoolMetricProcess方法執(zhí)行sourceReceiver.receive(serviceInstanceJVMMemoryPool)漩蟆;sendToGCMetricProcess方法執(zhí)行sourceReceiver.receive(serviceInstanceJVMGC)

小結(jié)

JVMModuleProvider繼承了ModuleProvider垒探,其start方法獲取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler怠李;前者使用的是JVMMetricsService.proto圾叼,后者使用的是agent-v2的JVMMetric.proto

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市捺癞,隨后出現(xiàn)的幾起案子夷蚊,更是在濱河造成了極大的恐慌,老刑警劉巖髓介,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惕鼓,死亡現(xiàn)場離奇詭異,居然都是意外死亡唐础,警方通過查閱死者的電腦和手機箱歧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來一膨,“玉大人呀邢,你說我怎么就攤上這事”鳎” “怎么了价淌?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我输钩,道長豺型,這世上最難降的妖魔是什么仲智? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任买乃,我火速辦了婚禮,結(jié)果婚禮上钓辆,老公的妹妹穿的比我還像新娘剪验。我一直安慰自己,他們只是感情好前联,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布功戚。 她就那樣靜靜地躺著,像睡著了一般似嗤。 火紅的嫁衣襯著肌膚如雪啸臀。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天烁落,我揣著相機與錄音乘粒,去河邊找鬼。 笑死伤塌,一個胖子當(dāng)著我的面吹牛灯萍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播每聪,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼旦棉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了药薯?” 一聲冷哼從身側(cè)響起绑洛,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎童本,沒想到半個月后真屯,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡巾陕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年讨跟,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鄙煤。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡晾匠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出梯刚,到底是詐尸還是另有隱情凉馆,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站澜共,受9級特大地震影響向叉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜嗦董,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一母谎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧京革,春花似錦奇唤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至廊勃,卻和暖如春懈贺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背坡垫。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工梭灿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人葛虐。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓胎源,卻偏偏與公主長得像,于是被迫代替她去往敵國和親屿脐。 傳聞我的和親對象是個殘疾皇子涕蚤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容