Rumah >pembangunan bahagian belakang >Golang >Acara yang dihantar oleh pelayan Golang kepada setiap pengguna

Acara yang dihantar oleh pelayan Golang kepada setiap pengguna

PHPz
PHPzke hadapan
2024-02-06 11:39:141118semak imbas

Golang 服务器向每个用户发送的事件

Kandungan soalan

Saya telah lama menggunakan Go tetapi tidak pernah menggunakan SSE sebelum ini. Saya mempunyai masalah, bolehkah seseorang memberikan contoh berfungsi bagi acara yang dihantar pelayan yang hanya dihantar kepada pengguna tertentu (sambungan)?

Saya menggunakan gorila - sesi untuk pengesahan dan saya mahu menggunakan ID Pengguna untuk memisahkan sambungan.

Atau patutkah saya menggunakan pengundian 5 saat melalui Ajax?

Terima kasih banyak-banyak

Ini yang saya temui dan cuba:

  1. https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c Ia tidak akan dihantar kepada pengguna individu dan fungsi go tidak akan berhenti jika sambungan ditutup

  2. https://github.com/striversity/gotr/blob/master/010-server-sent-event-part-2/main.go Inilah yang saya perlukan, tetapi setelah sambungan dipadamkan. Jadi sekarang, sebaik sahaja anda menutup dan membuka penyemak imbas dalam tetingkap peribadi, ia tidak berfungsi sama sekali. Selain itu, seperti yang dinyatakan di atas, rutin Pergi diteruskan.


Jawapan betul


Buat "broker" untuk mengedarkan mesej kepada pengguna yang berkaitan:

type broker struct {
    // users is a map where the key is the user id
    // and the value is a slice of channels to connections
    // for that user id
    users map[string][]chan []byte

    // actions is a channel of functions to call
    // in the broker's goroutine. the broker executes
    // everything in that single goroutine to avoid
    // data races.
    actions chan func()
}

// run executes in a goroutine. it simply gets and 
// calls functions.
func (b *broker) run() {
    for a := range b.actions {
        a()
    }
}

func newbroker() *broker {
    b := &broker{
        users:   make(map[string][]chan []byte),
        actions: make(chan func()),
    }
    go b.run()
    return b
}

// adduserchan adds a channel for user with given id.
func (b *broker) adduserchan(id string, ch chan []byte) {
    b.actions <- func() {
        b.users[id] = append(b.users[id], ch)
    }
}

// removeuserchan removes a channel for a user with the given id.
func (b *broker) removeuserchan(id string, ch chan []byte) {
    // the broker may be trying to send to 
    // ch, but nothing is receiving. pump ch
    // to prevent broker from getting stuck.
    go func() { for range ch {} }()

    b.actions <- func() {
        chs := b.users[id]
        i := 0
        for _, c := range chs {
            if c != ch {
                chs[i] = c
                i = i + 1
            }
        }
        if i == 0 {
            delete(b.users, id)
        } else {
            b.users[id] = chs[:i]
        }
        // close channel to break loop at beginning
        // of removeuserchan.
        // this must be done in broker goroutine
        // to ensure that broker does not send to
        // closed goroutine.
        close(ch)
    }
}

// sendtouser sends a message to all channels for the given user id.
func (b *broker) sendtouser(id string, data []byte) {
    b.actions <- func() {
        for _, ch := range b.users[id] {
            ch <- data
        }
    }
}

Isytiharkan pembolehubah pada peringkat pakej menggunakan proksi:

var broker = newbroker()

Tulis titik akhir sse menggunakan proksi:

func sseEndpoint(w http.ResponseWriter, r *http.Request) {
    // I assume that user id is in query string for this example,
    // You should use your authentication code to get the id.
    id := r.FormValue("id")

    // Do the usual SSE setup.
    flusher := w.(http.Flusher)
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // Create channel to receive messages for this connection.  
    // Register that channel with the broker.
    // On return from the function, remove the channel
    // from the broker.
    ch := make(chan []byte)
    broker.addUserChan(id, ch)
    defer broker.removeUserChan(id, ch)
    for {
        select {
        case <-r.Context().Done():
            // User closed the connection. We are out of here.
            return
        case m := <-ch:
            // We got a message. Do the usual SSE stuff.
            fmt.Fprintf(w, "data: %s\n\n", m)
            flusher.Flush()
        }
    }
}

Tambahkan kod pada aplikasi anda untuk menghubungi broker.sendtouser.

Atas ialah kandungan terperinci Acara yang dihantar oleh pelayan Golang kepada setiap pengguna. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:stackoverflow.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam