gRPC 流允許 protobuf 訊息從客戶端串流傳輸到伺服器、從伺服器串流傳輸到客戶端,或雙向串流。
這項強大的功能可用於建立即時應用程序,例如聊天應用程式、即時監控儀表板等。
在本文中,我們將探討如何正確使用 gRPC 流。
讓我們檢查一下使用 gRPC 流的良好實踐:
一個常見的錯誤是對一元請求使用串流。
例如,考慮以下 gRPC 服務定義:
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
如果客戶端只需要發送一個請求並接收一個回應,
您不需要使用串流。相反,我們可以如下定義服務:
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
透過對一元請求使用串流傳輸,我們增加了不必要的複雜性
到程式碼,這可能會使其更難理解和維護,而不是
從使用串流媒體中獲得任何好處。
比較一元請求和流請求的 Golang 程式碼範例:
一元請求:
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
流式一元請求:
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
我們可以看到,一元請求的程式碼更簡單,更容易理解
比流請求的代碼。
讓我們來比較一下這兩個服務定義:
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
BookStore 一次串流一本書,而 BookStoreBatch 同時串流多本書。
如果客戶端需要列出所有書籍,使用BookStoreBatch 效率更高
因為它減少了客戶端和伺服器之間的往返次數。
讓我們來看看 BookStore 和 BookStoreBatch 的 Golang 程式碼範例:
書店:
type bookStore struct { pb.UnimplementedBookStoreServer } func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error { for _, b := range bookStoreData { if b.Author == req.Author { if err := stream.Send(&pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }); err != nil { return err } } } return nil } func TestBookStore_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
書店批次:
type bookStoreBatch struct { pb.UnimplementedBookStoreBatchServer } func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error { const batchSize = 10 books := make([]*pb.Book, 0, batchSize) for _, b := range bookStoreData { if b.Author == req.Author { books = append(books, &pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }) if len(books) == batchSize { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return err } books = books[:0] } } } if len(books) > 0 { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return nil } } return nil } func TestBookStoreBatch_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
從上面的程式碼來看,需要明確哪一個比較好。
讓我們執行一個基準測試來看看差異:
書店基準:
func BenchmarkBookStore_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } benchInnerBooks = books } benchBooks = benchInnerBooks }
BookStoreBatch 基準:
func BenchmarkBookStoreBatch_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } benchInnerBooks = books } benchBooks = benchInnerBooks }
基準測試結果:
BenchmarkBookStore_ListBooks BenchmarkBookStore_ListBooks-12 732 1647454 ns/op 85974 B/op 1989 allocs/op BenchmarkBookStoreBatch_ListBooks BenchmarkBookStoreBatch_ListBooks-12 1202 937491 ns/op 61098 B/op 853 allocs/op
多麼大的進步啊! BookStoreBatch 比 BookStore 快 1.75 倍。
但是為什麼 BookStoreBatch 比 BookStore 快?
伺服器每次傳送訊息流至客戶端.Send(),都需要
對訊息進行編碼並透過網路發送。透過發送多個檔案
我們立即減少了伺服器需要編碼和發送的次數
訊息,不僅提高了伺服器的效能,還提高了
對於需要解碼訊息的客戶端。
在上面的範例中,批量大小設定為10,但客戶端可以根據網路情況和文件大小進行調整。
書店範例傳回所有書籍並完成串流,但如果客戶端
需要即時觀察事件(例如感測器),使用雙向
直播是正確的選擇。
雙向流有點棘手,因為客戶端和伺服器都
可以同時發送和接收訊息。希望 golang 能讓這一切變得簡單
像這樣處理並發。
如前所述,感測器是雙向流的一個很好的例子。
監視功能允許客戶端決定監視和請求哪些感測器
如果需要的話,當前值。
讓我們來看看下面的protobuf定義:
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
請求訊息不只是訊息流,更是一條可以
包含不同類型的請求。 oneof 指令允許我們定義一個
只能包含指定類型之一的欄位。
感測器的 golang 程式碼將被忽略,但您可以在這裡找到它
serverStream 包裝流和感測器數據,使其更易於使用。
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
如前所述,伺服器可以同時發送和接收訊息,一個
函數將處理傳入的訊息,另一個函數將處理
傳出消息。
接收訊息:
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
switch語句用來處理不同類型的請求並做出決定
如何處理每個請求。只保留 SecvLoop 函數很重要
讀取訊息但不向客戶端發送訊息,因此我們有 sendLoop
將從控制通道讀取訊息並將其發送到客戶端。
發送訊息:
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
sendLoop函數讀取控制通道和資料通道並發送
發送給客戶端的訊息。如果流關閉,函數將傳回。
最後,感測器服務的快樂路徑測試:
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
從上面的測試我們可以看到客戶端可以建立、取消、取得當前
感測器的值。客戶端還可以同時觀看多個感測器。
gRPC 串流是用於建立即時應用程式的多功能且強大的工具。
透過遵循最佳實踐,例如僅在必要時使用串流、有效地批次資料以及明智地利用雙向串流傳輸,開發人員可以最大限度地提高效能
並保持程式碼簡單性。
雖然 gRPC 串流帶來了複雜性,但其好處遠遠超過了挑戰
當深思熟慮地應用時。
如果您有任何問題或回饋,請隨時在 LinkedIn 上與我聯繫。
以上是gRPC 串流:最佳實踐和效能見解的詳細內容。更多資訊請關注PHP中文網其他相關文章!