Rumah >pembangunan bahagian belakang >Golang >Bagaimana untuk menyekat pengeluar dan pengguna daripada membaca mesej?

Bagaimana untuk menyekat pengeluar dan pengguna daripada membaca mesej?

WBOY
WBOYke hadapan
2024-02-11 18:00:11944semak imbas

Bagaimana untuk menyekat pengeluar dan pengguna daripada membaca mesej?

PHP editor Zimo Dalam proses pembangunan perisian, baris gilir mesej ialah mekanisme komunikasi biasa yang digunakan untuk mencapai komunikasi tak segerak antara pengeluar dan pengguna. Walau bagaimanapun, kadangkala kami ingin mengawal pembacaan mesej oleh pengeluar dan pengguna untuk mengurus sumber sistem dengan lebih baik dan mengendalikan permintaan semasa waktu sibuk. Artikel ini akan memperkenalkan beberapa kaedah untuk menyekat pengeluar dan pengguna daripada membaca mesej untuk membantu pembangun mengoptimumkan prestasi sistem dan meningkatkan kestabilan aplikasi.

Kandungan soalan

Saya ingin mendapatkan pengeluar aplikasi-pengguna (ditutup melalui isyarat) menggunakan go.

Pengeluar terus menjana mesej dalam baris gilir, dengan had 10. Sesetengah pengguna membaca dan memproses saluran. Jika bilangan mesej dalam baris gilir ialah 0, pengeluar menjana 10 mesej sekali lagi. Apabila isyarat berhenti diterima, pengeluar berhenti menjana mesej baharu dan pengguna memproses segala-galanya dalam saluran.

Saya menjumpai sekeping kod tetapi tidak dapat memahami sama ada ia berfungsi dengan betul kerana saya menemui sesuatu yang pelik:

  1. Mengapa selepas menghentikan program, tidak semua mesej dalam baris gilir diproses, dan beberapa data kelihatan hilang. (Dalam tangkapan skrin, 15 mesej telah dihantar tetapi 5 telah diproses)
  2. Bagaimana untuk mengehadkan baris gilir dengan betul kepada 10 mesej, iaitu perlu menulis 10 mesej, tunggu pemprosesan apabila pembilang baris gilir mencapai 0, dan kemudian tulis 10 lagi?
  3. Adakah mungkin untuk memberitahu penerbit selepas isyarat berhenti supaya dia tidak lagi menjana mesej baharu kepada saluran? (Dalam tangkapan skrin, pengeluar berjaya menulis ke baris gilir - 12,13,14,15)

Hasil:

Contoh kod:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

Penyelesaian

Mengapa selepas menghentikan program, tidak semua mesej dalam baris gilir diproses, dan beberapa data nampaknya hilang.

Ini kerana apabila ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 in ch.

Demo di bawah menyelesaikan masalah ini dan memudahkan kod sumber.

Nota:

  1. producectx 完成后停止。并且它关闭了 in Saluran.

  2. Bidang jobs 已从 consumer 中删除,工作人员直接从 in Baca saluran.

  3. Permintaan berikut tidak diendahkan kerana pelik. Tingkah laku biasa ialah apabila kerja dijana, jika saluran in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in membaca kerja itu sehingga itu.

    Jika bilangan mesej dalam baris gilir ialah 0, pengeluar menjana 10 mesej lagi

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}

Atas ialah kandungan terperinci Bagaimana untuk menyekat pengeluar dan pengguna daripada membaca mesej?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:stackoverflow.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam