gRPC流
Simple RPC
一個(gè)簡(jiǎn)單的RPC,客戶端使用存根將請(qǐng)求發(fā)送到服務(wù)器弛针,然后等待響應(yīng)返回,就像正常的函數(shù)調(diào)用一樣胎食。
使用Simple RPC的問(wèn)題
- 數(shù)據(jù)包過(guò)大造成壓力
- 接收數(shù)據(jù)包時(shí)逗宜,需要所有數(shù)據(jù)包都接受成功且正確后,才能夠回調(diào)響應(yīng)剃诅,進(jìn)行業(yè)務(wù)處理(無(wú)法客戶端邊發(fā)送巷送,服務(wù)端邊處理)
流
為什么使用流
- 大規(guī)模數(shù)據(jù)包
- 實(shí)時(shí)場(chǎng)景
讓我們來(lái)模擬一個(gè)場(chǎng)景,服務(wù)端存儲(chǔ)了<id,studentInfo>
综苔。id
是編號(hào)惩系,studentInfo
是學(xué)生的一些信息,包括name
,age
兩個(gè)屬性
服務(wù)端流式RPC(server-side streaming RPC)
服務(wù)器端流式RPC,客戶端在其中向服務(wù)器發(fā)送請(qǐng)求如筛,并獲取流以讀取回一系列消息堡牡。客戶端從返回的流中讀取杨刨,直到?jīng)]有更多消息為止晤柄。如我們的示例所示,您可以通過(guò)將stream
關(guān)鍵字放在響應(yīng)類型之前來(lái)指定服務(wù)器端流方法妖胀。
客戶端流(client-side streaming RPC)
客戶端流式RPC芥颈,客戶端在其中編寫(xiě)一系列消息惠勒,然后再次使用提供的流將它們發(fā)送到服務(wù)器∨揽樱客戶端寫(xiě)完消息后纠屋,它將等待服務(wù)器讀取所有消息并返回其響應(yīng)。通過(guò)將stream
關(guān)鍵字放在請(qǐng)求類型之前盾计,可以指定客戶端流方法售担。
雙向流式RPC(bidirectional streaming RPC)
雙向流式RPC,雙方都使用讀寫(xiě)流發(fā)送一系列消息署辉。這兩個(gè)流獨(dú)立運(yùn)行族铆,因此客戶端和服務(wù)器可以按照自己喜歡的順序進(jìn)行讀寫(xiě):例如,服務(wù)器可以在寫(xiě)響應(yīng)之前等待接收所有客戶端消息哭尝,或者可以先讀取消息再寫(xiě)入消息哥攘,或讀寫(xiě)的其他組合。每個(gè)流中的消息順序都會(huì)保留材鹦。您可以通過(guò)在請(qǐng)求和響應(yīng)之前都放置stream
關(guān)鍵字來(lái)指定這種類型的方法逝淹。
IDL
寫(xiě)入如下的stream.proto
文件
syntax = "proto3";
package streamProto;
//option go_package = "./";
service StreamService{
// 服務(wù)端流式RPC, request: StreamRangeRequest. response: StreamStuResponse.
// List 返回所有查詢范圍內(nèi)的 student 信息
rpc List(StreamRangeRequest)returns (stream StreamStuResponse){};
// 客戶端流式RPC,request: StreamUpdateRequest. response: StreamOKResponse.
// Update 更新服務(wù)端學(xué)生信息,根據(jù)id更新age.
rpc Update(stream StreamUpdateRequest) returns(StreamOKResponse){};
// 雙向流式RPC, request: StreamRangeRequest. response: StreamStuResponse.
// Check 根據(jù)請(qǐng)求的范圍返回信息
rpc Check(stream StreamRangeRequest)returns(stream StreamStuResponse){};
}
message StreamRangeRequest{
// 請(qǐng)求從begin-end的信息
int32 begin = 1;
int32 end = 2;
}
message StreamStuResponse{
// 學(xué)生信息
string name = 1;
int32 age = 2;
}
message StreamUpdateRequest{
// 更新服務(wù)端學(xué)生信息,根據(jù)id更新age
int32 id = 1;
int32 age = 2;
}
message StreamOKResponse{
// 返回更新成功個(gè)數(shù)
int32 OK = 1;
}
服務(wù)端流式RPC
服務(wù)器端流式 RPC侠姑,顯然是單向流创橄,并代指 Server 為 Stream 而 Client 為普通 RPC 請(qǐng)求
簡(jiǎn)單來(lái)講就是客戶端發(fā)起一次普通的 RPC 請(qǐng)求,服務(wù)端通過(guò)流式響應(yīng)多次發(fā)送數(shù)據(jù)集莽红,客戶端 Recv 接收數(shù)據(jù)集妥畏。大致如圖:
Server
func (s *StreamService) List(r *pb.StreamRangeRequest, stream pb.StreamService_ListServer) error {
begin := r.GetBegin()
end := r.GetEnd()
// 如果有這個(gè)id,將信息發(fā)送給客戶端
for i := begin; i <= end; i++ {
if info, ok := studentInfo[i]; ok {
err := stream.Send(&pb.StreamStuResponse{
Name: info.name,
Age: int32(info.age),
})
if err != nil {
return err
}
}
}
return nil
}
- 消息體(對(duì)象)序列化
- 壓縮序列化后的消息體
- 對(duì)正在傳輸?shù)南Ⅲw增加 5 個(gè)字節(jié)的 header
- 判斷壓縮+序列化后的消息體總字節(jié)長(zhǎng)度是否大于預(yù)設(shè)的 maxSendMessageSize(預(yù)設(shè)值為
math.MaxInt32
),若超出則提示錯(cuò)誤 - 寫(xiě)入給流的數(shù)據(jù)集
Client
func List(client pb.StreamServiceClient, begin, end int32) {
stream, err := client.List(context.Background(), &pb.StreamRangeRequest{
Begin: begin,
End: end,
})
if err != nil {
log.Fatalf("grpc.List err: %v", err)
}
for {
stuInfo, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.List(_) = _, %v", client, err)
}
log.Println(stuInfo.Name, stuInfo.Age)
}
}
(1)RecvMsg 是阻塞等待的
(2)RecvMsg 當(dāng)流成功/結(jié)束(調(diào)用了 Close)時(shí)安吁,會(huì)返回 io.EOF
(3)RecvMsg 當(dāng)流出現(xiàn)任何錯(cuò)誤時(shí)醉蚁,流會(huì)被中止,錯(cuò)誤信息會(huì)包含 RPC 錯(cuò)誤碼鬼店。而在 RecvMsg 中可能出現(xiàn)如下錯(cuò)誤:
- io.EOF
- io.ErrUnexpectedEOF
- transport.ConnectionError
- google.golang.org/grpc/codes
同時(shí)需要注意网棍,默認(rèn)的 MaxReceiveMessageSize 值為 1024 _ 1024 _ 4,建議不要超出
運(yùn)行
服務(wù)端預(yù)先寫(xiě)入了一些內(nèi)容
// 存儲(chǔ)學(xué)生信息
var studentInfo = map[int32]*stuInfo{}
func init() {
// 初始化一些數(shù)據(jù)
studentInfo[1] = &stuInfo{
name: "張三",
age: 23,
}
studentInfo[2] = &stuInfo{
name: "李四",
age: 30,
}
}
type stuInfo struct {
name string
age int
}
客戶端請(qǐng)求數(shù)據(jù)
2021/05/19 21:13:19 張三 23
2021/05/19 21:13:19 李四 30
客戶端流式RPC
客戶端流式 RPC妇智,單向流滥玷,客戶端通過(guò)流式發(fā)起多次 RPC 請(qǐng)求給服務(wù)端,服務(wù)端發(fā)起一次響應(yīng)給客戶端巍棱,大致如圖:
Server
func (s *StreamService) Update(stream pb.StreamService_UpdateServer) error {
var okCount int32
for {
stuInfo, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamOKResponse{OK: int32(okCount)})
}
if err != nil {
return err
}
id := stuInfo.GetId()
if _, ok := studentInfo[id]; ok {
studentInfo[id].age = int(stuInfo.GetAge())
okCount++
}
}
return nil
}
多了一個(gè)從未見(jiàn)過(guò)的方法 stream.SendAndClose
惑畴,它是做什么用的呢?
在這段程序中航徙,我們對(duì)每一個(gè) Recv 都進(jìn)行了處理如贷,當(dāng)發(fā)現(xiàn) io.EOF
(流關(guān)閉) 后,需要將最終的響應(yīng)結(jié)果發(fā)送給客戶端,同時(shí)關(guān)閉正在另外一側(cè)等待的 Recv
Client
func List(client pb.StreamServiceClient, begin, end int32) {
stream, err := client.List(context.Background(), &pb.StreamRangeRequest{
Begin: begin,
End: end,
})
if err != nil {
log.Fatalf("grpc.List err: %v", err)
}
for {
stuInfo, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.List(_) = _, %v", client, err)
}
log.Println(stuInfo.Name, stuInfo.Age)
}
}
stream.CloseAndRecv
和 stream.SendAndClose
是配套使用的流方法杠袱,相信聰明的你已經(jīng)秒懂它的作用了
運(yùn)行
2021/05/19 21:41:50 更改成功數(shù)量 2
雙向流式RPC
雙向流式 RPC尚猿,顧名思義是雙向流。由客戶端以流式的方式發(fā)起請(qǐng)求楣富,服務(wù)端同樣以流式的方式響應(yīng)請(qǐng)求
首個(gè)請(qǐng)求一定是 Client 發(fā)起凿掂,但具體交互方式(誰(shuí)先誰(shuí)后、一次發(fā)多少菩彬、響應(yīng)多少缠劝、什么時(shí)候關(guān)閉)根據(jù)程序編寫(xiě)的方式來(lái)確定(可以結(jié)合協(xié)程)
假設(shè)該雙向流是按順序發(fā)送的話潮梯,大致如圖:
因程序編寫(xiě)的不同而不同骗灶。雙向流圖示無(wú)法適用不同的場(chǎng)景
server
func (s *StreamService) Check(stream pb.StreamService_CheckServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
begin, end := req.GetBegin(), req.GetEnd()
for i := begin; i <= end; i++ {
if info, ok := studentInfo[i]; ok {
err := stream.Send(&pb.StreamStuResponse{
Name: info.name,
Age: int32(info.age),
})
if err != nil {
return err
}
}
}
}
return nil
}
client
func Check(client pb.StreamServiceClient) {
stream, err := client.Check(context.Background())
if err != nil {
log.Fatalf("%v.Update(_) = _, %v", client, err)
}
waitc := make(chan interface{})
go func() {
// 接收消息
for {
info, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message of Student(%s, %d)", info.Name, info.Age)
}
}()
for i := 0; i < 1; i++ {
if err := stream.Send(&pb.StreamRangeRequest{Begin: int32(i + 1), End: int32(i + 2)}); err != nil {
log.Fatalf("Failed to send a message : %v", err)
}
}
stream.CloseSend()
fmt.Printf("here")
<-waitc
}
運(yùn)行
here2021/05/19 22:01:34 Got message of Student(張三, 10)
2021/05/19 22:01:34 Got message of Student(李四, 20)