Home  >  Article  >  Backend Development  >  How to cancel channel job based on ID in golang

How to cancel channel job based on ID in golang

PHPz
PHPzforward
2024-02-08 23:42:11703browse

How to cancel channel job based on ID in golang

In golang, canceling channel jobs is a common requirement. When we are dealing with concurrent tasks, sometimes we need to cancel the executing task based on the task ID. So, how to implement this function in golang? Below, I will introduce you to a simple and effective method. First, we need to create a buffered channel to store the task's ID. We can then use a select statement to listen for read operations on the channel, and send the task's ID to the channel when the task needs to be canceled. Then, in the task execution function, we can determine whether the task needs to be canceled by judging whether the channel is closed. If the channel is closed, the task has been canceled and we can terminate the execution of the task at the appropriate place. In this way, we can easily cancel channel jobs based on ID in golang. The above is the method introduced by php editor Xinyi, I hope it will be helpful to everyone!

Question content

So I have a post endpoint that creates a job and adds them to a chan. workerjobschan = make(chan job, maxqueuesize)

This is how I execute the job in the channel (main.go):

for i := 1; i <= maxworkers; i++ {
    go func(i int) {
        for job := range workerjobschan {
            ctx, cancel := context.withcancel(context.background())
            storejob(job.search.id, cancel)
            job.execute(ctx, c.db, i)
        }
    }(i)
}

I store the cancellation function in the map: canceljobfuncs = make(map[int]context.cancelfunc).

This is the job function:

func (j *job) execute(ctx context.context, db *sql.db, workerid int) error {


    for {
        select {
        // check for cancellation signal
        case <-ctx.done():
             if err := ctx.err(); err != nil {
                fmt.println("worker", workerid, "error", err)
             }
            fmt.println("worker", workerid, "cancelled")
            return nil

        default:
            fmt.printf("worker%d: processing %s\n", workerid, j.search.query)
            time.sleep(2 * time.second)
            fmt.printf("worker%d: active %s\n", workerid, j.search.query)
            time.sleep(5 * time.second)
            fmt.printf("worker%d: completed %s!\n", workerid, j.search.query)

        }
    }
}

I cancel the context (in the http handler) like this:

cancelJob(search.ID)

But the job continues to run. I've tried a lot of things but can't seem to get it to work.

Workaround

Here's a way to illustrate an obvious point: if your code doesn't check ctx.done(), it has no way of knowing that it has been Cancel.
(BTW, this is another paraphrase of what @jimb wrote in the comment on your question).

So when the code in the .execute(...) method starts executing this block:

fmt.printf("worker%d: processing %s\n", workerid, j.search.query)
    time.sleep(2 * time.second)
    fmt.printf("worker%d: active %s\n", workerid, j.search.query)
    time.sleep(5 * time.second)
    fmt.printf("worker%d: completed %s!\n", workerid, j.search.query)

It will reach the end of the block (7 seconds). There is no instruction telling it to stop on cancel.

If you want your function to be able to detect cancellation during a "sleep" instruction, you must change your code.

Here's an example of how to do this using your example:

func (j *job) execute(ctx context.context, db *sql.db, workerid int) error {


    for {
        fmt.printf("worker%d: processing %s\n", workerid, j.search.query)

        // rewrite time.sleep() with time.after() so that it can be composed
        // in a select statement:
        select {
        case <-ctx.done():
            fmt.println("worker", workerid, "cancelled")
            return nil
        case <-time.after(2 * time.second):
            // keep going
        }

        fmt.printf("worker%d: active %s\n", workerid, j.search.query)

        select {
        case <-ctx.done():
            fmt.println("worker", workerid, "cancelled")
            return nil
        case <-time.after(5 * time.second):
            // keep going
        }

        fmt.printf("worker%d: completed %s!\n", workerid, j.search.query)
    }
}

https://www.php.cn/link/3bc31a430954d8326605fc690ed22f4d

I guess your actual code doesn't have the time.sleep() directive, but processsearch(...) or doquery(...) or...

If you need these functions to be cancelable during execution, you need to pass the cancellation context to them somehow and have them check for cancellation in some way.

One way to "pass context" is obviously to add it to the parameters of said function:

processsearch(ctx, ...)
doquery(ctx, ...)

But depending on your existing code, some parameters may already have built-in methods to cancel.
for example:

// an http.Request carries a context:
func doQuery(req *http.Request, ....) {
    ...
}

// at call site:
    ...
    req := http.NewRequestWithContext(ctx, "GET", "https://some.other.service/", nil)
    doQuery(req, ...)

The above is the detailed content of How to cancel channel job based on ID in golang. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete