跟我一起學Go系列:gRPC 全局數據傳輸和超時處理

gRPC 在多個 GoRoutine 之間傳遞數據使用的是 Go SDK 提供的 Context 包。關於 Context 的使用可以看我之前的一篇文章:Context 使用

但是 Context 的使用場景是同一個進程內,gRPC 使用都是跨進程的網路傳輸,如果在某個調用鏈上 A 服務當前要調用 B 服務傳遞一些上下文參數並且也希望 B 服務繼續往下傳遞該如何實現呢?

跨進程的全局數據傳輸

再次回憶一下 gRPC 是基於 HTTP/2 協議的。那我們是不是可以再請求頭中將這一部分數據 set 進去,而不是放在數據包裡面。

gRPC 也是如此實現的。進程間傳輸定義了一個 metadata 對象,該對象放在 Request-Headers 內:

Requests
Request → Request-Headers *Length-Prefixed-Message EOS
Request-Headers are delivered as HTTP2 headers in HEADERS + CONTINUATION frames.

Request-Headers → Call-Definition *Custom-Metadata
Call-Definition → Method Scheme Path TE [Authority] [Timeout] Content-Type [Message-Type] [Message-Encoding] [Message-Accept-Encoding] [User-Agent]
Method → ":method POST"
Scheme → ":scheme " ("http" / "https")
Path → ":path" "/" Service-Name "/" {method name} # But see note below.
Service-Name → {IDL-specific service name}
......
......
......
Custom-Metadata → Binary-Header / ASCII-Header
......

Custom-Metadata 欄位內即為我們要傳輸的全局對象。具體文檔可以看這裡:PROTOCOL-HTTP2

所以通過 metadata 我們可以將上一個進程中的全局對象透傳到下一個被調用的進程。查看源碼可以發現 metadata 內部實際上是通過一個 map 對象存儲數據:

type MD map[string][]string

metadata 和 Context 一起連用的使用方式如下:

發送方如果想發送一些全局欄位給接收方,首先從自己端的 metadata set 數據:

//set 數據到 metadata
md := metadata.Pairs("key", "val")
// 新建一個有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)

注意上面的 NewOutgoingContext() 方法,命名很形象,向外輸出 Context。那麼對端接收的時候肯定有一個對應的方法,我們繼續往下看。這個新的 Context 就可以用來發送出去,比如還是我們上文中的示例方法:

//set 數據到 metadata
md := metadata.Pairs("key", "val")
// 新建一個有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)

c = NewTokenServiceClient(conn)
hello, err := c.SayHello(ctx, &PingMessage{Greeting: "hahah"})
if err != nil {
  fmt.Printf("could not greet: %v", err)
}

對於接收方來說,無非就是解析 metadata 中的數據。gRPC 已經幫我們將數據解析到 context 中,所以需要從 Context 中取出 MD 對象。

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
  fmt.Printf("get metadata error")
}
if t, ok := md["key"]; ok {
  fmt.Printf("key from metadata:\n")
  for i, e := range t {
    fmt.Printf(" %d. %s\n", i, e)
  }
}

這裡取數的邏輯使用了 metadata 的 FromIncomingContext() 方法。跟存數據的 NewOutgoingContext() 方法遙相呼應。

跨進程的超時停止

同進程下跨 Goroutine 我們還是可以使用 Context 來設置當前 Context 管理下子 Goroutine 的有效期:

//超時截止
context.WithTimeout(context.Background(), 100*time.Millisecond)
//限制截止
deadline, c2 := context.WithDeadline(context.Background(), deadline time.Time)

gRPC 中同樣實現了這個功能,即跨進程間的 Context 傳遞實現進程間的 Context 生命周期管理。我們看一個簡單的例子:

服務端:

package normal

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
	pb "gorm-demo/models/pb"
	"net"
	"testing"
	"time"
)

type server struct{}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	time.Sleep(3 * time.Second)
	return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

//攔截器 - 列印日誌
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler) (interface{}, error) {
	fmt.Printf("gRPC method: %s, %v", info.FullMethod, req)
	resp, err := handler(ctx, req)
	fmt.Printf("gRPC method: %s, %v", info.FullMethod, resp)
	return resp, err
}

func TestGrpcServer(t *testing.T) {
	// 監聽本地的8972埠
	lis, err := net.Listen("tcp", ":8972")
	if err != nil {
		fmt.Printf("failed to listen: %v", err)
		return
	}
	//註冊攔截器
	s := grpc.NewServer(grpc.UnaryInterceptor(LoggingInterceptor)) // 創建gRPC伺服器
	pb.RegisterGreeterServer(s, &server{})                         // 在gRPC服務端註冊服務

	reflection.Register(s) //在給定的gRPC伺服器上註冊伺服器反射服務
	// Serve方法在lis上接受傳入連接,為每個連接創建一個ServerTransport和server的goroutine。
	// 該goroutine讀取gRPC請求,然後調用已註冊的處理程式來響應它們。
	err = s.Serve(lis)
	if err != nil {
		fmt.Printf("failed to serve: %v", err)
		return
	}

}

服務端程式碼我們在 SayHello() 方法中增加了 3s 的sleep。客戶端程式碼如下:

package normal

import (
	"fmt"
	"testing"
	"time"

	"golang.org/x/net/context"
	"google.golang.org/grpc"
	pb "gorm-demo/models/pb"
)

func TestGrpcClient(t *testing.T) {
	// 連接伺服器
	conn, err := grpc.Dial(":8972", grpc.WithInsecure())
	if err != nil {
		fmt.Printf("faild to connect: %v", err)
	}
	defer conn.Close()

	c := pb.NewGreeterClient(conn)

	//timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second*2)
	//defer cancelFunc()

	m, _ := time.ParseDuration("1s")
	result := time.Now().Add(m)
	deadline, c2 := context.WithDeadline(context.Background(), result)
	defer c2()

	// 調用服務端的SayHello
	r, err := c.SayHello(deadline, &pb.HelloRequest{Name: "CN"})
	if err != nil {
		fmt.Printf("could not greet: %v", err)
	}

	fmt.Printf("Greeting: %s !\n", r.Message)
}

針對兩種場景的超時:

//timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second*2)
//defer cancelFunc()

m, _ := time.ParseDuration("1s")
result := time.Now().Add(m)
deadline, c2 := context.WithDeadline(context.Background(), result)
defer c2()

分別做了測試,大家可以運行一下程式碼看看效果。都會看到報錯資訊:

code = DeadlineExceeded desc = context deadline exceeded

所以超時控制可以通過 Context 來操作,不必你自己再去額外寫程式碼。