首頁  >  文章  >  後端開發  >  如何限制生產者和消費者讀取訊息?

如何限制生產者和消費者讀取訊息?

WBOY
WBOY轉載
2024-02-11 18:00:11843瀏覽

如何限制生產者和消費者讀取訊息?

php小編子墨在軟體開發過程中,訊息佇列是一種常見的通訊機制,用於實現生產者和消費者之間的非同步通訊。然而,有時我們希望控制生產者和消費者對訊息的讀取,以便更好地管理系統資源和處理高峰時段的請求。本文將介紹一些限制生產者和消費者讀取訊息的方法,幫助開發者優化系統效能和提高應用的穩定性。

問題內容

我想用 go 獲得應用程式生產者-消費者(透過訊號關閉)。

生產者不斷在佇列中產生訊息,限制為 10 個。 一些消費者閱讀並處理該頻道。 如果佇列中的訊息數為0,生產者再次產生10個訊息。 當收到停止訊號時,生產者停止產生新訊息,消費者處理通道中的所有內容。

我找到了一段程式碼,但無法理解它是否正常工作,因為發現了奇怪的東西:

  1. 為什麼停止程式後,佇列中的消息並沒有全部處理完,好像遺失了部分資料。 (在螢幕截圖中,發送了 15 則訊息,但處理了 5 則訊息)
  2. 如何正確地將佇列限制為10個訊息,也就是必須寫入10個訊息,等待佇列計數器變成0時處理,然後再寫入10個訊息?
  3. 是否可以在停止訊號後通知生產者,以便他不再向通道產生新訊息? (在螢幕截圖中,生產者成功寫入佇列 - 12,13,14,15)

結果:

程式碼範例:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

解決方法

為什麼停止程式後,佇列中的消息並沒有全部處理完,好像遺失了部分資料。

這是因為當ctx 完成後,(consumer).consume 停止從in 通道讀取,但go p.produce( ) 創建的goroutine 仍然寫入in 通道。

下面的演示解決了這個問題並簡化了原始程式碼。

註解

  1. producectx 完成後停止。並且它關閉了 in 通道。

  2. 欄位 jobs 已從 consumer 中刪除,工作人員直接從 in 通道讀取。

  3. 以下要求被忽略,因為它很奇怪。常見的行為是,當作業產生時,如果in 通道未滿,則作業會立即發送到in 通道;當它已滿時,發送操作將阻塞,直到從in 通道讀取作業為止。

    如果佇列中的訊息數為0,生產者再次產生10個訊息

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}

以上是如何限制生產者和消費者讀取訊息?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除