2.gRPC流

gRPC流

github

Simple RPC

一個(gè)簡(jiǎn)單的RPC,客戶端使用存根將請(qǐng)求發(fā)送到服務(wù)器弛针,然后等待響應(yīng)返回,就像正常的函數(shù)調(diào)用一樣胎食。

使用Simple RPC的問(wèn)題

  • 數(shù)據(jù)包過(guò)大造成壓力
  • 接收數(shù)據(jù)包時(shí)逗宜,需要所有數(shù)據(jù)包都接受成功且正確后,才能夠回調(diào)響應(yīng)剃诅,進(jìn)行業(yè)務(wù)處理(無(wú)法客戶端邊發(fā)送巷送,服務(wù)端邊處理)

為什么使用流

  • 大規(guī)模數(shù)據(jù)包
  • 實(shí)時(shí)場(chǎng)景
image-20210519220417828.png

讓我們來(lái)模擬一個(gè)場(chǎng)景,服務(wù)端存儲(chǔ)了<id,studentInfo>综苔。id是編號(hào)惩系,studentInfo是學(xué)生的一些信息,包括name,age兩個(gè)屬性

服務(wù)端流式RPC(server-side streaming RPC)

服務(wù)器端流式RPC,客戶端在其中向服務(wù)器發(fā)送請(qǐng)求如筛,并獲取流以讀取回一系列消息堡牡。客戶端從返回的流中讀取杨刨,直到?jīng)]有更多消息為止晤柄。如我們的示例所示,您可以通過(guò)將stream關(guān)鍵字放在響應(yīng)類型之前來(lái)指定服務(wù)器端流方法妖胀。

客戶端流(client-side streaming RPC

客戶端流式RPC芥颈,客戶端在其中編寫(xiě)一系列消息惠勒,然后再次使用提供的流將它們發(fā)送到服務(wù)器∨揽樱客戶端寫(xiě)完消息后纠屋,它將等待服務(wù)器讀取所有消息并返回其響應(yīng)。通過(guò)將stream關(guān)鍵字放在請(qǐng)求類型之前盾计,可以指定客戶端流方法售担。

雙向流式RPC(bidirectional streaming RPC

雙向流式RPC,雙方都使用讀寫(xiě)流發(fā)送一系列消息署辉。這兩個(gè)流獨(dú)立運(yùn)行族铆,因此客戶端和服務(wù)器可以按照自己喜歡的順序進(jìn)行讀寫(xiě):例如,服務(wù)器可以在寫(xiě)響應(yīng)之前等待接收所有客戶端消息哭尝,或者可以先讀取消息再寫(xiě)入消息哥攘,或讀寫(xiě)的其他組合。每個(gè)流中的消息順序都會(huì)保留材鹦。您可以通過(guò)在請(qǐng)求和響應(yīng)之前都放置stream關(guān)鍵字來(lái)指定這種類型的方法逝淹。

IDL

寫(xiě)入如下的stream.proto文件

syntax = "proto3";

package streamProto;

//option go_package = "./";

service StreamService{
  // 服務(wù)端流式RPC, request: StreamRangeRequest. response: StreamStuResponse.
  // List 返回所有查詢范圍內(nèi)的 student 信息
  rpc List(StreamRangeRequest)returns (stream StreamStuResponse){};

  // 客戶端流式RPC,request: StreamUpdateRequest. response: StreamOKResponse.
  // Update 更新服務(wù)端學(xué)生信息,根據(jù)id更新age.
  rpc Update(stream StreamUpdateRequest) returns(StreamOKResponse){};

  // 雙向流式RPC, request: StreamRangeRequest. response: StreamStuResponse.
  // Check 根據(jù)請(qǐng)求的范圍返回信息
  rpc Check(stream StreamRangeRequest)returns(stream StreamStuResponse){};

}

message StreamRangeRequest{
  // 請(qǐng)求從begin-end的信息
  int32 begin = 1;
  int32 end = 2;
}

message StreamStuResponse{
  // 學(xué)生信息
  string name = 1;
  int32 age = 2;
}

message StreamUpdateRequest{
  // 更新服務(wù)端學(xué)生信息,根據(jù)id更新age
  int32 id = 1;
  int32 age = 2;
}

message StreamOKResponse{
  // 返回更新成功個(gè)數(shù)
  int32 OK = 1;
}

服務(wù)端流式RPC

服務(wù)器端流式 RPC侠姑,顯然是單向流创橄,并代指 Server 為 Stream 而 Client 為普通 RPC 請(qǐng)求

簡(jiǎn)單來(lái)講就是客戶端發(fā)起一次普通的 RPC 請(qǐng)求,服務(wù)端通過(guò)流式響應(yīng)多次發(fā)送數(shù)據(jù)集莽红,客戶端 Recv 接收數(shù)據(jù)集妥畏。大致如圖:

image-20210519211458599.png

Server

func (s *StreamService) List(r *pb.StreamRangeRequest, stream pb.StreamService_ListServer) error {
    begin := r.GetBegin()
    end := r.GetEnd()

    // 如果有這個(gè)id,將信息發(fā)送給客戶端
    for i := begin; i <= end; i++ {
        if info, ok := studentInfo[i]; ok {
            err := stream.Send(&pb.StreamStuResponse{
                Name: info.name,
                Age:  int32(info.age),
            })
            if err != nil {
                return err
            }
        }
    }
    return nil
}
  • 消息體(對(duì)象)序列化
  • 壓縮序列化后的消息體
  • 對(duì)正在傳輸?shù)南Ⅲw增加 5 個(gè)字節(jié)的 header
  • 判斷壓縮+序列化后的消息體總字節(jié)長(zhǎng)度是否大于預(yù)設(shè)的 maxSendMessageSize(預(yù)設(shè)值為 math.MaxInt32),若超出則提示錯(cuò)誤
  • 寫(xiě)入給流的數(shù)據(jù)集

Client

func List(client pb.StreamServiceClient, begin, end int32) {
    stream, err := client.List(context.Background(), &pb.StreamRangeRequest{
        Begin: begin,
        End:   end,
    })
    if err != nil {
        log.Fatalf("grpc.List err: %v", err)
    }
    for {
        stuInfo, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("%v.List(_) = _, %v", client, err)
        }
        log.Println(stuInfo.Name, stuInfo.Age)
    }
}

(1)RecvMsg 是阻塞等待的

(2)RecvMsg 當(dāng)流成功/結(jié)束(調(diào)用了 Close)時(shí)安吁,會(huì)返回 io.EOF

(3)RecvMsg 當(dāng)流出現(xiàn)任何錯(cuò)誤時(shí)醉蚁,流會(huì)被中止,錯(cuò)誤信息會(huì)包含 RPC 錯(cuò)誤碼鬼店。而在 RecvMsg 中可能出現(xiàn)如下錯(cuò)誤:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

同時(shí)需要注意网棍,默認(rèn)的 MaxReceiveMessageSize 值為 1024 _ 1024 _ 4,建議不要超出

運(yùn)行

服務(wù)端預(yù)先寫(xiě)入了一些內(nèi)容

// 存儲(chǔ)學(xué)生信息
var studentInfo = map[int32]*stuInfo{}

func init() {
    // 初始化一些數(shù)據(jù)
    studentInfo[1] = &stuInfo{
        name: "張三",
        age:  23,
    }
    studentInfo[2] = &stuInfo{
        name: "李四",
        age:  30,
    }
}

type stuInfo struct {
    name string
    age  int
}

客戶端請(qǐng)求數(shù)據(jù)

2021/05/19 21:13:19 張三 23
2021/05/19 21:13:19 李四 30

客戶端流式RPC

客戶端流式 RPC妇智,單向流滥玷,客戶端通過(guò)流式發(fā)起多次 RPC 請(qǐng)求給服務(wù)端,服務(wù)端發(fā)起一次響應(yīng)給客戶端巍棱,大致如圖:

image-20210519214226806.png

Server

func (s *StreamService) Update(stream pb.StreamService_UpdateServer) error {
    var okCount int32
    for {
        stuInfo, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.StreamOKResponse{OK: int32(okCount)})
        }
        if err != nil {
            return err
        }
        id := stuInfo.GetId()
        if _, ok := studentInfo[id]; ok {
            studentInfo[id].age = int(stuInfo.GetAge())
            okCount++
        }
    }
    return nil
}

多了一個(gè)從未見(jiàn)過(guò)的方法 stream.SendAndClose惑畴,它是做什么用的呢?

在這段程序中航徙,我們對(duì)每一個(gè) Recv 都進(jìn)行了處理如贷,當(dāng)發(fā)現(xiàn) io.EOF (流關(guān)閉) 后,需要將最終的響應(yīng)結(jié)果發(fā)送給客戶端,同時(shí)關(guān)閉正在另外一側(cè)等待的 Recv

Client

func List(client pb.StreamServiceClient, begin, end int32) {
    stream, err := client.List(context.Background(), &pb.StreamRangeRequest{
        Begin: begin,
        End:   end,
    })
    if err != nil {
        log.Fatalf("grpc.List err: %v", err)
    }
    for {
        stuInfo, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("%v.List(_) = _, %v", client, err)
        }
        log.Println(stuInfo.Name, stuInfo.Age)
    }
}

stream.CloseAndRecvstream.SendAndClose 是配套使用的流方法杠袱,相信聰明的你已經(jīng)秒懂它的作用了

運(yùn)行

2021/05/19 21:41:50 更改成功數(shù)量 2

雙向流式RPC

雙向流式 RPC尚猿,顧名思義是雙向流。由客戶端以流式的方式發(fā)起請(qǐng)求楣富,服務(wù)端同樣以流式的方式響應(yīng)請(qǐng)求

首個(gè)請(qǐng)求一定是 Client 發(fā)起凿掂,但具體交互方式(誰(shuí)先誰(shuí)后、一次發(fā)多少菩彬、響應(yīng)多少缠劝、什么時(shí)候關(guān)閉)根據(jù)程序編寫(xiě)的方式來(lái)確定(可以結(jié)合協(xié)程)

假設(shè)該雙向流是按順序發(fā)送的話潮梯,大致如圖:

image-20210519214420251.png

因程序編寫(xiě)的不同而不同骗灶。雙向流圖示無(wú)法適用不同的場(chǎng)景

server

func (s *StreamService) Check(stream pb.StreamService_CheckServer) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        begin, end := req.GetBegin(), req.GetEnd()
        for i := begin; i <= end; i++ {
            if info, ok := studentInfo[i]; ok {
                err := stream.Send(&pb.StreamStuResponse{
                    Name: info.name,
                    Age:  int32(info.age),
                })
                if err != nil {
                    return err
                }
            }
        }

    }
    return nil
}

client

func Check(client pb.StreamServiceClient) {
    stream, err := client.Check(context.Background())
    if err != nil {
        log.Fatalf("%v.Update(_) = _, %v", client, err)
    }

    waitc := make(chan interface{})

    go func() {
        // 接收消息
        for {
            info, err := stream.Recv()
            if err == io.EOF {
                // read done.
                close(waitc)
                return
            }
            if err != nil {
                log.Fatalf("Failed to receive a note : %v", err)
            }
            log.Printf("Got message of Student(%s, %d)", info.Name, info.Age)
        }
    }()

    for i := 0; i < 1; i++ {
        if err := stream.Send(&pb.StreamRangeRequest{Begin: int32(i + 1), End: int32(i + 2)}); err != nil {
            log.Fatalf("Failed to send a message : %v", err)
        }
    }
    stream.CloseSend()
    fmt.Printf("here")
    <-waitc
}

運(yùn)行

here2021/05/19 22:01:34 Got message of Student(張三, 10)
2021/05/19 22:01:34 Got message of Student(李四, 20)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市秉馏,隨后出現(xiàn)的幾起案子耙旦,更是在濱河造成了極大的恐慌,老刑警劉巖萝究,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件免都,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡帆竹,警方通過(guò)查閱死者的電腦和手機(jī)绕娘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)栽连,“玉大人险领,你說(shuō)我怎么就攤上這事∶虢簦” “怎么了绢陌?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)熔恢。 經(jīng)常有香客問(wèn)我脐湾,道長(zhǎng),這世上最難降的妖魔是什么叙淌? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任秤掌,我火速辦了婚禮,結(jié)果婚禮上鹰霍,老公的妹妹穿的比我還像新娘闻鉴。我一直安慰自己,他們只是感情好衅谷,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布椒拗。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蚀苛。 梳的紋絲不亂的頭發(fā)上在验,一...
    開(kāi)封第一講書(shū)人閱讀 49,764評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音堵未,去河邊找鬼腋舌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛渗蟹,可吹牛的內(nèi)容都是我干的块饺。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼雌芽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼授艰!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起世落,我...
    開(kāi)封第一講書(shū)人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤淮腾,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后屉佳,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體谷朝,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年武花,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了圆凰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡体箕,死狀恐怖专钉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情干旁,我是刑警寧澤驶沼,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站争群,受9級(jí)特大地震影響回怜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜换薄,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一玉雾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧轻要,春花似錦复旬、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)壁涎。三九已至,卻和暖如春志秃,著一層夾襖步出監(jiān)牢的瞬間怔球,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工浮还, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留竟坛,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓钧舌,卻偏偏與公主長(zhǎng)得像担汤,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子洼冻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容