前情提要
本系列的第一篇文章 通過(guò)一個(gè)例子介紹了go語(yǔ)言實(shí)現(xiàn)gRPC雙向數(shù)據(jù)流的交互控制,第二篇文章介紹了如何通過(guò)Websocket與gRPC交互。通過(guò)這兩篇文章学辱,我們可以一窺gRPC雙向數(shù)據(jù)流的開(kāi)發(fā)方式,但是在生產(chǎn)環(huán)境當(dāng)中一臺(tái)服務(wù)器(一個(gè)服務(wù)端程序)是不夠的,我們往往會(huì)面臨各種復(fù)雜情況:訪問(wèn)量上來(lái)了一臺(tái)服務(wù)器不夠怎么辦话告?服務(wù)器掛了怎么辦?有實(shí)戰(zhàn)經(jīng)驗(yàn)的讀者肯定知道答案:上負(fù)載均衡(Load Balancing)奥盐俊沙郭!
gRPC服務(wù)如何做負(fù)載均衡?
gRPC官方博客上有一篇文章《gRPC Load Balancing》(https://grpc.io/blog/loadbalancing)裳朋,詳細(xì)介紹了幾種方案病线,并分析了幾種方案各自的優(yōu)劣。并附了一張解決方案表:
在gRPC的Github上還有一篇文章叫《Load Balancing in gRPC》(https://github.com/grpc/grpc/blob/master/doc/load-balancing.md),如果英文看著費(fèi)勁可以看一篇中文的《gRPC服務(wù)發(fā)現(xiàn)&負(fù)載均衡》(https://segmentfault.com/a/1190000008672912)送挑。
測(cè)試Nginx對(duì)gRPC服務(wù)的支持
因?yàn)樯厦鎺灼恼陆榻B的很詳細(xì)了绑莺,所以本文不再展開(kāi)討論。我們可以注意到上表中被紅框圈起來(lái)的部分寫(xiě)著“Nginx coming soon”惕耕,現(xiàn)在這個(gè)Nginx的解決方案已經(jīng)來(lái)了——2018年3月17日纺裁,Nginx官方宣布nginx 1.13.10支持gRPC (https://www.nginx.com/blog/nginx-1-13-10-grpc/)
第一步:下載nginx最新的stable版(本文發(fā)稿時(shí)是1.14.0,如果會(huì)用docker的也可以下載其alpine版本)赡突。
第二步:配置nginx的config文件如下
server {
# ?? nginx的監(jiān)聽(tīng)端口按你的實(shí)際情況設(shè)置
listen 80 http2;
access_log /var/log/nginx/access.log main;
location / {
# ?? 把下面的 grpc://127.0.0.1:3000換成你自己的grpc服務(wù)器地址
grpc_pass grpc://127.0.0.1:3000;
}
}
第三步:把go語(yǔ)言實(shí)現(xiàn)gRPC雙向數(shù)據(jù)流的交互控制 一文中的client.go 中的服務(wù)端地址改為nginx服務(wù)的地址(比如:127.0.0.1:80)
第四步:
(1)運(yùn)行server.go
(2)運(yùn)行nginx服務(wù)
(3)運(yùn)行client.go
如果沒(méi)什么意外对扶,gRPC客戶(hù)端發(fā)出的消息可以通過(guò)nginx后被gRPC服務(wù)端收到。
我們可以通過(guò)nginx日志觀察到相應(yīng)的信息惭缰。
一個(gè)小坑
上述連接雖然已經(jīng)實(shí)現(xiàn)浪南,但是如果我們的客戶(hù)端有連續(xù)一分鐘沒(méi)有輸入信息,會(huì)出現(xiàn)接收信息出錯(cuò)的情況漱受。
這種情形在沒(méi)有使用nginx的時(shí)候不會(huì)出現(xiàn)络凿,由于以前使用nginx給websocket做反向代理時(shí)也出現(xiàn)過(guò)類(lèi)似情況,故而推斷是nginx對(duì)超過(guò)一段時(shí)間的連接進(jìn)行了斷開(kāi)昂羡。
添加心跳
解決上述問(wèn)題可以采取的一個(gè)方法是增加心跳(如果您發(fā)現(xiàn)了什么別的好辦法可以解決這個(gè)問(wèn)題絮记,比如在nginx里配置一些參數(shù),請(qǐng)留言告訴我??)
client.go
添加一段隔40秒發(fā)送心跳的代碼
package main
import (
"bufio"
"context"
"flag"
"io"
"log"
"os"
"time"
"google.golang.org/grpc"
proto "chat" // 根據(jù)proto文件自動(dòng)生成的代碼
)
var 服務(wù)器地址 string
func init() {
flag.StringVar(&服務(wù)器地址, "server", "127.0.0.1:80", "服務(wù)器地址")
}
func main() {
// 創(chuàng)建連接
conn, err := grpc.Dial(服務(wù)器地址, grpc.WithInsecure())
if err != nil {
log.Printf("連接失敗: [%v]\n", err)
return
}
defer conn.Close()
client := proto.NewChatClient(conn)
// 聲明 context
ctx := context.Background()
// 創(chuàng)建雙向數(shù)據(jù)流
stream, err := client.BidStream(ctx)
if err != nil {
log.Printf("創(chuàng)建數(shù)據(jù)流失敗: [%v]\n", err)
return
}
// 啟動(dòng)一個(gè) goroutine 接收命令行輸入的指令
go func() {
log.Println("請(qǐng)輸入消息...")
輸入 := bufio.NewReader(os.Stdin)
for {
// 獲取 命令行輸入的字符串虐先, 以回車(chē) \n 作為結(jié)束標(biāo)志
命令行輸入的字符串, _ := 輸入.ReadString('\n')
// 向服務(wù)端發(fā)送 指令
if err := stream.Send(&proto.Request{Input: 命令行輸入的字符串}); err != nil {
return
}
}
}()
//?? 新添加的部分: 啟動(dòng)一個(gè) goroutine 每隔40秒發(fā)送心跳包
go func() {
for {
// 每隔 40 秒發(fā)送一次
time.Sleep(40 * time.Second)
log.Println("發(fā)送心跳包")
// 心跳字符用"\n"
if err := stream.Send(&proto.Request{Input: "\n"}); err != nil {
return
}
}
}()
for {
// 接收從 服務(wù)端返回的數(shù)據(jù)流
響應(yīng), err := stream.Recv()
if err == io.EOF {
log.Println("?? 收到服務(wù)端的結(jié)束信號(hào)")
break
}
if err != nil {
// TODO: 處理接收錯(cuò)誤
log.Println("接收數(shù)據(jù)出錯(cuò):", err)
break
}
log.Printf("[客戶(hù)端收到]: %s", 響應(yīng).Output)
}
}
server.go
添加一段檢測(cè)心跳的代碼
package main
import (
"flag"
"io"
"log"
"net"
"strconv"
"google.golang.org/grpc"
proto "chat" // 根據(jù)proto文件自動(dòng)生成的代碼
)
// Streamer 服務(wù)端
type Streamer struct{}
// BidStream 實(shí)現(xiàn)了 ChatServer 接口中定義的 BidStream 方法
func (s *Streamer) BidStream(stream proto.Chat_BidStreamServer) error {
ctx := stream.Context()
for {
select {
case <-ctx.Done():
log.Println("收到客戶(hù)端通過(guò)context發(fā)出的終止信號(hào)")
return ctx.Err()
default:
// 接收從客戶(hù)端發(fā)來(lái)的消息
輸入, err := stream.Recv()
if err == io.EOF {
log.Println("客戶(hù)端發(fā)送的數(shù)據(jù)流結(jié)束")
return nil
}
if err != nil {
log.Println("接收數(shù)據(jù)出錯(cuò):", err)
return err
}
// 如果接收正常怨愤,則根據(jù)接收到的 字符串 執(zhí)行相應(yīng)的指令
switch 輸入.Input {
case "結(jié)束對(duì)話(huà)\n", "結(jié)束對(duì)話(huà)":
log.Println("收到'結(jié)束對(duì)話(huà)'指令")
if err := stream.Send(&proto.Response{Output: "收到結(jié)束指令"}); err != nil {
return err
}
// 收到結(jié)束指令時(shí),通過(guò) return nil 終止雙向數(shù)據(jù)流
return nil
case "返回?cái)?shù)據(jù)流\n", "返回?cái)?shù)據(jù)流":
log.Println("收到'返回?cái)?shù)據(jù)流'指令")
// 收到 收到'返回?cái)?shù)據(jù)流'指令蛹批, 連續(xù)返回 10 條數(shù)據(jù)
for i := 0; i < 10; i++ {
if err := stream.Send(&proto.Response{Output: "數(shù)據(jù)流 #" + strconv.Itoa(i)}); err != nil {
return err
}
}
// ?? 攔截心跳字符"\n"
case "\n":
log.Println("收到心跳包")
// 只接收心跳不回發(fā)數(shù)據(jù)也可以
default:
// 缺省情況下撰洗, 返回 '服務(wù)端返回: ' + 輸入信息
log.Printf("[收到消息]: %s", 輸入.Input)
if err := stream.Send(&proto.Response{Output: "服務(wù)端返回: " + 輸入.Input}); err != nil {
return err
}
}
}
}
}
var 服務(wù)端口 string
func init() {
flag.StringVar(&服務(wù)端口, "port", "3000", "服務(wù)端口")
}
func main() {
log.Println("啟動(dòng)服務(wù)端...")
server := grpc.NewServer()
// 注冊(cè) ChatServer
proto.RegisterChatServer(server, &Streamer{})
address, err := net.Listen("tcp", ":"+服務(wù)端口)
if err != nil {
panic(err)
}
if err := server.Serve(address); err != nil {
panic(err)
}
}
添加完成后再度測(cè)試,連接不會(huì)再被nginx打斷腐芍。
Nginx實(shí)現(xiàn)服務(wù)端負(fù)載均衡的配置文件
心跳的坑趟過(guò)去之后差导,剩下的其實(shí)就簡(jiǎn)單了,我們修改nginx的配置文件:
upstream backend {
# ?? 把下面的服務(wù)端地址和端口改成你自己的
server 127.0.0.1:3000;
server 127.0.0.1:3001;
}
server {
listen 80 http2;
access_log /var/log/nginx/access.log main;
location / {
grpc_pass grpc://backend;
}
}
按如下順序啟動(dòng)
(1)運(yùn)行多個(gè) server.go 猪勇,按照nginx配置文件輸入端口參數(shù)(如 server.go -port 3001)
(2)運(yùn)行nginx服務(wù)
(3)運(yùn)行多個(gè)client.go设褐, (也可以運(yùn)行websocket的那個(gè)程序,記得把心跳代碼加上泣刹,多開(kāi)幾個(gè)瀏覽器窗口)
我們可以觀察到開(kāi)啟的多個(gè)server都在進(jìn)行g(shù)RPC數(shù)據(jù)流服務(wù)助析,至此大功告成??!
總結(jié)
gRPC服務(wù)端的負(fù)載均衡有很多種方案椅您,也各有優(yōu)劣外冀,但是用Nginx似乎是最簡(jiǎn)單的一種〗缶冢總之,我們還得根據(jù)具體的業(yè)務(wù)場(chǎng)景來(lái)選擇具體的實(shí)現(xiàn)方案。
gRPC雙向數(shù)據(jù)流系列
(之一): gRPC雙向數(shù)據(jù)流的交互控制
(之二): 通過(guò)Websocket與gRPC交互