Home  >  Article  >  Backend Development  >  How to restrict producers and consumers from reading messages?

How to restrict producers and consumers from reading messages?

WBOY
WBOYforward
2024-02-11 18:00:11901browse

How to restrict producers and consumers from reading messages?

php editor Zimo In the software development process, message queue is a common communication mechanism used to achieve asynchronous communication between producers and consumers. However, sometimes we want to control the reading of messages by producers and consumers in order to better manage system resources and handle requests during peak hours. This article will introduce some methods to restrict producers and consumers from reading messages to help developers optimize system performance and improve application stability.

Question content

I want to get the application producer-consumer (closed via signal) using go.

The producer continuously generates messages in the queue, with a limit of 10. Some consumers read and process the channel. If the number of messages in the queue is 0, the producer generates 10 messages again. When a stop signal is received, the producer stops generating new messages and the consumer processes everything in the channel.

I found a piece of code but I can't understand if it works properly because I found something strange:

  1. Why after stopping the program, not all the messages in the queue are processed, and some data seems to be lost. (In the screenshot, 15 messages were sent but 5 were processed)
  2. How to correctly limit the queue to 10 messages, that is, 10 messages must be written, wait for processing when the queue counter reaches 0, and then write 10 more?
  3. Is it possible to notify the producer after a stop signal so that he no longer generates new messages to the channel? (In the screenshot, the producer successfully writes to the queue - 12,13,14,15)

result:

Code example:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

Solution

Why after stopping the program, not all the messages in the queue are processed, and it seems that some data is lost.

This is because when ctx completes, (consumer).consume stops reading from the in channel, but go p.produce( ) The created goroutine still writes in channel.

The demo below solves this problem and simplifies the source code.

Comments:

  1. produce Stops after ctx completes. And it closes the in channel.

  2. Field jobs has been removed from consumer, workers are read directly from the in channel.

  3. The following request is ignored because it is strange. Common behavior is that when a job is generated, if the in channel is not full, the job will be sent to the in channel immediately; when it is full, the send operation will block until the in channel is filled.

    in

    Until the channel read operation.

  4. If the number of messages in the queue is 0, the producer generates 10 messages again
### ###
package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}

The above is the detailed content of How to restrict producers and consumers from reading messages?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete