>  기사  >  백엔드 개발  >  생산자와 소비자가 메시지를 읽지 못하도록 제한하는 방법은 무엇입니까?

생산자와 소비자가 메시지를 읽지 못하도록 제한하는 방법은 무엇입니까?

WBOY
WBOY앞으로
2024-02-11 18:00:11843검색

생산자와 소비자가 메시지를 읽지 못하도록 제한하는 방법은 무엇입니까?

PHP 편집기 Zimo 소프트웨어 개발 프로세스에서 메시지 큐는 생산자와 소비자 간의 비동기 통신을 달성하는 데 사용되는 일반적인 통신 메커니즘입니다. 그러나 때로는 피크 시간 동안 시스템 리소스를 더 잘 관리하고 요청을 처리하기 위해 생산자와 소비자의 메시지 읽기를 제어하고 싶을 때도 있습니다. 이 문서에서는 개발자가 시스템 성능을 최적화하고 애플리케이션 안정성을 향상시키는 데 도움이 되도록 생산자와 소비자가 메시지를 읽지 못하도록 제한하는 몇 가지 방법을 소개합니다.

질문 내용

go를 사용하여 애플리케이션 생산자-소비자(신호를 통해 닫힘)를 가져오고 싶습니다.

생산자는 대기열에 최대 10개의 메시지를 지속적으로 생성합니다. 일부 소비자는 채널을 읽고 처리합니다. 대기열의 메시지 수가 0이면 생산자는 10개의 메시지를 다시 생성합니다. 중지 신호가 수신되면 생산자는 새 메시지 생성을 중지하고 소비자는 채널의 모든 것을 처리합니다.

코드를 찾았지만 뭔가 이상한 것을 발견했기 때문에 제대로 작동하는지 이해할 수 없습니다.

  1. 프로그램을 중지한 후 대기열의 모든 메시지가 처리되지 않고 일부 데이터가 손실되는 이유는 무엇입니까? (스크린샷에는 15개의 메시지가 전송되었지만 5개가 처리되었습니다)
  2. 큐를 10개의 메시지로 올바르게 제한하는 방법은 무엇입니까? 즉, 10개의 메시지를 작성하고 큐 카운터가 0에 도달할 때 처리를 기다린 다음 10개를 더 작성해야 합니까?
  3. 정지 신호 후에 제작자에게 더 이상 채널에 새로운 메시지를 생성하지 않도록 알릴 수 있나요? (스크린샷에서 생산자는 대기열에 성공적으로 기록합니다 - 12,13,14,15)

결과:

코드 예:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

Solution

프로그램을 중지한 후 대기열의 모든 메시지가 처리되지 않고 일부 데이터가 손실되는 이유는 무엇입니까?

이것은 ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 in ch.

아래 데모는 이 문제를 해결하고 소스 코드를 단순화합니다.

Notes:

  1. producectx 完成后停止。并且它关闭了 in 채널.

  2. Field jobs 已从 consumer 中删除,工作人员直接从 in 채널을 읽었습니다.

  3. 다음 요청은 이상하므로 무시합니다. 일반적인 동작은 작업이 생성될 때 in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in 채널이 그때까지 작업을 읽는 것입니다.

    큐의 메시지 수가 0이면 생산자는 다시 10개의 메시지를 생성합니다

으아아아

위 내용은 생산자와 소비자가 메시지를 읽지 못하도록 제한하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 stackoverflow.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제