Heim >Backend-Entwicklung >Golang >Detaillierte Code-Erklärung: Verwenden von Go zum Aufbau eines Live-Video-Sperrsystems basierend auf WebSocket

Detaillierte Code-Erklärung: Verwenden von Go zum Aufbau eines Live-Video-Sperrsystems basierend auf WebSocket

coldplay.xixi
coldplay.xixinach vorne
2020-07-21 17:27:034606Durchsuche

Detaillierte Code-Erklärung: Verwenden von Go zum Aufbau eines Live-Video-Sperrsystems basierend auf WebSocket

(1) Einführung in die Geschäftskomplexität

Gehen wir gleich zur Sache: Nehmen wir an, dass 500.000 Menschen gleichzeitig in einem Live-Übertragungsraum online sind Sind 1.000 Schüsse pro Sekunde, dann beträgt die Push-Frequenz: 500W * 1000条/秒=50亿条/秒 Denken Sie darüber nach, wie großartig das Schwallsystem der Silvesterparty 2019 war. Außerdem kann eine große Website nicht nur einen Live-Übertragungsraum haben!

Die Verwendung von Go für die WebSocket-Entwicklung besteht aus nichts weiter als drei Situationen:

  • Verwendung der nativen Bibliothek von Go, also golang.org/x/net, aber Dies Die offizielle Bibliothek ist wirklich voller Fehler.
  • Mithilfe der GitHub-Boss-gorilla/websocket-Bibliothek kann sie mit bestimmten Webentwicklungs-Frameworks wie Gin, Iris usw. kombiniert werden, sofern das verwendete Framework auf Dann kann diese Bibliothek mit diesem Framework kombiniert werdengolang.org/net
  • Wählen Sie ein WebSocket-Framework manuell aus
Verwandte Lernempfehlungen:

Gehen Sie zum Sprachtutorial

Den Schätzungsergebnissen zufolge kommt es bei einem großen Barrage-Push zu einem Engpass im Linux-Kernel, da die maximale Paketsendefrequenz beim Senden von TCP-Paketen durch den Linux-Kernel 100 W beträgt. Daher können Sperrnachrichten innerhalb derselben Sekunde zu einem Push zusammengeführt werden, wodurch die Übertragung kleiner Datenpakete im Netzwerk reduziert und dadurch die Push-Frequenz verringert wird.

Das Sperrsystem muss langfristige Verbindungen für Online-Benutzer aufrechterhalten, um einen gezielten Push an Online-Benutzer zu erreichen. Es verwendet normalerweise eine Hash-Wörterbuchstruktur, und Push-Nachrichten durchlaufen normalerweise das online verwendete Hash-Wörterbuch. Während der Barrage-Push-Phase sind Benutzer ständig online und offline. Um Online-Benutzer zu halten, muss das Hash-Wörterbuch ständig geändert werden und ein übermäßiges Benutzervolumen führt zu Sperrengpässen. Daher kann die gesamte Hash-Struktur in mehrere Hash-Strukturen aufgeteilt werden, den mehreren Hash-Strukturen unterschiedliche Sperren hinzugefügt werden und Lese-/Schreibsperren anstelle von Mutex-Sperren verwendet werden.

Normalerweise interagiert der Server mit dem Client über eine JSON-Struktur, die eine ständige Kodierung und Dekodierung von JSON-Daten erfordert, was zu einem CPU-Engpass führt. Die Nachrichten werden zunächst zusammengeführt, dann codiert und schließlich wird die Hash-Struktur für Push abgefragt.

Bei den oben genannten Problemen handelt es sich um Probleme mit einer einzelnen Architektur. Um mehr Benutzerlasten zu unterstützen, verwendet das Sperrsystem normalerweise eine verteilte Architektur für elastische Expansion und Kontraktion.

(2) Drücken oder ziehen?

Wenn der Client die serverseitigen Daten abruft, treten die folgenden Probleme auf:

    Die große Anzahl an Personen, die in der Live-Übertragung online sind, führt dazu, dass die Nachrichtendaten häufig aktualisiert werden , also Ziehen der Nachricht Dies bedeutet, dass das Sperrfeuer die Aktualität nicht erfüllen kann
  • Wenn viele Clients gleichzeitig ziehen, ist der Druck auf der Serverseite der gleiche wie bei DDOS
  • Ein Sperrsystem sollte universell sein, also für Live-Bombing-Szenarien in Räumen mit weniger Szenen, die dazu führen, dass Nachrichtendaten-Pull-Anfragen ungültig sind
Daher betrachten wir den Push-Modus: Wenn die Daten aktualisiert werden, werden sie vom Server aktiv gepusht an den Client, wodurch die Anzahl der Clientanfragen effektiv reduziert werden kann. Wenn Message Push implementiert werden muss, bedeutet dies, dass der Server eine große Anzahl langer Verbindungen aufrechterhält.

(3) Warum WebSocket verwenden?

Um Echtzeitaktualisierungen von Sperrnachrichten zu erreichen, muss Socket verwendet werden. Warum also WebSocket verwenden? Heutzutage erfolgt die Entwicklung der meisten Live-Übertragungsanwendungen plattformübergreifend. Der Kern des plattformübergreifenden Entwicklungsframeworks ist jedoch die Webentwicklung. Daher muss es untrennbar mit WebSocket verbunden sein, und einige Benutzer entscheiden sich dafür, Videos im Web anzusehen, z. B. Bilibili. Heutzutage gibt es auch einige Desktop-Anwendungen, die mit plattformübergreifenden Frameworks wie Electron wie Lark und Feishu entwickelt werden. Daher ist die beste Möglichkeit, Message Push zu implementieren, die Verwendung von WebSocket.

Mit WebSocket können Sie problemlos eine lange Verbindung auf der Serverseite aufrechterhalten. Zweitens basiert WebSocket auf dem HTTP-Protokoll und kann auch HTTPS verwenden. Daher ist WebSocket eine zuverlässige Übertragung und erfordert keine Kosten für Entwickler Aufmerksamkeit auf die zugrunde liegenden Details.

Warum Go für WebSocket verwenden? Wenn Sie über WebSocket sprechen, denken Sie vielleicht an Node.js, aber Node.js ist ein Single-Thread-Modell. Wenn Sie eine hohe Parallelität erreichen möchten, müssen Sie mehrere Node.js-Prozesse erstellen, was jedoch nicht der Fall ist Für den Server ist es einfach, die gesamte Verbindungssammlung zu durchlaufen. Die Bereitstellung von Java-Projekten und das Schreiben von Docker-Dateien sind nicht so präzise wie die Zielbinärdatei von Go, und Go-Coroutinen können leicht eine hohe Parallelität erreichen In diesem Kapitel verfügt die Go-Sprache derzeit auch über ein ausgereiftes WebSocket-Rad.

(4) Grundlegende Demo des Servers

Erstellen Sie zunächst ein Framework:

package main

import (
  "fmt"
  "net/http"
)

func main() {
 fmt.Println("Listen localhost:8080")
   // 注册一个用于WebSocket的路由,实际业务中不可能只有一个路由
  http.HandleFunc("/messages", messageHandler)
  // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil
  http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  // TODO: 实现消息处理
  response.Write([]byte("HelloWorld"))
}

Verbessern Sie dann die messageHandler-Funktion:

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    _, bytes, err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 写入消息
    err = conn.WriteMessage(websocket.TextMessage, bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}

Implementieren Sie nun grundsätzlich die WebSocket-Funktion , aber die native API von Websocket ist nicht threadsicher (die Close-Methode ist threadsicher und wiedereintrittsfähig), und andere Module können die Geschäftslogik nicht wiederverwenden. Daher ist sie gekapselt:

    Kapselung Das Verbindungsobjekt beschreibt eine WebSocket-Verbindung
  • Bietet threadsichere Schließ-, Empfangs- und Sende-APIs für das Connection-Objekt
  • // main.go
    package main
    
    import (
      "bluemiaomiao.github.io/websocket-go/service"
      "fmt"
      "net/http"
    
      "github.com/gorilla/websocket"
    )
    
    func main() {
      fmt.Println("Listen localhost:8080")
      http.HandleFunc("/messages", messageHandler)
      _ = http.ListenAndServe("localhost:8080", nil)
    }
    
    func messageHandler(response http.ResponseWriter, request *http.Request) {
      var upgrader = websocket.Upgrader{
        // 允许跨域
        CheckOrigin: func(resquest *http.Request) bool {
          return true
        },
      }
    
      // 建立连接
      conn, err := upgrader.Upgrade(response, request, nil)
      wsConn, err := service.Create(conn)
      if err != nil {
        return
      }
    
      // 收发消息
      for {
        // 读取消息
        msg, err := wsConn.ReadOne()
        if err != nil {
          wsConn.Close()
        }
        // 写入消息
        err = wsConn.WriteOne(msg)
        if err != nil {
          _ = conn.Close()
        }
      }
    }
    // service/messsage_service.go
    package service
    
    import (
      "errors"
      "github.com/gorilla/websocket"
      "sync"
    )
    
    // 封装的连接对象
    // 
    // 由于websocket的Close()方法是可重入的,所以可以多次调用,但是关闭Channel的close()
    // 方法不是可重入的,因此通过isClosed进行判断
    // isClosed可能发生资源竞争,因此通过互斥锁避免
    // 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalCloseLoopChan实现
    type Connection struct {
      conn                  *websocket.Conn  // 具体的连接对象
      inputStream             chan []byte       // 输入流,使用Channel模拟
      outputStream           chan []byte       // 输出流,使用chaneel模拟
      signalCloseLoopChan     chan byte       // 关闭信号
      isClosed               bool            // 是否调用过close()方法
      lock                   sync.Mutex      // 简单的锁
    }
    
    // 用于初始化一个连接对象
    func Create(conn *websocket.Conn) (connection *Connection, err error) {
      connection = &Connection{
        conn:              conn,
        inputStream:        make(chan []byte, 1000),
        outputStream:       make(chan []byte, 1000),
        signalCloseLoopChan: make(chan byte, 1),
        isClosed:            false,
      }
    
      // 启动读写循环
      go connection.readLoop()
      go connection.writeLoop()
      return
    }
    
    // 读取一条消息
    func (c *Connection) ReadOne() (msg []byte, err error) {
      select {
      case msg = <-(*c).inputStream:
      case <-(*c).signalCloseLoopChan:
        err = errors.New("connection is closed")
      }
      return
    }
    
    // 写入一条消息
    func (c *Connection) WriteOne(msg []byte) (err error) {
      select {
      case (*c).outputStream <- msg:
      case <-(*c).signalCloseLoopChan:
        err = errors.New("connection is closed")
      }
      return
    }
    
    // 关闭连接对象
    func (c *Connection) Close() {
      _ = (*c).conn.Close()
      (*c).lock.Lock()
      if !(*c).isClosed {
        close((*c).signalCloseLoopChan)
      }
      (*c).lock.Unlock()
    
    }
    
    // 读取循环
    func (c *Connection) readLoop() {
      // 不停的读取长连接中的消息,只要存在消息就将其放到队列中
      for {
        _, bytes, err := (*c).conn.ReadMessage()
        if err != nil {
          (*c).Close()
        }
        select {
        case <-(*c).signalCloseLoopChan:
          (*c).Close()
        case (*c).inputStream <- bytes:
        }
      }
    }
    
    // 写入循环
    func (c *Connection) writeLoop() {
      // 只要队列中存在消息,就将其写入
      var data []byte
      for {
        select {
        case data = <-(*c).outputStream:
        case <-(*c).signalCloseLoopChan:
          (*c).Close()
        }
        err := (*c).conn.WriteMessage(websocket.TextMessage, data)
        if err != nil {
          _ = (*c).conn.Close()
        }
      }
    }
Bisher haben Sie gelernt, wie Sie mit Go einen WebSocket-Dienst erstellen .

Das obige ist der detaillierte Inhalt vonDetaillierte Code-Erklärung: Verwenden von Go zum Aufbau eines Live-Video-Sperrsystems basierend auf WebSocket. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:jb51.net. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen