>  기사  >  백엔드 개발  >  자세한 코드 설명: Go를 사용하여 WebSocket 기반 라이브 비디오 사격 시스템 구축

자세한 코드 설명: Go를 사용하여 WebSocket 기반 라이브 비디오 사격 시스템 구축

coldplay.xixi
coldplay.xixi앞으로
2020-07-21 17:27:034506검색

자세한 코드 설명: Go를 사용하여 WebSocket 기반 라이브 비디오 사격 시스템 구축

(1) 비즈니스 복잡성 소개

본론으로 들어가겠습니다. 500만 명이 동시에 라이브 방송실에 접속하고 초당 1,000번의 공격이 발생한다고 가정해 보겠습니다. 포격 시스템은 다음과 같습니다: 500W * 1000/초 = 50억 메시지/초 스테이션 B의 2019년 새해 전야 파티의 포격 시스템이 얼마나 훌륭했는지 생각해 보십시오. 게다가 대형 웹사이트에는 생방송 룸이 하나만 있을 수 없습니다! <code>500W * 1000条/秒=50亿条/秒 ,想想B站2019跨年晚会那次弹幕系统得是多么的NB,况且一个大型网站不可能只有一个直播间!

使用Go做WebSocket开发无非就是三种情况:

  • 使用Go原生自带的库,也就是 golang.org/x/net ,但是这个官方库真是出了奇Bug多
  • 使用GitHub大佬 gorilla/websocket 库,可以结合到某些Web开发框架,比如Gin、iris等,只要使用的框架式基于 golang.org/net
  • 사용방법은 3가지밖에 없습니다 WebSocket 개발 상황:

Go의 기본 라이브러리인 golang.org/x/net을 사용하지만 이 공식 라이브러리는 정말 놀라울 정도로 버그가 많습니다. GitHub 상사 gorilla /websocket 사용 라이브러리는 Gin, iris 등과 같은 특정 웹 개발 프레임워크와 결합될 수 있습니다. 사용되는 프레임워크가 golang.org/net을 기반으로 하는 한 이 라이브러리는 결합될 수 있습니다. 이 프레임 조합으로

WebSocket 프레임워크 살펴보기

관련 학습 권장사항:

Go 언어 튜토리얼

추정 결과에 따르면, 탄막 푸시량이 클 경우 Linux 커널에 병목 현상이 발생합니다. Linux 커널은 TCP를 보내기 때문에 패킷을 보낼 때 최대 패킷 전송 빈도는 100W입니다. 따라서 동일한 초 내의 공격 메시지를 하나의 푸시로 병합할 수 있어 네트워크에서 작은 데이터 패킷의 전송을 줄여 푸시 빈도를 줄일 수 있습니다.

공격 시스템은 온라인 사용자에 대한 타겟 푸시를 달성하기 위해 장기적인 연결을 유지해야 합니다. 일반적으로 푸시 메시지는 온라인 해시 사전을 통해 이동됩니다. Barrage Push 기간 동안 사용자는 온라인과 오프라인을 계속해서 유지해야 하며, 해시 사전을 지속적으로 수정해야 하며 과도한 사용자 볼륨으로 인해 잠금 병목 현상이 발생합니다. 따라서 전체 해시 구조를 여러 해시 구조로 분할하고, 여러 해시 구조에 서로 다른 잠금을 추가하고, 뮤텍스 잠금 대신 읽기-쓰기 잠금을 사용할 수 있습니다.

    일반적으로 서버와 클라이언트는 JSON 구조를 사용하여 상호 작용하는데, JSON 데이터의 지속적인 인코딩 및 디코딩이 필요하므로 CPU 병목 현상이 발생합니다. 메시지는 먼저 병합된 다음 인코딩되고 마지막으로 푸시를 위해 해시 구조가 폴링됩니다.
  • 위 내용은 단일 아키텍처에 존재하는 문제입니다. 더 많은 사용자 부하를 지원하기 위해 사격 시스템은 일반적으로 탄력적인 확장 및 수축을 위한 분산 아키텍처를 채택합니다.
  • (2) 밀까, 당길까?
  • 클라이언트가 서버 측 데이터를 가져오면 다음과 같은 문제가 발생합니다.

라이브 방송에서 온라인에 사람이 많다는 것은 메시지 데이터가 자주 업데이트된다는 것을 의미하고, 메시지를 가져오면 공세를 맞출 수 없다는 것을 의미합니다. 적시성

많은 클라이언트가 동시에 당기면 서버에 가해지는 압력은 DDOS와 다르지 않습니다

사격 시스템은 보편적이어야 하므로 생방송실에 사격이 거의 없는 시나리오의 경우 메시지 데이터 풀 요청이 유효하지 않습니다

그래서 우리는 푸시 모드를 고려합니다. 데이터가 업데이트되면 서버가 이를 클라이언트에 적극적으로 푸시하므로 클라이언트 요청 수를 효과적으로 줄일 수 있습니다. 메시지 푸시를 구현해야 하는 경우 이는 서버가 많은 수의 긴 연결을 유지한다는 의미입니다.

(3) WebSocket을 사용하는 이유는 무엇입니까?

연발 메시지의 실시간 업데이트를 달성하려면 소켓을 사용해야 하는데 왜 WebSocket을 사용합니까? 요즘 대부분의 라이브 방송 애플리케이션 개발은 크로스 플랫폼입니다. 그러나 크로스 플랫폼 개발 프레임워크의 본질은 웹 개발이므로 WebSocket과 분리될 수 없어야 하며 일부 사용자는 Bilibili와 같은 웹에서 비디오를 시청하기로 선택합니다. 요즘에는 Lark, Feishu 등 Electron과 같은 크로스 플랫폼 프레임워크를 사용하여 일부 데스크톱 애플리케이션이 개발되므로 메시지 푸시를 구현하는 가장 좋은 방법은 WebSocket을 사용하는 것입니다.

WebSocket을 사용하면 서버 측에서 긴 연결을 쉽게 유지할 수 있습니다. 둘째, WebSocket은 HTTP 프로토콜을 기반으로 구축되었으며 HTTPS도 사용할 수 있습니다. 따라서 WebSocket은 안정적인 전송이며 개발자가 기본 사항에 주의를 기울일 필요가 없습니다. 세부.

왜 WebSocket에 Go를 사용해야 하나요? 우선 WebSocket 하면 Node.js를 떠올릴 수 있는데 Node.js는 높은 동시성을 달성하려면 여러 개의 Node.js 프로세스를 만들어야 하는데 그렇지 않습니다. 서버가 전체 연결 컬렉션을 쉽게 탐색할 수 있습니다. Java를 사용하는 경우에는 Java 프로젝트 배포 및 Dockerfile 작성이 Go의 대상 바이너리만큼 간결하지 않으며 앞서 언급한 것처럼 Go 코루틴은 쉽게 높은 동시성을 달성할 수 있습니다. 장에서 Go 언어에는 현재 성숙한 WebSocket 휠도 있습니다.

    (4) 서버 측 기본 데모
  • 먼저 프레임워크를 구축하세요:
  • package main
    
    import (
      "fmt"
      "net/http"
    )
    
    func main() {
     fmt.Println("Listen localhost:8080")
       // 注册一个用于WebSocket的路由,实际业务中不可能只有一个路由
      http.HandleFunc("/messages", messageHandler)
      // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil
      http.ListenAndServe("localhost:8080", nil)
    }
    
    func messageHandler(response http.ResponseWriter, request *http.Request) {
      // TODO: 实现消息处理
      response.Write([]byte("HelloWorld"))
    }
  • 그런 다음 messageHandler 기능을 개선하세요:
func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    _, bytes, err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 写入消息
    err = conn.WriteMessage(websocket.TextMessage, bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}

이제 WebSocket 기능은 기본적으로 구현되지만 websocket의 기본 API는 스레드로부터 안전하지 않습니다(닫기 메서드는 스레드로부터 안전하고 재진입 가능), 다른 모듈은 비즈니스 로직을 재사용할 수 없으므로 캡슐화됩니다.

🎜🎜WebSocket 연결을 설명하기 위해 Connection 개체를 캡슐화합니다. 🎜🎜 스레드로부터 안전한 닫기, 수신 및 전송 API를 제공합니다. the Connection 객체 🎜 🎜
// main.go
package main

import (
  "bluemiaomiao.github.io/websocket-go/service"
  "fmt"
  "net/http"

  "github.com/gorilla/websocket"
)

func main() {
  fmt.Println("Listen localhost:8080")
  http.HandleFunc("/messages", messageHandler)
  _ = http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  wsConn, err := service.Create(conn)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    msg, err := wsConn.ReadOne()
    if err != nil {
      wsConn.Close()
    }
    // 写入消息
    err = wsConn.WriteOne(msg)
    if err != nil {
      _ = conn.Close()
    }
  }
}
// service/messsage_service.go
package service

import (
  "errors"
  "github.com/gorilla/websocket"
  "sync"
)

// 封装的连接对象
// 
// 由于websocket的Close()方法是可重入的,所以可以多次调用,但是关闭Channel的close()
// 方法不是可重入的,因此通过isClosed进行判断
// isClosed可能发生资源竞争,因此通过互斥锁避免
// 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalCloseLoopChan实现
type Connection struct {
  conn                  *websocket.Conn  // 具体的连接对象
  inputStream             chan []byte       // 输入流,使用Channel模拟
  outputStream           chan []byte       // 输出流,使用chaneel模拟
  signalCloseLoopChan     chan byte       // 关闭信号
  isClosed               bool            // 是否调用过close()方法
  lock                   sync.Mutex      // 简单的锁
}

// 用于初始化一个连接对象
func Create(conn *websocket.Conn) (connection *Connection, err error) {
  connection = &Connection{
    conn:              conn,
    inputStream:        make(chan []byte, 1000),
    outputStream:       make(chan []byte, 1000),
    signalCloseLoopChan: make(chan byte, 1),
    isClosed:            false,
  }

  // 启动读写循环
  go connection.readLoop()
  go connection.writeLoop()
  return
}

// 读取一条消息
func (c *Connection) ReadOne() (msg []byte, err error) {
  select {
  case msg = <-(*c).inputStream:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 写入一条消息
func (c *Connection) WriteOne(msg []byte) (err error) {
  select {
  case (*c).outputStream <- msg:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 关闭连接对象
func (c *Connection) Close() {
  _ = (*c).conn.Close()
  (*c).lock.Lock()
  if !(*c).isClosed {
    close((*c).signalCloseLoopChan)
  }
  (*c).lock.Unlock()

}

// 读取循环
func (c *Connection) readLoop() {
  // 不停的读取长连接中的消息,只要存在消息就将其放到队列中
  for {
    _, bytes, err := (*c).conn.ReadMessage()
    if err != nil {
      (*c).Close()
    }
    select {
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    case (*c).inputStream <- bytes:
    }
  }
}

// 写入循环
func (c *Connection) writeLoop() {
  // 只要队列中存在消息,就将其写入
  var data []byte
  for {
    select {
    case data = <-(*c).outputStream:
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    }
    err := (*c).conn.WriteMessage(websocket.TextMessage, data)
    if err != nil {
      _ = (*c).conn.Close()
    }
  }
}
🎜 지금까지 Go를 사용하여 WebSocket 서비스를 구축하는 방법을 배웠습니다. 🎜

위 내용은 자세한 코드 설명: Go를 사용하여 WebSocket 기반 라이브 비디오 사격 시스템 구축의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 jb51.net에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제