首頁 >後端開發 >Golang >gRPC 串流:最佳實踐和效能見解

gRPC 串流:最佳實踐和效能見解

Patricia Arquette
Patricia Arquette原創
2024-12-06 11:32:12789瀏覽

gRPC Streaming: Best Practices and Performance Insights

介紹

gRPC 流允許 protobuf 訊息從客戶端串流傳輸到伺服器、從伺服器串流傳輸到客戶端,或雙向串流。
這項強大的功能可用於建立即時應用程序,例如聊天應用程式、即時監控儀表板等。

在本文中,我們將探討如何正確使用 gRPC 流。

先決條件

  • gRPC基礎知識
  • Go 程式語言的基礎知識(範例程式碼是用 Go 寫的,但這個概念也可以應用於其他語言)
  • 程式碼範例可在 GitHub 上取得

良好實踐

讓我們檢查一下使用 gRPC 流的良好實踐:

使用一元請求進行一元請求

一個常見的錯誤是對一元請求使用串流。
例如,考慮以下 gRPC 服務定義:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

如果客戶端只需要發送一個請求並接收一個回應,
您不需要使用串流。相反,我們可以如下定義服務:

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

透過對一元請求使用串流傳輸,我們增加了不必要的複雜性
到程式碼,這可能會使其更難理解和維護,而不是
從使用串流媒體中獲得任何好處。

比較一元請求和流請求的 Golang 程式碼範例:

一元請求:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

流式一元請求:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

我們可以看到,一元請求的程式碼更簡單,更容易理解
比流請求的代碼。

如果可以的話,一次發送多個文檔

讓我們來比較一下這兩個服務定義:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

BookStore 一次串流一本書,而 BookStoreBatch 同時串流多本書。

如果客戶端需要列出所有書籍,使用BookStoreBatch 效率更高
因為它減少了客戶端和伺服器之間的往返次數。

讓我們來看看 BookStore 和 BookStoreBatch 的 Golang 程式碼範例:

書店:

type bookStore struct {
    pb.UnimplementedBookStoreServer
}

func (s *bookStore) ListBooks(req *pb.ListBooksRequest, stream pb.BookStore_ListBooksServer) error {
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            if err := stream.Send(&pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            }); err != nil {
                return err
            }
        }
    }
    return nil
}

func TestBookStore_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        book, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, book)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

書店批次:

type bookStoreBatch struct {
    pb.UnimplementedBookStoreBatchServer
}

func (s *bookStoreBatch) ListBooks(req *pb.ListBooksRequest, stream pb.BookStoreBatch_ListBooksServer) error {
    const batchSize = 10
    books := make([]*pb.Book, 0, batchSize)
    for _, b := range bookStoreData {
        if b.Author == req.Author {
            books = append(books, &pb.Book{
                Title:           b.Title,
                Author:          b.Author,
                PublicationYear: int32(b.PublicationYear),
                Genre:           b.Genre,
            })

            if len(books) == batchSize {
                if err := stream.Send(&pb.ListBooksResponse{
                    Books: books,
                }); err != nil {
                    return err
                }
                books = books[:0]
            }
        }
    }

    if len(books) > 0 {
        if err := stream.Send(&pb.ListBooksResponse{
            Books: books,
        }); err != nil {
            return nil
        }
    }

    return nil
}

func TestBookStoreBatch_ListBooks(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    stream, err := client.ListBooks(
        context.Background(),
        &pb.ListBooksRequest{
            Author: charlesDickens,
        },
    )
    if err != nil {
        t.Fatalf("failed to list books: %v", err)
    }

    books := []*pb.Book{}
    for {
        response, err := stream.Recv()
        if err != nil {
            break
        }
        books = append(books, response.Books...)
    }

    if len(books) != charlesDickensBooks {
        t.Errorf("unexpected number of books: %d", len(books))
    }
}

從上面的程式碼來看,需要明確哪一個比較好。
讓我們執行一個基準測試來看看差異:

書店基準:

func BenchmarkBookStore_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreServer(s, &bookStore{})
    })

    client := pb.NewBookStoreClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            book, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, book)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

BookStoreBatch 基準:

func BenchmarkBookStoreBatch_ListBooks(b *testing.B) {
    conn := newServer(b, func(s grpc.ServiceRegistrar) {
        pb.RegisterBookStoreBatchServer(s, &bookStoreBatch{})
    })

    client := pb.NewBookStoreBatchClient(conn)

    var benchInnerBooks []*pb.Book
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        stream, err := client.ListBooks(
            context.Background(),
            &pb.ListBooksRequest{
                Author: charlesDickens,
            },
        )
        if err != nil {
            b.Fatalf("failed to list books: %v", err)
        }

        books := []*pb.Book{}
        for {
            response, err := stream.Recv()
            if err != nil {
                break
            }
            books = append(books, response.Books...)
        }

        benchInnerBooks = books
    }

    benchBooks = benchInnerBooks
}

基準測試結果:

BenchmarkBookStore_ListBooks
BenchmarkBookStore_ListBooks-12                      732           1647454 ns/op           85974 B/op       1989 allocs/op
BenchmarkBookStoreBatch_ListBooks
BenchmarkBookStoreBatch_ListBooks-12                1202            937491 ns/op           61098 B/op        853 allocs/op

多麼大的進步啊! BookStoreBatch 比 BookStore 快 1.75 倍。

但是為什麼 BookStoreBatch 比 BookStore 快?

伺服器每次傳送訊息流至客戶端.Send(),都需要
對訊息進行編碼並透過網路發送。透過發送多個檔案
我們立即減少了伺服器需要編碼和發送的次數
訊息,不僅提高了伺服器的效能,還提高了
對於需要解碼訊息的客戶端。

在上面的範例中,批量大小設定為10,但客戶端可以根據網路情況和文件大小進行調整。

使用雙向流來控制流量

書店範例傳回所有書籍並完成串流,但如果客戶端
需要即時觀察事件(例如感測器),使用雙向
直播是正確的選擇。

雙向流有點棘手,因為客戶端和伺服器都
可以同時發送和接收訊息。希望 golang 能讓這一切變得簡單
像這樣處理並發。

如前所述,感測器是雙向流的一個很好的例子。
監視功能允許客戶端決定監視和請求哪些感測器
如果需要的話,當前值。

讓我們來看看下面的protobuf定義:

service MyService {
  rpc GetSomething (SomethingRequest) returns (stream SomethingResponse) {}
}

請求訊息不只是訊息流,更是一條可以
包含不同類型的請求。 oneof 指令允許我們定義一個
只能包含指定類型之一的欄位。

感測器的 golang 程式碼將被忽略,但您可以在這裡找到它

serverStream 包裝流和感測器數據,使其更易於使用。

service MyService {
  rpc GetSomething (SomethingRequest) returns (SomethingResponse) {}
}

如前所述,伺服器可以同時發送和接收訊息,一個
函數將處理傳入的訊息,另一個函數將處理
傳出消息。

接收訊息:

type somethingUnary struct {
    pb.UnimplementedSomethingUnaryServer
}

func (s *somethingUnary) GetSomething(ctx context.Context, req *pb.SomethingRequest) (*pb.SomethingResponse, error) {
    return &pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }, nil
}

func TestSomethingUnary(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingUnaryServer(s, &somethingUnary{})
    })

    client := pb.NewSomethingUnaryClient(conn)

    response, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

switch語句用來處理不同類型的請求並做出決定
如何處理每個請求。只保留 SecvLoop 函數很重要
讀取訊息但不向客戶端發送訊息,因此我們有 sendLoop
將從控制通道讀取訊息並將其發送到客戶端。

發送訊息:

type somethingStream struct {
    pb.UnimplementedSomethingStreamServer
}

func (s *somethingStream) GetSomething(req *pb.SomethingRequest, stream pb.SomethingStream_GetSomethingServer) error {
    if err := stream.Send(&pb.SomethingResponse{
        Message: "Hello " + req.Name,
    }); err != nil {
        return err
    }

    return nil
}

func TestSomethingStream(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSomethingStreamServer(s, &somethingStream{})
    })

    client := pb.NewSomethingStreamClient(conn)

    stream, err := client.GetSomething(
        context.Background(),
        &pb.SomethingRequest{
            Name: "test",
        },
    )
    if err != nil {
        t.Fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.Recv()
    if err != nil {
        t.Fatalf("failed to receive response: %v", err)
    }

    if response.Message != "Hello test" {
        t.Errorf("unexpected response: %v", response.Message)
    }
}

sendLoop函數讀取控制通道和資料通道並發送
發送給客戶端的訊息。如果流關閉,函數將傳回。

最後,感測器服務的快樂路徑測試:

service BookStore {
  rpc ListBooks(ListBooksRequest) returns (stream Book) {}
}

service BookStoreBatch {
  rpc ListBooks(ListBooksRequest) returns (stream ListBooksResponse) {}
}

message ListBooksResponse {
  repeated Book books = 1;
}

從上面的測試我們可以看到客戶端可以建立、取消、取得當前
感測器的值。客戶端還可以同時觀看多個感測器。

挑戰自己

  • 使用 gRPC 流實作聊天應用程式。
  • 修改感測器服務以一次發送多個值以節省往返次數。
  • 嗅探網路流量以查看一元請求和串流請求之間的差異。

結論

gRPC 串流是用於建立即時應用程式的多功能且強大的工具。
透過遵循最佳實踐,例如僅在必要時使用串流、有效地批次資料以及明智地利用雙向串流傳輸,開發人員可以最大限度地提高效能
並保持程式碼簡單性。
雖然 gRPC 串流帶來了複雜性,但其好處遠遠超過了挑戰
當深思熟慮地應用時。

保持聯繫

如果您有任何問題或回饋,請隨時在 LinkedIn 上與我聯繫。

以上是gRPC 串流:最佳實踐和效能見解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn