gRPC 스트리밍을 사용하면 protobuf 메시지를 클라이언트에서 서버로, 서버에서 클라이언트로 또는 양방향으로 스트리밍할 수 있습니다.
이 강력한 기능을 사용하면 채팅 애플리케이션, 실시간 모니터링 대시보드 등과 같은 실시간 애플리케이션을 구축할 수 있습니다.
이 글에서는 gRPC 스트리밍을 올바르게 사용하는 방법을 살펴보겠습니다.
gRPC 스트리밍 사용에 대한 모범 사례를 확인해 보겠습니다.
일반적인 실수 중 하나는 단항 요청에 스트리밍을 사용하는 것입니다.
예를 들어 다음 gRPC 서비스 정의를 고려해 보세요.
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
클라이언트가 하나의 요청만 보내고 하나의 응답만 받으면
스트리밍을 사용할 필요는 없습니다. 대신 다음과 같이 서비스를 정의할 수 있습니다.
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
단항 요청에 스트리밍을 사용하여 불필요한 복잡성을 추가하고 있습니다
코드에 추가하면 이해하고 유지 관리하기가 더 어려워질 수 있습니다.
스트리밍을 사용하여 이익을 얻습니다.
단항 요청과 스트리밍 요청을 비교하는 Golang 코드 예:
단항 요청:
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
단항 요청 스트리밍:
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
보시다시피 단항 요청의 코드는 더 간단하고 이해하기 쉽습니다
스트리밍 요청 코드보다
이 두 가지 서비스 정의를 비교해 보겠습니다.
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
BookStore는 한 번에 한 권의 책을 스트리밍하는 반면, BookStoreBatch는 여러 권의 책을 동시에 스트리밍합니다.
클라이언트가 모든 도서를 나열해야 하는 경우 BookStoreBatch를 사용하는 것이 더 효율적입니다.
클라이언트와 서버 간의 왕복 횟수가 줄어들기 때문입니다.
BookStore 및 BookStoreBatch에 대한 Golang 코드 예제를 살펴보겠습니다.
서점:
type bookStore struct { pb.UnimplementedBookStoreServer } func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error { for _, b := range bookStoreData { if b.Author == req.Author { if err := stream.Send(&pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }); err != nil { return err } } } return nil } func TestBookStore_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
서점 배치:
type bookStoreBatch struct { pb.UnimplementedBookStoreBatchServer } func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error { const batchSize = 10 books := make([]*pb.Book, 0, batchSize) for _, b := range bookStoreData { if b.Author == req.Author { books = append(books, &pb.Book{ Title: b.Title, Author: b.Author, PublicationYear: int32(b.PublicationYear), Genre: b.Genre, }) if len(books) == batchSize { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return err } books = books[:0] } } } if len(books) > 0 { if err := stream.Send(&pb.ListBooksResponse{ Books: books, }); err != nil { return nil } } return nil } func TestBookStoreBatch_ListBooks(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { t.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } if len(books) != charlesDickensBooks { t.Errorf("unexpected number of books: %d", len(books)) } }
위의 코드에서 어느 것이 더 나은지 명확히 해야 합니다.
차이점을 확인하기 위해 벤치마크를 실행해 보겠습니다.
북스토어 벤치마크:
func BenchmarkBookStore_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreServer(s, &bookStore{}) }) client := pb.NewBookStoreClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { book, err := stream.Recv() if err != nil { break } books = append(books, book) } benchInnerBooks = books } benchBooks = benchInnerBooks }
BookStoreBatch 벤치마크:
func BenchmarkBookStoreBatch_ListBooks(b *testing.B) { conn := newServer(b, func(s grpc.ServiceRegistrar) { pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{}) }) client := pb.NewBookStoreBatchClient(conn) var benchInnerBooks []*pb.Book b.ResetTimer() for i := 0; i < b.N; i++ { stream, err := client.ListBooks( context.Background(), &pb.ListBooksRequest{ Author: charlesDickens, }, ) if err != nil { b.Fatalf("failed to list books: %v", err) } books := []*pb.Book{} for { response, err := stream.Recv() if err != nil { break } books = append(books, response.Books...) } benchInnerBooks = books } benchBooks = benchInnerBooks }
벤치마크 결과:
BenchmarkBookStore_ListBooks BenchmarkBookStore_ListBooks-12 732 1647454 ns/op 85974 B/op 1989 allocs/op BenchmarkBookStoreBatch_ListBooks BenchmarkBookStoreBatch_ListBooks-12 1202 937491 ns/op 61098 B/op 853 allocs/op
정말 개선됐네요! BookStoreBatch는 BookStore보다 1.75배 빠릅니다.
그런데 왜 BookStoreBatch가 BookStore보다 빠른가요?
서버가 클라이언트에 메시지 스트림을 보낼 때마다 Send()를 수행해야 합니다
메시지를 인코딩하여 네트워크를 통해 보냅니다. 여러 문서를 전송하여
서버가 인코딩하고 전송해야 하는 횟수를 한 번에 줄입니다
서버뿐만 아니라 성능도 향상시키는 메시지
메시지를 디코딩해야 하는 클라이언트를 위해.
위 예시에서는 배치 크기를 10으로 설정했지만, 클라이언트는 네트워크 상황과 문서 크기에 따라 조정할 수 있습니다.
서점 예시에서는 책을 모두 반납하고 스트리밍을 종료하지만, 클라이언트가
실시간으로 이벤트(예: 센서)를 감시해야 하며 양방향 사용
스트리밍이 올바른 선택입니다.
양방향 스트림은 클라이언트와 서버가 모두 필요하기 때문에 약간 까다롭습니다
동시에 메시지를 보내고 받을 수 있습니다. golang이 이를 쉽게 만들어주기를 바랍니다
이런 동시성을 사용하여 작업합니다.
앞서 언급했듯이 센서는 양방향 스트리밍의 훌륭한 예가 될 수 있습니다.
감시 기능을 통해 클라이언트는 감시하고 요청할 센서를 결정할 수 있습니다
필요한 경우 현재 값.
다음 protobuf 정의를 살펴보겠습니다.
service MyService { rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {} }
요청 메시지는 메시지의 흐름일 뿐만 아니라 다음과 같은 메시지도 가능합니다
다양한 유형의 요청을 포함합니다. oneof 지시어를 사용하면
지정된 유형 중 하나만 포함할 수 있는 필드입니다.
센서의 golang 코드는 무시되지만 여기에서 찾을 수 있습니다
serverStream은 스트림과 센서 데이터를 래핑하여 작업을 더 쉽게 만듭니다.
service MyService { rpc GetSomething (SomethingRequest) returns (SomethingResponse) {} }
앞서 언급했듯이 서버는 메시지를 동시에 보내고 받을 수 있습니다.
함수는 들어오는 메시지를 처리하고 다른 함수는
보내는 메시지.
메시지 수신:
type somethingUnary struct { pb.UnimplementedSomethingUnaryServer } func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) { return &pb.SomethingResponse{ Message: "Hello " + req.Name, }, nil } func TestSomethingUnary(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingUnaryServer(s, &somethingUnary{}) }) client := pb.NewSomethingUnaryClient(conn) response, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
switch 문은 다양한 유형의 요청을 처리하고 결정하는 데 사용됩니다
각 요청에 대해 무엇을 해야 할지. RecvLoop 기능만 남겨두는 것이 중요합니다
이러한 이유로 클라이언트에게 메시지를 읽고 보내지 않기 위해 sendLoop를 사용합니다
그러면 제어 채널에서 메시지를 읽고 클라이언트에 보냅니다.
메시지 보내기:
type somethingStream struct { pb.UnimplementedSomethingStreamServer } func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error { if err := stream.Send(&pb.SomethingResponse{ Message: "Hello " + req.Name, }); err != nil { return err } return nil } func TestSomethingStream(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSomethingStreamServer(s, &somethingStream{}) }) client := pb.NewSomethingStreamClient(conn) stream, err := client.GetSomething( context.Background(), &pb.SomethingRequest{ Name: "test", }, ) if err != nil { t.Fatalf("failed to get something stream: %v", err) } response, err := stream.Recv() if err != nil { t.Fatalf("failed to receive response: %v", err) } if response.Message != "Hello test" { t.Errorf("unexpected response: %v", response.Message) } }
sendLoop 함수는 제어 채널과 데이터 채널을 모두 읽고 전송합니다
클라이언트에게 보내는 메시지. 스트림이 닫히면 함수가 반환됩니다.
마지막으로 센서 서비스에 대한 행복한 경로 테스트:
service BookStore { rpc ListBooks(ListBooksRequest) returns (stream Book) {} } service BookStoreBatch { rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {} } message ListBooksResponse { repeated Book books = 1; }
위의 테스트에서 클라이언트가 현재 항목을 생성, 취소 및 가져올 수 있음을 알 수 있습니다
센서의 가치. 클라이언트는 동시에 여러 센서를 감시할 수도 있습니다.
gRPC 스트리밍은 실시간 애플리케이션 구축을 위한 다양하고 강력한 도구입니다.
필요할 때만 스트리밍을 사용하고, 데이터를 효율적으로 일괄 처리하고, 양방향 스트리밍을 현명하게 활용하는 등의 모범 사례를 따르면 개발자는 성능을 극대화할 수 있습니다
코드 단순성을 유지합니다.
gRPC 스트리밍으로 인해 복잡성이 발생하지만 그 이점은 문제보다 훨씬 큽니다.
신중하게 적용했을 때.
질문이나 의견이 있으면 언제든지 LinkedIn을 통해 연락해 주세요.
위 내용은 gRPC 스트리밍: 모범 사례 및 성능 통찰력의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!