gRPC客戶端詳解

RPC框架的選擇

常見的RPC框架主要分為輕重兩種乡数。較輕的框架一般只負責通信椭蹄,如rmi、webservice净赴、restful绳矩、Thrift、gRPC等玖翅。較重的框架一般包括完整的服務發(fā)現(xiàn)翼馆、負載均衡策略等等如BAT三家的Dubbo、brpc金度、Tars之類应媚。

框架選擇時個人認為首先要考慮的是框架的歷史和項目的活躍程度。一個歷史悠久的活躍項目(大概至少可以保證每兩到三個月有一次小版本的更新)可以保證各種bug早已暴露并修復猜极,讓我們可以更專注于我們自己的項目本身中姜,而不是要擔心究竟是我們自己的代碼有問題還是框架本身就有問題。

重量級RPC框架有一個主要問題就是結構復雜跟伏,另外主語言之外的代碼質量也不太容易保證丢胚。個人認為活躍的社區(qū)以及一個活躍的開源管理團隊是這些重型RPC框架項目成功的必要前提條件翩瓜。比如我們項目組試用過騰訊的Tars趟卸,C++同學表示沒有任何問題啊鸭,然后JAVA同學表示java版本有許多bug,修復bug的pull request需要兩個多月才能得到merge,而官方jar包也將近兩年沒有更新過了骨宠。

輕量級rpc框架中,restful可以被視作標桿相满。由于restful基于http協(xié)議层亿,天然被各種框架支持,而且非常靈活立美。restful的缺點有兩方面匿又,一是過于靈活,缺少根據協(xié)議生成服務端和客戶端代碼的工具建蹄,聯(lián)調往往要花更多的時間碌更;二是大部分序列化基于json或者xml,相對來講效率不理想洞慎。和restful相比痛单,其它很多輕量級框架都有這樣或者那樣的缺點,有的缺少跨語言支持(rmi)劲腿,有的既繁瑣又缺乏效率優(yōu)勢(webservice)旭绒。個人認為其中相對理想的是gRPC和Thrift。

gRPC簡介

Protobuf是一種google推出的非常流行的跨語言序列化/反序列化框架焦人。在Protobuf2中就已經出現(xiàn)了用rpc定義服務的概念挥吵,但是一直缺少一種流行的rpc框架支持。當Http2推出之后花椭,google將Http2和protobuf3結合忽匈,推出了gRPC。gRPC繼承了Protobuf和Http2的優(yōu)點矿辽,包括:

  • 序列化反序列化性能好
  • 強類型支持
  • 向前/向后兼容
  • 有代碼生成機制脉幢,而且可以支持多語言
  • 長連接、多路復用

同時gRPC還提供了簡單地服務發(fā)現(xiàn)和負載均衡功能嗦锐。雖然這并不是gRPC框架的重點嫌松,但是開發(fā)者可以非常容易的自己擴展gRPC這些功能,實現(xiàn)自己的策略或應用最新的相關方面技術奕污,而不用像重型RPC框架一樣受制于框架本身是否支持萎羔。

gRPC與Thrift對比

Thrift是Facebook推出的一種RPC框架,從性能上來講遠優(yōu)于gRPC碳默。但是在實際調研時發(fā)現(xiàn)有一個很麻煩的問題:Thrift的客戶端是線程不安全的——這意味著在Spring中無法以單例形式注入到Bean中贾陷。解決方案有三種:

  1. 每次調用創(chuàng)建一個Thrift客戶端缘眶。這不僅意味著額外的對象創(chuàng)建和垃圾回收開銷,而且實際上相當于只使用了短鏈接髓废,這是一個開發(fā)復雜度最低但是從性能上來講最差的解決方案巷懈。
  2. 利用Pool,稍微復雜一點的解決方案慌洪,但是也非常成熟顶燕。但是問題在于一來缺少服務發(fā)現(xiàn)和負載均衡恐實現(xiàn),需要很多額外開發(fā)冈爹;二來需要創(chuàng)建Pool數量*服務端數量個客戶端涌攻,內存開銷會比較大。
  3. 使用異步框架如Netty频伤,可以成功避免創(chuàng)建過多的客戶端恳谎,但是仍要自己實現(xiàn)服務發(fā)現(xiàn)和負載均衡,相對復雜憋肖。實際上Facebook有一個基于Netty的Thrift客戶端因痛,叫Nifty,但是快四年沒更新了岸更。鸵膏。。

相比較而言gRPC就友好多了坐慰,本身有簡單而且可擴展的服務發(fā)現(xiàn)和負載均衡功能较性,底層基于Netty所以線程安全,在不需要極限壓榨性能的情況下是非常好的選擇结胀。當然如果需要極限壓榨性能Thrift也未必夠看赞咙。

gRPC入門

gRPC服務定義

gRPC中有一個特殊的關鍵字stream,表示可以以流式輸入或輸出多個protobuf對象糟港。注意只有異步非阻塞的客戶端支持以stream形式輸入攀操,同步阻塞客戶端不支持以stream形式輸入。

syntax = "proto3";  //gRPC必須使用proto3

option java_multiple_files = true;
option java_package = "cn.lmh.examples.grpc.proto";

service RouteGuide {
    // 輸入一個坐標秸抚,返回坐標和時間(1:1)
    rpc getPoint(Point) returns (LocationNote) {}
    // 輸入一個矩形速和,以stream形式返回一系列點(1:n)
    rpc listPoints(Rectangle) returns (stream Point) {}
    // 以stream形式輸入一系列點,返回點的數量和總共花費的時間(m:1)
    rpc recordRoute(stream Point) returns (RouteSummary) {}
    // 以stream形式輸入一系列點剥汤,以stream形式返回已輸入點的數量和總共花費的時間(m:n)
    rpc getPointStream(stream Point) returns (stream RouteSummary) {}
}

message Point {
    int32 latitude = 1;
    int32 longitude = 2;
}
message Rectangle {
    Point lo = 1;
    Point hi = 2;
}
message LocationNote {
    Point location = 1;
    int64 timestamp = 2;
}
message RouteSummary {
    int32 point_count = 1;
    int64 elapsed_time = 2;
}

依賴和代碼生成

由于protoc的gRPC插件需要自己編譯颠放,而且存在環(huán)境問題。推薦使用gradle或者maven的protobuf插件吭敢。入門示例項目使用了gradle碰凶,根目錄build.gradle配置如下:

plugins {
    id 'java'
    id 'idea'
    id 'wrapper'
}

ext {
    groupId = 'cn.lmh.leviathan'
    proto = [
        version : "3.9.0",
        "grpc" :[
            version : "1.23.0"
        ]
    ]
}

allprojects{
    apply plugin: 'java'
    apply plugin: 'idea'

    sourceCompatibility=JavaVersion.VERSION_1_8
    targetCompatibility=JavaVersion.VERSION_1_8

    project.group = 'cn.lmh.examples'

    compileJava.options.encoding = 'UTF-8'
}

subprojects{
    repositories {
        mavenCentral()
        mavenLocal();
    };
    configurations {
        compile
    }

    dependencies {
        compile "io.grpc:grpc-netty-shaded:${proto.grpc.version}"
        compile "io.grpc:grpc-protobuf:${proto.grpc.version}"
        compile "io.grpc:grpc-stub:${proto.grpc.version}"

        testCompile group: 'junit', name: 'junit', version: '4.12'
    }
}

子項目build.gradle如下:

plugins{
    id 'com.google.protobuf' version '0.8.10'   //引入protobuf插件
}

sourceSets{
    main{
        proto {
            srcDir 'src/main/proto' //指定.proto文件所在的位置
        }
    }
}

protobuf {
    generatedFilesBaseDir = "$projectDir/src"   //生成文件的根目錄

    protoc {
        artifact = "com.google.protobuf:protoc:${proto.version}"    //protoc的版本
    }

    plugins {
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:${proto.grpc.version}" //gRPC的版本
        }
    }

    generateProtoTasks {
        all()*.plugins {
            grpc {
                outputSubDir = "java"   //grpc生成文件的子目錄
            }
        }
    }
}

我們的入門子項目名稱叫做starter,配置好build.gradle之后,執(zhí)行gradlew :starter:generateProto就可以在src/main/java下生成對應的文件:

gRPC生成的目錄結構

服務端

無論客戶端以異步非阻塞還是同步阻塞形式調用欲低,gRPC服務端的Response都是異步形式辕宏。對于異步的Request或者Response,都需要實現(xiàn)gRPC的io.grpc.stub.StreamObserver接口砾莱。io.grpc.stub.StreamObserver接口有三個方法:

  • onNext:表示接收/發(fā)送一個對象
  • onError:處理異常
  • onCompleted:表示Request或Response結束

當Request發(fā)送到服務端端時瑞筐,會異步調用requestObserver的onNext方法,直到結束時調用requestObserver的onCompleted方法腊瑟;服務端調用responseObserver的onNext把Response返回給客戶端聚假,直到調用responseObserver的onCompleted方法通知客戶端Response結束。服務端代碼如下:

public class RouteGuideServer {
    private final int port;
    private final Server server;

    public RouteGuideServer(int port) throws IOException {
        this.port = port;
        server = ServerBuilder.forPort(port).addService(new RouteGuideService())
                .build();
    }

    /**
     * Start server.
     */
    public void start() throws IOException {
        server.start();
        System.out.println("Server started, listening on " + port);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                RouteGuideServer.this.stop();
            }
        });
    }

    /**
     * Stop server
     */
    public void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    /**
     * Await termination on the main thread since the grpc library uses daemon threads.
     */
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    public static void main(String[] args) throws Exception {
        RouteGuideServer server = new RouteGuideServer(8980);
        server.start();
        server.blockUntilShutdown();
    }

    private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
        @Override
        public void getPoint(Point request, StreamObserver<LocationNote> responseObserver) {
            LocationNote value = LocationNote
                .newBuilder()
                .setLocation(request)
                .setTimestamp(System.nanoTime())
                .build();
            responseObserver.onNext(value);
            responseObserver.onCompleted();
        }

        @Override
        public void listPoints(Rectangle request, StreamObserver<Point> responseObserver) {
            int left = Math.min(request.getLo().getLongitude(), request.getHi().getLongitude());
            int right = Math.max(request.getLo().getLongitude(), request.getHi().getLongitude());
            int top = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
            int bottom = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
            for (int x = left; x <= right; x++) {
                for (int y = top; y >= bottom; y--) {
                    Point point = Point.newBuilder().setLongitude(x).setLatitude(y).build();
                    responseObserver.onNext(point);
                }
            }
            responseObserver.onCompleted();
        }

        @Override
        public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {
            return new StreamObserver<Point>() { //返回的是requestObserver
                AtomicInteger pointCount = new AtomicInteger(0);
                final long startTime = System.nanoTime();

                @Override
                public void onNext(Point value) {
                    int count = pointCount.incrementAndGet();
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onCompleted() {
                    RouteSummary result = RouteSummary.newBuilder()
                      .setElapsedTime(System.nanoTime() - startTime).setPointCount(pointCount.get()).build();
                    responseObserver.onNext(result);
                    responseObserver.onCompleted();
                }
            };
        }

        @Override
        public StreamObserver<Point> getPointStream(StreamObserver<RouteSummary> responseObserver) {
            return new StreamObserver<Point>() { //返回的是requestObserver
                AtomicInteger pointCount = new AtomicInteger(0);
                final long startTime = System.nanoTime();

                @Override
                public void onNext(Point value) {
                    int count = pointCount.incrementAndGet();
                    RouteSummary result = RouteSummary.newBuilder()
                      .setElapsedTime(System.nanoTime() - startTime).setPointCount(count).build();
                    responseObserver.onNext(result);
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

客戶端

gRPC的客戶端有同步阻塞客戶端(blockingStub)和異步非阻塞客戶端(Stub)兩種扫步。同步客戶端使用比較方便魔策,但是性能較低匈子,而且不支持stream形式的Request;異步客戶端性能較高河胎,支持stream形式的Request,但是如果想要以同步方式調用需要額外封裝虎敦。本文將主要以異步為例游岳。

異步轉同步

由于gRPC的異步客戶端性能較高且功能更完整,所以一般都會采用異步客戶端其徙。異步客戶端接收到的Response也是以io.grpc.stub.StreamObserver形式胚迫。由于客戶端的調用可能是在異步進程中但更可能是在同步進程中,所以就存在一個如何把gRPC異步Response轉為同步Response的問題唾那。

一個比較常見的思路是寫一個io.grpc.stub.StreamObserver實現(xiàn)访锻,里面有一個內置變量保存異步Response的結果,再添加一個阻塞式的get()方法闹获,直到Response結束才把所有結果返回期犬。要知道Response是否結束,需要添加一個Boolean或者AtomicBoolean變量避诽,初始化為false龟虎,調用responseObserver.onCompleted()方法時設置為true,這樣就可以通過這個變量判斷Response是否結束沙庐。

阻塞get()方法最常見的思路是get()寫一個while循環(huán)鲤妥,直到變量值改為true才退出循環(huán)并返回結果。這種方式的優(yōu)點是簡單直接拱雏,任何語言都可以簡單實現(xiàn)棉安,缺點是由于使用循環(huán)可能CPU占用較高。而對于java這種多線程比較完善的語言铸抑,另一個比較好思路是Response結束前將線程掛起贡耽,當調用responseObserver.onCompleted()方法再喚醒線程。代碼如下:

public class CallableStreamObserver<T> implements StreamObserver<T> {
    List<T> values = new ArrayList<T>();
    boolean isCompleted = false;
    Throwable t = null;

    @Override
    public void onNext(T value) {
        this.values.add(value);
    }

    @Override
    public void onError(Throwable t) {
        this.isCompleted = true;
        notifyAll();
    }

    @Override
    public synchronized void onCompleted() {
        this.isCompleted = true;
        notifyAll();
    }

    public List<T> get() throws Throwable {
        if (!this.isCompleted) {
            synchronized (this) {
                this.wait(60 * 1000);
            }
        }
        if (null != t) {
            throw this.t;
        } else {
            return this.values;
        }
    }
}

客戶端代碼

public class RouteGuideClient {

    private final ManagedChannel channel;
    private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
    private final RouteGuideGrpc.RouteGuideStub asyncStub;

    public RouteGuideClient(String host, int port) {
        String target = "dns:///" + host + ":" + port;
        ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
                .forTarget(target)
                .usePlaintext();
        channel = channelBuilder.build();
        blockingStub = RouteGuideGrpc.newBlockingStub(channel);
        asyncStub = RouteGuideGrpc.newStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public LocationNote getPoint(int lo, int lt, boolean blocking) throws Throwable {
        Point point = Point.newBuilder().setLongitude(lo).setLatitude(lt).build();
        if(blocking) {
            return blockingStub.getPoint(point);
        }else{
            CallableStreamObserver<LocationNote> responseObserver = new CallableStreamObserver<LocationNote>();
            asyncStub.getPoint(point, responseObserver);
            return responseObserver.get().get(0);
        }
    }

    public Iterator<Point> listPoints(int left, int top, int right, int bottom, boolean blocking) throws Throwable {
        Point hi = Point.newBuilder().setLongitude(left).setLatitude(top).build();
        Point lo = Point.newBuilder().setLongitude(right).setLatitude(bottom).build();
        Rectangle rec = Rectangle.newBuilder().setHi(hi).setLo(lo).build();
        if(blocking){
            return blockingStub.listPoints(rec);
        }else{
            CallableStreamObserver<Point> responseObserver = new CallableStreamObserver<Point>();
            asyncStub.listPoints(rec, responseObserver);
            return responseObserver.get().iterator();
        }
    }

    public RouteSummary recordRoute(Collection<Point> points) throws Throwable {
        CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
        StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
        points.stream().parallel().forEach(p -> requestObserver.onNext(p));
        requestObserver.onCompleted();
        return responseObserver.get().get(0);

    }

    public List<RouteSummary> getPointStream(Collection<Point> points) throws Throwable {
        CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
        StreamObserver<Point> requestObserver = asyncStub.getPointStream(responseObserver);
        points.stream().parallel().forEach(p -> requestObserver.onNext(p));
        requestObserver.onCompleted();
        return responseObserver.get();
    }
}

gRPC客戶端代碼詳解

gRPC官方將自己分為三層組件:Stub、Channel和Transport菇爪。

  • Stub層是最上層的代碼算芯,gRPC附帶的插件可以從.proto文件直接生成Stub層代碼,開發(fā)人員通過直接調用Stub層的代碼調用RPC服務
  • Channel層是對Transport層功能的抽象凳宙,同時提供了很多有用的功能熙揍,比如服務發(fā)現(xiàn)和負載均衡。氏涩。
  • Transport層承擔了將字節(jié)從網絡中取出和放入數據的工作届囚,有三種實現(xiàn)Netty、okHttp是尖、inProgress意系。Transport層是最底層的代碼。

整個grpc-java項目的代碼比較多饺汹。從風格上來講蛔添,封裝比較多,相對于interface更喜歡使用abstract class兜辞,相對于反射更喜歡使用硬編碼迎瞧,而且大量使用了單線程異步調用造成調用棧斷裂,與常見的java項目的編碼風格有很大差別逸吵,閱讀起來可能容易不習慣凶硅。

在源碼層面本文將關注下面這些方面:

  • Channel的初始化過程;
  • gRPC中的服務發(fā)現(xiàn);
  • gRPC中的負載均衡
  • Client與Server之間的數據傳輸

Channel的初始化過程

通過入門示例可以看到,Channel的初始化過程分三步:

  1. 調用forTarget方法創(chuàng)建io.grpc.ManagedChannelBuilder;
  2. 配置各種選項扫皱,不論如何配置足绅,返回的總是io.grpc.ManagedChannelBuilder對象;
  3. 調用build方法創(chuàng)建io.grpc.ManagedChannel

forTarget方法

gRPC這里設計比較繁瑣韩脑,過程比較繞氢妈。forTarget方法的實際功能就是把參數target賦值給io.grpc.ManagedChannelBuilder的內部變量target

public static ManagedChannelBuilder<?> forTarget(String target) {
    return ManagedChannelProvider.provider().builderForTarget(target);
}

io.grpc.ManagedChannelProvider.provider()會返回一個io.grpc.ManagedChannelProvider實現(xiàn)扰才。有哪些io.grpc.ManagedChannelProvider實現(xiàn)是在io.grpc.ManagedChannelProvider中以硬編碼形式確定的允懂,這里其實存在利用反射改進的空間。

private static final class HardcodedClasses implements Iterable<Class<?>> {
    @Override
    public Iterator<Class<?>> iterator() {
        List<Class<?>> list = new ArrayList<>();
        try {
            list.add(Class.forName("io.grpc.okhttp.OkHttpChannelProvider"));
        } catch (ClassNotFoundException ex) {
            // ignore
        }
        try {
            list.add(Class.forName("io.grpc.netty.NettyChannelProvider"));
        } catch (ClassNotFoundException ex) {
        // ignore
        }
        return list.iterator();
    }
}

實際上就根據依賴的jar包不同就只有兩個實現(xiàn)衩匣,一個netty的蕾总,一個okhttp的。如果像入門示例項目一樣只配置了netty實現(xiàn),那就只有netty的琅捏。io.grpc.netty.NettyChannelProvider的buildForTarget方法調用的是io.grpc.netty.NettyChannelBuilderforTarget方法生百。

public NettyChannelBuilder builderForTarget(String target) {
    return NettyChannelBuilder.forTarget(target);
}

io.grpc.netty.NettyChannelBuilder繼承自io.grpc.internal.AbstractManagedChannelImplBuilderforTarget方法實際上調用了父類的構造函數柄延。

NettyChannelBuilder(String target) {
    super(target);
}

public static NettyChannelBuilder forTarget(String target) {
    return new NettyChannelBuilder(target);
}

io.grpc.internal.AbstractManagedChannelImplBuilder的構造函數最終會是把參數賦值給target變量蚀浆。

protected AbstractManagedChannelImplBuilder(String target) {
    this.target = Preconditions.checkNotNull(target, "target");
    this.directServerAddress = null;
}

build方法

從前文可以看到缀程,實際初始化的io.grpc.ManagedChannelBuilder實際上是io.grpc.netty.NettyChannelBuilder,其build方法實現(xiàn)在其父類io.grpc.internal.AbstractManagedChannelImplBuilder中市俊。

public ManagedChannel build() {
    return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
        this,
        buildTransportFactory(),
        // TODO(carl-mastrangelo): Allow clients to pass this in
        new ExponentialBackoffPolicy.Provider(),
        SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
        GrpcUtil.STOPWATCH_SUPPLIER,
        getEffectiveInterceptors(),
        TimeProvider.SYSTEM_TIME_PROVIDER));
}

這里的io.grpc.internal.ManagedChannelOrphanWrapperio.grpc.internal.ManagedChannelImpl其實都是io.grpc.ManagedChannel的實現(xiàn)杨凑。io.grpc.internal.ManagedChannelOrphanWrapper從功能上分析沒有任何作用,io.grpc.internal.ManagedChannelOrphanWrapper會為io.grpc.ManagedChannel創(chuàng)建弱引用摆昧,并被放置到ReferenceQueue中撩满。如果Channel是單例的,那么意義不大绅你;如果客戶端被重復創(chuàng)建卻沒有被關閉伺帘,那么ReferenceQueue中會留下相應的引用記錄,可能有助于排查問題忌锯。

io.grpc.internal.ManagedChannelImpl構造方法的幾個參數中伪嫁,除了第一個參數是builder本身,第二個參數是用來創(chuàng)建Transport的Factory偶垮,第三個參數是后臺連接重試策略张咳,第四個參數是gRPC的全局線程池,第五個和第七個都是和時間相關的對象针史,主要用于日志中晶伦,第六個是客戶端調用時的interceptor碟狞。在io.grpc.netty.NettyChannelBuilder中啄枕,buildTransportFactory方法會創(chuàng)建一個io.grpc.netty.NettyChannelBuilder.NettyTransportFactory

服務發(fā)現(xiàn)

前文的入門示例中直接寫了target族沃,只能連接單個Server频祝。如果有多個可以提供服務的Server,那么就需要有一種方式通過單個target發(fā)現(xiàn)這些Server脆淹。在io.grpc.ManagedChannelBuilder中有一個nameResolverFactory方法常空,可以用來指定如何解析target地址,發(fā)現(xiàn)多個服務端盖溺。

nameResolverFactory方法

這個方法的實現(xiàn)也在io.grpc.internal.AbstractManagedChannelImplBuilder中漓糙,如果用戶有自己的io.grpc.NameResolver.Factory實現(xiàn)的話可以通過nameResolverFactory方法指定,gRPC就會使用用戶自己的io.grpc.NameResolver.Factroy實現(xiàn)代替gRPC自己的默認實現(xiàn),否則會使用io.grpc.NameResolverRegistry中的默認實現(xiàn)烘嘱。

io.grpc.NameResolverRegistry會通過硬編碼加載io.grpc.NameResolverProvider實現(xiàn)昆禽,并創(chuàng)建一個與之有關的io.grpc.NameResolver.Factory的實現(xiàn)。目前硬編碼加載的io.grpc.NameResolverProvider實現(xiàn)只有io.grpc.internal.DnsNameResolverProvider一種蝇庭。

private final NameResolver.Factory factory = new NameResolverFactory();
@GuardedBy("this")
private final LinkedHashSet<NameResolverProvider> allProviders = new LinkedHashSet<>();

private synchronized void addProvider(NameResolverProvider provider) {
    checkArgument(provider.isAvailable(), "isAvailable() returned false");
    allProviders.add(provider);
}

public static synchronized NameResolverRegistry getDefaultRegistry() {
    if (instance == null) {
        List<NameResolverProvider> providerList = ServiceProviders.loadAll(
            NameResolverProvider.class,
            getHardCodedClasses(),
            NameResolverProvider.class.getClassLoader(),
            new NameResolverPriorityAccessor());
        if (providerList.isEmpty()) {
            logger.warning("No NameResolverProviders found via ServiceLoader, including for DNS. This "
            + "is probably due to a broken build. If using ProGuard, check your configuration");
        }
        instance = new NameResolverRegistry();
        for (NameResolverProvider provider : providerList) {
            logger.fine("Service loader found " + provider);
            if (provider.isAvailable()) {
                instance.addProvider(provider);
            }
        }
        instance.refreshProviders();
    }
    return instance;
}  

public NameResolver.Factory asFactory() {
    return factory;
}

@VisibleForTesting
static List<Class<?>> getHardCodedClasses() {
    ArrayList<Class<?>> list = new ArrayList<>();
    try {
        list.add(Class.forName("io.grpc.internal.DnsNameResolverProvider"));
    } catch (ClassNotFoundException e) {
        logger.log(Level.FINE, "Unable to find DNS NameResolver", e);
    }
    return Collections.unmodifiableList(list);
}

private final class NameResolverFactory extends NameResolver.Factory {
    @Override
    @Nullable
    public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
        List<NameResolverProvider> providers = providers();
        for (NameResolverProvider provider : providers) {
            NameResolver resolver = provider.newNameResolver(targetUri, args);
            if (resolver != null) {
                return resolver;
            }
        }
        return null;
    }

    @Override
    public String getDefaultScheme() {
        List<NameResolverProvider> providers = providers();
        if (providers.isEmpty()) {
            return "unknown";
        }
        return providers.get(0).getDefaultScheme();
    }
}

getDefaultSchema會匹配target中的schema(如dns)醉鳖,如果匹配的上,就使用相應的NameResolver.Factory哮内,返回NameResolver決定真正的服務訪問地址盗棵。

io.grpc.NameResolver

我們來看io.grpc.NameResolver

public abstract class NameResolver {

    public abstract String getServiceAuthority();
    
    public void start(final Listener listener) {
    if (listener instanceof Listener2) {
            start((Listener2) listener);
        } else {
            start(new Listener2() {
                @Override
                public void onError(Status error) {
                listener.onError(error);
                }

                @Override
                public void onResult(ResolutionResult resolutionResult) {
                    listener.onAddresses(resolutionResult.getAddresses(), resolutionResult.getAttributes());
                }
            });
        }
    }

    public void start(Listener2 listener) {
        start((Listener) listener);
    }
    
    public abstract void shutdown();
    
    public void refresh() {}
    
    @ThreadSafe
    public interface Listener {

        void onAddresses(List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes);

        void onError(Status error);
    }

    public abstract static class Listener2 implements Listener {

        @Override
        public final void onAddresses(
            List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes) {
            onResult(
            ResolutionResult.newBuilder().setAddresses(servers).setAttributes(attributes).build());
        }

        public abstract void onResult(ResolutionResult resolutionResult);

        @Override
        public abstract void onError(Status error);
    }

    public static final class ResolutionResult {
        private final List<EquivalentAddressGroup> addresses;
        @ResolutionResultAttr
        private final Attributes attributes;
        @Nullable
        private final ConfigOrError serviceConfig;

        ResolutionResult(
            List<EquivalentAddressGroup> addresses,
            @ResolutionResultAttr Attributes attributes,
            ConfigOrError serviceConfig) {
            this.addresses = Collections.unmodifiableList(new ArrayList<>(addresses));
            this.attributes = checkNotNull(attributes, "attributes");
            this.serviceConfig = serviceConfig;
        }

        public static Builder newBuilder() {
          return new Builder();
        }

        public Builder toBuilder() {
          return newBuilder()
              .setAddresses(addresses)
              .setAttributes(attributes)
              .setServiceConfig(serviceConfig);
        }

        public List<EquivalentAddressGroup> getAddresses() {
          return addresses;
        }

        @ResolutionResultAttr
        public Attributes getAttributes() {
          return attributes;
        }

        @Nullable
        public ConfigOrError getServiceConfig() {
          return serviceConfig;
        }

        @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
        public static final class Builder {
            private List<EquivalentAddressGroup> addresses = Collections.emptyList();
            private Attributes attributes = Attributes.EMPTY;
            @Nullable
            private ConfigOrError serviceConfig;
            Builder() {}

            public Builder setAddresses(List<EquivalentAddressGroup> addresses) {
                this.addresses = addresses;
                return this;
            }

            public Builder setAttributes(Attributes attributes) {
                this.attributes = attributes;
                return this;
            }

            public Builder setServiceConfig(@Nullable ConfigOrError serviceConfig) {
                this.serviceConfig = serviceConfig;
                return this;
            }

            public ResolutionResult build() {
                return new ResolutionResult(addresses, attributes, serviceConfig);
            }
        }
    }
}

在客戶端首次連接服務端的時候會調用Listener2start方法,需要更新的時候會調用refresh方法。當Listener2接收到服務端地址時纹因,會調用onResult方法喷屋。

io.grpc.internal.DnsNameResolver

由于gRPC支持長連接,所以如果直連的話只會訪問一個域名下的一臺服務器瞭恰,即首次連接時通過DNS返回IP地址逼蒙。io.grpc.internal.DnsNameResolverProvider是對io.grpc.internal.DnsNameResolver的簡單封裝,只支持以dns:///開頭的地址寄疏。io.grpc.internal.DnsNameResolver會根據target獲取該host下所有關聯(lián)的IP是牢,即通過DNS解析出所有的服務端IP地址。

public final class DnsNameResolverProvider extends NameResolverProvider {

  private static final String SCHEME = "dns";

  @Override
  public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
    if (SCHEME.equals(targetUri.getScheme())) {
      String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
      Preconditions.checkArgument(targetPath.startsWith("/"),
          "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
      String name = targetPath.substring(1);
      return new DnsNameResolver(
          targetUri.getAuthority(),
          name,
          args,
          GrpcUtil.SHARED_CHANNEL_EXECUTOR,
          Stopwatch.createUnstarted(),
          InternalServiceProviders.isAndroid(getClass().getClassLoader()));
    } else {
      return null;
    }
  }

  @Override
  public String getDefaultScheme() {
    return SCHEME;
  }

  @Override
  protected boolean isAvailable() {
    return true;
  }

  @Override
  protected int priority() {
    return 5;
  }
}

可以看到io.grpc.internal.DnsNameResolver中的startrefresh方法都調用的是resolve方法陕截,而resolve方法是執(zhí)行了一個繼承自RunnableResolve接口驳棱。

DnsNameResolver

在有代理的情況下,ResolveresolveInternal會根據代理返回的ProxiedSocketAddress創(chuàng)建EquivalentAddressGroup作為服務端列表返回农曲,并設置空config社搅;否則會調用resolveAll方法獲取服務端列表,并調用parseServiceConfig方法設置config乳规。resolveAll方法返回的ResolutionResults有三個變量addresses形葬、txtRecordsbalancerAddresses

@VisibleForTesting
static ResolutionResults resolveAll(
    AddressResolver addressResolver,
    @Nullable ResourceResolver resourceResolver,
    boolean requestSrvRecords,
    boolean requestTxtRecords,
    String name) {
    List<? extends InetAddress> addresses = Collections.emptyList();
    Exception addressesException = null;
    List<EquivalentAddressGroup> balancerAddresses = Collections.emptyList();
    Exception balancerAddressesException = null;
    List<String> txtRecords = Collections.emptyList();
    Exception txtRecordsException = null;

    try {
        addresses = addressResolver.resolveAddress(name);
    } catch (Exception e) {
    addressesException = e;
    }
    if (resourceResolver != null) {
        if (requestSrvRecords) {
            try {
                balancerAddresses =
                    resourceResolver.resolveSrv(addressResolver, GRPCLB_NAME_PREFIX + name);
            } catch (Exception e) {
                balancerAddressesException = e;
            }
        }
        if (requestTxtRecords) {
            boolean balancerLookupFailedOrNotAttempted =
                !requestSrvRecords || balancerAddressesException != null;
            boolean dontResolveTxt =
                (addressesException != null) && balancerLookupFailedOrNotAttempted;
            if (!dontResolveTxt) {
                try {
                    txtRecords = resourceResolver.resolveTxt(SERVICE_CONFIG_NAME_PREFIX + name);
                } catch (Exception e) {
                txtRecordsException = e;
                }
            }   
        }
    }
    try {
        if (addressesException != null
            && (balancerAddressesException != null || balancerAddresses.isEmpty())) {
            Throwables.throwIfUnchecked(addressesException);
            throw new RuntimeException(addressesException);
        }
    } finally {
        if (addressesException != null) {
            logger.log(Level.FINE, "Address resolution failure", addressesException);
        }
        if (balancerAddressesException != null) {
            logger.log(Level.FINE, "Balancer resolution failure", balancerAddressesException);
        }
        if (txtRecordsException != null) {
            logger.log(Level.FINE, "ServiceConfig resolution failure", txtRecordsException);
        }
    }
    return new ResolutionResults(addresses, txtRecords, balancerAddresses);
}

addressResolverresolveAddress方法實際是調用JDK的java.net.InetAddressgetAllByName方法暮的,即根據host通過DNS返回一系列服務端列表笙以。resourceResolver根據LDAP協(xié)議獲取指定命名空間下的服務端列表地址。txtRecordsbalancerAddresses是和LDAP相關的參數冻辩,方法入參requestSrvRecordsrequestTxtRecords的默認值都是false猖腕。由于LDAP不是特別常用,這里就不深入展開了恨闪。

NameResolverListeneronResult

NameResolverListener獲取解析結果后會調用onResult方法倘感,進而會調用io.grpc.LoadBalancerhandleResolvedAddresses方法。

獲取解析結果后調用handleResolvedAddresses方法

負載均衡

io.grpc.ManagedChannel初始化的時候可以通過defaultLoadBalancingPolicy方法指定負載均衡策略咙咽,實際是根據defaultLoadBalancingPolicy創(chuàng)建了一個io.grpc.internal.AutoConfiguredLoadBalancerFactory對象老玛。io.grpc.internal.AutoConfiguredLoadBalancerFactory則通過io.grpc.LoadBalancerRegistry獲取對應名稱的負載均衡策略。io.grpc.LoadBalancerProvidergetPolicyName方法指定負載均衡策略名稱钧敞,newLoadBalancer返回負載均衡io.grpc.LoadBalancer的具體實現(xiàn)蜡豹。如果想要添加自定義負載均衡策略,需要調用io.grpc.LoadBalancerRegistryregistry方法犁享,并自己實現(xiàn)io.grpc.LoadBalancerProviderio.grpc.LoadBalancer余素,并指定負載均衡策略名稱即可。

defaultLoadBalancingPolicy方法

io.grpc.LoadBalancer.SubchannelPicker

io.grpc.LoadBalancer的核心邏輯實際在SubchannelPicker中炊昆。pickSubchannel方法會返回的PickResult中包含真正可用的subchannel桨吊,用來進行后續(xù)的數據傳輸威根。

public abstract static class SubchannelPicker {
    /**
    * Make a balancing decision for a new RPC.
    *
    * @param args the pick arguments
    * @since 1.3.0
    */
    public abstract PickResult pickSubchannel(PickSubchannelArgs args);
}

gRPC默認提供了兩種負載均衡實現(xiàn)策略:prick_firstround_robin。前者總會使用第一個可用的服務端视乐,后者則是簡單輪詢洛搀。

handleResolvedAddresses

當服務端列表更新時,會調用io.grpc.LoadBalancerhandleResolvedAddresses方法更新可用的subchannel佑淀。

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
    if (subchannel == null) {
        final Subchannel subchannel = helper.createSubchannel(
        CreateSubchannelArgs.newBuilder()
            .setAddresses(servers)
            .build());
        subchannel.start(new SubchannelStateListener() {
            @Override
            public void onSubchannelState(ConnectivityStateInfo stateInfo) {
                processSubchannelState(subchannel, stateInfo);
            }
        });
        this.subchannel = subchannel;
        helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
        subchannel.requestConnection();
    } else {
        subchannel.updateAddresses(servers);
    }
}

如果是首次調用(subchannel == null) 會創(chuàng)建subchannel留美,其實現(xiàn)是io.grpc.internal.ManagedChannelImpl.SubchannelImpl,創(chuàng)建的過程中會創(chuàng)建io.grpc.internal.InternalSubchannel伸刃。然后調用io.grpc.internal.ManagedChannelImplupdateBalancingState方法谎砾,把subchannelPicker更新為實現(xiàn)Picker,然后開啟subchannel的連接捧颅。

開啟subchannel連接

在開啟subchannel的連接過程中景图,會調用io.grpc.internal.InternalSubchannelobtainActiveTransport方法。

這里的transportFactory就是上面提到io.grpc.ManagedChannelBuilder調用build初始化時調用buildTransportFactory方法返回的碉哑,依賴于Transport層的具體實現(xiàn)挚币。在netty實現(xiàn)中,返回的是io.grpc.netty.NettyClientTransport扣典。

傳輸

gRPC客戶端發(fā)起Request時妆毕,stub會調用ClientCallsstartCall方法,最終會調用io.grpc.internal.ManagedChannelImpl.ChannelTransportProviderget方法獲取io.grc.internal.ClientTransport贮尖。

gRPC客戶端發(fā)起Request時調用ChannelTransportProvider的get方法
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
    if (shutdown.get()) {
        return delayedTransport;
    }
    if (pickerCopy == null) {
        final class ExitIdleModeForTransport implements Runnable {
            @Override
            public void run() {
                exitIdleMode();
            }
        }
        syncContext.execute(new ExitIdleModeForTransport());
        return delayedTransport;
    }
    PickResult pickResult = pickerCopy.pickSubchannel(args);
    ClientTransport transport = GrpcUtil.getTransportFromPickResult(
        pickResult, args.getCallOptions().isWaitForReady());
    if (transport != null) {
        return transport;
    }
    return delayedTransport;
}

如果subchannelPicker存在笛粘,會使用subchannelPicker進行選擇;如果是首次訪問服務端時subchannel肯定不存在远舅,會使用syncContext異步執(zhí)行exitIdleMode方法初始化闰蛔。syncContext是一個單線程執(zhí)行隊列,可以保證先提交的任務先執(zhí)行图柏。delayedTransport的執(zhí)行也依賴于syncContext,這就保證了delayedTransport中的方法執(zhí)行一定會在exitIdleMode方法之后任连。

首次訪問服務端時執(zhí)行exidIdleMode方法

exitIdleMode方法會初始化NameResolverLoadBalancer蚤吹,并會啟動NameResolverListener。當解析完成后會調用NameResolverListeneronResult方法随抠,進而調用LoadBalancerhandleResolvedAddresses方法創(chuàng)建subchannelPicker裁着、創(chuàng)建并連接subchannel。

@VisibleForTesting
void exitIdleMode() {
    syncContext.throwIfNotInThisSynchronizationContext();
    if (shutdown.get() || panicMode) {
        return;
    }
    if (inUseStateAggregator.isInUse()) {
        cancelIdleTimer(false);
    } else {
        rescheduleIdleTimer();
    }
    if (lbHelper != null) {
        return;
    }
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
    LbHelperImpl lbHelper = new LbHelperImpl();
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
    this.lbHelper = lbHelper;

    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
    nameResolver.start(listener);
    nameResolverStarted = true;
}

Request

發(fā)送Request時會調用ConnectionClientTransportnewStream方法返回一個io.grpc.internal.ClientStream對象,而首次調用會通過delayedTransport延遲調用newStream方法拱她。

調用newStream的調用棧

netty實現(xiàn)會返回一個io.grpc.netty.shaded.io.grpc.netty.NettyClientStream對象二驰。io.grpc.internal.ClientStream下有兩個子類,TransportState負責處理傳輸狀態(tài),Sink負責寫入數據秉沼。

在進行一系列http2相關設置后桶雀,會調用io.grpc.internal.ClientStreamstart方法矿酵,為TransportState設置監(jiān)聽并通過Sink寫入Header。

@Override
public final void start(ClientStreamListener listener) {
    transportState().setListener(listener);
    if (!useGet) {
        abstractClientStreamSink().writeHeaders(headers, null);
        headers = null;
    }
}

初始化結束后矗积,調用requestObserver的onNext方法會調用io.grpc.internal.ClientCallImplsendMessage方法全肮,將protobuf對象轉換成InputStream,并作為參數調用io.grpc.internal.ClientStreamwriteMessage方法棘捣,進而調用io.grpc.internal.MessageFramerwritePayload方法辜腺,最終調用writeToOutputStream方法將內容寫入Http的OutputStream。如果是參數是stream形式會繼續(xù)調用flush乍恐。

onNext

調用requestObserver的onCompleted方法會調用io.grpc.internal.ClientCallImplhalfClose方法评疗,進而會調用io.grpc.internal.MessageFramerendOfMessages,flush并結束發(fā)送消息茵烈。

onComplete

Response

onNext

客戶端接受到Response會調用ClientStreamListener的messagesAvailable方法壤巷,并通過同步線程池最終調用StreamObserver的onNext方法接收數據。

response-on-complete.png

當返回結束時會調用TransportState的transportReportStatus方法關閉請求瞧毙,進而調用ClientStreamListener的closed方法關閉監(jiān)聽胧华,進而調用StreamObserver的onClose方法。

gRPC通信格式

gRPC發(fā)送的請求發(fā)送方法是POST宙彪,路徑是/{serviceName}/{methodName}矩动,content-type為content-type = application/grpc+proto。

Request

HEADERS (flags = END_HEADERS)
:method = POST
:scheme = http
:path = /RouteGuide/getPoint
grpc-timeout = 1S
content-type = application/grpc+proto
grpc-encoding = gzip

DATA (flags = END_STREAM)
<Length-Prefixed Message>

Response

HEADERS (flags = END_HEADERS)
:status = 200
grpc-encoding = gzip
content-type = application/grpc+proto

DATA
<Length-Prefixed Message>

HEADERS (flags = END_STREAM, END_HEADERS)
grpc-status = 0 # OK
trace-proto-bin = jher831yy13JHy3hc

擴展gRPC

自定義基于zookeeper的NameResolver.Factory實現(xiàn)

public class CuratorNameResolver extends NameResolver {
    CuratorFramework curatorFramework;
    String basePath;
    String serviceAuthority;
    private Listener2 listener;

    public CuratorNameResolver(CuratorFramework curatorFramework, String basePath, String serviceAuthority) {
        this.curatorFramework = curatorFramework;
        this.basePath = basePath;
        this.serviceAuthority = serviceAuthority;
    }

    @Override
    public void start(Listener2 listener) {
        this.curatorFramework.start();
        this.listener = listener;
        refresh();
    }

    @Override
    public void refresh() {
        List<EquivalentAddressGroup> servers = new ArrayList<>();
        try {
            List<EquivalentAddressGroup> addresses = curatorFramework.getChildren()
                    .forPath(basePath)
                    .stream().map(address ->{
                        try {
                            URI uri = new URI("http://" + address);
                            return new EquivalentAddressGroup(
                                new InetSocketAddress(uri.getHost(), uri.getPort()));
                        }catch (Exception e){
                            listener.onError(Status.INTERNAL);
                            return null;
                        }
                    }).collect(Collectors.toList());
            listener.onResult(ResolutionResult.newBuilder().setAddresses(addresses).build());

        } catch (Exception e) {
            listener.onError(Status.INTERNAL);
        }
    }

    @Override
    public String getServiceAuthority() {
        return this.serviceAuthority;
    }

    @Override
    public void shutdown() {
        this.curatorFramework.close();
    }

    public static class Factory extends NameResolver.Factory{
        @Override
        public NameResolver newNameResolver(URI targetUri, Args args) {
            String address = targetUri.getHost() + ":" + targetUri.getPort();
            String authority = null == targetUri.getAuthority() ? address : targetUri.getAuthority();
            CuratorFramework curator = CuratorFrameworkFactory.builder()
                    .connectString(address)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 5))
                    .connectionTimeoutMs(1000)
                    .sessionTimeoutMs(60000)
                    .build();
            return new CuratorNameResolver(curator, targetUri.getPath(), authority);
        }

        @Override
        public String getDefaultScheme() {
            return "zookeeper";
        }
    }
}

自定義隨機負載均衡實現(xiàn)

public class RandomLoadBalancer extends LoadBalancer{
    LoadBalancer.Helper helper;

    private final Map<EquivalentAddressGroup, Subchannel> subchannels =
            new HashMap<>();
    static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
            Attributes.Key.create("state-info");

    public RandomLoadBalancer(LoadBalancer.Helper helper) {
        this.helper = helper;
    }
    @Override
    public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
        List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
        for(EquivalentAddressGroup server : servers){
            List<EquivalentAddressGroup> serverSingletonListt = Collections.singletonList(server);
            Subchannel exists = subchannels.getOrDefault(server, null);
            if(null != exists){
                exists.updateAddresses(serverSingletonListt);
                continue;
            }
            Attributes.Builder subchannelAttrs = Attributes.newBuilder()
                    .set(STATE_INFO,
                            new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));
            final Subchannel subchannel = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
                            .setAddresses(serverSingletonListt)
                            .setAttributes(subchannelAttrs.build())
                            .build());
            subchannels.put(server, subchannel);
            subchannel.start(new SubchannelStateListener() {
                @Override
                public void onSubchannelState(ConnectivityStateInfo state) {
                    for(Map.Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()){
                        if(subchannel == entry.getValue()){
                            if (state.getState() == SHUTDOWN) {
                                subchannels.remove(entry.getKey());
                            }
                            if (state.getState() == IDLE) {
                                subchannel.requestConnection();
                            }
                            subchannel.getAttributes().get(STATE_INFO).value = state;
                            updateBalancingState();
                            return;
                        }
                    }
                }
            });
            subchannel.requestConnection();
        }
        updateBalancingState();
    }
    @Override
    public void handleNameResolutionError(Status error) {
        shutdown();
        helper.updateBalancingState(TRANSIENT_FAILURE, new SubchannelPicker() {
            @Override
            public PickResult pickSubchannel(PickSubchannelArgs args) {
                return PickResult.withError(error);
            }
        });
    }

    private  void updateBalancingState(){
        boolean ready = true;
        for(Subchannel subchannel : this.subchannels.values()){
            if(subchannel.getAttributes().get(STATE_INFO).value.getState() != READY){
                helper.updateBalancingState(CONNECTING, new RandomSubchannelPick(subchannels.values()));
                return;
            }
        }
        helper.updateBalancingState(ConnectivityState.READY, new RandomSubchannelPick(subchannels.values()));
    }

    @Override
    public void shutdown() {
        for(Iterator<Map.Entry<EquivalentAddressGroup, Subchannel>> itr = subchannels.entrySet().iterator(); itr.hasNext();){
            Map.Entry<EquivalentAddressGroup, Subchannel> e = itr.next();
            e.getValue().shutdown();
            itr.remove();
        }

    }

    class RandomSubchannelPick extends SubchannelPicker{
        Subchannel[] subchannels;
        Random random = new Random(System.currentTimeMillis());

        public RandomSubchannelPick(Collection<Subchannel> subchannels) {
            this.subchannels = subchannels.stream().toArray(Subchannel[]::new);
        }

        @Override
        public PickResult pickSubchannel(PickSubchannelArgs args) {
            int idx = random.nextInt(subchannels.length);
            return PickResult.withSubchannel(subchannels[idx]);
        }
    }

    public static class Provider extends LoadBalancerProvider{

        @Override
        public boolean isAvailable() {
            return true;
        }

        @Override
        public int getPriority() {
            return 100;
        }

        @Override
        public String getPolicyName() {
            return "random";
        }

        @Override
        public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
            return new RandomLoadBalancer(helper);
        }
    }

    static final class Ref<T> {
        T value;

        Ref(T value) {
            this.value = value;
        }
    }
}

服務端初始化

服務端需要把自己的服務地址注冊到zookeeper释漆。

private final int port;
private final Server server;
private String registryPath;
private String address;
CuratorFramework curator = CuratorFrameworkFactory.builder()
        .connectString("127.0.0.1:2181")
        .retryPolicy(new ExponentialBackoffRetry(1000, 5))
        .connectionTimeoutMs(1000)
        .sessionTimeoutMs(60000)
        .build();;

public GreetingServer(int port, String registryPath) throws IOException {
    this.port = port;
    server = ServerBuilder.forPort(port).addService(new GreetingService())
            .build();
    this.registryPath = registryPath;
    this.address =  "localhost:" + port;    //本機網卡不能正確顯示地址悲没,直接寫死localhost
}

/**
 * Start server.
 */
public void start() throws Exception {
    this.curator.start();
    server.start();;
    this.curator.create()
            .creatingParentContainersIfNeeded()
            .withMode(CreateMode.EPHEMERAL)
            .forPath(registryPath + "/" + address, ("http://" + address).getBytes());

    System.out.println("Server started, listening on " + address);
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            GreetingServer.this.stop();
        }
    });
}

客戶端初始化

客戶端需要注冊自定義的NameResolverFactory和LoadBalancer。

public GreetingClient(String host, int port, String path) {
    String target = "zookeeper://" + host + ":" + port + path;
    CuratorNameResolver.Factory factory = new CuratorNameResolver.Factory();

    LoadBalancerRegistry.getDefaultRegistry().register(new RandomLoadBalancer.Provider());
    ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
            .forTarget(target)
            .nameResolverFactory(factory)
            .defaultLoadBalancingPolicy("random")
            .usePlaintext();
    channel = channelBuilder.build();
    blockingStub = GreetingGrpc.newBlockingStub(channel);
}

參考資料

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末男图,一起剝皮案震驚了整個濱河市示姿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌逊笆,老刑警劉巖撬碟,帶你破解...
    沈念sama閱讀 218,607評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異麦向,居然都是意外死亡祷膳,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評論 3 395
  • 文/潘曉璐 我一進店門乃戈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來褂痰,“玉大人,你說我怎么就攤上這事症虑∷跬幔” “怎么了?”我有些...
    開封第一講書人閱讀 164,960評論 0 355
  • 文/不壞的土叔 我叫張陵谍憔,是天一觀的道長匪蝙。 經常有香客問我主籍,道長,這世上最難降的妖魔是什么骗污? 我笑而不...
    開封第一講書人閱讀 58,750評論 1 294
  • 正文 為了忘掉前任崇猫,我火速辦了婚禮,結果婚禮上需忿,老公的妹妹穿的比我還像新娘诅炉。我一直安慰自己,他們只是感情好屋厘,可當我...
    茶點故事閱讀 67,764評論 6 392
  • 文/花漫 我一把揭開白布涕烧。 她就那樣靜靜地躺著,像睡著了一般汗洒。 火紅的嫁衣襯著肌膚如雪议纯。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,604評論 1 305
  • 那天溢谤,我揣著相機與錄音瞻凤,去河邊找鬼。 笑死世杀,一個胖子當著我的面吹牛阀参,可吹牛的內容都是我干的。 我是一名探鬼主播瞻坝,決...
    沈念sama閱讀 40,347評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蛛壳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了所刀?” 一聲冷哼從身側響起衙荐,我...
    開封第一講書人閱讀 39,253評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎浮创,沒想到半個月后忧吟,有當地人在樹林里發(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,702評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡蒸矛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,893評論 3 336
  • 正文 我和宋清朗相戀三年瀑罗,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雏掠。...
    茶點故事閱讀 40,015評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖劣像,靈堂內的尸體忽然破棺而出乡话,到底是詐尸還是另有隱情,我是刑警寧澤耳奕,帶...
    沈念sama閱讀 35,734評論 5 346
  • 正文 年R本政府宣布绑青,位于F島的核電站诬像,受9級特大地震影響,放射性物質發(fā)生泄漏闸婴。R本人自食惡果不足惜坏挠,卻給世界環(huán)境...
    茶點故事閱讀 41,352評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望邪乍。 院中可真熱鬧降狠,春花似錦、人聲如沸庇楞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吕晌。三九已至蛋褥,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間睛驳,已是汗流浹背烙心。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留乏沸,地道東北人淫茵。 一個月前我還...
    沈念sama閱讀 48,216評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像屎蜓,于是被迫代替她去往敵國和親痘昌。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,969評論 2 355