プッシュ サービスの実装
基本原理サーバーが起動すると、 Handler を 2 つ登録します。 websocketHandler は、アップグレード リクエストを送信し、WebSocket 接続にアップグレードするためのブラウザを提供するために使用されます。 pushHandler は、外部プッシュ端末にプッシュ データを送信するリクエストを提供するために使用されます。 ブラウザは最初に websocketHandler (デフォルトのアドレスは ws://ip:port/ws) に接続し、アップグレード要求は WebSocket 接続です。接続が確立されたら、登録情報を送信する必要があります。登録。ここでの登録情報にはトークン情報が含まれます。 サーバーは提供されたトークンを検証し、対応する userId を取得し (一般に、userId は同時に多くのトークンに関連付けられる可能性があります)、トークン、userId、conn (接続) の間の関係を保存して維持します。 )。 プッシュ エンドは、データをプッシュするリクエストを PushHandler に送信します (デフォルトのアドレスは ws://ip:port/push)。リクエストには userId フィールドとメッセージ フィールドが含まれます。サーバーは、userId に基づいて、現時点でサーバーに接続されているすべての接続を取得し、メッセージを 1 つずつプッシュします。 プッシュ サービスのリアルタイム性により、プッシュされたデータはキャッシュされず、キャッシュする必要もありません。コードの詳細な説明
ここではコードの基本構造を簡単に説明し、Go 言語で一般的に使用されるいくつかの記述方法とパターンについても説明します (I他の言語も Go 言語に変わります。結局のところ、Go 言語も非常に若いです。そのため、何か提案があれば、ぜひ提案してください。) Go 言語の発明者と主要なメンテナーの一部はほとんどが C/C 言語出身であるため、Go 言語のコードも C/C システムに偏っています。 最初にサーバーの構造を見てみましょう:
// Server defines parameters for running websocket server. type Server struct { // Address for server to listen on Addr string // Path for websocket request, default "/ws". WSPath string // Path for push message, default "/push". PushPath string // Upgrader is for upgrade connection to websocket connection using // "github.com/gorilla/websocket". // // If Upgrader is nil, default upgrader will be used. Default upgrader is // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always // returns true. Upgrader *websocket.Upgrader // Check token if it's valid and return userID. If token is valid, userID // must be returned and ok should be true. Otherwise ok should be false. AuthToken func(token string) (userID string, ok bool) // Authorize push request. Message will be sent if it returns true, // otherwise the request will be discarded. Default nil and push request // will always be accepted. PushAuth func(r *http.Request) bool wh *websocketHandler ph *pushHandler }
ここに Upgrader *websocket.Upgrader があります。これは、HTTP 要求をアップグレードするために使用される、ゴリラ/websocket パッケージのオブジェクトです。
構造体にパラメータが多すぎる場合は、通常、構造体を直接初期化することは推奨されず、構造体が提供する New メソッドを使用することをお勧めします。
// NewServer creates a new Server.func NewServer(addr string) *Server { return &Server{ Addr: addr, WSPath: serverDefaultWSPath, PushPath: serverDefaultPushPath, } }
これは、初期化メソッドを外部に提供するための Go 言語の一般的な使用法でもあります。
次に、サーバーは、http パッケージの使用と同様に、ListenAndServe メソッドを使用してポートを開始し、リッスンします。
// ListenAndServe listens on the TCP network address and handle websocket // request. func (s *Server) ListenAndServe() error { b := &binder{ userID2EventConnMap: make(map[string]*[]eventConn), connID2UserIDMap: make(map[string]string), } // websocket request handler wh := websocketHandler{ upgrader: defaultUpgrader, binder: b, } if s.Upgrader != nil { wh.upgrader = s.Upgrader } if s.AuthToken != nil { wh.calcUserIDFunc = s.AuthToken } s.wh = &wh http.Handle(s.WSPath, s.wh) // push request handler ph := pushHandler{ binder: b, } if s.PushAuth != nil { ph.authFunc = s.PushAuth } s.ph = &ph http.Handle(s.PushPath, s.ph) return http.ListenAndServe(s.Addr, nil) }
ここでは、websocketHandler と PushHandler という 2 つのハンドラーを生成します。 websocketHandler はブラウザとの接続を確立してデータを送信する役割を果たし、pushHandler はプッシュ側のリクエストを処理します。
ご覧のとおり、ここでは両方のハンドラーがバインダー オブジェクトをカプセル化しています。このバインダーは、トークン <-> userID <-> Conn:
// binder is defined to store the relation of userID and eventConn type binder struct { mu sync.RWMutex // map stores key: userID and value of related slice of eventConn userID2EventConnMap map[string]*[]eventConn // map stores key: connID and value: userID connID2UserIDMap map[string]string }
websocketHandler
の関係を維持するために使用されます。websocketHandler の実装を詳しく見てみましょう。
// websocketHandler defines to handle websocket upgrade request. type websocketHandler struct { // upgrader is used to upgrade request. upgrader *websocket.Upgrader // binder stores relations about websocket connection and userID. binder *binder // calcUserIDFunc defines to calculate userID by token. The userID will // be equal to token if this function is nil. calcUserIDFunc func(token string) (userID string, ok bool) }
非常にシンプルな構造です。 websocketHandler は http.Handler インターフェイスを実装します。
// First try to upgrade connection to websocket. If success, connection will // be kept until client send close message or server drop them. func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { wsConn, err := wh.upgrader.Upgrade(w, r, nil) if err != nil { return } defer wsConn.Close() // handle Websocket request conn := NewConn(wsConn) conn.AfterReadFunc = func(messageType int, r io.Reader) { var rm RegisterMessage decoder := json.NewDecoder(r) if err := decoder.Decode(&rm); err != nil { return } // calculate userID by token userID := rm.Token if wh.calcUserIDFunc != nil { uID, ok := wh.calcUserIDFunc(rm.Token) if !ok { return } userID = uID } // bind wh.binder.Bind(userID, rm.Event, conn) } conn.BeforeCloseFunc = func() { // unbind wh.binder.Unbind(conn) } conn.Listen() }
まず、受信した http.Request を websocket.Conn に変換し、それからカスタマイズされた wserver.Conn にパッケージ化します (カプセル化、または組み合わせは、Go 言語の一般的な使用法です) Go 言語には継承がなく、合成のみがあることを覚えておいてください。
次に、Conn の AfterReadFunc メソッドと BeforeCloseFunc メソッドが設定され、conn.Listen() が開始されます。 AfterReadFunc は、Conn がデータを読み取った後、トークンに基づいて userID の検証と計算を試み、その後、bind がバインディングを登録することを意味します。 BeforeCloseFunc は、Conn が閉じられる前にバインド解除操作を実行します。
pushHandler
pushHandler は理解しやすいです。リクエストを解析し、データをプッシュします。
// Authorize if needed. Then decode the request and push message to each // realted websocket connection. func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } // authorize if s.authFunc != nil { if ok := s.authFunc(r); !ok { w.WriteHeader(http.StatusUnauthorized) return } } // read request var pm PushMessage decoder := json.NewDecoder(r.Body) if err := decoder.Decode(&pm); err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } // validate the data if pm.UserID == "" || pm.Event == "" || pm.Message == "" { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } cnt, err := s.push(pm.UserID, pm.Event, pm.Message) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt)) io.Copy(w, result) } Conn Conn (此处指 wserver.Conn) 为 websocket.Conn 的包装。 // Conn wraps websocket.Conn with Conn. It defines to listen and read // data from Conn. type Conn struct { Conn *websocket.Conn AfterReadFunc func(messageType int, r io.Reader) BeforeCloseFunc func() once sync.Once id string stopCh chan struct{} }
メイン メソッドは Listen() です。
// Listen listens for receive data from websocket connection. It blocks // until websocket connection is closed. func (c *Conn) Listen() { c.Conn.SetCloseHandler(func(code int, text string) error { if c.BeforeCloseFunc != nil { c.BeforeCloseFunc() } if err := c.Close(); err != nil { log.Println(err) } message := websocket.FormatCloseMessage(code, "") c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) return nil }) // Keeps reading from Conn util get error. ReadLoop: for { select { case <-c.stopCh: break ReadLoop default: messageType, r, err := c.Conn.NextReader() if err != nil { // TODO: handle read error maybe break ReadLoop } if c.AfterReadFunc != nil { c.AfterReadFunc(messageType, r) } } } }
主に、WebSocket 接続が閉じられたときのデータの処理と連続読み取りを設定します。
推奨: g
olang チュートリアル以上がGo 言語で単純な WebSocket プッシュ サービスを作成するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。