Skywalking:源碼閱覽

一驳阎、Compile

編譯源碼請查看源碼中的文檔 How-to-build.md

  1. 首先確保你Maven的版本為3.6+捌议,然后源碼根目錄執(zhí)行下面命令

    mvn clean package -DskipTests
    
  1. 編譯成功后,將工程導(dǎo)入到 IDEA中昏名,因為里面用到了proto泛释,默認情況下IDE是識別不到這些代碼的衫哥,如果你使用的是 IDEA工具圃伶,請安裝下面插件。

    確保target目錄沒有被隱藏毙芜,如果隱藏了忽媒,可通過 Setting - Editor - File Types 將 target 清除

image-20210813162204334.png

二、OAP

OAP是SkyWalkingAgent 的服務(wù)端腋粥,啟動一個基于H2存儲簡單的OAP服務(wù)器晦雨,運行 oap-server/server-starter中OAPServerStartUp類。

可以看到OAPServerStartUp類又調(diào)用了一個 OAPServerBootstrap 的類隘冲,這個類才是真正的入口類闹瞧。

public class OAPServerStartUp {
    public static void main(String[] args) {
        OAPServerBootstrap.start();
    }
}

OAPServerBootstrap.start()方法主要工作就是解析application.yml文件,加載配置展辞,初始化各個模塊奥邮。

public class OAPServerBootstrap {
    public static void start() {
        String mode = System.getProperty("mode");
        RunningMode.setMode(mode);

        ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
        ModuleManager manager = new ModuleManager();
        try {
            // 加載 application.yml 中的配置
            ApplicationConfiguration applicationConfiguration = configLoader.load();
            // 加載所有 Module 的實現(xiàn)類
            manager.init(applicationConfiguration);

            // 加載 Prometheus指標收集器
            manager.find(TelemetryModule.NAME)
                   .provider()
                   .getService(MetricsCreator.class)
                   .createGauge("uptime", "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
                   // Set uptime to second
                   .setValue(System.currentTimeMillis() / 1000d);

            if (RunningMode.isInitMode()) {
                log.info("OAP starts up in init mode successfully, exit now...");
                System.exit(0);
            }
        } catch (Throwable t) {
            log.error(t.getMessage(), t);
            System.exit(1);
        }
    }
}

上面的 manager.init() 方法,加載模塊時罗珍,采用 Java SPI 機制洽腺,讀取每個模塊的 META-INF.services 下配置,獲取實現(xiàn)類靡砌。

注意:值為 -的模塊不會被加載已脓,例如下面模塊將不會被加載珊楼,如果不注意的話會讓你抓狂通殃。詳細請看我的文章 《Skywalking:定制化》node-exporter章節(jié)。

prometheus-fetcher:
  selector: ${SW_PROMETHEUS_FETCHER:-}
public void init(
    ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
    // 獲取所有可用模塊列表
    String[] moduleNames = applicationConfiguration.moduleList();
    // 加載ModuleDefine的實現(xiàn)類
    ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
    // 加載ModuleProvider的實現(xiàn)類
    ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);

    HashSet<String> moduleSet = new HashSet<>(Arrays.asList(moduleNames));
    for (ModuleDefine module : moduleServiceLoader) {
        if (moduleSet.contains(module.name())) {
            module.prepare(this, applicationConfiguration.getModuleConfiguration(module.name()), moduleProviderLoader);
            loadedModules.put(module.name(), module);
            moduleSet.remove(module.name());
        }
    }
    // Finish prepare stage
    isInPrepareStage = false;

    if (moduleSet.size() > 0) {
        throw new ModuleNotFoundException(moduleSet.toString() + " missing.");
    }

    BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);

    bootstrapFlow.start(this);
    bootstrapFlow.notifyAfterCompleted();
}

三厕宗、 Agent

Agent 作為代理端插件 画舌,協(xié)助 OAP 收集應(yīng)用的日志,作為一個字節(jié)碼插件 skywalking-agent.jar 完成這個工作已慢,上面提到曲聂,當業(yè)務(wù)應(yīng)用端部署時除了這個jar包,還需依賴 agent.config配置和其他插件 jar包佑惠。

  public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
        final PluginFinder pluginFinder;
      
        //1.  初始化配置文件 agent.config,會在和skywalking-agent.jar同目錄的config下查找
        try {
            SnifferConfigInitializer.initializeCoreConfig(agentArgs);
        } catch (Exception e) {
            // try to resolve a new logger, and use the new logger to write the error log here
            LogManager.getLogger(SkyWalkingAgent.class)
                    .error(e, "SkyWalking agent initialized failure. Shutting down.");
            return;
        } finally {
            // refresh logger again after initialization finishes
            LOGGER = LogManager.getLogger(SkyWalkingAgent.class);
        }

        //2. 第一步: 讀取agent.config的plugin.mount參數(shù)朋腋,從plugins,activations兩個目錄下加載插件
        //   第二步: 每個插件下面都有一個skywalking-plugin.def文件(SPI模式)齐疙,定義了此插件監(jiān)控的各個點的類     
        try {
            pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
        } catch (AgentPackageNotFoundException ape) {
            LOGGER.error(ape, "Locate agent.jar failure. Shutting down.");
            return;
        } catch (Exception e) {
            LOGGER.error(e, "SkyWalking agent initialized failure. Shutting down.");
            return;
        }

        //3. 初始化Byte Buddy字節(jié)碼工具,此工具是新一代的字節(jié)碼工具旭咽,性能優(yōu)于ASM贞奋、javasisst
        final ByteBuddy byteBuddy = new ByteBuddy().with(TypeValidation.of(Config.Agent.IS_OPEN_DEBUGGING_CLASS));
        AgentBuilder agentBuilder = new AgentBuilder.Default(byteBuddy).ignore(
                nameStartsWith("net.bytebuddy.")
                        .or(nameStartsWith("org.slf4j."))
                        .or(nameStartsWith("org.groovy."))
                        .or(nameContains("javassist"))
                        .or(nameContains(".asm."))
                        .or(nameContains(".reflectasm."))
                        .or(nameStartsWith("sun.reflect"))
                        .or(allSkyWalkingAgentExcludeToolkit())
                        .or(ElementMatchers.isSynthetic()));

        JDK9ModuleExporter.EdgeClasses edgeClasses = new JDK9ModuleExporter.EdgeClasses();
        try {
            agentBuilder = BootstrapInstrumentBoost.inject(pluginFinder, instrumentation, agentBuilder, edgeClasses);
        } catch (Exception e) {
            LOGGER.error(e, "SkyWalking agent inject bootstrap instrumentation failure. Shutting down.");
            return;
        }

        try {
            agentBuilder = JDK9ModuleExporter.openReadEdge(instrumentation, agentBuilder, edgeClasses);
        } catch (Exception e) {
            LOGGER.error(e, "SkyWalking agent open read edge in JDK 9+ failure. Shutting down.");
            return;
        }

        if (Config.Agent.IS_CACHE_ENHANCED_CLASS) {
            try {
                agentBuilder = agentBuilder.with(new CacheableTransformerDecorator(Config.Agent.CLASS_CACHE_MODE));
                LOGGER.info("SkyWalking agent class cache [{}] activated.", Config.Agent.CLASS_CACHE_MODE);
            } catch (Exception e) {
                LOGGER.error(e, "SkyWalking agent can't active class cache.");
            }
        }

        // 4. 初始化Agent工具
        agentBuilder.type(pluginFinder.buildMatch())
                    .transform(new Transformer(pluginFinder))
                    .with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
                    .with(new RedefinitionListener())
                    .with(new Listener())
                    .installOn(instrumentation);

        // 5. 啟動所有插件服務(wù),每個插件服務(wù)必須實現(xiàn)BootService接口類
        try {
            ServiceManager.INSTANCE.boot();
        } catch (Exception e) {
            LOGGER.error(e, "Skywalking agent boot failure.");
        }

        Runtime.getRuntime()
                .addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));
    }

上面第5 步中 服務(wù)類穷绵,如果帶@DefaultImplementor注解就是缺省的實現(xiàn)轿塔;如果帶@OverrideImplementor注解,則會覆蓋注解中的指定的某個實現(xiàn)類仲墨,從而產(chǎn)生一個新的實現(xiàn)類

@OverrideImplementor(TraceSegmentServiceClient.class)
public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, KafkaConnectionStatusListener {
    private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);

ServiceManager類勾缭,用來管理所有插件

 private Map<Class, BootService> loadAllServices() {
        Map<Class, BootService> bootedServices = new LinkedHashMap<>();
        List<BootService> allServices = new LinkedList<>();
        load(allServices);
        for (final BootService bootService : allServices) {
            Class<? extends BootService> bootServiceClass = bootService.getClass();
            boolean isDefaultImplementor = bootServiceClass.isAnnotationPresent(DefaultImplementor.class);
            if (isDefaultImplementor) {//有@DefaultImplementor注解
                if (!bootedServices.containsKey(bootServiceClass)) {
                    bootedServices.put(bootServiceClass, bootService);
                } else {
                    //ignore the default service
                }
            } else {//有@OverrideImplementor注解
                OverrideImplementor overrideImplementor = bootServiceClass.getAnnotation(OverrideImplementor.class);
                if (overrideImplementor == null) {
                    if (!bootedServices.containsKey(bootServiceClass)) {
                        bootedServices.put(bootServiceClass, bootService);
                    } else {
                        throw new ServiceConflictException("Duplicate service define for :" + bootServiceClass);
                    }
                }

發(fā)送數(shù)據(jù)

大概分三類發(fā)送數(shù)據(jù)的客戶端:

  1. 服務(wù)類客戶端,如ServiceManagementClient

  2. 跟蹤類客戶端目养,如TraceSegmentServiceClient

  3. 指標類客戶端俩由,如MeterSender。

這三類客戶端除前面講的實現(xiàn)了BootService外混稽,還實現(xiàn)了GRPCChannelListener接口采驻,這個接口大概作用就是創(chuàng)建用于和服務(wù)端通信的通道。大概流程是在調(diào)用ServiceManager.boot()方法時匈勋,會調(diào)用每個服務(wù)的prepare()方法礼旅,收集通道監(jiān)聽器,然后遍歷每個通道的statusChanged方法洽洁,初始化一個XxxServiceStub服務(wù)存根類痘系,此類是由Protobuf序列化而成。

//實現(xiàn)了 GRPCChannelListener 類的prepare()饿自、statusChanged()方法
@DefaultImplementor
public class MeterSender implements BootService, GRPCChannelListener {
    private static final ILog LOGGER = LogManager.getLogger(MeterSender.class);
    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
    private volatile MeterReportServiceGrpc.MeterReportServiceStub meterReportServiceStub;

    @Override
    public void prepare() {
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
    }

    public void send(Map<MeterId, BaseMeter> meterMap, MeterService meterService) {
        if (status == GRPCChannelStatus.CONNECTED) {
            StreamObserver<MeterData> reportStreamObserver = null;
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            try {
                reportStreamObserver = meterReportServiceStub.withDeadlineAfter(
               ......
    }
                    
    @Override
    public void statusChanged(final GRPCChannelStatus status) {
        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
            meterReportServiceStub = MeterReportServiceGrpc.newStub(channel);
        } else {
            meterReportServiceStub = null;
        }
        this.status = status;
    }
}

Agent Instance
什么是Agent Instance呢汰翠?每通過skywalking-agent.jar啟動一個應(yīng)用,就會在OAP服務(wù)上注冊一個服務(wù)昭雌,而這時就會在Agent端產(chǎn)生一個 Agent實例复唤,它負責(zé)向OAP上報各種信息。ServiceManagementClient類就是做這些工作的烛卧,除了發(fā)送應(yīng)用跟蹤信息外佛纫,還會向OAP發(fā)送心跳等信息。

@DefaultImplementor
public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener {
    private static final ILog LOGGER = LogManager.getLogger(ServiceManagementClient.class);
    private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES;

    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
    private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub;
    private volatile ScheduledFuture<?> heartbeatFuture;
    private volatile AtomicInteger sendPropertiesCounter = new AtomicInteger(0);

    @Override
    public void statusChanged(GRPCChannelStatus status) {
        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
            managementServiceBlockingStub = ManagementServiceGrpc.newBlockingStub(channel);
        } else {
            managementServiceBlockingStub = null;
        }
        this.status = status;
    }

    @Override
    public void prepare() {
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);

        SERVICE_INSTANCE_PROPERTIES = new ArrayList<>();

        for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
            SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
                                                              .setKey(key)
                                                              .setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
                                                              .build());
        }
    }

    @Override
    public void boot() {
        heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
            new DefaultNamedThreadFactory("ServiceManagementClient")
        ).scheduleAtFixedRate(
            new RunnableWithExceptionProtection(
                this,
                t -> LOGGER.error("unexpected exception.", t)
            ), 0, Config.Collector.HEARTBEAT_PERIOD,
            TimeUnit.SECONDS
        );
    }

    @Override
    public void onComplete() {
    }

    @Override
    public void shutdown() {
        heartbeatFuture.cancel(true);
    }

    @Override
    public void run() {
        LOGGER.debug("ServiceManagementClient running, status:{}.", status);

        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            try {
                if (managementServiceBlockingStub != null) {
                    if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {

                        managementServiceBlockingStub
                            .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                            .reportInstanceProperties(InstanceProperties.newBuilder()
                                                                        .setService(Config.Agent.SERVICE_NAME)
                                                                        .setServiceInstance(Config.Agent.INSTANCE_NAME)
                                                                        .addAllProperties(OSUtil.buildOSInfo(
                                                                            Config.OsInfo.IPV4_LIST_SIZE))
                                                                        .addAllProperties(SERVICE_INSTANCE_PROPERTIES)
                                                                        .build());
                    } else {
                        final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
                            GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
                        ).keepAlive(InstancePingPkg.newBuilder()
                                                   .setService(Config.Agent.SERVICE_NAME)
                                                   .setServiceInstance(Config.Agent.INSTANCE_NAME)
                                                   .build());

                        ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                    }
                }
            } catch (Throwable t) {
                LOGGER.error(t, "ServiceManagementClient execute fail.");
                ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
            }
        }
    }
}

四总放、 Plugins

Skywalking中有大量的插件呈宇,正是這些插件幫助Skywalking建立起一個龐大的可觀察森林。每個插件都有兩個必須的類:

  • 攔截點類(XxxInstrumentation)局雄,在那些方法上干活甥啄。
  • 攔截器類(XxxInterceptor),具體干活的類炬搭。

攔截點

下面以Dubbo插件為例蜈漓,DubboInstrumentation攔截點實現(xiàn)了三個方法:enhanceClass(增強類)穆桂、getConstructorsInterceptPoints(攔截構(gòu)造方法)、getInstanceMethodsInterceptPoints(攔截實例方法)

public class DubboInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
    //被攔截的類融虽,也就是需要增強的類
    private static final String ENHANCE_CLASS = "com.alibaba.dubbo.monitor.support.MonitorFilter";
    //攔截器類
    private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.dubbo.DubboInterceptor";

    @Override
    protected ClassMatch enhanceClass() {
        return NameMatch.byName(ENHANCE_CLASS);
    }

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return null;
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new InstanceMethodsInterceptPoint() {
                // 獲取匹配到的攔截方法充尉,WitnessClass
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("invoke");
                }

                // 返回攔截器
                @Override
                public String getMethodsInterceptor() {
                    return INTERCEPT_CLASS;
                }

                // 是否要對原方法參數(shù)做修改
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
}

如何解決多版本問題

當應(yīng)用存在多個版本時,Skywalking為應(yīng)用的每個版本都寫一個相對應(yīng)插件版本衣形,例如針對 spring mvc不同版本有以下插件:

spring3.x => mvc-annotation-3.x-plugin
spring4.x => mvc-annotation-4.x-plugin
spring5.x => mvc-annotation-5.x-plugin

那么有個問題來了驼侠,比如我用spring4.x時,這時會把3個插件都加載上谆吴,如何值加載 mvc-annotation-4.x-plugin插件呢倒源?這時用到了一個技術(shù)叫 WitnessClass,原理很簡單句狼,就是在每個插件中定義 WITHNESS_CLASSES變量笋熬,并定義一個能區(qū)分出版本獨特的類。然后在應(yīng)用啟動時腻菇,在加載類集合里是否能找到WITHNESS_CLASSES變量定義的類胳螟,如果找對了,那么也找到了對應(yīng)的版本的插件筹吐。

攔截器

DubboInterceptor用來定義對增強類做什么處理

public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
    /**
     * <h2>Consumer:</h2> The serialized trace context data will
     * inject to the {@link RpcContext#attachments} for transport to provider side.
     * <p>
     * <h2>Provider:</h2> The serialized trace context data will extract from
     * {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null.
     */
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        Invoker invoker = (Invoker) allArguments[0];
        Invocation invocation = (Invocation) allArguments[1];
        RpcContext rpcContext = RpcContext.getContext();
        boolean isConsumer = rpcContext.isConsumerSide();
        URL requestURL = invoker.getUrl();

        AbstractSpan span;

        final String host = requestURL.getHost();
        final int port = requestURL.getPort();
        if (isConsumer) {
            final ContextCarrier contextCarrier = new ContextCarrier();
            span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
            //invocation.getAttachments().put("contextData", contextDataStr);
            //@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
                if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                    invocation.getAttachments().remove(next.getHeadKey());
                }
            }
        } else {
            ContextCarrier contextCarrier = new ContextCarrier();
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
            }

            span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
        }

        Tags.URL.set(span, generateRequestURL(requestURL, invocation));
        span.setComponent(ComponentsDefine.DUBBO);
        SpanLayer.asRPCFramework(span);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        Result result = (Result) ret;
        if (result != null && result.getException() != null) {
            dealException(result.getException());
        }

        ContextManager.stopSpan();
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t) {
        dealException(t);
    }

    /**
     * Log the throwable, which occurs in Dubbo RPC service.
     */
    private void dealException(Throwable throwable) {
        AbstractSpan span = ContextManager.activeSpan();
        span.log(throwable);
    }

    /**
     * Format operation name. e.g. org.apache.skywalking.apm.plugin.test.Test.test(String)
     *
     * @return operation name.
     */
    private String generateOperationName(URL requestURL, Invocation invocation) {
        StringBuilder operationName = new StringBuilder();
        String groupStr = requestURL.getParameter(Constants.GROUP_KEY);
        groupStr = StringUtil.isEmpty(groupStr) ? "" : groupStr + "/";
        operationName.append(groupStr);
        operationName.append(requestURL.getPath());
        operationName.append("." + invocation.getMethodName() + "(");
        for (Class<?> classes : invocation.getParameterTypes()) {
            operationName.append(classes.getSimpleName() + ",");
        }

        if (invocation.getParameterTypes().length > 0) {
            operationName.delete(operationName.length() - 1, operationName.length());
        }

        operationName.append(")");

        return operationName.toString();
    }

    /**
     * Format request url. e.g. dubbo://127.0.0.1:20880/org.apache.skywalking.apm.plugin.test.Test.test(String).
     *
     * @return request url.
     */
    private String generateRequestURL(URL url, Invocation invocation) {
        StringBuilder requestURL = new StringBuilder();
        requestURL.append(url.getProtocol() + "://");
        requestURL.append(url.getHost());
        requestURL.append(":" + url.getPort() + "/");
        requestURL.append(generateOperationName(url, invocation));
        return requestURL.toString();
    }
}

五糖耸、GraphQL

GraphQL 是一種新的API 查詢語言,由Facebook開源丘薛,對前端提供少許接口嘉竟,就可以查詢整個系統(tǒng)的數(shù)據(jù),且可以實現(xiàn)按需返回洋侨。

后臺需要先定義一個 schema.graphqls的文件舍扰,下面定義了兩個接口:

  • findAuthorById 返回 Author對象,其中這個對象又關(guān)聯(lián)了 Book對象
  • saveBook 保存Book接口希坚,它的輸入?yún)?shù)由 input 來定義
#查詢接口
type Query {
    findAuthorById(id:Long!): Author
}

# 更新接口
type Mutation {
    saveBook(input: BookInput!) : Book!
}

# Author 對象
type Author {
    #作者Id
    id: Long!
    #創(chuàng)建時間
    createdTime: String
    #名
    firstName: String
    #姓
    lastName: String
    #該作者的所有書籍
    books: [Book]
}

# 輸入?yún)?shù)
input BookInput {
    title: String!
    isbn: String!
    pageCount: Int
    authorId: Long
}

# Book對象
type Book {
    id: Long!
    title: String!
    isbn: String!
    pageCount: Int
    author: Author
}

對于查詢類接口和 更新類接口繼承的父類是不一樣的边苹,比如查詢是繼承GraphQLQueryResolver,更新是繼承GraphQLMutationResolver

@Component
@AllArgsConstructor
public class Query implements GraphQLQueryResolver {
    private AuthorRepo authorRepo;

    public Author findAuthorById(Long id) {
        return authorRepo.findAuthorById(id);
    }
}
@Component
@AllArgsConstructor
public class Mutation implements GraphQLMutationResolver {
    private BookRepo bookRepo;
 
    public Book saveBook(BookInput input) {
        Book book = new Book();
        book.setTitle(input.getTitle());
        book.setIsbn(input.getIsbn());
        book.setPageCount(input.getPageCount());
        book.setAuthorId(input.getAuthorId());
        return bookRepo.save(book);
    }
}

后臺定義好了GraphQL 接口裁僧,前端請求 http://x.x.x.x/graphql 个束,輸入請求參數(shù):

{
  findAuthorById(id: 3){
     id
     firstName
     lastName
  }
}

在Skywalking中,有一個插件: query-graphql-plugin锅知,就相當于一個graphql播急,支撐整個 UI的查詢脓钾,訪問前端 Web時售睹,F(xiàn)12看下發(fā)現(xiàn)調(diào)用的都是 …/graphql 的接口。

六可训、OpenTelemeTry

在了解OpenTelemeTry之前昌妹,先說說 OpenTracing 和 OpenCensus

OpenTracing:是CNCF的第三個項目捶枢,其目的是制定一套標準的分布式追蹤協(xié)議,可謂是Tracing界的slf4j飞崖。

OpenCensus: 除了OpenTracing 鏈路跟蹤外烂叔,還有一類 Metrics監(jiān)控指標也是經(jīng)常用到的,例如cpu固歪、內(nèi)存蒜鸡、硬盤、網(wǎng)絡(luò)請求延遲牢裳、錯誤率逢防、用戶數(shù)、訪問數(shù)蒲讯、訂單數(shù)等各類指標忘朝。沒錯OpenCensus就是將鏈路跟蹤和指標監(jiān)控都囊括了。

所謂分久必合判帮,OpenTelemetry的橫空出世局嘁,結(jié)束了監(jiān)控界的紛爭亂世,并以“可觀察性”全新定義了監(jiān)控技術(shù)晦墙,重塑了監(jiān)控規(guī)范悦昵,實現(xiàn)了Metrics、Tracing晌畅、Logging 的大融合旱捧,儼然成了監(jiān)控界的大佬。

image-20210816162849338.png

Skywalking收集 VM (操作系統(tǒng))指標踩麦,沒有自己實現(xiàn)枚赡,是通過 prometheus 的 node-exporter 完成的,并借助OpenTelemeTry 將Skywalking 和 prometheus 整合到一塊谓谦。


image.png

otel-collector-config.yaml

Receiver做為采集端贫橙,Exporter作為數(shù)據(jù)的輸出端,Opentelemetry-collector充當一個中間媒介將兩者統(tǒng)一起來

receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: 'otel-collector'
          scrape_interval: 10s
          static_configs:
            - targets: [ 'vm-1:9100' ]
            - targets: [ 'vm-2:9100' ]
            - targets: [ 'vm-3:9100' ]

processors:
  batch:

exporters:
  opencensus:
    endpoint: "oap:11800" # The OAP Server address
    insecure: true
  # Exports data to the console  
  logging:
    logLevel: debug

service:
  pipelines:
    metrics:
      receivers: [prometheus]
      processors: [batch]
      exporters: [opencensus,logging]

image-20210817164057977.png

在Skywalking中是通過 proto文件方式來集成opentelemetry的反粥,當然你可以通過 集成opentelemetry sdk來接收node-exporter的數(shù)據(jù)卢肃。Skywalking重寫了 MetricsServiceGrpc.MetricsServiceImplBase的exporer方法,此方法就是專門接收collector發(fā)過來的數(shù)據(jù)才顿,只不過它轉(zhuǎn)了一次轉(zhuǎn)換莫湘,將OpenTelemetry 數(shù)據(jù)格式轉(zhuǎn)換為了Skywalking的meter格式。

@Slf4j
public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase implements Handler {

    private List<PrometheusMetricConverter> metrics;

    @Override public StreamObserver<ExportMetricsServiceRequest> export(
        StreamObserver<ExportMetricsServiceResponse> responseObserver) {
        return new StreamObserver<ExportMetricsServiceRequest>() {
            private Node node;
            private Map<String, String> nodeLabels = new HashMap<>();

            @Override
            public void onNext(ExportMetricsServiceRequest request) {
                ....
            }

            @Override public void onError(Throwable throwable) {

            }

            @Override public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

    .....

    @Override public void active(List<String> enabledRules,
        MeterSystem service, GRPCHandlerRegister grpcHandlerRegister) {
        List<Rule> rules;
        try {
            rules = Rules.loadRules("otel-oc-rules", enabledRules);
        } catch (ModuleStartException e) {
            log.warn("failed to load otel-oc-rules");
            return;
        }
        if (rules.isEmpty()) {
            return;
        }
        this.metrics = rules.stream().map(r ->
            new PrometheusMetricConverter(r, service))
            .collect(toList());
        grpcHandlerRegister.addHandler(this);
    }
}

Application

在Skywalking 源碼工程中創(chuàng)建一個模塊apm-webapp(已經(jīng)存在的有問題郑气,我就刪掉了)幅垮,然后把skywalking-ui工程下的dist目錄拷貝到 resources下面,并改名為public尾组,這樣這個工程即可以訪問 UI ,又能集成skywalking-agent.jar測試 忙芒。此工程已經(jīng)上傳到gitee上示弓,請點擊這里下載。

在工程啟動 VM 參數(shù)添加

-javaagent:skywalking-agent\skywalking-agent.jar -Dskywalking.agent.service_name=webapp -Dskywalking.agent.instance_name=webapp  -Dskywalking.collector.backend_service=127.0.0.1:11800

訪問 UI

http://localhost:8080/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末呵萨,一起剝皮案震驚了整個濱河市奏属,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌潮峦,老刑警劉巖囱皿,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異忱嘹,居然都是意外死亡铆帽,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人落蝙,你說我怎么就攤上這事版保。” “怎么了?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我组砚,道長,這世上最難降的妖魔是什么掏颊? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任糟红,我火速辦了婚禮,結(jié)果婚禮上乌叶,老公的妹妹穿的比我還像新娘盆偿。我一直安慰自己,他們只是感情好准浴,可當我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布事扭。 她就那樣靜靜地躺著,像睡著了一般乐横。 火紅的嫁衣襯著肌膚如雪求橄。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天葡公,我揣著相機與錄音罐农,去河邊找鬼。 笑死催什,一個胖子當著我的面吹牛涵亏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼溯乒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了豹爹?” 一聲冷哼從身側(cè)響起裆悄,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎臂聋,沒想到半個月后光稼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡孩等,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年艾君,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肄方。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡冰垄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出权她,到底是詐尸還是另有隱情虹茶,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布隅要,位于F島的核電站蝴罪,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏步清。R本人自食惡果不足惜要门,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望廓啊。 院中可真熱鬧欢搜,春花似錦、人聲如沸谴轮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽书聚。三九已至唧领,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間雌续,已是汗流浹背斩个。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留驯杜,地道東北人受啥。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親滚局。 傳聞我的和親對象是個殘疾皇子居暖,可洞房花燭夜當晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容