Heim >Backend-Entwicklung >Golang >Goroutinen und Kanäle: Parallelitätsmuster in Go

Goroutinen und Kanäle: Parallelitätsmuster in Go

Patricia Arquette
Patricia ArquetteOriginal
2024-12-13 05:55:11732Durchsuche

Parallelität ermöglicht es uns, mehrere Aufgaben unabhängig voneinander zu bearbeiten. Goroutinen sind eine einfache Möglichkeit, mehrere Aufgaben unabhängig voneinander zu bearbeiten. In diesem Beitrag verbessern wir schrittweise einen HTTP-Handler, der Dateien akzeptiert und verschiedene Parallelitätsmuster in Go mithilfe von Kanälen und dem Synchronisierungspaket untersucht.

Aufstellen

Bevor wir uns mit Parallelitätsmustern befassen, bereiten wir die Bühne. Stellen Sie sich vor, wir haben einen HTTP-Handler, der mehrere Dateien über ein Formular akzeptiert und die Dateien auf irgendeine Weise verarbeitet.

func processFile(file multipart.File) {
   // do something with the file
   fmt.Println("Processing file...")
   time.Sleep(100 * time.Millisecond) // Simulating file processing time
}
func UploadHandler(w http.ResponseWriter, r *http.Request) {
   // limit to 10mb 
   if err := r.ParseMultipartForm(10 << 20); err != nil {
       http.Error(w, "Unable to parse form", http.StatusBadRequest)
       return 
   }
   // iterate through all files and process them sequentially 
   for _, file := range r.MultipartForm.File["files"] {
       f, err := file.Open()
       if err != nil {
          http.Error(w, "Unable to read file", http.StatusInternalServerError)
          return
       }
       processFile(f)
       f.Close()
   }
}

Im obigen Beispiel empfangen wir Dateien aus einem Formular und verarbeiten sie nacheinander. Wenn 10 Dateien hochgeladen werden, würde es 1 Sekunde dauern, bis der Vorgang abgeschlossen ist und eine Antwort an den Client gesendet wird.
Bei der Verarbeitung vieler Dateien kann dies zu einem Engpass werden. Mit der Parallelitätsunterstützung von Go können wir dieses Problem jedoch leicht lösen.

Wartegruppen

Um dieses Problem zu lösen, können wir Dateien gleichzeitig verarbeiten. Um eine neue Goroutine zu erzeugen, können wir einem Funktionsaufruf das Schlüsselwort go voranstellen, z. B. Gehen Sie zu ProcessFile(f). Da Goroutinen jedoch nicht blockieren, kehrt der Handler möglicherweise zurück, bevor der Prozess abgeschlossen ist, wodurch Dateien möglicherweise unverarbeitet bleiben oder einen falschen Status zurückgeben. Um auf die Verarbeitung aller Dateien zu warten, können wir sync.WaitGroup.
verwenden Eine WaitGroup wartet darauf, dass eine Reihe von Goroutinen abgeschlossen werden. Für jede Goroutine, die wir erzeugen, sollten wir zusätzlich den Zähler in der WaitGroup erhöhen. Dies kann mit der Add-Funktion erfolgen. Wenn eine Goroutine fertig ist, sollte Done aufgerufen werden, damit der Zähler um eins verringert wird. Vor der Rückkehr von der Funktion sollte Wait aufgerufen werden, das blockiert, bis der Zähler der WaitGroup 0 ist.

func UploadHandler(w http.ResponseWriter, r *http.Request) {
   if err := r.ParseMultipartForm(10 << 20); err != nil {
       http.Error(w, "Unable to parse form", http.StatusBadRequest)
       return
   }

   // create WaitGroup 
   var wg sync.WaitGroup 
   for _, file := range r.MultipartForm.File["files"] {
       f, err := file.Open()
       if err != nil {
          http.Error(w, "Unable to read file", http.StatusInternalServerError)
          return
       }

       wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine
       // Process file concurrently
       go func(file multipart.File) {
           defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. 
           defer file.Close()
           processFile(f)
       }(f)
   }

   // Wait for all goroutines to complete
   wg.Wait()
   fmt.Fprintln(w, "All files processed successfully!")
}

Jetzt wird für jede hochgeladene Datei eine neue Goroutine erzeugt, was das System überfordern könnte. Eine Lösung besteht darin, die Anzahl der erzeugten Goroutinen zu begrenzen.

Begrenzung der Parallelität mit einem Semaphor

Ein Semaphor ist einfach eine Variable, die wir verwenden können, um den Zugriff auf gemeinsame Ressourcen durch mehrere Threads oder in diesem Fall Goroutinen zu steuern.

In Go können wir gepufferte Kanäle nutzen, um ein Semaphor zu implementieren.

Kanäle

Bevor wir mit der Implementierung beginnen, schauen wir uns an, was Kanäle sind und welchen Unterschied zwischen gepufferten und ungepufferten Kanälen besteht.

Kanäle sind eine Leitung, über die wir Daten senden und empfangen können, um sicher zwischen Go-Routinen zu kommunizieren.
Kanäle müssen mit der Make-Funktion erstellt werden.

func processFile(file multipart.File) {
   // do something with the file
   fmt.Println("Processing file...")
   time.Sleep(100 * time.Millisecond) // Simulating file processing time
}
func UploadHandler(w http.ResponseWriter, r *http.Request) {
   // limit to 10mb 
   if err := r.ParseMultipartForm(10 << 20); err != nil {
       http.Error(w, "Unable to parse form", http.StatusBadRequest)
       return 
   }
   // iterate through all files and process them sequentially 
   for _, file := range r.MultipartForm.File["files"] {
       f, err := file.Open()
       if err != nil {
          http.Error(w, "Unable to read file", http.StatusInternalServerError)
          return
       }
       processFile(f)
       f.Close()
   }
}

Kanäle haben einen speziellen Operator <-, der zum Senden oder Lesen von einem Kanal verwendet wird.
Wenn der Operator auf den Kanal ch <-1 zeigt, werden Daten an den Kanal gesendet. Zeigt der Pfeil vom Kanal <-ch weg, wird der Wert empfangen. Sende- und Empfangsvorgänge blockieren standardmäßig. Dies bedeutet, dass jeder Vorgang wartet, bis die andere Seite bereit ist.

Goroutines and Channels: Concurrency Patterns in Go
Die Animation visualisiert, wie ein Produzent den Wert 1 über einen ungepufferten Kanal sendet und der Verbraucher aus dem Kanal liest.

Wenn der Produzent Ereignisse schneller senden kann, als der Verbraucher verarbeiten kann, haben wir die Möglichkeit, einen gepufferten Kanal zu verwenden, um mehrere Nachrichten in die Warteschlange zu stellen, ohne den Produzenten zu blockieren, bis der Puffer voll ist. Gleichzeitig kann der Verbraucher die Nachrichten in seinem eigenen Tempo verarbeiten.

func UploadHandler(w http.ResponseWriter, r *http.Request) {
   if err := r.ParseMultipartForm(10 << 20); err != nil {
       http.Error(w, "Unable to parse form", http.StatusBadRequest)
       return
   }

   // create WaitGroup 
   var wg sync.WaitGroup 
   for _, file := range r.MultipartForm.File["files"] {
       f, err := file.Open()
       if err != nil {
          http.Error(w, "Unable to read file", http.StatusInternalServerError)
          return
       }

       wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine
       // Process file concurrently
       go func(file multipart.File) {
           defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. 
           defer file.Close()
           processFile(f)
       }(f)
   }

   // Wait for all goroutines to complete
   wg.Wait()
   fmt.Fprintln(w, "All files processed successfully!")
}

In diesem Beispiel kann der Produzent bis zu zwei Artikel ohne Blockierung versenden. Wenn die Kapazität des Puffers erreicht ist, blockiert der Produzent, bis der Verbraucher mindestens eine Nachricht verarbeitet hat.

Goroutines and Channels: Concurrency Patterns in Go

Zurück zum ursprünglichen Problem: Wir möchten die Anzahl der Goroutinen begrenzen, die Dateien gleichzeitig verarbeiten. Dazu können wir gepufferte Kanäle nutzen.

ch := make(chan int)

In diesem Beispiel haben wir einen gepufferten Kanal mit einer Kapazität von 5 hinzugefügt. Dadurch können wir 5 Dateien gleichzeitig verarbeiten und die Belastung des Systems begrenzen.

Aber was ist, wenn nicht alle Dateien gleich sind? Wir können möglicherweise zuverlässig vorhersagen, dass unterschiedliche Dateitypen oder Dateigrößen mehr Ressourcen für die Verarbeitung erfordern. In diesem Fall können wir ein gewichtetes Semaphor verwenden.

Gewichtetes Semaphor

Einfach ausgedrückt können wir mit einem gewichteten Semaphor einer einzelnen Aufgabe mehr Ressourcen zuweisen. Go bietet bereits eine Implementierung für ein gewichtetes Semaphor innerhalb des Extend-Sync-Pakets.

ch := make(chan int, 2)

In dieser Version haben wir ein gewichtetes Semaphor mit 5 Slots erstellt. Wenn beispielsweise nur Bilder hochgeladen werden, verarbeitet der Prozess 5 Bilder gleichzeitig. Wenn jedoch ein PDF hochgeladen wird, werden 2 Slots erfasst, was die Anzahl der zu verarbeitenden Dateien verringern würde gleichzeitig.

Abschluss

Wir haben einige Parallelitätsmuster in Go untersucht und dabei sync.WaitGroup und Semaphoren verwendet, um die Anzahl gleichzeitiger Aufgaben zu steuern. Es stehen jedoch weitere Tools zur Verfügung. Wir könnten Kanäle nutzen, um einen Worker-Pool zu erstellen, Zeitüberschreitungen hinzuzufügen oder Fan-In/Out-Muster zu verwenden.
Darüber hinaus ist die Fehlerbehandlung ein wichtiger Aspekt, der der Einfachheit halber größtenteils weggelassen wurde.
Eine Möglichkeit, mit Fehlern umzugehen, wäre die Nutzung von Kanälen, um Fehler zu aggregieren und zu behandeln, nachdem alle Goroutinen abgeschlossen sind.

Go bietet auch eine errgroup.Group, die mit sync.WaitGroups zusammenhängt, aber die Behandlung von Aufgaben hinzufügt, die Fehler zurückgeben.
Das Paket finden Sie im Paket „Extend Sync“.

Das obige ist der detaillierte Inhalt vonGoroutinen und Kanäle: Parallelitätsmuster in Go. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn