Maison >développement back-end >Golang >Modèle de diffusion
Jetons un coup d'œil rapide au modèle de diffusion dans Go. En général, le fanout est utilisé pour effectuer un certain nombre de tâches simultanément.
Par exemple, disons que vous disposez d'un pipeline de données et que vous souhaitez traiter les éléments individuels. Nous pouvons utiliser des routines et des canaux go pour diviser les éléments au fur et à mesure que nous les recevons, puis traiter les éléments individuels (mis en dB par exemple).
C'est un modèle simple à mettre en œuvre ; mais vous devez gérer les canaux pour éviter les impasses.
// produce is simulating our single input as a channel func produce(id int) chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(20) } fmt.Printf("producer %d done\n", id) close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } fmt.Printf("worker: %d, got %d is %s\n", id, value, odd) } wg.Done() } func main() { inputCh := produce(1) numWorkers := 3 jobs := make(chan int) // split input into individual jobs go func() { for value := range inputCh { jobs <- value } close(jobs) // this is important!!! }() // fan-out var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) go worker(i, jobs, &wg) } wg.Wait() fmt.Println("done") }
L'idée principale ici est qu'il existe une séquence de données qui doivent être exploitées par un nombre fixe de travailleurs.
Pour la saisie, nous créons une séquence de nombres aléatoires et les plaçons dans un canal. Nous les transférons vers un autre canal d'où les travailleurs retireront leur « emploi ».
Dans cet exemple, il n'est pas strictement nécessaire de déplacer l'entrée vers le canal des tâches. Nous pourrions tout aussi bien demander aux travailleurs de s'extraire du canal d'entrée ; c'est juste fait pour plus de clarté ici.
Nous envoyons ensuite lancer le nombre fixe de travailleurs sous forme de goroutines. Chaque travailleur sera retiré du canal des tâches jusqu'à ce qu'il n'y ait plus de données à traiter, moment auquel il signale à un WaitGroup que c'est terminé.
Le thread principal utilise un WaitGroup pour s'assurer qu'il ne se termine pas tant que tous les travailleurs n'ont pas terminé, c'est-à-dire que toutes les tâches ont été traitées.
Un point clé à mentionner : ce modèle n'apporte aucune garantie sur l'ordre de traitement de la séquence d'entrée. Cela peut convenir dans de nombreuses circonstances. Par exemple, la séquence d'entrée est constituée d'enregistrements de données contenant leur propre horodatage et l'objectif est de stocker les enregistrements dans un dB. Dans ce cas, une répartition serait acceptable.
Une dernière remarque, vous verrez quelques commentaires sur la fermeture des chaînes une fois que toutes les données de la séquence auront été envoyées. C’est crucial. L'opérateur de portée qui extrait les canaux se mettra en veille une fois qu'il n'y aura plus de données. Vous pouvez le vérifier en commentant une des instructions close(), ce qui provoquera une condition de blocage. Les goroutines et les chaînes sont très puissantes mais il faut les utiliser à bon escient.
Que feriez-vous de différent ? Comment pouvons-nous améliorer cet exemple ? Laissez vos commentaires ci-dessous.
Merci !
Le code de cet article et de tous les articles de cette série peut être trouvé ici
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!