ホームページ  >  記事  >  バックエンド開発  >  プロデューサーとコンシューマーがメッセージを読むことを制限するにはどうすればよいですか?

プロデューサーとコンシューマーがメッセージを読むことを制限するにはどうすればよいですか?

WBOY
WBOY転載
2024-02-11 18:00:11843ブラウズ

プロデューサーとコンシューマーがメッセージを読むことを制限するにはどうすればよいですか?

php エディタ Zimo ソフトウェア開発プロセスにおいて、メッセージ キューは、プロデューサーとコンシューマー間の非同期通信を実現するために使用される一般的な通信メカニズムです。ただし、システム リソースをより適切に管理し、ピーク時のリクエストを処理するために、プロデューサーとコンシューマーによるメッセージの読み取りを制御したい場合があります。この記事では、開発者がシステムのパフォーマンスを最適化し、アプリケーションの安定性を向上させるために、プロデューサーとコンシューマーによるメッセージの読み取りを制限するいくつかの方法を紹介します。

質問内容

goを使ってアプリケーションのプロデューサー-コンシューマー(シグナルでクローズ)を取得したいです。

プロデューサーはキュー内にメッセージを継続的に生成しますが、上限は 10 です。 一部の消費者はチャネルを読み取って処理します。 キュー内のメッセージの数が 0 の場合、プロデューサーは再度 10 個のメッセージを生成します。 停止信号を受信すると、プロデューサーは新しいメッセージの生成を停止し、コンシューマーはチャネル内のすべてを処理します。

コードを見つけましたが、何か奇妙なものを見つけたので、それが正しく動作するかどうか理解できません:

  1. プログラムを停止した後、キュー内のすべてのメッセージが処理されず、一部のデータが失われたように見えるのはなぜですか。 (スクリーンショットでは、15 個のメッセージが送信されましたが、5 個が処理されました)
  2. キューを 10 メッセージに正しく制限するにはどうすればよいですか。つまり、10 メッセージを書き込む必要があり、キュー カウンターが 0 に達したら処理を待機し、さらに 10 メッセージを書き込む必要があります。
  3. 停止信号の後にプロデューサーに通知して、チャンネルに新しいメッセージを生成しないようにすることはできますか? (スクリーンショットでは、プロデューサーはキューへの書き込みに成功しています - 12、13、14、15)
###結果:###

コード例:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

解決策

プログラムを停止した後、キュー内のすべてのメッセージが処理されず、一部のデータが失われたように見えるのはなぜですか。

これは、

ctx

が完了すると、(consumer).consumein チャネルからの読み取りを停止しますが、go p.Produce( ) が実行されるためです。 作成されたゴルーチンは引き続き in チャネルに書き込みます。 以下のデモはこの問題を解決し、ソース コードを簡素化します。

コメント

:

  1. Produce

    ctx が完了すると停止します。そして、in チャネルを閉じます。

  2. フィールド
  3. jobs

    consumer から削除され、ワー​​カーは in チャネルから直接読み取られます。

  4. 次のリクエストは奇妙であるため無視されます。一般的な動作は、ジョブが生成されるときに、
  5. in

    チャネルがいっぱいでない場合、ジョブはすぐに in チャネルに送信され、いっぱいになると送信操作が行われます。 in チャネルが埋まるまでブロック in チャネル読み取り操作まで。

    キュー内のメッセージの数が 0 の場合、プロデューサーは再度 10 個のメッセージを生成します

リーリー

以上がプロデューサーとコンシューマーがメッセージを読むことを制限するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はstackoverflow.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。