gRPC-Go基础(4)metadata和超时设置

0. 简介

Go在多个go routine之间传递数据使用的是Go SDK提供的context包,而context的作用范围在进程内,而gRPC使用的是跨进程的网络传输,那如何实现跨进程的元数据传输呢?

1. metadata

1.1 metadata结构

metadata的简单理解,就是http 的 Header 中的 key-value 对

gRPC使用metadata在服务之间传输全局数据,metadata形式为键值对(k-v)列表,如下:

type MD map[string][]string
  • k一般为字符串,v可以是字符串,也可以是二进制数据,当v是二进制数据时,k必须以-bin结尾,二进制数据会被base64编码然后传输;
  • 同时,k不能以grpc-开头,因为这是为gRPC所保留;
  • k中大写字符会被转化为小写;
  • 如上,一个k可以对应多个v。

1.2 metadata创建

New方法

md := metadata.New(map[string]string{
   "k1":"v1","k2":"v2"})

Pair方法(相同的key自动合并)

md := metadata.Pairs(
   "k1", "v1",
   "k1", "v1.2",
   "k2-bin", string([]byte{
   1, 2}),
)

k1对应的值会被自动合并为map[string][]string{k1: {v1, v1.2}}。k2-bin对应的值会被进行base64编码。

1.3 客户端处理metadata

1.3.1 发送metadata

NewOutgoingContext

md := metadata.Pairs(
   "k1", "v1",
   "k1", "v1.2",
   "k2-bin", string([]byte{
   1, 2}),
)
ctx := metadata.NewOutgoingContext(context.Background(), md)

将新创建的 Metadata 添加到 context 中,这样会 覆盖 掉原来已有的 metadata。其实际上就是调用了context.WithValue 方法,生成了一个子context而已,这个子context中包含了传入的metadata。

// NewOutgoingContext creates a new context with outgoing md attached. If used
// in conjunction with AppendToOutgoingContext, NewOutgoingContext will
// overwrite any previously-appended metadata.
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
   
   return context.WithValue(ctx, mdOutgoingKey{
   }, rawMD{
   md: md})
}

AppendToOutgoingContext

ctx = metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v1.2", "k2-bin", string([]byte{
   1, 2}))

AppendToOutgoingContext方法将k-v对添加到已有的context中。如果对应的context没有metadata,那么就会创建一个;如果已有metadata了,那么就将数据添加到原来的metadata(推荐使用 AppendToOutgoingContext)。

1.3.2 接收metadata
目前,gRPC的客户端支持接收的metadata包括header和trailer。
一元RPC
header和trailer可以通过Header和Trailer方法,在调用gRPC方法的CallOption时传入,在函数调用结束后取出。

var header, trailer metadata.MD
res, err := grpcClient.SimpleRoute(ctx, &simplepb.SimpleRequest{
   Data: "I am iguochan"}, grpc.Header(&header), grpc.Trailer(&trailer))

值得注意的是,虽然流式RPC的方法调用中也有CallOption,但是这两个方法**明确仅用于一元RPC。

// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
   
   return HeaderCallOption{
   HeaderAddr: md}
}

// Trailer returns a CallOptions that retrieves the trailer metadata
// for a unary RPC.
func Trailer(md *metadata.MD) CallOption {
   
   return TrailerCallOption{
   TrailerAddr: md}
}

流式RPC
所有的流式RPC都将使用以下形式接收服务端发送的header和trailer。通过接口ClientStream的Header() (metadata.MD, error)和Trailer() metadata.MD方法取出。

stream, err := grpcClient.RouteList(context.Background())

// 从流中取header
header, err := stream.Header()

// 从流中取出trailer
trailer := stream.Trailer()

1.4 服务端处理metadata

1.4.1 发送metadata
和客户端接收metadata一样,服务端只能发送header和trailer这两类metadata。

一元RPC
有关这些方法的具体用法可以参考go文档。

func (r *Server) SimpleRoute(ctx context.Context, request *simplepb.SimpleRequest) (*simplepb.SimpleResponse, error) {
   
   // ...
   md := metadata.Pairs(/* ... */)
   err := grpc.SendHeader(ctx, md)
   // ...
   md := metadata.Pairs(/* ... */)
   err = grpc.SetTrailer(ctx, md)
}

流式RPC

func (r *Server) RouteList(server simplepb.Route_RouteListServer) error {
   
   // ...
   md := metadata.Pairs(/* ... */)
   err := server.SendHeader(md)
   // ...
   md := metadata.Pairs(/* ... */)
   server.SetTrailer(md)
   // ...
}

1.4.2 接收metadata
要读取客户端发送的元数据,服务器需要从 RPC 上下文中检索它。如果是一元调用,则可以使用 RPC 处理程序的上下文。对于流式调用,服务器需要从流中获取上下文。

一元RPC

func (r *Server) SimpleRoute(ctx context.Context, request *simplepb.SimpleRequest) (*simplepb.SimpleResponse, error) {
   
   md, ok := metadata.FromIncomingContext(ctx)
   if ok {
   
      // do something with metadata
   }
   // ...
}

流式RPC

func (r *Server) ListValue(request *simplepb.SimpleRequest, server simplepb.Route_ListValueServer) error {
   
   md, ok := metadata.FromIncomingContext(server.Context())
   if ok {
   
      // do something with metadata
   }
   // ...
}

1.5 metadata的传输

以上介绍了metadata的用法,本节将介绍一下metadata在gRPC各端之间的传输机制。上面说过,在Go中,metadata就是实际上就是调用了context.WithValue 方法,生成了一个子context,那其实不管是在客户端还是服务端,都属于context的上下文传递。
1.5.1 客户端输出metadata
根据前面的文章,我们知道,用户定义好protobuf后通过protoc生成服务端的interface和客户端的桩代码(stub),在桩代码中,已经包含了客户端的实现:

func (c *routeClient) SimpleRoute(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error) {
   
   out := new(SimpleResponse)
   err := c.cc.Invoke(ctx, "/simplepb.Route/SimpleRoute", in, out, opts...)
   if err != nil {
   
      return nil, err
   }
   return out, nil
}

当发起gRPC请求后,会调用Invoke方法,最终会调用invoke方法:

func invoke(ctx context.Context, method string, req, reply interface{
   }, cc *ClientConn, opts ...CallOption) error {
   
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
   
        return err
    }
    if err := cs.SendMsg(req); err != nil {
   
        return err
    }
    return cs.RecvMsg(reply)
}

invoke方法会调用newClientStream,newClientStream方法比较长,在里面会生成一个函数newStream,并调用newClientStreamWithParams:

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
   
    // ...
    var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
   
        return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
    }
    // ...
}

newClientStreamWithParams最红会选择一个transport去传输:

func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
   
    // ...
    // Pick the transport to use and create a new stream on the transport.
    // Assign cs.attempt upon success.
    op := func(a *csAttempt) error {
   
        if err := a.getTransport(); err != nil {
   
                return err
        }
        if err := a.newStream(); err != nil {
   
                return err
        }
        // Because this operation is always called either here (while creating
        // the clientStream) or by the retry code while locked when replaying
        // the operation, it is safe to access cs.attempt directly.
        cs.attempt = a
        return nil
    }
    // ...
}

而gRPC选择的是HTTP2作为传输协议,所以最终a.newStream()会最终调用到HTTP2的NewStream:

// NewStream creates a stream and registers it into the transport as "active"
// streams.  All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
   
    ctx = peer.NewContext(ctx, t.getPeer())
    headerFields, err := t.createHeaderFields(ctx, callHdr)
    if err != nil {
   
        return nil, &NewStreamError{
   Err: err, AllowTransparentRetry: false}
    }
}

createHeaderFields,顾名思义,就是构建HTTP请求时的头(HEADERS Frame),其有关metadata从context到HTTP Header的转换实现如下:

func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
   
    // ...
    if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
   
        var k string
        for k, vv := range md {
   
            // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
            if isReservedHeader(k) {
   
                continue
            }
            for _, v := range vv {
   
                headerFields = append(headerFields, hpack.HeaderField{
   Name: k, Value: encodeMetadataHeader(k, v)})
            }
        }
        for _, vv := range added {
   
            for i, v := range vv {
   
                if i%2 == 0 {
   
                        k = strings.ToLower(v)
                        continue
                }
                // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
                if isReservedHeader(k) {
   
                        continue
                }
                headerFields = append(headerFields, hpack.HeaderField{
   Name: k, Value: encodeMetadataHeader(k, v)})
            }
        }
    }
    // ...
}

可以看到,所谓metadata,其实最终是通过HTTP2协议的头帧带入到网络的,主要存储在Binary-HeaderASCII-Header,感兴趣的同学可以参考gRPC over HTTP2。
1.5.2 服务端接收metadata
服务端通过Serve方法来启动,监听来自客户端的连接:

err = grpcServer.Serve(listener)
if err != nil {
   
    log.Fatalf("grpcServer.Serve err: %v", err)
}

Serve方法最终会调用到handleRawConn,handleRawConn会通过HTTP2去接收消息。

func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
   
    // ...
    st := s.newHTTP2Transport(rawConn)
    // ...
    go func() {
   
        s.serveStreams(st)
        s.removeConn(lisAddr, st)
    }()
}

接下来的调用流程涉及的方法是:serveStreams——>HandleStreams——>operateHeaders,最终在operateHeaders,会将所有的头文件中的传入的Header(除去默认的一些字段等)读入到mdata中,这就是服务端拿到的metadata。

func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
   
    // ...
    for _, hf := range frame.Fields {
   
        switch hf.Name {
   
        // ...
        default:
            if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
   
                break
            }
            v, err := decodeMetadataHeader(hf.Name, hf.Value)
            if err != nil {
   
                headerError = true
                logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
                break
            }
            mdata[hf.Name] = append(mdata[hf.Name], v)
        }
    }
    // ...
}

2. 超时设置

除了一些元数据的传输,在gRPC中,我们还支持将context中的超时传递到接下来的每个服务中,甚至可以跨语言,而这是怎么做到的呢?首先我们想到是通过metadata传输,实际上并不是的。

2.1 客户端输出超时信息

其实从前面1.5.1 客户端输出metadata的最后一步,createHeaderFields方法中,就有如下处理,超时的数值会被作为key是grpc-timeout的值放在HTTP2的头中(正如备注中所言,其实在网络上的传输时间是没有被计入的,会有一定误差)。

func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
   
    // ...
    if dl, ok := ctx.Deadline(); ok {
   
        // Send out timeout regardless its value. The server can detect timeout context by itself.
        // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
        timeout := time.Until(dl)
        headerFields = append(headerFields, hpack.HeaderField{
   Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
    }
    // ...
}

2.2 服务端端接收超时信息

同理,在HTTP2的server端的头处理函数operateHeaders中,也会对grpc-timeout对应的值进行处理,将其值读入到timeout中,并置上timeoutSet标志,用于后续处理。

func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
   
    // ...
    var (
        // ...
        timeoutSet bool
        timeout    time.Duration
    )
    // ...
    for _, hf := range frame.Fields {
   
        switch hf.Name {
   
        // ...
        case "grpc-timeout":
            timeoutSet = true
            var err error
            if timeout, err = decodeTimeout(hf.Value); err != nil {
   
                headerError = true
            }
        }
    }
    // ...
    if timeoutSet {
   
        s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
    } else {
   
        s.ctx, s.cancel = context.WithCancel(t.ctx)
    }
    // ...
}

3. 小结

根据以上分析,我们可以发现,不管是metadata还是超时,都是通过HTTP2的头帧传入到对侧的。

相关推荐

  1. gRPC-Go基础4metadata超时设置

    2023-12-29 10:30:08       28 阅读
  2. gRPC-Go基础(3)基础gRPC服务

    2023-12-29 10:30:08       40 阅读
  3. gRPC-Go基础(1)基础知识

    2023-12-29 10:30:08       40 阅读
  4. gRPC-Go基础(2)protobuf基础

    2023-12-29 10:30:08       26 阅读
  5. gRPC-Go基础(1)protoc的使用

    2023-12-29 10:30:08       40 阅读
  6. go grpc安装protobuf

    2023-12-29 10:30:08       8 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-29 10:30:08       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-29 10:30:08       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-29 10:30:08       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-29 10:30:08       20 阅读

热门阅读

  1. volatile关键字详解

    2023-12-29 10:30:08       36 阅读
  2. SQL面试题挑战11:访问会话切割

    2023-12-29 10:30:08       34 阅读
  3. HarmonyOS(Stage模型)app-json5

    2023-12-29 10:30:08       41 阅读
  4. 限制哪些IP能连接postgre

    2023-12-29 10:30:08       37 阅读
  5. STL 之 vector 通俗理解

    2023-12-29 10:30:08       32 阅读
  6. 基于matlab的一维多节数组排序

    2023-12-29 10:30:08       30 阅读
  7. 用VSCode Remote-SSH做Docker环境中的开发

    2023-12-29 10:30:08       36 阅读
  8. centos 编译安装 icu

    2023-12-29 10:30:08       45 阅读