gRPC 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。
在本文中,我们将探讨如何正确使用 gRPC 流。
让我们检查一下使用 gRPC 流的良好实践:
一个常见的错误是对一元请求使用流式传输。
例如,考虑以下 gRPC 服务定义:
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是
从使用流媒体中获得任何好处。
比较一元请求和流请求的 Golang 代码示例:
一元请求:
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
流式一元请求:
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。
让我们比较一下这两个服务定义:
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
BookStore 一次流式传输一本书,而 BookStoreBatch 同时流式传输多本书。
如果客户端需要列出所有书籍,使用BookStoreBatch 效率更高
因为它减少了客户端和服务器之间的往返次数。
让我们看看 BookStore 和 BookStoreBatch 的 Golang 代码示例:
书店:
type bookStore struct { pb.UnimplementedBookStoreServer } func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error { for _, b := range bookStoreData { if b.Author == req.Author { if err := stream.Send(&pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }); err != nil { return err } } } return nil } func TestBookStore_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
书店批次:
type bookStoreBatch struct { pb.UnimplementedBookStoreBatchServer } func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error { const batchSize = 10 books := make([]*pb.Book, 0, batchSize) for _, b := range bookStoreData { if b.Author == req.Author { books = append(books, &pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }) if len(books) == batchSize { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return err } books = books[:0] } } } if len(books) > 0 { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return nil } } return nil } func TestBookStoreBatch_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
从上面的代码来看,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:
书店基准:
func BenchmarkBookStore_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } benchInnerBooks = books } benchBooks = benchInnerBooks }
BookStoreBatch 基准:
func BenchmarkBookStoreBatch_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } benchInnerBooks = books } benchBooks = benchInnerBooks }
基准测试结果:
BenchmarkBookStore_ListBooks BenchmarkBookStore_ListBooks-12 732 1647454 ns/op 85974 B/op 1989 allocs/op BenchmarkBookStoreBatch_ListBooks BenchmarkBookStoreBatch_ListBooks-12 1202 937491 ns/op 61098 B/op 853 allocs/op
多么大的进步啊! BookStoreBatch 比 BookStore 快 1.75 倍。
但是为什么 BookStoreBatch 比 BookStore 快?
服务器每次向客户端发送消息流.Send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。
在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。
书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。
双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。
如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。
让我们看一下下面的protobuf定义:
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。
传感器的 golang 代码将被忽略,但您可以在这里找到它
serverStream 包装流和传感器数据,使其更易于使用。
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。
接收消息:
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留recvLoop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendLoop
将从控制通道读取消息并将其发送到客户端。
发送消息:
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
sendLoop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。
最后,传感器服务的快乐路径测试:
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。
gRPC 流是用于构建实时应用程序的多功能且强大的工具。
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。
虽然 gRPC 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。
如果您有任何问题或反馈,请随时在 LinkedIn 上与我联系。
以上是gRPC 流:最佳实践和性能见解的详细内容。更多信息请关注PHP中文网其他相关文章!