接口
type Client interface {
Init(...Option) error
Options() Options
NewPublication(topic string, msg interface{}) Publication
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error
String() string
}
用例
// Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
Name: "John",
})
// create context with metadata
ctx := metadata.NewContext(context.Background(), map[string]string{
"X-User-Id": "john",
"X-From-Id": "script",
})
rsp := &example.Response{}
// Call service
if err := client.Call(ctx, req, rsp); err != nil {
fmt.Println("call err: ", err, rsp)
return
}
fmt.Println("Call:", i, "rsp:", rsp.Msg)
操作流程
大致流程分三步库倘,實(shí)例化巴元,構(gòu)建Request舵盈,發(fā)起請(qǐng)求 Call
- 1陋率、client := NewClient() // 實(shí)例化,可以直接用client秽晚,默認(rèn)客戶端(上例中使用)
-- 1.1瓦糟、client.Init(...opts) // 初始化client , 可以在NewClient時(shí)配置,也可以在init配置赴蝇,可配置項(xiàng)見默認(rèn)配置 - 2菩浙、構(gòu)建Request,其實(shí)就是構(gòu)建一個(gè)用戶發(fā)送請(qǐng)求的結(jié)構(gòu)
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...)
}
func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/json", reqOpts...)
}
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions
for _, o := range reqOpts {
o(&opts)
}
return &rpcRequest{
service: service,
method: method,
request: request,
contentType: contentType,
opts: opts,
}
}
- 3、發(fā)起請(qǐng)求 Call劲蜻,重點(diǎn)陆淀!看看客戶端怎樣發(fā)起請(qǐng)求的
-- 3.1 opts.Selector.Select(request.Service(), callOpts.SelectOptions...) 選擇服務(wù)端服務(wù)器地址
---- 3.1.1 services, err := r.so.Registry.GetService(service) 從服務(wù)發(fā)現(xiàn)服務(wù)中取對(duì)應(yīng)所有服務(wù)器列表
---- 3.1.2 sopts.Strategy(services) 從列表中按選擇策略選出一個(gè)服務(wù)器地址,默認(rèn)策略:Random
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
-- 3.2 構(gòu)建context
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
-- 3.3 請(qǐng)求外圍調(diào)用斋竞,類似于請(qǐng)求中間件機(jī)制
// make copy of call method
rcall := r.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
rcall = callOpts.CallWrappers[i-1](rcall)
}
-- 3.4 完整的調(diào)用方法
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
// delay 機(jī)制,出錯(cuò)的時(shí)候可以指定下一次執(zhí)行時(shí)間
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node 取服務(wù)端節(jié)點(diǎn)
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the address
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
// make the call 發(fā)送請(qǐng)求
err = rcall(ctx, address, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err) // 標(biāo)記,用于記錄錯(cuò)誤秃殉,優(yōu)化選擇器
return err
}
-- 3.4 重試機(jī)制
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
go func() {
ch <- call(i)
}()
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, request, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
- 4 真正的call
-- 4.1 構(gòu)建 請(qǐng)求header
msg := &transport.Message{
Header: make(map[string]string),
}
md, ok := metadata.FromContext(ctx)
if ok {
for k, v := range md {
msg.Header[k] = v
}
}
// set timeout in nanoseconds
msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
// set the content type for the request
msg.Header["Content-Type"] = req.ContentType()
// set the accept header
msg.Header["Accept"] = req.ContentType()
-- 4.2 newCodec , 根據(jù) contentType 初始化encode方式坝初,json or protobuf
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
cf, err := r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
-- 4.3 從連接池中取連接實(shí)例
var grr error
c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout))
if err != nil {
return errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
defer func() {
// defer execution of release
r.pool.release(address, c, grr)
}()
-- 4.4 發(fā)送請(qǐng)求
stream := &rpcStream{
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
}
defer stream.Close()
ch := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
ch <- errors.InternalServerError("go.micro.client", "panic recovered: %v", r)
}
}()
// send request
if err := stream.Send(req.Request()); err != nil {
ch <- err
return
}
// recv request
if err := stream.Recv(resp); err != nil {
ch <- err
return
}
// success
ch <- nil
}()
select {
case err := <-ch:
grr = err
return err
case <-ctx.Done():
grr = ctx.Err()
return errors.New("go.micro.client", fmt.Sprintf("request timeout: %v", ctx.Err()), 408)
}
stream.Send 調(diào)用codec.WriteRequest
stream.Recv 調(diào)用codec.ReadResponseHeader,codec.ReadResponseBody