ホームページ >バックエンド開発 >Golang >Go 言語で単純な WebSocket プッシュ サービスを作成する

Go 言語で単純な WebSocket プッシュ サービスを作成する

尚
転載
2019-11-25 16:13:232533ブラウズ

Go 言語で単純な WebSocket プッシュ サービスを作成する

プッシュ サービスの実装

基本原理

サーバーが起動すると、 Handler を 2 つ登録します。

websocketHandler は、アップグレード リクエストを送信し、WebSocket 接続にアップグレードするためのブラウザを提供するために使用されます。

pushHandler は、外部プッシュ端末にプッシュ データを送信するリクエストを提供するために使用されます。

ブラウザは最初に websocketHandler (デフォルトのアドレスは ws://ip:port/ws) に接続し、アップグレード要求は WebSocket 接続です。接続が確立されたら、登録情報を送信する必要があります。登録。ここでの登録情報にはトークン情報が含まれます。

サーバーは提供されたトークンを検証し、対応する userId を取得し (一般に、userId は同時に多くのトークンに関連付けられる可能性があります)、トークン、userId、conn (接続) の間の関係を保存して維持します。 )。

プッシュ エンドは、データをプッシュするリクエストを PushHandler に送信します (デフォルトのアドレスは ws://ip:port/push)。リクエストには userId フィールドとメッセージ フィールドが含まれます。サーバーは、userId に基づいて、現時点でサーバーに接続されているすべての接続を取得し、メッセージを 1 つずつプッシュします。

プッシュ サービスのリアルタイム性により、プッシュされたデータはキャッシュされず、キャッシュする必要もありません。

コードの詳細な説明

ここではコードの基本構造を簡単に説明し、Go 言語で一般的に使用されるいくつかの記述方法とパターンについても説明します (I他の言語も Go 言語に変わります。結局のところ、Go 言語も非常に若いです。そのため、何か提案があれば、ぜひ提案してください。)

Go 言語

の発明者と主要なメンテナーの一部はほとんどが C/C 言語出身であるため、Go 言語のコードも C/C システムに偏っています。 最初にサーバーの構造を見てみましょう:

// Server defines parameters for running websocket server.
type Server struct {
    // Address for server to listen on
    Addr string

    // Path for websocket request, default "/ws".
    WSPath string

    // Path for push message, default "/push".
    PushPath string

    // Upgrader is for upgrade connection to websocket connection using
    // "github.com/gorilla/websocket".
    //
    // If Upgrader is nil, default upgrader will be used. Default upgrader is
    // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always
    // returns true.
    Upgrader *websocket.Upgrader

    // Check token if it's valid and return userID. If token is valid, userID
    // must be returned and ok should be true. Otherwise ok should be false.
    AuthToken func(token string) (userID string, ok bool)

    // Authorize push request. Message will be sent if it returns true,
    // otherwise the request will be discarded. Default nil and push request
    // will always be accepted.
    PushAuth func(r *http.Request) bool

    wh *websocketHandler
    ph *pushHandler
}

ここに Upgrader *websocket.Upgrader があります。これは、HTTP 要求をアップグレードするために使用される、ゴリラ/websocket パッケージのオブジェクトです。

構造体にパラメータが多すぎる場合は、通常、構造体を直接初期化することは推奨されず、構造体が提供する New メソッドを使用することをお勧めします。

// NewServer creates a new Server.func NewServer(addr string) *Server {    return &Server{
        Addr:     addr,
        WSPath:   serverDefaultWSPath,
        PushPath: serverDefaultPushPath,
    }
}

これは、初期化メソッドを外部に提供するための Go 言語の一般的な使用法でもあります。

次に、サーバーは、http パッケージの使用と同様に、ListenAndServe メソッドを使用してポートを開始し、リッスンします。

// ListenAndServe listens on the TCP network address and handle websocket
// request.
func (s *Server) ListenAndServe() error {
    b := &binder{
        userID2EventConnMap: make(map[string]*[]eventConn),
        connID2UserIDMap:    make(map[string]string),
    }

    // websocket request handler
    wh := websocketHandler{
        upgrader: defaultUpgrader,
        binder:   b,
    }
    if s.Upgrader != nil {
        wh.upgrader = s.Upgrader
    }
    if s.AuthToken != nil {
        wh.calcUserIDFunc = s.AuthToken
    }
    s.wh = &wh
    http.Handle(s.WSPath, s.wh)

    // push request handler
    ph := pushHandler{
        binder: b,
    }
    if s.PushAuth != nil {
        ph.authFunc = s.PushAuth
    }
    s.ph = &ph
    http.Handle(s.PushPath, s.ph)

    return http.ListenAndServe(s.Addr, nil)
}

ここでは、websocketHandler と PushHandler という 2 つのハンドラーを生成します。 websocketHandler はブラウザとの接続を確立してデータを送信する役割を果たし、pushHandler はプッシュ側のリクエストを処理します。

ご覧のとおり、ここでは両方のハンドラーがバインダー オブジェクトをカプセル化しています。このバインダーは、トークン <-> userID <-> Conn:

// binder is defined to store the relation of userID and eventConn
type binder struct {
    mu sync.RWMutex

    // map stores key: userID and value of related slice of eventConn
    userID2EventConnMap map[string]*[]eventConn

    // map stores key: connID and value: userID
    connID2UserIDMap map[string]string
}

websocketHandler

の関係を維持するために使用されます。websocketHandler の実装を詳しく見てみましょう。

// websocketHandler defines to handle websocket upgrade request.
type websocketHandler struct {
    // upgrader is used to upgrade request.
    upgrader *websocket.Upgrader

    // binder stores relations about websocket connection and userID.
    binder *binder

    // calcUserIDFunc defines to calculate userID by token. The userID will
    // be equal to token if this function is nil.
    calcUserIDFunc func(token string) (userID string, ok bool)
}

非常にシンプルな構造です。 websocketHandler は http.Handler インターフェイスを実装します。

// First try to upgrade connection to websocket. If success, connection will
// be kept until client send close message or server drop them.
func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    wsConn, err := wh.upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }
    defer wsConn.Close()

    // handle Websocket request
    conn := NewConn(wsConn)
    conn.AfterReadFunc = func(messageType int, r io.Reader) {
        var rm RegisterMessage
        decoder := json.NewDecoder(r)
        if err := decoder.Decode(&rm); err != nil {
            return
        }

        // calculate userID by token
        userID := rm.Token
        if wh.calcUserIDFunc != nil {
            uID, ok := wh.calcUserIDFunc(rm.Token)
            if !ok {
                return
            }
            userID = uID
        }

        // bind
        wh.binder.Bind(userID, rm.Event, conn)
    }
    conn.BeforeCloseFunc = func() {
        // unbind
        wh.binder.Unbind(conn)
    }

    conn.Listen()
}

まず、受信した http.Request を websocket.Conn に変換し、それからカスタマイズされた wserver.Conn にパッケージ化します (カプセル化、または組み合わせは、Go 言語の一般的な使用法です) Go 言語には継承がなく、合成のみがあることを覚えておいてください。

次に、Conn の AfterReadFunc メソッドと BeforeCloseFunc メソッドが設定され、conn.Listen() が開始されます。 AfterReadFunc は、Conn がデータを読み取った後、トークンに基づいて userID の検証と計算を試み、その後、bind がバインディングを登録することを意味します。 BeforeCloseFunc は、Conn が閉じられる前にバインド解除操作を実行します。

pushHandler

pushHandler は理解しやすいです。リクエストを解析し、データをプッシュします。

// Authorize if needed. Then decode the request and push message to each
// realted websocket connection.
func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // authorize
    if s.authFunc != nil {
        if ok := s.authFunc(r); !ok {
            w.WriteHeader(http.StatusUnauthorized)
            return
        }
    }

    // read request
    var pm PushMessage
    decoder := json.NewDecoder(r.Body)
    if err := decoder.Decode(&pm); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        w.Write([]byte(ErrRequestIllegal.Error()))
        return
    }

    // validate the data
    if pm.UserID == "" || pm.Event == "" || pm.Message == "" {
        w.WriteHeader(http.StatusBadRequest)
        w.Write([]byte(ErrRequestIllegal.Error()))
        return
    }

    cnt, err := s.push(pm.UserID, pm.Event, pm.Message)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        w.Write([]byte(err.Error()))
        return
    }

    result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt))
    io.Copy(w, result)
}
Conn
Conn (此处指 wserver.Conn) 为 websocket.Conn 的包装。

// Conn wraps websocket.Conn with Conn. It defines to listen and read
// data from Conn.
type Conn struct {
    Conn *websocket.Conn

    AfterReadFunc   func(messageType int, r io.Reader)
    BeforeCloseFunc func()

    once   sync.Once
    id     string
    stopCh chan struct{}
}

メイン メソッドは Listen() です。

// Listen listens for receive data from websocket connection. It blocks
// until websocket connection is closed.
func (c *Conn) Listen() {
    c.Conn.SetCloseHandler(func(code int, text string) error {
        if c.BeforeCloseFunc != nil {
            c.BeforeCloseFunc()
        }

        if err := c.Close(); err != nil {
            log.Println(err)
        }

        message := websocket.FormatCloseMessage(code, "")
        c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
        return nil
    })

    // Keeps reading from Conn util get error.
ReadLoop:
    for {
        select {
        case <-c.stopCh:
            break ReadLoop
        default:
            messageType, r, err := c.Conn.NextReader()
            if err != nil {
                // TODO: handle read error maybe
                break ReadLoop
            }

            if c.AfterReadFunc != nil {
                c.AfterReadFunc(messageType, r)
            }
        }
    }
}

主に、WebSocket 接続が閉じられたときのデータの処理と連続読み取りを設定します。

推奨: g

olang チュートリアル

以上がGo 言語で単純な WebSocket プッシュ サービスを作成するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はcnblogs.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。