Maison  >  Article  >  développement back-end  >  Comment empêcher les producteurs et les consommateurs de lire les messages ?

Comment empêcher les producteurs et les consommateurs de lire les messages ?

WBOY
WBOYavant
2024-02-11 18:00:11843parcourir

Comment empêcher les producteurs et les consommateurs de lire les messages ?

Éditeur PHP Zimo Dans le processus de développement logiciel, la file d'attente de messages est un mécanisme de communication courant utilisé pour réaliser une communication asynchrone entre les producteurs et les consommateurs. Cependant, nous souhaitons parfois contrôler la lecture des messages par les producteurs et les consommateurs afin de mieux gérer les ressources du système et traiter les demandes pendant les heures de pointe. Cet article présentera certaines méthodes pour empêcher les producteurs et les consommateurs de lire les messages afin d'aider les développeurs à optimiser les performances du système et à améliorer la stabilité des applications.

Contenu de la question

Je souhaite obtenir une application producteur-consommateur (fermée via un signal) en utilisant go.

Le producteur génère en continu des messages dans la file d'attente, avec une limite de 10. Certains consommateurs lisent et traitent la chaîne. Si le nombre de messages dans la file d'attente est 0, le producteur génère à nouveau 10 messages. Lorsqu'un signal d'arrêt est reçu, le producteur arrête de générer de nouveaux messages et le consommateur traite tout ce qui se passe dans le canal.

J'ai trouvé un morceau de code mais je n'arrive pas à comprendre s'il fonctionne correctement car j'ai trouvé quelque chose de bizarre :

  1. Pourquoi après l'arrêt du programme, tous les messages de la file d'attente ne sont pas traités et certaines données semblent être perdues. (Dans la capture d'écran, 15 messages ont été envoyés mais 5 ont été traités)
  2. Comment limiter correctement la file d'attente à 10 messages, c'est-à-dire devoir écrire 10 messages, attendre le traitement lorsque le compteur de file d'attente atteint 0, puis en écrire 10 de plus ?
  3. Est-il possible de prévenir le producteur après un signal d'arrêt pour qu'il ne génère plus de nouveaux messages à la chaîne ? (Dans la capture d'écran, le producteur écrit avec succès dans la file d'attente - 12,13,14,15)

Résultat :

Exemple de code :

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

Solution

Pourquoi après l'arrêt du programme, tous les messages de la file d'attente ne sont pas traités et certaines données semblent être perdues.

C'est parce que quand ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 in ch.

La démo ci-dessous résout ce problème et simplifie le code source.

Remarques :

  1. producectx 完成后停止。并且它关闭了 in Chaîne.

  2. Champ jobs 已从 consumer 中删除,工作人员直接从 in Lecture de la chaîne.

  3. La demande suivante est ignorée car elle est bizarre. Le comportement courant est celui lorsqu'une tâche est générée, si le canal in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in lit la tâche d'ici là.

    Si le nombre de messages dans la file d'attente est 0, le producteur génère à nouveau 10 messages

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer