Maison >développement back-end >Golang >Explication détaillée du code : utiliser Go pour créer un système de barrage vidéo en direct basé sur WebSocket

Explication détaillée du code : utiliser Go pour créer un système de barrage vidéo en direct basé sur WebSocket

coldplay.xixi
coldplay.xixiavant
2020-07-21 17:27:034561parcourir

Explication détaillée du code : utiliser Go pour créer un système de barrage vidéo en direct basé sur WebSocket

(1) Introduction à la complexité des affaires

Allons droit au but Supposons que 500 000 personnes soient en ligne dans une salle de diffusion en direct en même temps, et là. Il y a 1 000 barrages par seconde, alors le système de barrage La fréquence de poussée est : 500W * 1000条/秒=50亿条/秒 Pensez à quel point le système de barrage de la fête du Nouvel An 2019 de la Station B était génial. De plus, un grand site Web ne peut pas avoir une seule salle de diffusion en direct !

Utiliser Go pour le développement WebSocket n'est rien de plus que trois situations :

  • Utiliser la bibliothèque native de Go, c'est-à-dire golang.org/x/net, mais ceci la bibliothèque officielle est vraiment pleine de bugs
  • En utilisant la bibliothèque GitHub boss gorilla/websocket, elle peut être combinée avec certains frameworks de développement web, comme Gin, iris, etc., à condition que le framework utilisé soit basé sur golang.org/net, Ensuite, cette bibliothèque peut être combinée avec ce framework
  • Sélectionnez manuellement un framework WebSocket

Recommandations d'apprentissage associées : Tutoriel de langage Go

Selon les résultats de l'estimation, lorsque la quantité de poussée de barrage est importante, le noyau Linux aura un goulot d'étranglement, car la fréquence maximale d'envoi de paquets lorsque le noyau Linux envoie des paquets TCP est de 100 W. Par conséquent, les messages de barrage au cours de la même seconde peuvent être fusionnés en une seule poussée, réduisant ainsi la transmission de petits paquets de données sur le réseau, réduisant ainsi la fréquence de poussée.

Le système de barrage doit maintenir des connexions à long terme pour les utilisateurs en ligne afin d'obtenir une diffusion ciblée vers les utilisateurs en ligne. Il utilise généralement une structure de dictionnaire de hachage, et les messages push traversent généralement le dictionnaire de hachage utilisé en ligne. Pendant la période de poussée du barrage, les utilisateurs sont constamment en ligne et hors ligne. Afin de maintenir les utilisateurs en ligne, le dictionnaire de hachage doit être constamment modifié et les opérations de verrouillage doivent être effectuées en continu. Un volume d'utilisateurs excessif entraîne des goulots d'étranglement de verrouillage. Par conséquent, la structure de hachage entière peut être divisée en plusieurs structures de hachage, ajouter différents verrous aux multiples structures de hachage et utiliser des verrous en lecture-écriture au lieu de verrous mutex.

Habituellement, le serveur interagit avec le client en utilisant une structure JSON, ce qui nécessite un encodage et un décodage constants des données JSON, ce qui provoquera un goulot d'étranglement du processeur. Les messages sont d'abord fusionnés, puis codés, et enfin la structure de hachage est interrogée pour le push.

Les problèmes ci-dessus concernent une architecture unique. Afin de prendre en charge davantage de charges d'utilisateurs, le système de barrage adopte généralement une architecture distribuée pour l'expansion et la contraction élastiques.

(2) Pousser ou tirer ?

Si le client extrait les données côté serveur, il y aura les problèmes suivants :

  • Le grand nombre de personnes en ligne lors de la diffusion en direct signifie que les données du message sont fréquemment mises à jour , donc tirer le message Cela signifie que le barrage ne peut pas respecter les délais
  • Si plusieurs clients le tirent en même temps, la pression côté serveur est la même que celle du DDOS
  • Un système de barrage devrait être universel, donc pour les bombardements de salles de diffusion en direct, les scénarios avec moins de scènes signifient que les demandes d'extraction de données de message ne sont pas valides

Par conséquent, nous considérons le mode push : lorsque les données sont mises à jour, le serveur les pousse activement au client, ce qui peut réduire efficacement le nombre de demandes des clients. Si le message push doit être implémenté, cela signifie que le serveur maintient un grand nombre de connexions longues.

(3) Pourquoi utiliser WebSocket ?

Pour réaliser des mises à jour en temps réel des messages de barrage, Socket doit être utilisé, alors pourquoi utiliser WebSocket ? De nos jours, la plupart des développements d'applications de diffusion en direct sont multiplateformes. Cependant, l'essence du cadre de développement multiplateforme est le développement Web, il doit donc être indissociable de WebSocket, et certains utilisateurs choisiront de regarder des vidéos sur le Web, comme Bilibili. De nos jours, certaines applications de bureau sont également développées à l'aide de frameworks multiplateformes tels qu'Electron, tels que Lark et Feishu, donc la meilleure façon d'implémenter le push de messages est d'utiliser WebSocket.

En utilisant WebSocket, vous pouvez facilement maintenir une longue connexion côté serveur. Deuxièmement, WebSocket est construit sur le protocole HTTP et peut également utiliser HTTPS. Par conséquent, WebSocket est une transmission fiable et ne nécessite pas de paiement pour les développeurs. attention aux détails sous-jacents.

Pourquoi utiliser Go pour WebSocket ? Tout d'abord, lorsque vous parlez de WebSocket, vous pensez peut-être à Node.js, mais Node.js est un modèle monothread. Si vous souhaitez obtenir une concurrence élevée, vous devez créer plusieurs processus Node.js, mais ce n'est pas le cas. il est facile pour le serveur de parcourir l'ensemble de la collection de connexions ; si vous utilisez Java, cela semble fastidieux. Le déploiement de projets Java et l'écriture de fichiers Docker ne sont pas aussi concis que le binaire cible de Go, et les coroutines Go peuvent facilement atteindre une concurrence élevée, comme mentionné dans le précédent. chapitre, le langage Go dispose actuellement également d'une roue WebSocket mature.

(4) Démo de base du serveur

Construisez d'abord un framework :

package main

import (
  "fmt"
  "net/http"
)

func main() {
 fmt.Println("Listen localhost:8080")
   // 注册一个用于WebSocket的路由,实际业务中不可能只有一个路由
  http.HandleFunc("/messages", messageHandler)
  // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil
  http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  // TODO: 实现消息处理
  response.Write([]byte("HelloWorld"))
}

Ensuite, améliorez la fonction messageHandler :

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    _, bytes, err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 写入消息
    err = conn.WriteMessage(websocket.TextMessage, bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}

Maintenant, implémentez essentiellement la fonction WebSocket , mais l'API native de websocket n'est pas thread-safe (la méthode Close est thread-safe et réentrante), et les autres modules ne peuvent pas réutiliser la logique métier, elle est donc encapsulée :

  • encapsulation L'objet Connection décrit une connexion WebSocket
  • Fournit des API de fermeture, de réception et d'envoi thread-safe pour l'objet Connection
// main.go
package main

import (
  "bluemiaomiao.github.io/websocket-go/service"
  "fmt"
  "net/http"

  "github.com/gorilla/websocket"
)

func main() {
  fmt.Println("Listen localhost:8080")
  http.HandleFunc("/messages", messageHandler)
  _ = http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  wsConn, err := service.Create(conn)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    msg, err := wsConn.ReadOne()
    if err != nil {
      wsConn.Close()
    }
    // 写入消息
    err = wsConn.WriteOne(msg)
    if err != nil {
      _ = conn.Close()
    }
  }
}
// service/messsage_service.go
package service

import (
  "errors"
  "github.com/gorilla/websocket"
  "sync"
)

// 封装的连接对象
// 
// 由于websocket的Close()方法是可重入的,所以可以多次调用,但是关闭Channel的close()
// 方法不是可重入的,因此通过isClosed进行判断
// isClosed可能发生资源竞争,因此通过互斥锁避免
// 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalCloseLoopChan实现
type Connection struct {
  conn                  *websocket.Conn  // 具体的连接对象
  inputStream             chan []byte       // 输入流,使用Channel模拟
  outputStream           chan []byte       // 输出流,使用chaneel模拟
  signalCloseLoopChan     chan byte       // 关闭信号
  isClosed               bool            // 是否调用过close()方法
  lock                   sync.Mutex      // 简单的锁
}

// 用于初始化一个连接对象
func Create(conn *websocket.Conn) (connection *Connection, err error) {
  connection = &Connection{
    conn:              conn,
    inputStream:        make(chan []byte, 1000),
    outputStream:       make(chan []byte, 1000),
    signalCloseLoopChan: make(chan byte, 1),
    isClosed:            false,
  }

  // 启动读写循环
  go connection.readLoop()
  go connection.writeLoop()
  return
}

// 读取一条消息
func (c *Connection) ReadOne() (msg []byte, err error) {
  select {
  case msg = <-(*c).inputStream:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 写入一条消息
func (c *Connection) WriteOne(msg []byte) (err error) {
  select {
  case (*c).outputStream <- msg:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 关闭连接对象
func (c *Connection) Close() {
  _ = (*c).conn.Close()
  (*c).lock.Lock()
  if !(*c).isClosed {
    close((*c).signalCloseLoopChan)
  }
  (*c).lock.Unlock()

}

// 读取循环
func (c *Connection) readLoop() {
  // 不停的读取长连接中的消息,只要存在消息就将其放到队列中
  for {
    _, bytes, err := (*c).conn.ReadMessage()
    if err != nil {
      (*c).Close()
    }
    select {
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    case (*c).inputStream <- bytes:
    }
  }
}

// 写入循环
func (c *Connection) writeLoop() {
  // 只要队列中存在消息,就将其写入
  var data []byte
  for {
    select {
    case data = <-(*c).outputStream:
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    }
    err := (*c).conn.WriteMessage(websocket.TextMessage, data)
    if err != nil {
      _ = (*c).conn.Close()
    }
  }
}

Jusqu'à présent, vous avez appris à utiliser Go pour créer un service WebSocket .

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer