Home > Article > Backend Development > Write a simple WebSocket push service in Go language
##Push service implementation
Basic principleAfter the server starts, it will register two a Handler. websocketHandler is used to provide the browser to send Upgrade requests and upgrade to WebSocket connections. pushHandler is used to provide requests for external push terminals to send push data. The browser first connects to the websocketHandler (the default address is ws://ip:port/ws) and the upgrade request is a WebSocket connection. When the connection is established, registration information needs to be sent for registration. The registration information here contains a token information. The server will verify the provided token and obtain the corresponding userId (generally speaking, a userId may be associated with many tokens at the same time), and save and maintain the relationship between token, userId and conn (connection) . The push end sends a request to push data to pushHandler (the default address is ws://ip:port/push). The request contains the userId field and message field. The server will obtain all conns connected to the server at this time based on the userId, and then push the messages one by one. Due to the real-time nature of the push service, the pushed data does not and does not need to be cached.Detailed code explanation
I will briefly describe the basic structure of the code here, and also talk about some commonly used writing methods and patterns in the Go language (I also come from Other languages turn to the Go language. After all, the Go language is also quite young. So if you have any suggestions, please put them forward.). Since the inventors and some of the main maintainers ofGo language are mostly from the C/C language, the code of the Go language is also more biased towards the C/C system.
First take a look at the structure of Server:// Server defines parameters for running websocket server. type Server struct { // Address for server to listen on Addr string // Path for websocket request, default "/ws". WSPath string // Path for push message, default "/push". PushPath string // Upgrader is for upgrade connection to websocket connection using // "github.com/gorilla/websocket". // // If Upgrader is nil, default upgrader will be used. Default upgrader is // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always // returns true. Upgrader *websocket.Upgrader // Check token if it's valid and return userID. If token is valid, userID // must be returned and ok should be true. Otherwise ok should be false. AuthToken func(token string) (userID string, ok bool) // Authorize push request. Message will be sent if it returns true, // otherwise the request will be discarded. Default nil and push request // will always be accepted. PushAuth func(r *http.Request) bool wh *websocketHandler ph *pushHandler }Here is Upgrader *websocket.Upgrader, which is the object of the gorilla/websocket package, which is used to upgrade HTTP requests. If a structure has too many parameters, it is usually not recommended to initialize it directly, but to use the New method it provides. Here is:
// NewServer creates a new Server.func NewServer(addr string) *Server { return &Server{ Addr: addr, WSPath: serverDefaultWSPath, PushPath: serverDefaultPushPath, } }This is also a common usage of Go language to provide initialization methods to the outside world. Then Server uses the ListenAndServe method to start and listen to the port, similar to the use of the http package:
// ListenAndServe listens on the TCP network address and handle websocket // request. func (s *Server) ListenAndServe() error { b := &binder{ userID2EventConnMap: make(map[string]*[]eventConn), connID2UserIDMap: make(map[string]string), } // websocket request handler wh := websocketHandler{ upgrader: defaultUpgrader, binder: b, } if s.Upgrader != nil { wh.upgrader = s.Upgrader } if s.AuthToken != nil { wh.calcUserIDFunc = s.AuthToken } s.wh = &wh http.Handle(s.WSPath, s.wh) // push request handler ph := pushHandler{ binder: b, } if s.PushAuth != nil { ph.authFunc = s.PushAuth } s.ph = &ph http.Handle(s.PushPath, s.ph) return http.ListenAndServe(s.Addr, nil) }Here we generate two Handlers, namely websocketHandler and pushHandler. websocketHandler is responsible for establishing a connection with the browser and transmitting data, while pushHandler handles push-side requests. As you can see, both Handlers here encapsulate a binder object. This binder is used to maintain the relationship of token <-> userID <-> Conn:
// binder is defined to store the relation of userID and eventConn type binder struct { mu sync.RWMutex // map stores key: userID and value of related slice of eventConn userID2EventConnMap map[string]*[]eventConn // map stores key: connID and value: userID connID2UserIDMap map[string]string }websocketHandler Let’s take a closer look at the implementation of websocketHandler.
// websocketHandler defines to handle websocket upgrade request. type websocketHandler struct { // upgrader is used to upgrade request. upgrader *websocket.Upgrader // binder stores relations about websocket connection and userID. binder *binder // calcUserIDFunc defines to calculate userID by token. The userID will // be equal to token if this function is nil. calcUserIDFunc func(token string) (userID string, ok bool) }Very simple structure. websocketHandler implements the http.Handler interface:
// First try to upgrade connection to websocket. If success, connection will // be kept until client send close message or server drop them. func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { wsConn, err := wh.upgrader.Upgrade(w, r, nil) if err != nil { return } defer wsConn.Close() // handle Websocket request conn := NewConn(wsConn) conn.AfterReadFunc = func(messageType int, r io.Reader) { var rm RegisterMessage decoder := json.NewDecoder(r) if err := decoder.Decode(&rm); err != nil { return } // calculate userID by token userID := rm.Token if wh.calcUserIDFunc != nil { uID, ok := wh.calcUserIDFunc(rm.Token) if !ok { return } userID = uID } // bind wh.binder.Bind(userID, rm.Event, conn) } conn.BeforeCloseFunc = func() { // unbind wh.binder.Unbind(conn) } conn.Listen() }First convert the incoming http.Request into websocket.Conn, and then package it into our customized wserver.Conn (encapsulation, or combination , is a typical usage of Go language. Remember, Go language has no inheritance, only composition). Then the AfterReadFunc and BeforeCloseFunc methods of Conn are set, and then conn.Listen() is started. AfterReadFunc means that after Conn reads the data, it tries to verify and calculate the userID based on the token, and then bind registers the binding. BeforeCloseFunc performs the unbinding operation before Conn is closed. pushHandlerpushHandler is easy to understand. It parses the request and then pushes the data:
// Authorize if needed. Then decode the request and push message to each // realted websocket connection. func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } // authorize if s.authFunc != nil { if ok := s.authFunc(r); !ok { w.WriteHeader(http.StatusUnauthorized) return } } // read request var pm PushMessage decoder := json.NewDecoder(r.Body) if err := decoder.Decode(&pm); err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } // validate the data if pm.UserID == "" || pm.Event == "" || pm.Message == "" { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } cnt, err := s.push(pm.UserID, pm.Event, pm.Message) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt)) io.Copy(w, result) } Conn Conn (此处指 wserver.Conn) 为 websocket.Conn 的包装。 // Conn wraps websocket.Conn with Conn. It defines to listen and read // data from Conn. type Conn struct { Conn *websocket.Conn AfterReadFunc func(messageType int, r io.Reader) BeforeCloseFunc func() once sync.Once id string stopCh chan struct{} }The main method is Listen():
// Listen listens for receive data from websocket connection. It blocks // until websocket connection is closed. func (c *Conn) Listen() { c.Conn.SetCloseHandler(func(code int, text string) error { if c.BeforeCloseFunc != nil { c.BeforeCloseFunc() } if err := c.Close(); err != nil { log.Println(err) } message := websocket.FormatCloseMessage(code, "") c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) return nil }) // Keeps reading from Conn util get error. ReadLoop: for { select { case <-c.stopCh: break ReadLoop default: messageType, r, err := c.Conn.NextReader() if err != nil { // TODO: handle read error maybe break ReadLoop } if c.AfterReadFunc != nil { c.AfterReadFunc(messageType, r) } } } }Mainly sets up the processing and continuous reading of data when the websocket connection is closed. Recommended: g
The above is the detailed content of Write a simple WebSocket push service in Go language. For more information, please follow other related articles on the PHP Chinese website!