ホームページ >バックエンド開発 >Golang >gRPC ストリーミング: ベスト プラクティスとパフォーマンスに関する洞察

gRPC ストリーミング: ベスト プラクティスとパフォーマンスに関する洞察

Patricia Arquette
Patricia Arquetteオリジナル
2024-12-06 11:32:12755ブラウズ

gRPC Streaming: Best Practices and Performance Insights

導入

gRPC ストリーミングを使用すると、protobuf メッセージをクライアントからサーバー、サーバーからクライアント、または双方向にストリーミングできます。
この強力な機能を使用して、チャット アプリケーション、リアルタイム監視ダッシュボードなどのリアルタイム アプリケーションを構築できます。

この記事では、gRPC ストリーミングを正しく使用する方法を検討します。

前提条件

  • gRPC の基礎知識
  • Go プログラミング言語の基礎知識 (サンプルコードは Go で書かれていますが、概念は他の言語にも適用できます)
  • コード例は GitHub で入手できます

良い実践方法

gRPC ストリーミングを使用するためのグッド プラクティスを確認してみましょう:

単項リクエストには単項リクエストを使用する

よくある間違いの 1 つは、単項リクエストにストリーミングを使用することです。
たとえば、次の gRPC サービス定義を考えてみましょう:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

クライアントが 1 つのリクエストを送信して 1 つの応答を受信するだけでよい場合、
ストリーミングを使用する必要はありません。代わりに、次のようにサービスを定義できます:

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

単項リクエストにストリーミングを使用することで、不必要な複雑さが追加されます
これにより、理解と保守が困難になる可能性があり、
ストリーミングを使用することであらゆるメリットが得られます。

単項リクエストとストリーミングリクエストを比較する Golang コード例:

単項リクエスト:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

ストリーミング単項リクエスト:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

ご覧のとおり、単項リクエストのコードはよりシンプルで理解しやすくなっています
ストリーミングリクエストのコードよりも。

可能であれば複数の書類を一度に送信する

これら 2 つのサービス定義を比較してみましょう:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

BookStore は一度に 1 冊の本をストリーミングしますが、BookStoreBatch は複数の本を同時にストリーミングします。

クライアントがすべての書籍をリストする必要がある場合は、BookStoreBatch を使用する方が効率的です
クライアントとサーバー間の往復回数が減るためです。

BookStore と BookStoreBatch の Golang コード例を見てみましょう:

書店:

type bookStore struct {
    pb.UnimplementedBookStoreServer
}

func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error {
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            if err := stream.Send(&pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            }); err != nil {
                return err
            }
        }
    }
    return nil
}

func TestBookStore_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        book, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, book)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

ブックストアバッチ:

type bookStoreBatch struct {
    pb.UnimplementedBookStoreBatchServer
}

func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error {
    const batchSize = 10
    books := make([]*pb.Book, 0, batchSize)
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            books = append(books, &pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            })

            if len(books) == batchSize {
                if err := stream.Send(&pb.ListBooksResponse{
                    Books: books,
                }); err != nil {
                    return err
                }
                books = books[:0]
            }
        }
    }

    if len(books) > 0 {
        if err := stream.Send(&pb.ListBooksResponse{
            Books: books,
        }); err != nil {
            return nil
        }
    }

    return nil
}

func TestBookStoreBatch_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        response, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, response.Books...)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

上記のコードから、どちらが優れているかを明確にする必要があります。
ベンチマークを実行して違いを確認してみましょう:

BookStore ベンチマーク:

func BenchmarkBookStore_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            book, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, book)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

BookStoreBatch ベンチマーク:

func BenchmarkBookStoreBatch_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            response, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, response.Books...)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

ベンチマーク結果:

BenchmarkBookStore_ListBooks
BenchmarkBookStore_ListBooks-12                      732           1647454 ns/op           85974 B/op       1989 allocs/op
BenchmarkBookStoreBatch_ListBooks
BenchmarkBookStoreBatch_ListBooks-12                1202            937491 ns/op           61098 B/op        853 allocs/op

なんという改善でしょう! BookStoreBatch は BookStore よりも 1.75 倍高速です。

しかし、なぜ BookStoreBatch は BookStore よりも速いのでしょうか?

サーバーがメッセージ stream.Send() をクライアントに送信するたびに、
メッセージをエンコードしてネットワーク経由で送信します。複数の書類を送信する場合
サーバーがエンコードして送信する必要がある回数を一度に削減します
このメッセージにより、サーバーだけでなくパフォーマンスも向上します
メッセージをデコードする必要があるクライアント用。

上記の例では、バッチ サイズは 10 に設定されていますが、クライアントはネットワークの状態とドキュメントのサイズに基づいて調整できます。

双方向ストリーミングを使用してフローを制御する

書店の例ではすべての本を返してストリームを終了しますが、クライアント
リアルタイムでイベント (センサーなど) を監視する必要があるため、双方向
を使用します。 ストリーミングは正しい選択です。

クライアントとサーバーの両方が存在するため、双方向ストリームは少し注意が必要です
メッセージの送信と受信を同時に行うことができます。 golang を使えば簡単にできるといいですね
このように同時実行で動作します。

前述したように、センサーは双方向ストリーミングの優れた例となり得ます。
watch 関数を使用すると、クライアントはどのセンサーを監視してリクエストするかを決定できます
必要に応じて現在の値。

次の protobuf 定義を見てみましょう:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

リクエスト メッセージは、メッセージのストリームであるだけでなく、次のことができるメッセージでもあります
さまざまな種類のリクエストが含まれています。 oneof ディレクティブを使用すると、
を定義できます。 指定されたタイプのいずれか 1 つだけを含めることができるフィールド。

センサーの golang コードは無視されますが、ここで見つけることができます

serverStream は、ストリームとセンサー データをラップして、操作を容易にします。

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

前に述べたように、サーバーは同時にメッセージを送受信できます。
関数は受信メッセージを処理し、別の関数は
を処理します。 送信メッセージ。

メッセージの受信:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

switch ステートメントは、さまざまな種類のリクエストを処理し、決定するために使用されます
それぞれのリクエストに対して何をするか。 recvLoop 関数のみを残すことが重要です
このため、メッセージを読み取り、クライアントに送信しないようにするには、sendLoop
を使用します。 これにより、制御チャネルからメッセージが読み取られ、クライアントに送信されます。

メッセージの送信:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

sendLoop 関数は、制御チャネルとデータ チャネルの両方を読み取り、送信します
クライアントへのメッセージ。ストリームが閉じている場合、関数は戻ります。

最後に、センサー サービスのハッピー パス テスト:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

上記のテストから、クライアントは現在のものを作成、キャンセル、取得できることがわかります
センサーの値。クライアントは複数のセンサーを同時に監視することもできます。

自分自身に挑戦してください

  • gRPC ストリーミングを使用してチャット アプリケーションを実装します。
  • ラウンドトリップを節約するために、複数の値を一度に送信するようにセンサー サービスを変更します。
  • ネットワーク トラフィックをスニッフィングして、単項リクエストとストリーミング リクエストの違いを確認します。

結論

gRPC ストリーミングは、リアルタイム アプリケーションを構築するための多用途かつ強力なツールです。
必要な場合にのみストリーミングを使用する、データを効率的にバッチ処理する、双方向ストリーミングを賢く活用するなどのベスト プラクティスに従うことで、開発者はパフォーマンスを最大化できます
コードの単純さを維持します。
gRPC ストリーミングは複雑さをもたらしますが、その利点は課題をはるかに上回ります
慎重に適用した場合。

連絡を取り合う

ご質問やフィードバックがございましたら、LinkedIn でお気軽にご連絡ください。

以上がgRPC ストリーミング: ベスト プラクティスとパフォーマンスに関する洞察の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。