php小編子墨在軟體開發過程中,訊息佇列是一種常見的通訊機制,用於實現生產者和消費者之間的非同步通訊。然而,有時我們希望控制生產者和消費者對訊息的讀取,以便更好地管理系統資源和處理高峰時段的請求。本文將介紹一些限制生產者和消費者讀取訊息的方法,幫助開發者優化系統效能和提高應用的穩定性。
我想用 go 獲得應用程式生產者-消費者(透過訊號關閉)。
生產者不斷在佇列中產生訊息,限制為 10 個。 一些消費者閱讀並處理該頻道。 如果佇列中的訊息數為0,生產者再次產生10個訊息。 當收到停止訊號時,生產者停止產生新訊息,消費者處理通道中的所有內容。
我找到了一段程式碼,但無法理解它是否正常工作,因為發現了奇怪的東西:
結果:
程式碼範例:
package main import ( "context" "fmt" "math/rand" "os" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 in := make(chan int, 10) p := Producer{&in} c := Consumer{&in, make(chan int, nConsumers)} go p.Produce() ctx, cancelFunc := context.WithCancel(context.Background()) go c.Consume(ctx) wg := &sync.WaitGroup{} wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(wg, i) } termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan cancelFunc() wg.Wait() } type Consumer struct { in *chan int jobs chan int } func (c Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.jobs { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } func (c Consumer) Consume(ctx context.Context) { for { select { case job := <-*c.in: c.jobs <- job case <-ctx.Done(): close(c.jobs) fmt.Println("Consumer close channel") return } } } type Producer struct { in *chan int } func (p Producer) Produce() { task := 1 for { *p.in <- task fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) } }
為什麼停止程式後,佇列中的消息並沒有全部處理完,好像遺失了部分資料。
這是因為當ctx
完成後,(consumer).consume
停止從in
通道讀取,但go p.produce( )
創建的goroutine 仍然寫入in
通道。
下面的演示解決了這個問題並簡化了原始程式碼。
註解:
produce
在 ctx
完成後停止。並且它關閉了 in
通道。
欄位 jobs
已從 consumer
中刪除,工作人員直接從 in
通道讀取。
以下要求被忽略,因為它很奇怪。常見的行為是,當作業產生時,如果in
通道未滿,則作業會立即發送到in
通道;當它已滿時,發送操作將阻塞,直到從in
通道讀取作業為止。
如果佇列中的訊息數為0,生產者再次產生10個訊息
package main import ( "context" "fmt" "math/rand" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() in := make(chan int, 10) p := Producer{in} c := Consumer{in} go p.Produce(ctx) var wg sync.WaitGroup wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(&wg, i) } <-ctx.Done() fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in)) wg.Wait() } type Consumer struct { in chan int } func (c *Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.in { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } type Producer struct { in chan int } func (p *Producer) Produce(ctx context.Context) { task := 1 for { select { case p.in <- task: fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) case <-ctx.Done(): close(p.in) return } } }
以上是如何限制生產者和消費者讀取訊息?的詳細內容。更多資訊請關注PHP中文網其他相關文章!