首页 >后端开发 >Golang >gRPC 流:最佳实践和性能见解

gRPC 流:最佳实践和性能见解

Patricia Arquette
Patricia Arquette原创
2024-12-06 11:32:12755浏览

gRPC Streaming: Best Practices and Performance Insights

介绍

gRPC 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。

在本文中,我们将探讨如何正确使用 gRPC 流。

先决条件

  • gRPC基础知识
  • Go 编程语言的基础知识(示例代码是用 Go 编写的,但这个概念也可以应用于其他语言)
  • 代码示例可在 GitHub 上获取

良好实践

让我们检查一下使用 gRPC 流的良好实践:

使用一元请求进行一元请求

一个常见的错误是对一元请求使用流式传输。
例如,考虑以下 gRPC 服务定义:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是
从使用流媒体中获得任何好处。

比较一元请求和流请求的 Golang 代码示例:

一元请求:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

流式一元请求:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。

如果可以的话,一次发送多个文档

让我们比较一下这两个服务定义:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

BookStore 一次流式传输一本书,而 BookStoreBatch 同时流式传输多本书。

如果客户端需要列出所有书籍,使用BookStoreBatch 效率更高
因为它减少了客户端和服务器之间的往返次数。

让我们看看 BookStore 和 BookStoreBatch 的 Golang 代码示例:

书店:

type bookStore struct {
    pb.UnimplementedBookStoreServer
}

func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error {
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            if err := stream.Send(&pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            }); err != nil {
                return err
            }
        }
    }
    return nil
}

func TestBookStore_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        book, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, book)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

书店批次:

type bookStoreBatch struct {
    pb.UnimplementedBookStoreBatchServer
}

func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error {
    const batchSize = 10
    books := make([]*pb.Book, 0, batchSize)
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            books = append(books, &pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            })

            if len(books) == batchSize {
                if err := stream.Send(&pb.ListBooksResponse{
                    Books: books,
                }); err != nil {
                    return err
                }
                books = books[:0]
            }
        }
    }

    if len(books) > 0 {
        if err := stream.Send(&pb.ListBooksResponse{
            Books: books,
        }); err != nil {
            return nil
        }
    }

    return nil
}

func TestBookStoreBatch_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        response, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, response.Books...)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

从上面的代码来看,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:

书店基准:

func BenchmarkBookStore_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            book, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, book)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

BookStoreBatch 基准:

func BenchmarkBookStoreBatch_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            response, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, response.Books...)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

基准测试结果:

BenchmarkBookStore_ListBooks
BenchmarkBookStore_ListBooks-12                      732           1647454 ns/op           85974 B/op       1989 allocs/op
BenchmarkBookStoreBatch_ListBooks
BenchmarkBookStoreBatch_ListBooks-12                1202            937491 ns/op           61098 B/op        853 allocs/op

多么大的进步啊! BookStoreBatch 比 BookStore 快 1.75 倍。

但是为什么 BookStoreBatch 比 BookStore 快?

服务器每次向客户端发送消息流.Send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。

在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。

使用双向流来控制流量

书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。

双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。

如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。

让我们看一下下面的protobuf定义:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。

传感器的 golang 代码将被忽略,但您可以在这里找到它

serverStream 包装流和传感器数据,使其更易于使用。

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。

接收消息:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留r​​ecvLoop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendLoop
将从控制通道读取消息并将其发送到客户端。

发送消息:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

sendLoop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。

最后,传感器服务的快乐路径测试:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。

挑战自己

  • 使用 gRPC 流实现聊天应用程序。
  • 修改传感器服务以一次发送多个值以节省往返次数。
  • 嗅探网络流量以查看一元请求和流请求之间的区别。

结论

gRPC 流是用于构建实时应用程序的多功能且强大的工具。
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。
虽然 gRPC 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。

保持联系

如果您有任何问题或反馈,请随时在 LinkedIn 上与我联系。

以上是gRPC 流:最佳实践和性能见解的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn