Home >Backend Development >Golang >Detailed code explanation: Using Go to build a live video barrage system based on WebSocket

Detailed code explanation: Using Go to build a live video barrage system based on WebSocket

coldplay.xixi
coldplay.xixiforward
2020-07-21 17:27:034557browse

Detailed code explanation: Using Go to build a live video barrage system based on WebSocket

(1) Introduction to business complexity

Let’s get straight to the point. Assume that 5 million people are online in a live broadcast room at the same time, and there are 1,000 barrages per second. Then the barrage system The push frequency is: 500W * 1000 items/second = 5 billion items/second. Think about how awesome the barrage system of Station B’s 2019 New Year’s Eve Party was. Moreover, a large website cannot have only one Studio!

Using Go for WebSocket development is nothing more than three situations:

  • Use Go’s native library, which is golang. org/x/net, but this official library is really surprisingly buggy
  • Use the GitHub bossgorilla/websocket library, which can be combined with certain web development frameworks, such as Gin, iris, etc., as long as the framework used is based on golang.org/net, then this library can be combined with this framework
  • Hands on a WebSocket framework

Related learning recommendations: Go language tutorial

According to the estimation results, when the amount of barrage push is large, the Linux kernel will have a bottleneck, because Linux When the kernel sends TCP packets, the maximum packet sending frequency is 100W. Therefore, barrage messages within the same second can be merged into one push, reducing the transmission of small data packets on the network, thereby reducing the frequency of push.

The barrage system needs to maintain long-term connections for online users to achieve targeted push to online users. It usually uses a Hash dictionary structure. Usually, the push message is to traverse the Hash dictionary used online. During the barrage push period, users are constantly going online and offline. In order to maintain online users, the Hash dictionary must be constantly modified and lock operations must be performed continuously. Excessive user volume leads to lock bottlenecks. Therefore, the entire Hash structure can be split into multiple Hash structures, add different locks to the multiple Hash structures, and use read-write locks instead of mutex locks.

Usually the server and client interact using JSON structure, which requires constant encoding and decoding of JSON data, which will cause a CPU bottleneck. The messages are first merged, then encoded, and finally the Hash structure is polled for push.

The above are problems existing in a single architecture. In order to support more user loads, the barrage system usually adopts a distributed architecture for elastic expansion and contraction.

(2) Push or pull?

If the client pulls server-side data, then there will be the following problems:

  • The large number of people online in the live broadcast means that the message data is updated frequently, so pulling the message This means that the barrage cannot meet the timeliness
  • If many clients pull at the same time, the pressure on the server side is the same as DDOS
  • A barrage system should be universal, so for live broadcast room bullets Scenarios with fewer scenes mean that message data pull requests are invalid

So we consider the push mode: when the data is updated, the server actively pushes it to the client, which can effectively reduce The number of client requests. If message push needs to be implemented, it means that the server maintains a large number of long connections.

(3) Why use WebSocket?

To achieve real-time update of barrage messages, Socket must be used, so why use WebSocket? Nowadays, most live broadcast application development is cross-platform. However, the essence of cross-platform development framework is Web development, so it must be inseparable from WebSocket, and some users will choose to watch videos on the Web, such as Bilibili. Nowadays, there are also some desktop Applications are developed using cross-platform frameworks such as Electron, such as Lark and Feishu, so the best way to implement message push is to use WebSocket.

Using WebSocket, you can easily maintain a long connection on the server side. Secondly, WebSocket is built on the HTTP protocol and can also use HTTPS. Therefore, WebSocket is a reliable transmission and does not require developers to pay attention to the underlying details.

Why should we use Go for WebSocket? First of all, when talking about WebSocket, you may think of Node.js, but Node.js is a single-threaded model. If you want to achieve high concurrency, you have to create multiple Node.js processes, but it is not easy for the server to traverse the entire connection collection; if you use Java will appear cumbersome. The deployment of Java projects and writing Dockerfiles are not as concise as Go's target binary, and Go coroutines can easily achieve high concurrency. As mentioned in the previous chapter, the Go language currently also has a mature WebSocket wheel.

(4) Server-side Basic Demo

First build a framework:

package main

import (
  "fmt"
  "net/http"
)

func main() {
 fmt.Println("Listen localhost:8080")
   // 注册一个用于WebSocket的路由,实际业务中不可能只有一个路由
  http.HandleFunc("/messages", messageHandler)
  // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil
  http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  // TODO: 实现消息处理
  response.Write([]byte("HelloWorld"))
}

Then improve the messageHandler function:

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    _, bytes, err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 写入消息
    err = conn.WriteMessage(websocket.TextMessage, bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}

Now basically implement WebSocket function, but the native API of websocket is not thread-safe (the Close method is thread-safe and reentrant), and other modules cannot reuse business logic, so it is encapsulated:

  • Encapsulation The Connection object describes a WebSocket connection
  • Provides thread-safe closing, receiving, and sending APIs for the Connection object
// main.go
package main

import (
  "bluemiaomiao.github.io/websocket-go/service"
  "fmt"
  "net/http"

  "github.com/gorilla/websocket"
)

func main() {
  fmt.Println("Listen localhost:8080")
  http.HandleFunc("/messages", messageHandler)
  _ = http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  wsConn, err := service.Create(conn)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    msg, err := wsConn.ReadOne()
    if err != nil {
      wsConn.Close()
    }
    // 写入消息
    err = wsConn.WriteOne(msg)
    if err != nil {
      _ = conn.Close()
    }
  }
}
// service/messsage_service.go
package service

import (
  "errors"
  "github.com/gorilla/websocket"
  "sync"
)

// 封装的连接对象
// 
// 由于websocket的Close()方法是可重入的,所以可以多次调用,但是关闭Channel的close()
// 方法不是可重入的,因此通过isClosed进行判断
// isClosed可能发生资源竞争,因此通过互斥锁避免
// 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalCloseLoopChan实现
type Connection struct {
  conn                  *websocket.Conn  // 具体的连接对象
  inputStream             chan []byte       // 输入流,使用Channel模拟
  outputStream           chan []byte       // 输出流,使用chaneel模拟
  signalCloseLoopChan     chan byte       // 关闭信号
  isClosed               bool            // 是否调用过close()方法
  lock                   sync.Mutex      // 简单的锁
}

// 用于初始化一个连接对象
func Create(conn *websocket.Conn) (connection *Connection, err error) {
  connection = &Connection{
    conn:              conn,
    inputStream:        make(chan []byte, 1000),
    outputStream:       make(chan []byte, 1000),
    signalCloseLoopChan: make(chan byte, 1),
    isClosed:            false,
  }

  // 启动读写循环
  go connection.readLoop()
  go connection.writeLoop()
  return
}

// 读取一条消息
func (c *Connection) ReadOne() (msg []byte, err error) {
  select {
  case msg = <-(*c).inputStream:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 写入一条消息
func (c *Connection) WriteOne(msg []byte) (err error) {
  select {
  case (*c).outputStream <- msg:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 关闭连接对象
func (c *Connection) Close() {
  _ = (*c).conn.Close()
  (*c).lock.Lock()
  if !(*c).isClosed {
    close((*c).signalCloseLoopChan)
  }
  (*c).lock.Unlock()

}

// 读取循环
func (c *Connection) readLoop() {
  // 不停的读取长连接中的消息,只要存在消息就将其放到队列中
  for {
    _, bytes, err := (*c).conn.ReadMessage()
    if err != nil {
      (*c).Close()
    }
    select {
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    case (*c).inputStream <- bytes:
    }
  }
}

// 写入循环
func (c *Connection) writeLoop() {
  // 只要队列中存在消息,就将其写入
  var data []byte
  for {
    select {
    case data = <-(*c).outputStream:
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    }
    err := (*c).conn.WriteMessage(websocket.TextMessage, data)
    if err != nil {
      _ = (*c).conn.Close()
    }
  }
}

So far, you have learned how to use Go to build a WebSocket service.

The above is the detailed content of Detailed code explanation: Using Go to build a live video barrage system based on WebSocket. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:jb51.net. If there is any infringement, please contact admin@php.cn delete