Heim >Backend-Entwicklung >Golang >Erstellen Sie ein Pub Sub mit der Go-Routine
Ich versuche, eine Goroutine zu erstellen, um die Aufgabe zu erledigen
Also habe ich diesen Code geschrieben. Aufgaben wie a, b, c ohne Abhängigkeiten sind einfach zu implementieren und laufen gut. Ich bin gerade auf einige Probleme bei der Implementierung der abhängigen Aufgaben d und e gestoßen, jede Aufgabe hat Abhängigkeiten von 2 Aufgaben.
Es gibt nur noch einen Verbindungspunkt, der für jede Aufgabe einen Kanal erstellt und dann die Nachricht übermittelt, die von der abhängigen Aufgabe gelesen wird, um die Anzahl der Abhängigkeiten nach Abschluss der abhängigen Aufgabe zu reduzieren. Siehe die checkpoint 1
Kommentare im Code.
Kann mir jemand helfen, dieses Problem zu lösen? Ich bin in diesem Fall nur mit der Frage beschäftigt, wie ich Goroutine implementieren soll.
Code:
package main import ( "fmt" "sync" ) type task struct { isdone bool dependencies []*task subscribers []*task donechan chan bool numdependencies int taskname string informsubchannel chan bool // } func (t *task) executetask() { fmt.printf("task %s is getting executed...\n", t.taskname) // <-time.after(5 * time.second) fmt.printf("task %s is done!! <-------\n", t.taskname) } func (t *task) updatedependency() { var updateddependencies []*task for _, t := range t.dependencies { if !t.isdone { updateddependencies = append(updateddependencies, t) } } t.numdependencies = len(updateddependencies) fmt.printf("updating dependency for task: %s to %d\n", t.taskname, t.numdependencies) t.dependencies = updateddependencies } // if we are having dependencies for a task subscribe to those dependent task. // when the dependent task is done inform it and reduce the no of dependencies. // a --> d (d depends on a), a has finished its task so inform it subscribed task which is d here and reduce d dependencies. func (t *task) informsubscriber() { if len(t.subscribers) > 0 { for _, sub := range t.subscribers { fmt.printf("task %s has informed subscriber %s\n", t.taskname, sub.taskname) sub.updatedependency() } } } // task is subscribed to dependent task. d has been subscribed to a, d will watch over the activity of a func (t *task) setsubscriber(sub *task) { fmt.printf("set subscriber %s to task %s\n", sub.taskname, t.taskname) t.subscribers = append(t.subscribers, sub) } // go routine - background task execution // mark it as completed func (t *task) markcompleted() { for { select { case <-t.donechan: { t.isdone = true t.executetask() // inform all the subscribers that the task is completed and adjust their dependencies t.informsubscriber() close(t.donechan) return } default: } } } func (t *task) setdependency(tasks []*task) { t.dependencies = tasks t.numdependencies = len(t.dependencies) } // this will be use if dependent task are already done. will be used in checkpoint 1. func (t *task) trackdependency() { t.numdependencies -= 1 fmt.printf("no of dependencies for task %s is: %d\n", t.taskname, t.numdependencies) if t.numdependencies == 0 { // execute task t.donechan <- true } } func (t *task) start() { fmt.printf("running task %s\n", t.taskname) t.updatedependency() go t.markcompleted() if t.numdependencies > 0 { // for every dependent task for _, dep := range t.dependencies { // create subscribers dep.setsubscriber(t) // what if all dependencies are already executed. subscriber won't help as they won't be marked completed as already done. // say a and c are already done then d won't be able to complete itself since it's still waiting for them // if dependencies are already finished mark it as completed too // code: handle the dependent case here(unable to implement) // background function for tracking dependency // checkpoint 1: read dependent task channel value & reduce dependencies if done go t.trackdependency() } fmt.printf("task %s has %d dependencies and waiting for them to get finished\n", t.taskname, t.numdependencies) } else { // if no dependencies. mark it as finished t.donechan <- true } } func createtask(taskname string) *task { return &task{ isdone: false, taskname: taskname, dependencies: nil, subscribers: nil, numdependencies: 0, donechan: make(chan bool), } } func main() { taska := createtask("a") taskb := createtask("b") taskc := createtask("c") taskd := createtask("d") taske := createtask("e") taskd.setdependency([]*task{taska, taskb}) taske.setdependency([]*task{taskc, taskd}) alltasks := []*task{taska, taskb, taskc, taskd, taske} var wg sync.waitgroup for _, t := range alltasks { wg.add(1) go func(t *task) { defer wg.done() t.start() }(t) } wg.wait() }
Beispielausgabe:
(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go Running Task D Running Task B Running Task C Updating dependency for task: B to 0 Running Task E Task B is getting executed... Updating dependency for task: C to 0 Running Task A Task C is getting executed... Task C is done!! <------- Updating dependency for task: D to 2 Set subscriber D to task A Set subscriber D to task B Task D has 2 dependencies and waiting for them to get finished Task B is done!! <------- No of dependencies for task D is: 2 Updating dependency for task: E to 2 Set subscriber E to task C Set subscriber E to task D Task E has 2 dependencies and waiting for them to get finished No of dependencies for task E is: 2 No of dependencies for task D is: 2 No of dependencies for task E is: 2 Updating dependency for task: A to 0 task B has informed subscriber D Updating dependency for task: D to 0 Task A is getting executed... Task A is done!! <-------
Aufgrund der oben fehlenden Implementierung werden derzeit 5 Datenrennen gefunden
. 发现 5 个数据竞争
。
我认为您可以使用较小的任务结构和 waitgroup
waitgroup
für die Synchronisierung verwenden. Hier ist ein Beispiel dafür, wie ich zur Erklärung einige Notizen zusammengestellt habe.
package main import ( "fmt" "math/rand" "sync" "time" ) // tasks holds an id ( for ease of debugging ) // a buffered channel that is only used for signaling when the task is executed // and finally a list of dependency tasks type task struct { id string done chan struct{} dependencies []*task } // run is where all the logic happens // // we create a waitgroup that will be the size of the dependencies for the current task // and we will wait until all tasks have signaled that they have executed. // // when all the dependencies have signaled through their channel that they are done // then the current task is free to execute and then signal any potential waiting task. func (t *task) run(done func()) { wg := sync.waitgroup{} wg.add(len(t.dependencies)) for _, task := range t.dependencies { go func(dep *task) { fmt.printf("%s is waiting for task %s to finish\n", t.id, dep.id) <-dep.done wg.done() }(task) } wg.wait() // emulate work time.sleep(time.duration(rand.intn(5-1)+1) * time.second) fmt.printf("job %s ran\n", t.id) t.done <- struct{}{} done() } func newtask(id string) *task { return &task{ id: id, // we need buffered size here, else the task will be blocked until someone will read the channel on `run` done: make(chan struct{}, 1), } } func (t *task) setdeps(deps ...*task) { t.dependencies = append(t.dependencies, deps...) } // executetasks simply runs all the tasks concurrently and waits until every tasks is completed func executetasks(tasks ...*task) { fmt.println("starting execution") wg := sync.waitgroup{} wg.add(len(tasks)) for _, task := range tasks { go task.run(wg.done) } wg.wait() fmt.println("end of execution") } func main() { // initialise the tasks a := newtask("a") b := newtask("b") c := newtask("c") d := newtask("d") e := newtask("e") // and set dependencies // a.setdeps(d) d.setdeps(a, b) e.setdeps(d, c) // then we "try" to execute all the tasks. executetasks(a, b, c, d, e) }Natürlich ist das keine perfekte Lösung, ich sehe, dass es bereits viele Situationen gibt, die nicht bewältigt werden
a => d
和 d => a
Oder wenn mehrere Aufgaben von einer anderen Aufgabe abhängen, liegt der Grund darin, dass Sie denselben Wert nur einmal aus einem Kanal lesen können. hacky
go func(dep *Task) { fmt.Printf("%s is waiting for task %s to finish\n", t.id, dep.id) <-dep.done // put the value back if anyone else is also dependent dep.done <- struct{}{} wg.Done() }(task)
Das obige ist der detaillierte Inhalt vonErstellen Sie ein Pub Sub mit der Go-Routine. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!