Maison >développement back-end >Golang >Streaming gRPC : meilleures pratiques et informations sur les performances
Le streaming gRPC permet de diffuser les messages protobuf du client au serveur, du serveur au client ou de manière bidirectionnelle.
Cette fonctionnalité puissante peut être utilisée pour créer des applications en temps réel telles que des applications de chat, des tableaux de bord de surveillance en temps réel, etc.
Dans cet article, nous explorerons comment utiliser correctement le streaming gRPC.
Vérifions les bonnes pratiques d'utilisation du streaming gRPC :
Une erreur courante consiste à utiliser le streaming pour les requêtes unaires.
Par exemple, considérons la définition de service gRPC suivante :
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
Si le client n'a besoin d'envoyer qu'une seule demande et de recevoir une seule réponse,
Vous n'avez pas besoin d'utiliser le streaming. Au lieu de cela, nous pouvons définir le service comme suit :
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
En utilisant le streaming pour les requêtes unaires, nous ajoutons une complexité inutile
au code, ce qui peut le rendre plus difficile à comprendre et à maintenir et non
tirer tous les avantages de l’utilisation du streaming.
Exemple de code Golang comparant la requête unaire et la requête de streaming :
Requête unaire :
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
Requête unaire de streaming :
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
Comme on peut le voir, le code des requêtes unaires est plus simple et plus facile à comprendre
que le code pour les demandes de streaming.
Comparons ces deux définitions de service :
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
BookStore diffuse un livre à la fois, tandis que BookStoreBatch diffuse plusieurs livres simultanément.
Si le client a besoin de lister tous les livres, il est plus efficace d'utiliser BookStoreBatch
car cela réduit le nombre d'allers-retours entre le client et le serveur.
Voyons l'exemple de code Golang pour BookStore et BookStoreBatch :
Librairie :
type bookStore struct { pb.UnimplementedBookStoreServer } func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error { for _, b := range bookStoreData { if b.Author == req.Author { if err := stream.Send(&pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }); err != nil { return err } } } return nil } func TestBookStore_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
BookStoreBatch :
type bookStoreBatch struct { pb.UnimplementedBookStoreBatchServer } func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error { const batchSize = 10 books := make([]*pb.Book, 0, batchSize) for _, b := range bookStoreData { if b.Author == req.Author { books = append(books, &pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }) if len(books) == batchSize { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return err } books = books[:0] } } } if len(books) > 0 { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return nil } } return nil } func TestBookStoreBatch_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
D'après le code ci-dessus, il faut clarifier lequel est le meilleur.
Faisons un benchmark pour voir la différence :
Benchmark Librairie :
func BenchmarkBookStore_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } benchInnerBooks = books } benchBooks = benchInnerBooks }
Benchmark BookStoreBatch :
func BenchmarkBookStoreBatch_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } benchInnerBooks = books } benchBooks = benchInnerBooks }
Résultats de référence :
BenchmarkBookStore_ListBooks BenchmarkBookStore_ListBooks-12 732 1647454 ns/op 85974 B/op 1989 allocs/op BenchmarkBookStoreBatch_ListBooks BenchmarkBookStoreBatch_ListBooks-12 1202 937491 ns/op 61098 B/op 853 allocs/op
Quelle amélioration ! BookStoreBatch est 1,75x plus rapide que BookStore.
Mais pourquoi BookStoreBatch est-il plus rapide que BookStore ?
Chaque fois que le serveur envoie un message stream.Send() au client, il doit
encoder le message et l'envoyer sur le réseau. En envoyant plusieurs documents
d'un coup, nous réduisons le nombre de fois que le serveur doit encoder et envoyer
le message, ce qui améliore les performances non seulement pour le serveur mais aussi
pour le client qui a besoin de décoder le message.
Dans l'exemple ci-dessus, la taille du lot est définie sur 10, mais le client peut l'ajuster en fonction des conditions du réseau et de la taille des documents.
L'exemple de librairie renvoie tous les livres et termine le flux, mais si le client
doit surveiller les événements en temps réel (par exemple, les capteurs), l'utilisation du bidirectionnel
le streaming est le bon choix.
Les flux bidirectionnels sont un peu délicats car à la fois le client et le serveur
peut envoyer et recevoir des messages en même temps. Espérons que Golang facilitera les choses
travailler avec la concurrence comme celle-ci.
Comme mentionné, un capteur peut être un excellent exemple de streaming bidirectionnel.
La fonction montre permet au client de décider quels capteurs surveiller et demander
la valeur actuelle si nécessaire.
Jetons un coup d'œil à la définition de protobuf suivante :
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
Le message de requête n'est pas seulement un flux de messages mais aussi un message qui peut
contiennent différents types de demandes. La directive oneof nous permet de définir un
champ qui ne peut contenir qu'un seul des types spécifiés.
Le code Golang du capteur sera ignoré, mais vous pouvez le trouver ici
serverStream encapsule le flux et les données du capteur pour faciliter son utilisation.
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
Comme indiqué précédemment, le serveur peut envoyer et recevoir des messages en même temps, un
la fonction gérera les messages entrants et une autre fonction gérera le
messages sortants.
Réception de messages :
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
L'instruction switch est utilisée pour gérer les différents types de requêtes et décider
que faire de chaque demande. Il est important de laisser uniquement la fonction recvLoop
pour lire et ne pas envoyer de messages au client pour cette raison nous avons le sendLoop
qui lira les messages du canal de contrôle et les enverra au client.
Envoi de messages :
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
La fonction sendLoop lit à la fois le canal de contrôle et le canal de données et envoie
les messages au client. Si le flux est fermé, la fonction reviendra.
Enfin, un joyeux test de chemin pour le service capteurs :
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
D'après le test ci-dessus, nous pouvons voir que le client peut créer, annuler et obtenir le courant
valeur d'un capteur. Le client peut également regarder plusieurs capteurs en même temps.
Le streaming gRPC est un outil polyvalent et puissant pour créer des applications en temps réel.
En suivant les meilleures pratiques telles que l'utilisation du streaming uniquement lorsque cela est nécessaire, le regroupement efficace des données et l'exploitation judicieuse du streaming bidirectionnel, les développeurs peuvent maximiser les performances
et maintenir la simplicité du code.
Même si le streaming gRPC introduit de la complexité, ses avantages dépassent de loin les défis
lorsqu'il est appliqué de manière réfléchie.
Si vous avez des questions ou des commentaires, n'hésitez pas à me contacter sur LinkedIn.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!