Home >Backend Development >Golang >Events sent by the Golang server to each user

Events sent by the Golang server to each user

PHPz
PHPzforward
2024-02-06 11:39:141132browse

Golang 服务器向每个用户发送的事件

Question content

I have been using Go for a while but have never used SSE before. I have a problem, can someone provide a working example of a server sent event that is only sent to a specific user (connection)?

I'm using gorilla - session for authentication and I want to use UserID to separate the connections.

Or should I use 5 second polling via Ajax?

Thank you so much

Here's what I found and tried:

  1. https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c It will not be sent to individual users, and the go func will not stop if the connection is closed

  2. https://github.com/striversity/gotr/blob/master/010-server-sent-event-part-2/main.go This is exactly what I need, but once the connection is deleted . So now, once you close and open the browser in a private window, it doesn't work at all. Also, as mentioned above, the Go routine continues.


Correct answer


Create a "broker" to distribute messages to connected users:

type broker struct {
    // users is a map where the key is the user id
    // and the value is a slice of channels to connections
    // for that user id
    users map[string][]chan []byte

    // actions is a channel of functions to call
    // in the broker's goroutine. the broker executes
    // everything in that single goroutine to avoid
    // data races.
    actions chan func()
}

// run executes in a goroutine. it simply gets and 
// calls functions.
func (b *broker) run() {
    for a := range b.actions {
        a()
    }
}

func newbroker() *broker {
    b := &broker{
        users:   make(map[string][]chan []byte),
        actions: make(chan func()),
    }
    go b.run()
    return b
}

// adduserchan adds a channel for user with given id.
func (b *broker) adduserchan(id string, ch chan []byte) {
    b.actions <- func() {
        b.users[id] = append(b.users[id], ch)
    }
}

// removeuserchan removes a channel for a user with the given id.
func (b *broker) removeuserchan(id string, ch chan []byte) {
    // the broker may be trying to send to 
    // ch, but nothing is receiving. pump ch
    // to prevent broker from getting stuck.
    go func() { for range ch {} }()

    b.actions <- func() {
        chs := b.users[id]
        i := 0
        for _, c := range chs {
            if c != ch {
                chs[i] = c
                i = i + 1
            }
        }
        if i == 0 {
            delete(b.users, id)
        } else {
            b.users[id] = chs[:i]
        }
        // close channel to break loop at beginning
        // of removeuserchan.
        // this must be done in broker goroutine
        // to ensure that broker does not send to
        // closed goroutine.
        close(ch)
    }
}

// sendtouser sends a message to all channels for the given user id.
func (b *broker) sendtouser(id string, data []byte) {
    b.actions <- func() {
        for _, ch := range b.users[id] {
            ch <- data
        }
    }
}

Declare variables at the package level using proxies:

var broker = newbroker()

Writing sse endpoint using proxy:

func sseEndpoint(w http.ResponseWriter, r *http.Request) {
    // I assume that user id is in query string for this example,
    // You should use your authentication code to get the id.
    id := r.FormValue("id")

    // Do the usual SSE setup.
    flusher := w.(http.Flusher)
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // Create channel to receive messages for this connection.  
    // Register that channel with the broker.
    // On return from the function, remove the channel
    // from the broker.
    ch := make(chan []byte)
    broker.addUserChan(id, ch)
    defer broker.removeUserChan(id, ch)
    for {
        select {
        case <-r.Context().Done():
            // User closed the connection. We are out of here.
            return
        case m := <-ch:
            // We got a message. Do the usual SSE stuff.
            fmt.Fprintf(w, "data: %s\n\n", m)
            flusher.Flush()
        }
    }
}

Add code to your application to call broker.sendtouser.

The above is the detailed content of Events sent by the Golang server to each user. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete