Heim >Backend-Entwicklung >Golang >gRPC-Streaming: Best Practices und Leistungseinblicke

gRPC-Streaming: Best Practices und Leistungseinblicke

Patricia Arquette
Patricia ArquetteOriginal
2024-12-06 11:32:12759Durchsuche

gRPC Streaming: Best Practices and Performance Insights

Einführung

gRPC-Streaming ermöglicht das Streamen von Protobuf-Nachrichten von Client zu Server, von Server zu Client oder bidirektional.
Mit dieser leistungsstarken Funktion können Echtzeitanwendungen wie Chat-Anwendungen, Echtzeitüberwachungs-Dashboards und mehr erstellt werden.

In diesem Artikel erfahren Sie, wie Sie gRPC-Streaming richtig nutzen.

Voraussetzungen

  • Grundkenntnisse von gRPC
  • Grundkenntnisse der Programmiersprache Go (Der Beispielcode ist in Go geschrieben, das Konzept kann jedoch auch auf andere Sprachen angewendet werden)
  • Die Codebeispiele sind auf GitHub verfügbar

Gute Praktiken

Sehen wir uns die bewährten Methoden für die Verwendung von gRPC-Streaming an:

Verwenden Sie eine unäre Anfrage für eine unäre Anfrage

Ein häufiger Fehler besteht darin, Streaming für unäre Anfragen zu verwenden.
Betrachten Sie beispielsweise die folgende gRPC-Dienstdefinition:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

Wenn der Kunde nur eine Anfrage senden und eine Antwort erhalten muss,
Sie müssen kein Streaming verwenden. Stattdessen können wir den Dienst wie folgt definieren:

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

Durch die Verwendung von Streaming für unäre Anfragen erhöhen wir unnötige Komplexität
zum Code, was das Verständnis und die Wartung erschweren kann und nicht
Vorteile aus der Nutzung von Streaming ziehen.

Golang-Codebeispiel zum Vergleich einer unären Anfrage und einer Streaming-Anfrage:

Unäre Anfrage:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

Streaming unärer Anfrage:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

Wie wir sehen können, ist der Code für unäre Anfragen einfacher und leichter zu verstehen
als der Code für Streaming-Anfragen.

Wenn möglich, senden Sie mehrere Dokumente gleichzeitig

Vergleichen wir diese beiden Servicedefinitionen:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

BookStore streamt jeweils ein Buch, während BookStoreBatch mehrere Bücher gleichzeitig streamt.

Wenn der Client alle Bücher auflisten muss, ist es effizienter, BookStoreBatch
zu verwenden weil es die Anzahl der Roundtrips zwischen dem Client und dem Server reduziert.

Sehen wir uns das Golang-Codebeispiel für BookStore und BookStoreBatch an:

Buchladen:

type bookStore struct {
    pb.UnimplementedBookStoreServer
}

func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error {
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            if err := stream.Send(&pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            }); err != nil {
                return err
            }
        }
    }
    return nil
}

func TestBookStore_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        book, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, book)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

BookStoreBatch:

type bookStoreBatch struct {
    pb.UnimplementedBookStoreBatchServer
}

func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error {
    const batchSize = 10
    books := make([]*pb.Book, 0, batchSize)
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            books = append(books, &pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            })

            if len(books) == batchSize {
                if err := stream.Send(&pb.ListBooksResponse{
                    Books: books,
                }); err != nil {
                    return err
                }
                books = books[:0]
            }
        }
    }

    if len(books) > 0 {
        if err := stream.Send(&pb.ListBooksResponse{
            Books: books,
        }); err != nil {
            return nil
        }
    }

    return nil
}

func TestBookStoreBatch_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        response, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, response.Books...)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

Anhand des obigen Codes muss geklärt werden, welcher besser ist.
Lassen Sie uns einen Benchmark durchführen, um den Unterschied zu sehen:

BookStore-Benchmark:

func BenchmarkBookStore_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            book, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, book)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

BookStoreBatch-Benchmark:

func BenchmarkBookStoreBatch_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            response, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, response.Books...)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

Benchmark-Ergebnisse:

BenchmarkBookStore_ListBooks
BenchmarkBookStore_ListBooks-12                      732           1647454 ns/op           85974 B/op       1989 allocs/op
BenchmarkBookStoreBatch_ListBooks
BenchmarkBookStoreBatch_ListBooks-12                1202            937491 ns/op           61098 B/op        853 allocs/op

Was für eine Verbesserung! BookStoreBatch ist um den Faktor 1,75x schneller als BookStore.

Aber warum ist BookStoreBatch schneller als BookStore?

Jedes Mal, wenn der Server eine Nachricht stream.Send() an den Client sendet, muss
Verschlüsseln Sie die Nachricht und senden Sie sie über das Netzwerk. Durch das Versenden mehrerer Dokumente
Auf einmal reduzieren wir die Häufigkeit, die der Server zum Kodieren und Senden von
benötigt die Nachricht, die nicht nur die Leistung des Servers verbessert, sondern auch
für den Client, der die Nachricht dekodieren muss.

Im obigen Beispiel ist die Stapelgröße auf 10 eingestellt, aber der Client kann sie basierend auf den Netzwerkbedingungen und der Größe der Dokumente anpassen.

Verwenden Sie bidirektionales Streaming, um den Fluss zu steuern

Das Buchhandlungsbeispiel gibt alle Bücher zurück und beendet den Stream, aber wenn der Client
muss in Echtzeit auf Ereignisse achten (z. B. Sensoren), die Verwendung von bidirektional
Streaming ist die richtige Wahl.

Bidirektionale Streams sind etwas knifflig, da sowohl der Client als auch der Server
kann gleichzeitig Nachrichten senden und empfangen. Hoffentlich macht Golang es einfach
um mit Parallelität wie dieser zu arbeiten.

Wie bereits erwähnt, kann ein Sensor ein hervorragendes Beispiel für bidirektionales Streaming sein.
Mit der Überwachungsfunktion kann der Kunde entscheiden, welche Sensoren überwacht und angefordert werden sollen
den aktuellen Wert, falls erforderlich.

Werfen wir einen Blick auf die folgende Protobuf-Definition:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

Die Anforderungsnachricht ist nicht nur ein Nachrichtenstrom, sondern auch eine Nachricht, die dies kann
enthalten verschiedene Arten von Anfragen. Mit der oneof-Direktive können wir ein
definieren Feld, das nur einen der angegebenen Typen enthalten kann.

Der Golang-Code für den Sensor wird ignoriert, aber Sie können ihn hier finden

serverStream verpackt den Stream und die Sensordaten, um die Arbeit damit zu erleichtern.

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

Wie bereits erwähnt, kann der Server gleichzeitig Nachrichten senden und empfangen
Die Funktion verarbeitet die eingehenden Nachrichten und eine andere Funktion verarbeitet die
ausgehende Nachrichten.

Nachrichten empfangen:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

Die switch-Anweisung wird verwendet, um die verschiedenen Arten von Anforderungen zu verarbeiten und zu entscheiden
was mit jeder Anfrage zu tun ist. Es ist wichtig, nur die recvLoop-Funktion zu belassen
Um Nachrichten zu lesen und nicht an den Client zu senden, haben wir aus diesem Grund die sendLoop
Dadurch werden die Nachrichten vom Steuerkanal gelesen und an den Client gesendet.

Nachrichten senden:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

Die sendLoop-Funktion liest sowohl den Steuerkanal als auch den Datenkanal und sendet
die Nachrichten an den Kunden. Wenn der Stream geschlossen ist, kehrt die Funktion zurück.

Zum Schluss noch ein Happy-Path-Test für den Sensorservice:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

Aus dem obigen Test können wir ersehen, dass der Kunde die aktuelle Version erstellen, abbrechen und abrufen kann
Wert eines Sensors. Der Kunde kann auch mehrere Sensoren gleichzeitig beobachten.

Fordern Sie sich selbst heraus

  • Implementieren Sie eine Chat-Anwendung mit gRPC-Streaming.
  • Ändern Sie den Sensordienst so, dass mehrere Werte gleichzeitig gesendet werden, um Roundtrips zu vermeiden.
  • Schnüffeln Sie am Netzwerkverkehr, um den Unterschied zwischen unärer Anfrage und Streaming-Anfrage zu erkennen.

Abschluss

gRPC-Streaming ist ein vielseitiges und leistungsstarkes Tool zum Erstellen von Echtzeitanwendungen.
Durch die Befolgung von Best Practices wie der Verwendung von Streaming nur bei Bedarf, der effizienten Stapelung von Daten und der sinnvollen Nutzung von bidirektionalem Streaming können Entwickler die Leistung maximieren
und die Einfachheit des Codes beibehalten.
Während gRPC-Streaming Komplexität mit sich bringt, überwiegen seine Vorteile bei weitem die Herausforderungen
wenn es mit Bedacht angewendet wird.

Bleiben Sie in Kontakt

Wenn Sie Fragen oder Feedback haben, können Sie mich gerne auf LinkedIn kontaktieren.

Das obige ist der detaillierte Inhalt vongRPC-Streaming: Best Practices und Leistungseinblicke. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn