主要基于官網(wǎng)介紹的文檔總結(jié)而來驼抹。
需要先了解 protocol buffers
為什么使用gRPC
通過gPRC,我們可以僅僅定義一次service 到.proto文件中啼肩,然后使用gRPC支持的任何開發(fā)語言開發(fā)客戶端或服務(wù)器塘匣。
樣例代碼和環(huán)境的建立
首先要確保golang開發(fā)環(huán)境的正確配置,go1.5+滨砍。
$ go get -u -v google.golang.org/grpc
本人在測試中遇到報錯,主要原因在于樣例需要
"golang.org/x/net"
"golang.org/x/text"
的支持,本人的解決方法如下
到
$GOPATH/src/golang.org/x/
目錄下惋戏,如果golang.org/x/ 不存在則手動創(chuàng)建一個领追。
然后
git clone https://github.com/golang/net.git
git clone https://github.com/golang/text.git
樣例測試
$ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide
$ go run server/server.go
$ go run client/client.go
下面對樣例的代碼進(jìn)行分析
服務(wù)定義
gRPC使用 protocol buffers定義服務(wù)。
要定義服務(wù)日川,需要在.proto文件中做service定義如下:
service RouteGuide {
...
}
然后可以在servie的定義rpc方法,指定對應(yīng)的request和response類型矩乐。gPRC允許開發(fā)者定義4中service方法龄句,這4中方法在樣例RouteGuide 中都有用到。
- 最簡單的RPC方法散罕,客戶端通過調(diào)用該方法發(fā)送request到服務(wù)端分歇,等待服務(wù)器的response,類似正常的函數(shù)調(diào)用欧漱。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
- 服務(wù)端單邊stream的RPC( server-side streaming RPC):客戶端調(diào)用該方法到服務(wù)端职抡,服務(wù)器返回一個stream,客戶端從這個stream中讀取數(shù)據(jù)直到?jīng)]有數(shù)據(jù)可讀误甚。從樣例代碼中可以看到該方法的主要特點(diǎn)是在response類型前加stream缚甩。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- 客戶端單邊stream的RPC(A client-side streaming RPC):客戶端通過使用stream將一系列的數(shù)據(jù)發(fā)送到服務(wù)端∫ぐ睿客戶端數(shù)據(jù)發(fā)送完畢后就等待服務(wù)端把數(shù)據(jù)全部讀完后發(fā)送相應(yīng)過來擅威。從樣例代碼中可以看到該方法主要特點(diǎn)是在request類型前面加stream.:
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 雙邊stream RPC(bidirectional streaming RPC)「郧眨客戶端和服務(wù)端都通過讀寫流(read-write stream)向?qū)Ψ桨l(fā)送一系列的消息郊丛。這兩個streams是完全獨(dú)立的,所以客戶端和服務(wù)端可以隨意的進(jìn)行讀寫操作:例如瞧筛,服務(wù)端可以等待客戶端的是數(shù)據(jù)都接收完畢后再往response里寫數(shù)據(jù)厉熟,或者可以先讀取一條消息再寫入一條信息或者是其他的一些讀寫組合方式。從樣例代碼中可以看到該方法的主要特點(diǎn)就是在request和response前面都加stream较幌。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
樣例中的.proto文件包含了服務(wù)端方法中使用的request和response類型所使用的類型的協(xié)議池消息類型定義( protocol buffer message type definitions )揍瑟。
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客戶端和服務(wù)端代碼
根據(jù).proto文件生成客戶端和服務(wù)端所需的gRPC接口代碼
protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide
創(chuàng)建服務(wù)端
服務(wù)端代碼主要做兩方面的工作:
- 實(shí)現(xiàn)上一步驟.proto生成的服務(wù)端接口。
- 運(yùn)行一個gRPC服務(wù)來監(jiān)聽客戶端的請求并且把請求分發(fā)到正確的服務(wù)端實(shí)現(xiàn)里乍炉。
實(shí)現(xiàn)RouteGuide
As you can see, our server has a routeGuideServer struct type that implements the generated RouteGuideServer interface:
可以看出我們的服務(wù)端有一個routeGuideServer 的結(jié)構(gòu)體類型實(shí)現(xiàn)了RouteGuideServer 的接口月培。
type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
Simple RPC
GetFeature,從客戶端獲取一個Point然后從數(shù)據(jù)庫中返回對應(yīng)的特征信息。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// No feature was found, return an unnamed feature
return &pb.Feature{"", point}, nil
}
這個方法輸入?yún)?shù)是一個RPC的context對象以及客戶端發(fā)過來的點(diǎn)協(xié)議池(Point protocol buffer)請求恩急。這個方法返回一個特征協(xié)議池(Feature protocol buffer)對象杉畜,對象中包含響應(yīng)信息和錯誤。在這個方法中衷恭,我們?yōu)镕eature轉(zhuǎn)入了正確的信息然后和nil error一起返回此叠,告訴gRPC服務(wù)器已經(jīng)完成對RPC的處理,F(xiàn)eature可以返回給客戶端了随珠。
Server-side streaming RPC
ListFeatures是一個服務(wù)端stream的RPC灭袁,所以我們需要返回多個Features到客戶端猬错。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
可以看出,該方法獲取一個request對象以及一個特殊的RouteGuide_ListFeaturesServer 來寫相應(yīng)茸歧。這個方法中我們用Send方法把所有需要返回的Feature特征寫入到RouteGuide_ListFeaturesServer 中倦炒。最后返回一個nil error告訴gRPC服務(wù)端已經(jīng)寫好相應(yīng)。如果期間有什么錯誤發(fā)生软瞎,我們返回一個非nil的error,gRPC會轉(zhuǎn)換為正確的RPC狀態(tài)發(fā)送到線路中逢唤。
Client-side streaming RPC
.
客戶端流方法RecordRoute中,我們從客戶端獲取一系列的Point然后返回一個RouteSummary 對象包含旅行信息涤浇。從代碼中可以看到該方法里面沒有任何的請求參數(shù)鳖藕,而是一個RouteGuide_RecordRouteServer 流對象。服務(wù)端可以用Rev()方法從RouteGuide_RecordRouteServer 對象中讀取消息并使用Write()方法往里面寫消息只锭。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
在這個方法中著恩,我們使用RouteGuide_RecordRouteServer’s 的Recv方法不停的從客戶端的請求中讀取數(shù)據(jù)到requesst對象直到?jīng)]有數(shù)據(jù)可讀。服務(wù)器需要檢測每次Recv返回的error蜻展,如果是nil喉誊,表示這個stream正常可以繼續(xù)讀纵顾,如果是io.EOF表示流已經(jīng)停止了此時服務(wù)端可以返回RouteSummary裹驰。
Bidirectional streaming RPC
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // look for notes to be sent to client
for _, note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
這個方法中使用RouteGuide_RouteChatServer 流對象,可以用來讀消息和寫消息片挂。然而這次我們通過流返回數(shù)據(jù)的同時客戶端仍然在往他們的消息流中寫消息幻林。
該方法中往消息流中寫消息使用的是Send() 方法而不是 SendAndClose()
官網(wǎng)中介紹原因如下:具體意思暫時沒有搞明白。
TODO:The syntax for reading and writing here is very similar to our client-streaming method, except the server uses the stream’s Send() method rather than SendAndClose() because it’s writing multiple responses. Although each side will always get the other’s messages in the order they were written, both the client and server can read and write in any order — the streams operate completely independently.
Starting the server
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, &routeGuideServer{})
... // determine whether to use TLS
grpcServer.Serve(lis)
如代碼所示音念,我們創(chuàng)建和啟動一個服務(wù)器需要下面4個步驟:
- 指定端口號沪饺,用來監(jiān)聽客戶端的請求,使用
err := net.Listen("tcp", fmt.Sprintf(":%d", *port)).
- 創(chuàng)建一個gRPC服務(wù)器實(shí)例
grpc.NewServer().
注冊服務(wù)器實(shí)現(xiàn)到上一步驟創(chuàng)建的gRPC服務(wù)器實(shí)例上闷愤。
調(diào)用Serve啟動服務(wù)整葡,阻塞等待直到該進(jìn)程被殺死或服務(wù)器的stop被調(diào)用。
使用TLS
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
if *tls {
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
}
Creating the client
創(chuàng)建客戶端
flag.Parse()
var opts []grpc.DialOption
if *tls {
var sn string
if *serverHostOverride != "" {
sn = *serverHostOverride
}
var creds credentials.TransportCredentials
if *caFile != "" {
var err error
creds, err = credentials.NewClientTLSFromFile(*caFile, sn)
if err != nil {
grpclog.Fatalf("Failed to create TLS credentials %v", err)
}
} else {
creds = credentials.NewClientTLSFromCert(nil, sn)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
grpclog.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
client := pb.NewRouteGuideClient(conn)
為了能夠調(diào)用服務(wù)端的方法讥脐,我們首先創(chuàng)建一個gRPC通道來和服務(wù)端溝通遭居。通過傳入服務(wù)器地址和端口號給grpc.Dial()來創(chuàng)建。如代碼旬渠,我們還可以使用DialOptions來設(shè)置grpc中的認(rèn)證方法俱萍。
一旦gRPC通道建立起來后,我們需要一個客戶端來執(zhí)行RPC告丢,通過.proto創(chuàng)建的pb包中提供的NewRouteGuideClient方法來創(chuàng)建枪蘑。
Calling service methods
對應(yīng)服務(wù)端的四種方法,客戶端也要采用不同的調(diào)用方法。
Simple RPC
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
從代碼中看出岳颇,客戶端調(diào)用方法GetFeature(在)照捡,傳遞協(xié)議池(protocol buffer object)對象pb.Point作為參數(shù),同時傳遞一個context.Context 對象话侧,可以讓我們方便的改變RPC的行為栗精,例如超時或取消RPC。
Server-side streaming RPC
rect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
...
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
cient.ListFeaturens參見.proto生成的route_guide.pb.go
func (c *routeGuideClient) ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error) {
在這個方法中瞻鹏,同樣的傳遞一個context對象和一個請求悲立,但是返回一個RouteGuide_ListFeaturesClient實(shí)例,客戶端可以從這個實(shí)例中讀取得到服務(wù)端的響應(yīng)乙漓。
我們使用RouteGuide_ListFeaturesClient的Recv方法來從服務(wù)端的響應(yīng)中讀取到協(xié)議池對象Feature中直到?jīng)]有數(shù)據(jù)可讀级历。同樣的客戶端在讀取時需要檢測返回的err释移,如果為nil叭披,說明此時stream是正常的繼續(xù)可讀,如果為io.EOF表示數(shù)據(jù)已經(jīng)到結(jié)尾了玩讳。
Client-side streaming RPC
// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
同樣參見route_guide.pb.go中RecordRoute的定義
func (c *routeGuideClient) RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[1], c.cc, "/routeguide.RouteGuide/RecordRoute", opts...)
RecordRoute方法僅僅需要傳遞一個context參數(shù)涩蜘,然后返回一個RouteGuide_RecordRouteClient流對象用于客戶端寫消息和讀消息。
RouteGuide_RecordRouteClient的Send()方法用于向客戶端發(fā)送請求熏纯,一旦完成客戶端的所有請求同诫,客戶端需要調(diào)用CloseAndRecv方法來讓gRPC知道客戶端已經(jīng)完成請求并且期望獲得一個響應(yīng)。
如果CloseAndRecv()返回的err不為nil樟澜,那么返回的第一個值就是一個有效的服務(wù)端響應(yīng)误窖。
Bidirectional streaming RPC
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
和RecordRoute類型,方法RouteChat僅需要傳遞一個context對象秩贰,返回一個RouteGuide_RouteChatClient用于客戶端讀消息和寫消息霹俺。
func (c *routeGuideClient) RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RouteGuide_serviceDesc.Streams[2], c.cc, "/routeguide.RouteGuide/RouteChat", opts...)
不過和RecordRoute不同的是,客戶端在往客戶端的stream里寫消息的同時毒费,服務(wù)端也在往服務(wù)端的stream中寫消息丙唧。另外,該方法中客戶端中讀和寫是分開獨(dú)立運(yùn)行的觅玻,沒有先后順序想际,還有就是客戶端寫消息完畢后使用CloseSend而不是CloseAndRecv
后記
之前一直在CSDN上寫文章,后面會逐步轉(zhuǎn)換到簡書上溪厘,還請大家多多支持胡本。