Heim >Backend-Entwicklung >Golang >Detaillierte Code-Erklärung: Verwenden von Go zum Aufbau eines Live-Video-Sperrsystems basierend auf WebSocket
(1) Einführung in die Geschäftskomplexität
Gehen wir gleich zur Sache: Nehmen wir an, dass 500.000 Menschen gleichzeitig in einem Live-Übertragungsraum online sind Sind 1.000 Schüsse pro Sekunde, dann beträgt die Push-Frequenz: 500W * 1000条/秒=50亿条/秒
Denken Sie darüber nach, wie großartig das Schwallsystem der Silvesterparty 2019 war. Außerdem kann eine große Website nicht nur einen Live-Übertragungsraum haben!
Die Verwendung von Go für die WebSocket-Entwicklung besteht aus nichts weiter als drei Situationen:
golang.org/x/net
, aber Dies Die offizielle Bibliothek ist wirklich voller Fehler. gorilla/websocket
-Bibliothek kann sie mit bestimmten Webentwicklungs-Frameworks wie Gin, Iris usw. kombiniert werden, sofern das verwendete Framework auf Dann kann diese Bibliothek mit diesem Framework kombiniert werdengolang.org/net
Verwandte Lernempfehlungen:Den Schätzungsergebnissen zufolge kommt es bei einem großen Barrage-Push zu einem Engpass im Linux-Kernel, da die maximale Paketsendefrequenz beim Senden von TCP-Paketen durch den Linux-Kernel 100 W beträgt. Daher können Sperrnachrichten innerhalb derselben Sekunde zu einem Push zusammengeführt werden, wodurch die Übertragung kleiner Datenpakete im Netzwerk reduziert und dadurch die Push-Frequenz verringert wird. Das Sperrsystem muss langfristige Verbindungen für Online-Benutzer aufrechterhalten, um einen gezielten Push an Online-Benutzer zu erreichen. Es verwendet normalerweise eine Hash-Wörterbuchstruktur, und Push-Nachrichten durchlaufen normalerweise das online verwendete Hash-Wörterbuch. Während der Barrage-Push-Phase sind Benutzer ständig online und offline. Um Online-Benutzer zu halten, muss das Hash-Wörterbuch ständig geändert werden und ein übermäßiges Benutzervolumen führt zu Sperrengpässen. Daher kann die gesamte Hash-Struktur in mehrere Hash-Strukturen aufgeteilt werden, den mehreren Hash-Strukturen unterschiedliche Sperren hinzugefügt werden und Lese-/Schreibsperren anstelle von Mutex-Sperren verwendet werden. Normalerweise interagiert der Server mit dem Client über eine JSON-Struktur, die eine ständige Kodierung und Dekodierung von JSON-Daten erfordert, was zu einem CPU-Engpass führt. Die Nachrichten werden zunächst zusammengeführt, dann codiert und schließlich wird die Hash-Struktur für Push abgefragt. Bei den oben genannten Problemen handelt es sich um Probleme mit einer einzelnen Architektur. Um mehr Benutzerlasten zu unterstützen, verwendet das Sperrsystem normalerweise eine verteilte Architektur für elastische Expansion und Kontraktion. (2) Drücken oder ziehen? Wenn der Client die serverseitigen Daten abruft, treten die folgenden Probleme auf:
package main import ( "fmt" "net/http" ) func main() { fmt.Println("Listen localhost:8080") // 注册一个用于WebSocket的路由,实际业务中不可能只有一个路由 http.HandleFunc("/messages", messageHandler) // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil http.ListenAndServe("localhost:8080", nil) } func messageHandler(response http.ResponseWriter, request *http.Request) { // TODO: 实现消息处理 response.Write([]byte("HelloWorld")) }Verbessern Sie dann die messageHandler-Funktion:
func messageHandler(response http.ResponseWriter, request *http.Request) { var upgrader = websocket.Upgrader{ // 允许跨域 CheckOrigin: func(resquest *http.Request) bool { return true }, } // 建立连接 conn, err := upgrader.Upgrade(response, request, nil) if err != nil { return } // 收发消息 for { // 读取消息 _, bytes, err := conn.ReadMessage() if err != nil { _ = conn.Close() } // 写入消息 err = conn.WriteMessage(websocket.TextMessage, bytes) if err != nil { _ = conn.Close() } } }Implementieren Sie nun grundsätzlich die WebSocket-Funktion , aber die native API von Websocket ist nicht threadsicher (die Close-Methode ist threadsicher und wiedereintrittsfähig), und andere Module können die Geschäftslogik nicht wiederverwenden. Daher ist sie gekapselt:
// main.go package main import ( "bluemiaomiao.github.io/websocket-go/service" "fmt" "net/http" "github.com/gorilla/websocket" ) func main() { fmt.Println("Listen localhost:8080") http.HandleFunc("/messages", messageHandler) _ = http.ListenAndServe("localhost:8080", nil) } func messageHandler(response http.ResponseWriter, request *http.Request) { var upgrader = websocket.Upgrader{ // 允许跨域 CheckOrigin: func(resquest *http.Request) bool { return true }, } // 建立连接 conn, err := upgrader.Upgrade(response, request, nil) wsConn, err := service.Create(conn) if err != nil { return } // 收发消息 for { // 读取消息 msg, err := wsConn.ReadOne() if err != nil { wsConn.Close() } // 写入消息 err = wsConn.WriteOne(msg) if err != nil { _ = conn.Close() } } }
// service/messsage_service.go package service import ( "errors" "github.com/gorilla/websocket" "sync" ) // 封装的连接对象 // // 由于websocket的Close()方法是可重入的,所以可以多次调用,但是关闭Channel的close() // 方法不是可重入的,因此通过isClosed进行判断 // isClosed可能发生资源竞争,因此通过互斥锁避免 // 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalCloseLoopChan实现 type Connection struct { conn *websocket.Conn // 具体的连接对象 inputStream chan []byte // 输入流,使用Channel模拟 outputStream chan []byte // 输出流,使用chaneel模拟 signalCloseLoopChan chan byte // 关闭信号 isClosed bool // 是否调用过close()方法 lock sync.Mutex // 简单的锁 } // 用于初始化一个连接对象 func Create(conn *websocket.Conn) (connection *Connection, err error) { connection = &Connection{ conn: conn, inputStream: make(chan []byte, 1000), outputStream: make(chan []byte, 1000), signalCloseLoopChan: make(chan byte, 1), isClosed: false, } // 启动读写循环 go connection.readLoop() go connection.writeLoop() return } // 读取一条消息 func (c *Connection) ReadOne() (msg []byte, err error) { select { case msg = <-(*c).inputStream: case <-(*c).signalCloseLoopChan: err = errors.New("connection is closed") } return } // 写入一条消息 func (c *Connection) WriteOne(msg []byte) (err error) { select { case (*c).outputStream <- msg: case <-(*c).signalCloseLoopChan: err = errors.New("connection is closed") } return } // 关闭连接对象 func (c *Connection) Close() { _ = (*c).conn.Close() (*c).lock.Lock() if !(*c).isClosed { close((*c).signalCloseLoopChan) } (*c).lock.Unlock() } // 读取循环 func (c *Connection) readLoop() { // 不停的读取长连接中的消息,只要存在消息就将其放到队列中 for { _, bytes, err := (*c).conn.ReadMessage() if err != nil { (*c).Close() } select { case <-(*c).signalCloseLoopChan: (*c).Close() case (*c).inputStream <- bytes: } } } // 写入循环 func (c *Connection) writeLoop() { // 只要队列中存在消息,就将其写入 var data []byte for { select { case data = <-(*c).outputStream: case <-(*c).signalCloseLoopChan: (*c).Close() } err := (*c).conn.WriteMessage(websocket.TextMessage, data) if err != nil { _ = (*c).conn.Close() } } }
Das obige ist der detaillierte Inhalt vonDetaillierte Code-Erklärung: Verwenden von Go zum Aufbau eines Live-Video-Sperrsystems basierend auf WebSocket. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!