ホームページ  >  記事  >  バックエンド開発  >  コードの詳細説明: Go を使用して WebSocket に基づいたライブ ビデオ弾幕システムを構築する

コードの詳細説明: Go を使用して WebSocket に基づいたライブ ビデオ弾幕システムを構築する

coldplay.xixi
coldplay.xixi転載
2020-07-21 17:27:034412ブラウズ

コードの詳細説明: Go を使用して WebSocket に基づいたライブ ビデオ弾幕システムを構築する

(1) ビジネスの複雑さの概要

早速本題に入りますが、500 万人が同時にライブ ブロードキャスト ルームにオンラインで接続していると仮定します。 1 秒あたり 1,000 の弾幕があります。次に、弾幕システムです。プッシュ周波数は次のとおりです: 500W * 1000 アイテム/秒 = 50 億アイテム/秒 ステーション B の 2019 年大晦日パーティーの弾幕システムがどれほど素晴らしいかを考えてください。さらに、大規模な Web サイトには Studio が 1 つだけあるわけがありません。

Go を WebSocket 開発に使用する場合は、次の 3 つの状況にすぎません:

  • Go のネイティブ ライブラリ (golang.org/ を使用する) を使用するx/net ですが、この公式ライブラリは本当に驚くほどバグが多いです。
  • GitHub Bossgorilla/websocket ライブラリを使用してください。これは、Gin などの特定の Web 開発フレームワークと組み合わせることができます。 iris など、使用されているフレームワークが golang.org/net に基づいている限り、このライブラリはこのフレームワークと組み合わせることができます
  • WebSocket フレームワークの実践

関連する学習の推奨事項: Go 言語チュートリアル

推定結果によると、弾幕プッシュの量が大きい場合、Linux カーネルはLinux カーネルが TCP パケットを送信するときの最大パケット送信周波数は 100W であるため、ボトルネックになります。したがって、同じ秒内の集中メッセージを 1 つのプッシュにマージして、ネットワーク上の小さなデータ パケットの送信を減らし、プッシュの頻度を減らすことができます。

バラージ システムは、オンライン ユーザーへのターゲットを絞ったプッシュを実現するために、オンライン ユーザーの長期接続を維持する必要があります。通常、ハッシュ ディクショナリ構造を使用します。通常、プッシュ メッセージは、オンラインで使用されるハッシュ ディクショナリをトラバースします。集中プッシュ期間中、ユーザーは常にオンラインとオフラインを繰り返します。オンライン ユーザーを維持するには、ハッシュ ディクショナリを常に変更し、ロック操作を継続的に実行する必要があります。過剰なユーザー ボリュームは、ロックのボトルネックにつながります。したがって、ハッシュ構造全体を複数のハッシュ構造に分割し、複数のハッシュ構造に異なるロックを追加し、ミューテックス ロックの代わりに読み取り/書き込みロックを使用することができます。

通常、サーバーとクライアントは JSON 構造を使用して対話します。これには、JSON データの継続的なエンコードとデコードが必要であり、CPU ボトルネックが発生します。メッセージは最初にマージされ、次にエンコードされ、最後にプッシュのためにハッシュ構造がポーリングされます。

上記は単一アーキテクチャに存在する問題ですが、弾幕システムは通常、より多くのユーザー負荷をサポートするために、柔軟な拡張と縮小を行う分散アーキテクチャを採用します。

(2) 押すか引くか?

クライアントがサーバー側のデータを取得すると、次の問題が発生します:

  • ライブ ブロードキャストでオンラインに参加する人が多いということは、メッセージ データが頻繁に更新されることを意味します。 、したがってメッセージをプルします これは、集中砲火が適時性を満たすことができないことを意味します
  • 多くのクライアントが同時にプルする場合、サーバー側の圧力はDDOSと同じです
  • 集中砲火システムは次のようになります。普遍的であるため、ライブ ブロードキャスト ルームの箇条書きに使用します。 シーンが少ないシナリオは、メッセージ データのプル リクエストが無効であることを意味します。

そこで、プッシュ モードを検討します。データが更新されると、サーバーはそれをアクティブにプッシュします。これにより、クライアント要求の数を効果的に削減できます。メッセージ プッシュを実装する必要がある場合は、サーバーが多数の長い接続を維持することを意味します。

(3) WebSocket を使用する理由

集中砲火メッセージのリアルタイム更新を実現するには、Socket を使用する必要がありますが、なぜ WebSocket を使用するのでしょうか?現在、ライブ ブロードキャスト アプリケーション開発のほとんどはクロスプラットフォームですが、クロスプラットフォーム開発フレームワークの本質は Web 開発であるため、WebSocket と不可分である必要があり、Bilibili などの Web 上のビデオを視聴することを選択するユーザーもいます。最近では、Lark や Feishu などの Electron などのクロスプラットフォーム フレームワークを使用して開発されたデスクトップ アプリケーションもいくつかあるため、メッセージ プッシュを実装する最良の方法は WebSocket を使用することです。

WebSocket を使用すると、サーバー側で長時間の接続を簡単に維持できます。第 2 に、WebSocket は HTTP プロトコルに基づいて構築されており、HTTPS も使用できます。したがって、WebSocket は信頼性の高い伝送であり、開発者に料金を支払う必要はありません根底にある細部に注意を払います。

WebSocket に Go を使用する必要があるのはなぜですか?まず、WebSocket というと Node.js を思い浮かべるかもしれませんが、Node.js はシングルスレッドモデルであり、高い同時実行性を実現したい場合は複数の Node.js プロセスを作成する必要がありますが、そうではありません。サーバーが接続コレクション全体を横断するのは簡単ですが、Java を使用する場合は面倒に見えます。Java プロジェクトのデプロイメントと Dockerfile の作成は Go のターゲット バイナリほど簡潔ではなく、Go コルーチンは簡単に高い同時実行性を達成できます。この章で説明したように、Go 言語には現在、成熟した WebSocket ホイールもあります。

(4) サーバーサイドの基本デモ

最初にフレームワークを構築します:

package main

import (
  "fmt"
  "net/http"
)

func main() {
 fmt.Println("Listen localhost:8080")
   // 注册一个用于WebSocket的路由,实际业务中不可能只有一个路由
  http.HandleFunc("/messages", messageHandler)
  // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil
  http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  // TODO: 实现消息处理
  response.Write([]byte("HelloWorld"))
}

次に、messageHandler 関数を改善します:

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    _, bytes, err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 写入消息
    err = conn.WriteMessage(websocket.TextMessage, bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}

次に、基本的に WebSocket 関数を実装しますただし、WebSocket のネイティブ API はスレッドセーフではなく (Close メソッドはスレッドセーフで再入可能です)、他のモジュールはビジネス ロジックを再利用できないため、カプセル化されます。

  • Encapsulation The Connection オブジェクトWebSocket 接続について説明します
  • Connection オブジェクトのスレッドセーフな終了、受信、および送信 API を提供します
// main.go
package main

import (
  "bluemiaomiao.github.io/websocket-go/service"
  "fmt"
  "net/http"

  "github.com/gorilla/websocket"
)

func main() {
  fmt.Println("Listen localhost:8080")
  http.HandleFunc("/messages", messageHandler)
  _ = http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  wsConn, err := service.Create(conn)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    msg, err := wsConn.ReadOne()
    if err != nil {
      wsConn.Close()
    }
    // 写入消息
    err = wsConn.WriteOne(msg)
    if err != nil {
      _ = conn.Close()
    }
  }
}
// service/messsage_service.go
package service

import (
  "errors"
  "github.com/gorilla/websocket"
  "sync"
)

// 封装的连接对象
// 
// 由于websocket的Close()方法是可重入的,所以可以多次调用,但是关闭Channel的close()
// 方法不是可重入的,因此通过isClosed进行判断
// isClosed可能发生资源竞争,因此通过互斥锁避免
// 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalCloseLoopChan实现
type Connection struct {
  conn                  *websocket.Conn  // 具体的连接对象
  inputStream             chan []byte       // 输入流,使用Channel模拟
  outputStream           chan []byte       // 输出流,使用chaneel模拟
  signalCloseLoopChan     chan byte       // 关闭信号
  isClosed               bool            // 是否调用过close()方法
  lock                   sync.Mutex      // 简单的锁
}

// 用于初始化一个连接对象
func Create(conn *websocket.Conn) (connection *Connection, err error) {
  connection = &Connection{
    conn:              conn,
    inputStream:        make(chan []byte, 1000),
    outputStream:       make(chan []byte, 1000),
    signalCloseLoopChan: make(chan byte, 1),
    isClosed:            false,
  }

  // 启动读写循环
  go connection.readLoop()
  go connection.writeLoop()
  return
}

// 读取一条消息
func (c *Connection) ReadOne() (msg []byte, err error) {
  select {
  case msg = <-(*c).inputStream:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 写入一条消息
func (c *Connection) WriteOne(msg []byte) (err error) {
  select {
  case (*c).outputStream <- msg:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 关闭连接对象
func (c *Connection) Close() {
  _ = (*c).conn.Close()
  (*c).lock.Lock()
  if !(*c).isClosed {
    close((*c).signalCloseLoopChan)
  }
  (*c).lock.Unlock()

}

// 读取循环
func (c *Connection) readLoop() {
  // 不停的读取长连接中的消息,只要存在消息就将其放到队列中
  for {
    _, bytes, err := (*c).conn.ReadMessage()
    if err != nil {
      (*c).Close()
    }
    select {
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    case (*c).inputStream <- bytes:
    }
  }
}

// 写入循环
func (c *Connection) writeLoop() {
  // 只要队列中存在消息,就将其写入
  var data []byte
  for {
    select {
    case data = <-(*c).outputStream:
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    }
    err := (*c).conn.WriteMessage(websocket.TextMessage, data)
    if err != nil {
      _ = (*c).conn.Close()
    }
  }
}

これまで、Go を使用して WebSocket サービスを構築する方法を学習しました。 。

以上がコードの詳細説明: Go を使用して WebSocket に基づいたライブ ビデオ弾幕システムを構築するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjb51.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。