首頁 >後端開發 >Golang >使用 go 例程建立 pub sub

使用 go 例程建立 pub sub

王林
王林轉載
2024-02-06 09:35:07761瀏覽

使用 go 例程创建 pub sub

問題內容

我正在嘗試建立 goroutine 來完成任務

所以我寫了這段程式碼。 像 a、b、c 這樣沒有依賴關係的任務很容易實現並且運作良好。 只是在實現依賴任務 d 和 e 時遇到一些問題,每個任務都有 2 個任務的依賴關係。

只剩下一個連接點,它為每個任務建立一個通道,然後傳遞訊息,該訊息將由依賴任務讀取,以減少依賴任務完成後的依賴數量。請參閱程式碼中的 checkpoint 1 註解。

有人可以幫我解決這個問題嗎?我只是停留在如何在這種情況下實現 goroutine 的部分。

程式碼:

package main

import (
    "fmt"
    "sync"
)

type task struct {
    isdone           bool
    dependencies     []*task
    subscribers      []*task
    donechan         chan bool
    numdependencies  int
    taskname         string
    informsubchannel chan bool //
}

func (t *task) executetask() {
    fmt.printf("task %s is getting executed...\n", t.taskname)
    // <-time.after(5 * time.second)
    fmt.printf("task %s is done!! <-------\n", t.taskname)
}

func (t *task) updatedependency() {
    var updateddependencies []*task
    for _, t := range t.dependencies {
        if !t.isdone {
            updateddependencies = append(updateddependencies, t)
        }
    }
    t.numdependencies = len(updateddependencies)
    fmt.printf("updating dependency for task: %s to %d\n", t.taskname, t.numdependencies)
    t.dependencies = updateddependencies
}

// if we are having dependencies for a task subscribe to those dependent task.
// when the dependent task is done inform it and reduce the no of dependencies.
// a --> d (d depends on a), a has finished its task so inform it subscribed task which is d here and reduce d dependencies.
func (t *task) informsubscriber() {
    if len(t.subscribers) > 0 {
        for _, sub := range t.subscribers {
            fmt.printf("task %s has informed subscriber %s\n", t.taskname, sub.taskname)
            sub.updatedependency()
        }
    }
}

// task is subscribed to dependent task. d has been subscribed to a, d will watch over the activity of a
func (t *task) setsubscriber(sub *task) {
    fmt.printf("set subscriber %s to task %s\n", sub.taskname, t.taskname)
    t.subscribers = append(t.subscribers, sub)
}

// go routine - background task execution
// mark it as completed
func (t *task) markcompleted() {
    for {
        select {
        case <-t.donechan:
            {
                t.isdone = true
                t.executetask()
                // inform all the subscribers that the task is completed and adjust their dependencies
                t.informsubscriber()
                close(t.donechan)
                return
            }
        default:
        }
    }
}

func (t *task) setdependency(tasks []*task) {
    t.dependencies = tasks
    t.numdependencies = len(t.dependencies)
}

// this will be use if dependent task are already done. will be used in checkpoint 1.
func (t *task) trackdependency() {
    t.numdependencies -= 1
    fmt.printf("no of dependencies for task %s is: %d\n", t.taskname, t.numdependencies)
    if t.numdependencies == 0 { // execute task
        t.donechan <- true
    }
}

func (t *task) start() {
    fmt.printf("running task %s\n", t.taskname)
    t.updatedependency()
    go t.markcompleted()

    if t.numdependencies > 0 {

        // for every dependent task
        for _, dep := range t.dependencies {
            // create subscribers
            dep.setsubscriber(t)
            // what if all dependencies are already executed. subscriber won't help as they won't be marked completed as already done.
            // say a and c are already done then d won't be able to complete itself since it's still waiting for them
            // if dependencies are already finished mark it as completed too

            // code: handle the dependent case here(unable to implement)
            // background function for tracking dependency
            // checkpoint 1: read dependent task channel value & reduce dependencies if done
            go t.trackdependency()
        }
        fmt.printf("task %s has %d dependencies and waiting for them to get finished\n", t.taskname, t.numdependencies)
    } else {
        // if no dependencies. mark it as finished
        t.donechan <- true
    }

}

func createtask(taskname string) *task {
    return &task{
        isdone:          false,
        taskname:        taskname,
        dependencies:    nil,
        subscribers:     nil,
        numdependencies: 0,
        donechan:        make(chan bool),
    }
}

func main() {

    taska := createtask("a")
    taskb := createtask("b")
    taskc := createtask("c")
    taskd := createtask("d")
    taske := createtask("e")

    taskd.setdependency([]*task{taska, taskb})
    taske.setdependency([]*task{taskc, taskd})

    alltasks := []*task{taska, taskb, taskc, taskd, taske}
    var wg sync.waitgroup
    for _, t := range alltasks {
        wg.add(1)
        go func(t *task) {
            defer wg.done()
            t.start()
        }(t)

    }
    wg.wait()

}

範例輸出:

#
(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go
Running Task D
Running Task B
Running Task C
Updating dependency for task: B to 0
Running Task E
Task B is getting executed...
Updating dependency for task: C to 0
Running Task A
Task C is getting executed...
Task C is done!! <-------
Updating dependency for task: D to 2
Set subscriber D to task A
Set subscriber D to task B
Task D has 2 dependencies and waiting for them to get finished
Task B is done!! <-------
No of dependencies for task D is: 2
Updating dependency for task: E to 2
Set subscriber E to task C
Set subscriber E to task D
Task E has 2 dependencies and waiting for them to get finished
No of dependencies for task E is: 2
No of dependencies for task D is: 2
No of dependencies for task E is: 2
Updating dependency for task: A to 0
task B has informed subscriber D
Updating dependency for task: D to 0
Task A is getting executed...
Task A is done!! <-------

由於上述缺失實現,目前 發現 5 個資料競爭


正確答案


我認為您可以使用較小的任務結構和 waitgroup 的一些幫助來實現上述場景進行同步。

這是我將一些註解放在一起進行解釋的範例。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// tasks holds an id ( for ease of debugging )
// a buffered channel that is only used for signaling when the task is executed
// and finally a list of dependency tasks
type task struct {
    id           string
    done         chan struct{}
    dependencies []*task
}

// run is where all the logic happens
//
// we create a waitgroup that will be the size of the dependencies for the current task
// and we will wait until all tasks have signaled that they have executed.
//
// when all the dependencies have signaled through their channel that they are done
// then the current task is free to execute and then signal any potential waiting task.
func (t *task) run(done func()) {
    wg := sync.waitgroup{}
    wg.add(len(t.dependencies))

    for _, task := range t.dependencies {
        go func(dep *task) {
            fmt.printf("%s is waiting for task %s to finish\n", t.id, dep.id)
            <-dep.done
            wg.done()
        }(task)
    }

    wg.wait()

    // emulate work
    time.sleep(time.duration(rand.intn(5-1)+1) * time.second)

    fmt.printf("job %s ran\n", t.id)
    t.done <- struct{}{}
    done()
}

func newtask(id string) *task {
    return &task{
        id: id,
        // we need buffered size here, else the task will be blocked until someone will read the channel on `run`
        done: make(chan struct{}, 1),
    }
}

func (t *task) setdeps(deps ...*task) {
    t.dependencies = append(t.dependencies, deps...)
}

// executetasks simply runs all the tasks concurrently and waits until every tasks is completed
func executetasks(tasks ...*task) {
    fmt.println("starting execution")

    wg := sync.waitgroup{}
    wg.add(len(tasks))

    for _, task := range tasks {
        go task.run(wg.done)
    }

    wg.wait()

    fmt.println("end of execution")
}

func main() {
    // initialise the tasks
    a := newtask("a")
    b := newtask("b")
    c := newtask("c")
    d := newtask("d")
    e := newtask("e")
    // and set dependencies
    // a.setdeps(d)
    d.setdeps(a, b)
    e.setdeps(d, c)

    // then we "try" to execute all the tasks.
    executetasks(a, b, c, d, e)
}

當然這不是完美的解決方案,我可以認為已經有很多情況沒有得到處理

例如

  • 循環依賴最終會陷入死鎖 a => dd => a
  • 或如果多個任務依賴另一個任務,原因是您只能從一個通道讀取相同的值一次。

為了解決第一個問題,您可能需要建立依賴圖並檢查它是否是循環的。對於第二個, hacky 方式可能是

go func(dep *Task) {
        fmt.Printf("%s is waiting for task %s to finish\n", t.id, dep.id)
        <-dep.done
        // put the value back if anyone else is also dependent
        dep.done <- struct{}{}
        wg.Done()
}(task)

以上是使用 go 例程建立 pub sub的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:stackoverflow.com。如有侵權,請聯絡admin@php.cn刪除