Heim >Backend-Entwicklung >Golang >Schließen Sie Goroutinen mithilfe von Kanälen schneller

Schließen Sie Goroutinen mithilfe von Kanälen schneller

WBOY
WBOYnach vorne
2024-02-06 09:12:04794Durchsuche

使用通道更快地关闭 goroutine

Frageninhalt

Ich bin neu bei GO und habe eine Frage zum Stoppen von Goroutine mithilfe des Kanalsignals.

Ich habe eine langjährige Goroutine (über 1000 davon) und einen Manager, der sie verwaltet:

func myThreadFunc(stop chan bool) {
    for {
        select {
        case <- stop:
            log.Debug("Stopping thread")
            return
        default:
            callClientTask() 
        }
    }
}

func callClientTask() {
    // This can take long time up to 30 seconds - this is external HTTP API call
    time.Sleep(5 * time.Second)
}


func manager() {
    var cancelChannelSlice []chan bool
    for i := 0; i < 1000; i++ {
        cancelChannel := make(chan bool)
        cancelChannelSlice = append(cancelChannelSlice, cancelChannel)

        go myThreadFunc(cancelChannel)
    }

    var stopTest = func() {
        for _, c := range cancelChannelSlice {
            c <- true
        }
    }

    timeout := time.After(time.Duration(300) * time.Second)
    for {
        select {
        case <-timeout:
            stopTest()
        default:
            time.Sleep(time.Second)
        }
    }
}

In diesem Fall jedes Mal, wenn ich anrufe c <- true 管理器都会等待 callClientTask() 完成,然后转到下一个 cancelChannel Ich möchte, dass alle Goroutinen in einer Iteration von callClientTask() (nicht länger als 30 Sekunden)

anhalten

Die einzige Möglichkeit, die ich versucht habe, bestand darin, die neue Goroutine so zu besetzen:

var stopTest = func() {
        for _, c := range cancelChannelSlice {
            go func(c chan bool) {
                c <- true
                close(c)
            }(c)
        }
    }

Mache ich das richtig?


Richtige Antwort


Soweit ich Ihre Frage verstehe: „Sie möchten, dass alle Goroutinen innerhalb einer Iteration von callClientTask() (nicht länger als 30 Sekunden) anhalten“ und die Arbeitsthreads gleichzeitig ohne Synchronisierungsfrage ausgeführt werden.

Ich habe den Code so umorganisiert, dass er gleichzeitig mit der Wartegruppe ausgeführt wird.

Beispielcode:

package main

import (
    "log"
    "sync"
    "time"
)

func worker(stop <-chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case <-stop:
            log.Println("Stopping thread")
            return
        default:
            callClientTask()
        }
    }
}

func callClientTask() {
    time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}

func main() {
    var wg sync.WaitGroup
    stop := make(chan struct{})

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go worker(stop, &wg)
    }

    time.Sleep(5 * time.Second) // allow workers to run for a while
    close(stop)                 // stop all workers, close channel
    wg.Wait()                   // wait for all workers
}

Ausgabe:

2023/10/26 10:40:44 Stopping thread
2023/10/26 10:40:44 Stopping thread
....
2023/10/26 10:40:49 Stopping thread
2023/10/26 10:40:49 Stopping thread

Herausgeber:

Wenn Sie einige Arbeiter stoppen möchten, müssen Sie die Arbeiter aktualisieren. Der folgende Code enthält einen Worker mit „Stopp“- und „Stopp“-Kanälen und einer Start/Stopp-Funktion.

Beispielcode:

package main

import (
    "log"
    "sync"
    "time"
)

type Worker struct {
    stop    chan struct{}
    stopped chan struct{}
}

func NewWorker() *Worker {
    return &Worker{
        stop:    make(chan struct{}),
        stopped: make(chan struct{}),
    }
}

func (w *Worker) Start(wg *sync.WaitGroup) {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            select {
            case <-w.stop:
                log.Println("Stopping thread")
                close(w.stopped)
                return
            default:
                callClientTask()
            }
        }
    }()
}

func (w *Worker) Stop() {
    close(w.stop)
    <-w.stopped
}

func callClientTask() {
    time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}

func main() {
    var wg sync.WaitGroup
    workers := make([]*Worker, 1000)

    for i := 0; i < 1000; i++ {
        workers[i] = NewWorker()
        workers[i].Start(&wg)
    }

    time.Sleep(5 * time.Second) // allow workers to run for a while 
    for i := 0; i < 100; i++ { // stop  first 100 workers
        workers[i].Stop()
    }  
    for i := 100; i < 1000; i++ { // wait other workers to finish
        workers[i].Stop()
    }
    wg.Wait()
}

Ausgabe:

2023/10/26 12:51:26 Stopping thread
2023/10/26 12:51:28 Stopping thread
2023/10/26 12:51:30 Stopping thread
....

Das obige ist der detaillierte Inhalt vonSchließen Sie Goroutinen mithilfe von Kanälen schneller. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen