Heim >Backend-Entwicklung >Golang >Wie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?

Wie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?

WBOY
WBOYnach vorne
2024-02-11 18:00:11941Durchsuche

Wie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?

PHP-Editor Zimo Im Softwareentwicklungsprozess ist die Nachrichtenwarteschlange ein gängiger Kommunikationsmechanismus, der verwendet wird, um eine asynchrone Kommunikation zwischen Produzenten und Verbrauchern zu erreichen. Manchmal möchten wir jedoch das Lesen von Nachrichten durch Produzenten und Verbraucher kontrollieren, um die Systemressourcen besser zu verwalten und Anfragen während der Spitzenzeiten zu bearbeiten. In diesem Artikel werden einige Methoden vorgestellt, mit denen Produzenten und Verbraucher daran gehindert werden, Nachrichten zu lesen, um Entwicklern dabei zu helfen, die Systemleistung zu optimieren und die Anwendungsstabilität zu verbessern.

Frageninhalt

Ich möchte mithilfe von go eine Producer-Consumer-Anwendung (über ein Signal geschlossen) erhalten.

Der Produzent generiert kontinuierlich Nachrichten in der Warteschlange, mit einem Limit von 10. Einige Verbraucher lesen und verarbeiten den Kanal. Wenn die Anzahl der Nachrichten in der Warteschlange 0 beträgt, generiert der Produzent erneut 10 Nachrichten. Wenn ein Stoppsignal empfangen wird, stoppt der Produzent die Generierung neuer Nachrichten und der Verbraucher verarbeitet alles im Kanal.

Ich habe einen Code gefunden, kann aber nicht verstehen, ob er richtig funktioniert, weil ich etwas Seltsames gefunden habe:

  1. Warum nach dem Stoppen des Programms nicht alle Nachrichten in der Warteschlange verarbeitet werden und einige Daten verloren zu gehen scheinen. (Im Screenshot wurden 15 Nachrichten gesendet, aber 5 wurden verarbeitet)
  2. Wie kann man die Warteschlange richtig auf 10 Nachrichten begrenzen, d. h. 10 Nachrichten schreiben, auf die Verarbeitung warten, wenn der Warteschlangenzähler 0 erreicht, und dann 10 weitere schreiben?
  3. Ist es möglich, den Produzenten nach einem Stoppsignal zu benachrichtigen, damit er keine neuen Nachrichten mehr an den Kanal generiert? (Im Screenshot schreibt der Produzent erfolgreich in die Warteschlange – 12,13,14,15)

Ergebnis:

Codebeispiel:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

Lösung

Warum nach dem Stoppen des Programms nicht alle Nachrichten in der Warteschlange verarbeitet werden und einige Daten verloren zu gehen scheinen.

Das liegt daran, dass wenn ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 inch.

Die folgende Demo löst dieses Problem und vereinfacht den Quellcode.

Notizen:

  1. producectx 完成后停止。并且它关闭了 in Kanal.

  2. Feld jobs 已从 consumer 中删除,工作人员直接从 in Kanal gelesen.

  3. Die folgende Anfrage wird ignoriert, weil sie seltsam ist. Ein übliches Verhalten besteht darin, dass bei der Generierung eines Jobs der in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in-Kanal den Job bis dahin liest.

    Wenn die Anzahl der Nachrichten in der Warteschlange 0 beträgt, generiert der Produzent erneut 10 Nachrichten

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}

Das obige ist der detaillierte Inhalt vonWie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen