Rumah  >  Artikel  >  pangkalan data  >  Bagaimana untuk melaksanakan penghurai protokol Redis berdasarkan Golang

Bagaimana untuk melaksanakan penghurai protokol Redis berdasarkan Golang

WBOY
WBOYke hadapan
2023-05-28 19:13:21745semak imbas

Protokol RESP

RESP ialah protokol untuk komunikasi antara klien dan pelayan Terdapat lima format:

Balasan biasa: bermula dengan "+" dan berakhir dengan " Borang rentetan berakhir dengan "rn"

Balasan ralat: Borang rentetan bermula dengan "-" dan berakhir dengan "rn"

Integer: Bermula dengan ":" dan berakhir dengan "rn " Borang rentetan

Rentetan berbilang baris: bermula dengan "$", diikuti dengan bilangan bait sebenar yang dihantar, dan kemudian bermula dan berakhir dengan "rn"

$3rn abcrn

Array: bermula dengan "*", diikuti dengan bilangan ahli

SET nilai kunci
*3rn$3rnSETrn$3rnkuncirn$5rnnilairn

Arahan atau data yang dihantar oleh klien dan pelayan sentiasa menggunakan rn ( CRLF) sebagai watak baris baharu .

Apabila kita memasukkan satu siri perintah seperti *3rn$3rnSETrn$3rnkuncirn$5rnnilairn , perkhidmatan Terminal menerima arahan berikut:
*3rn
$3rn
SETrn
$3rn
keyrn
$5rn
valuern

antara muka/resp/conn.go

type Connection interface {
   Write([]byte) error
   GetDBIndex() int
   SelectDB(int)
}

interface/resp/reply.go
type Reply interface {
	ToBytes() []byte
}
  • Antara muka sambungan: sambungan untuk klien Redis

  • Tulis: kepada pelanggan Tamatkan mesej balasan

  • GetDBIndex: Redis mempunyai 16 DB

  • Antara muka balasan: Antara muka respons

resp/reply/consts.go

type PongReply struct{}

var pongBytes = []byte("+PONG\r\n")

func (r *PongReply) ToBytes() []byte {
    return pongBytes
}

var thePongReply = new(PongReply)

func MakePongReply() *PongReply {
    return thePongReply
}

type OkReply struct{}

var okBytes = []byte("+OK\r\n")

func (r *OkReply) ToBytes() []byte {
    return okBytes
}

var theOkReply = new(OkReply)

func MakeOkReply() *OkReply {
    return theOkReply
}

var nullBulkBytes = []byte("$-1\r\n")

type NullBulkReply struct{}

func (r *NullBulkReply) ToBytes() []byte {
    return nullBulkBytes
}

func MakeNullBulkReply() *NullBulkReply {
    return &NullBulkReply{}
}

var emptyMultiBulkBytes = []byte("*0\r\n")

type EmptyMultiBulkReply struct{}

func (r *EmptyMultiBulkReply) ToBytes() []byte {
    return emptyMultiBulkBytes
}

type NoReply struct{}

var noBytes = []byte("")

func (r *NoReply) ToBytes() []byte {
    return noBytes
}

Tentukan lima balasan: balas pong, ok, batal, tatasusunan kosong, kosong

balas/balas / reply.go

type ErrorReply interface {
   Error() string
   ToBytes() []byte
}

ErrorReply: Tentukan antara muka ralat

resp/reply/errors.go

type UnknownErrReply struct{}

var unknownErrBytes = []byte("-Err unknown\r\n")

func (r *UnknownErrReply) ToBytes() []byte {
   return unknownErrBytes
}

func (r *UnknownErrReply) Error() string {
   return "Err unknown"
}

type ArgNumErrReply struct {
   Cmd string
}

func (r *ArgNumErrReply) ToBytes() []byte {
   return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}

func (r *ArgNumErrReply) Error() string {
   return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}

func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
   return &ArgNumErrReply{
      Cmd: cmd,
   }
}

type SyntaxErrReply struct{}

var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}

func MakeSyntaxErrReply() *SyntaxErrReply {
   return theSyntaxErrReply
}

func (r *SyntaxErrReply) ToBytes() []byte {
   return syntaxErrBytes
}

func (r *SyntaxErrReply) Error() string {
   return "Err syntax error"
}

type WrongTypeErrReply struct{}

var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")

func (r *WrongTypeErrReply) ToBytes() []byte {
   return wrongTypeErrBytes
}

func (r *WrongTypeErrReply) Error() string {
   return "WRONGTYPE Operation against a key holding the wrong kind of value"
}

type ProtocolErrReply struct {
   Msg string
}

func (r *ProtocolErrReply) ToBytes() []byte {
   return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}

func (r *ProtocolErrReply) Error() string {
   return "ERR Protocol error: '" + r.Msg
}

ralat mentakrifkan 5 jenis ralat: Ralat tidak diketahui UnknownErrReply, ralat nombor parameter ArgNumErrReply, ralat sintaks SyntaxErrReply, ralat jenis data WrongTypeErrReply, ralat protokol ProtocolErrReply

resp/reply/reply.go

  • IntReply: Balasan berangka

  • StandardErrReply: Balasan ralat standard

  • IsErrorReply: Tentukan sama ada ia ralat balas

  • ToBytes: Tukar rentetan ke dalam format yang ditentukan oleh protokol RESP

  • resp/parser/parser.go

  • var (
       nullBulkReplyBytes = []byte("$-1")
       // 协议的结尾
       CRLF = "\r\n"
    )
    
    type BulkReply struct {
       Arg []byte
    }
    
    func MakeBulkReply(arg []byte) *BulkReply {
       return &BulkReply{
          Arg: arg,
       }
    }
    
    func (r *BulkReply) ToBytes() []byte {
       if len(r.Arg) == 0 {
          return nullBulkReplyBytes
       }
       return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
    }
    
    type MultiBulkReply struct {
       Args [][]byte
    }
    
    func (r *MultiBulkReply) ToBytes() []byte {
       argLen := len(r.Args)
       var buf bytes.Buffer
       buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
       for _, arg := range r.Args {
          if arg == nil {
             buf.WriteString("$-1" + CRLF)
          } else {
             buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
          }
       }
       return buf.Bytes()
    }
    
    func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
       return &MultiBulkReply{
          Args: args,
       }
    }
    
    type StatusReply struct {
       Status string
    }
    
    func MakeStatusReply(status string) *StatusReply {
       return &StatusReply{
          Status: status,
       }
    }
    
    func (r *StatusReply) ToBytes() []byte {
       return []byte("+" + r.Status + CRLF)
    }
    
    type IntReply struct {
       Code int64
    }
    
    func MakeIntReply(code int64) *IntReply {
       return &IntReply{
          Code: code,
       }
    }
    
    func (r *IntReply) ToBytes() []byte {
       return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
    }
    
    type StandardErrReply struct {
       Status string
    }
    
    func (r *StandardErrReply) ToBytes() []byte {
       return []byte("-" + r.Status + CRLF)
    }
    
    func (r *StandardErrReply) Error() string {
       return r.Status
    }
    
    func MakeErrReply(status string) *StandardErrReply {
       return &StandardErrReply{
          Status: status,
       }
    }
    
    func IsErrorReply(reply resp.Reply) bool {
       return reply.ToBytes()[0] == '-'
    }
  • Struktur muatan: Data yang dihantar kepada kami oleh pihak perkhidmatan pelanggan

    Balasan: Data yang dihantar oleh pelanggan dan pelayan antara satu sama lain dipanggil Balas
  • readState struktur:

readingMultiLine: menghuraikan satu baris atau data berbilang baris

expectedArgsCount: bilangan parameter yang perlu dibaca

msgType: message Type

  • args: kandungan mesej

  • bulkLen: panjang data

  • kaedah selesai: analisis pertimbangan Sama ada ia telah selesai

    Kaedah ParseStream: menghuraikan data secara tidak segerak dan memasukkannya ke dalam saluran paip, dan mengembalikan data saluran paip
  • type Payload struct {
       Data resp.Reply
       Err  error
    }
    
    type readState struct {
       readingMultiLine  bool     
       expectedArgsCount int     
       msgType           byte    
       args              [][]byte 
       bulkLen           int64    
    }
    
    func (s *readState) finished() bool {
       return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
    }
    
    func ParseStream(reader io.Reader) <-chan *Payload {
       ch := make(chan *Payload)
       go parse0(reader, ch)
       return ch
    }
    
    func parse0(reader io.Reader, ch chan<- *Payload) {
    	 ......
    }
  • readLine: baca baris demi baris. Baca baris biasa, dipisahkan dengan n. Apabila membaca baris yang mengandungi aksara rn dalam teks, nyatakan.bulkLen campur aksara baris baharu rn (state.bulkLen+2)

    func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
       var msg []byte
       var err error
       if state.bulkLen == 0 {
          msg, err = bufReader.ReadBytes(&#39;\n&#39;)
          if err != nil {
             return nil, true, err
          }
          if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; {
             return nil, false, errors.New("protocol error: " + string(msg))
          }
       } else {
          msg = make([]byte, state.bulkLen+2)
          _, err = io.ReadFull(bufReader, msg)
          if err != nil {
             return nil, true, err
          }
          if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; || msg[len(msg)-1] != &#39;\n&#39; {
             return nil, false, errors.New("protocol error: " + string(msg))
          }
          state.bulkLen = 0
       }
       return msg, false, nil
    }

    parseMultiBulkHeader: Parse kepala tatasusunan, tetapkan bilangan baris yang dijangkakan dan parameter berkaitan.
  • parseBulkHeader: Menghuraikan pengepala rentetan berbilang baris.

    func parseMultiBulkHeader(msg []byte, state *readState) error {
       var err error
       var expectedLine uint64
       expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
       if err != nil {
          return errors.New("protocol error: " + string(msg))
       }
       if expectedLine == 0 {
          state.expectedArgsCount = 0
          return nil
       } else if expectedLine > 0 {
          state.msgType = msg[0]
          state.readingMultiLine = true
          state.expectedArgsCount = int(expectedLine)
          state.args = make([][]byte, 0, expectedLine)
          return nil
       } else {
          return errors.New("protocol error: " + string(msg))
       }
    }
    
    func parseBulkHeader(msg []byte, state *readState) error {
       var err error
       state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
       if err != nil {
          return errors.New("protocol error: " + string(msg))
       }
       if state.bulkLen == -1 { // null bulk
          return nil
       } else if state.bulkLen > 0 {
          state.msgType = msg[0]
          state.readingMultiLine = true
          state.expectedArgsCount = 1
          state.args = make([][]byte, 0, 1)
          return nil
       } else {
          return errors.New("protocol error: " + string(msg))
       }
    }

    parseSingleLineReply: Parse perintah satu baris
readBody: Baca arahan berbilang baris Jika ia bermula dengan $, tetapkan bulkLen apabila membaca baris seterusnya tidak bermula dengan $, kemudian terus Tambah ke args

func parseSingleLineReply(msg []byte) (resp.Reply, error) {
   str := strings.TrimSuffix(string(msg), "\r\n")
   var result resp.Reply
   switch msg[0] {
   case &#39;+&#39;: // status reply
      result = reply.MakeStatusReply(str[1:])
   case &#39;-&#39;: // err reply
      result = reply.MakeErrReply(str[1:])
   case &#39;:&#39;: // int reply
      val, err := strconv.ParseInt(str[1:], 10, 64)
      if err != nil {
         return nil, errors.New("protocol error: " + string(msg))
      }
      result = reply.MakeIntReply(val)
   }
   return result, nil
}

func readBody(msg []byte, state *readState) error {
   line := msg[0 : len(msg)-2]
   var err error
   if line[0] == &#39;$&#39; {
      // bulk reply
      state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
      if err != nil {
         return errors.New("protocol error: " + string(msg))
      }
      if state.bulkLen <= 0 { // null bulk in multi bulks
         state.args = append(state.args, []byte{})
         state.bulkLen = 0
      }
   } else {
      state.args = append(state.args, line)
   }
   return nil
}

parse0: Parse arahan Selepas penghuraian selesai, ia dihantar melalui saluran

resp/connection/conn. .go

func parse0(reader io.Reader, ch chan<- *Payload) {
    defer func() {
       if err := recover(); err != nil {
          logger.Error(string(debug.Stack()))
      }
   }()
    bufReader := bufio.NewReader(reader)
    var state readState
    var err error
    var msg []byte
    for {
       var ioErr bool
       msg, ioErr, err = readLine(bufReader, &state)
       if err != nil {
          if ioErr {
             ch <- &Payload{
                Err: err,
            }
             close(ch)
             return
         }
          ch <- &Payload{
             Err: err,
         }
          state = readState{}
          continue
      }

       if !state.readingMultiLine {
          if msg[0] == &#39;*&#39; {
             // multi bulk reply
             err = parseMultiBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
               }
                state = readState{}
                continue
            }
             if state.expectedArgsCount == 0 {
                ch <- &Payload{
                   Data: &reply.EmptyMultiBulkReply{},
               }
                state = readState{}
                continue
            }
         } else if msg[0] == &#39;$&#39; { // bulk reply
             err = parseBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
               }
                state = readState{} // reset state
                continue
            }
             if state.bulkLen == -1 { // null bulk reply
                ch <- &Payload{
                   Data: &reply.NullBulkReply{},
               }
                state = readState{} // reset state
                continue
            }
         } else {
             // single line reply
             result, err := parseSingleLineReply(msg)
             ch <- &Payload{
                Data: result,
                Err:  err,
            }
             state = readState{} // reset state
             continue
         }
      } else {
          // read bulk reply
          err = readBody(msg, &state)
          if err != nil {
             ch <- &Payload{
                Err: errors.New("protocol error: " + string(msg)),
            }
             state = readState{} // reset state
             continue
         }
          // if sending finished
          if state.finished() {
             var result resp.Reply
             if state.msgType == &#39;*&#39; {
                result = reply.MakeMultiBulkReply(state.args)
            } else if state.msgType == &#39;$&#39; {
                result = reply.MakeBulkReply(state.args[0])
            }
             ch <- &Payload{
                Data: result,
                Err:  err,
            }
             state = readState{}
         }
      }
   }
}

The EchoHandler kami yang terdahulu ditulis untuk menerima input pengguna dan mengembalikannya secara utuh. Kini anda perlu menulis RespHandler untuk menggantikan EchoHandler dan biarkan penghurai melakukan penghuraian. Struktur Sambungan yang menguruskan sambungan pelanggan perlu wujud dalam RespHandler.

Sambungan: sambungan pelanggan, yang akan digunakan dalam pengendali lapisan protokol

resp/handler/handler.go

type Connection struct {
    conn net.Conn
    waitingReply wait.Wait
    mu sync.Mutex // 避免多个协程往客户端中写
    selectedDB int
}

func NewConn(conn net.Conn) *Connection {
    return &Connection{
        conn: conn,
    }
}

func (c *Connection) RemoteAddr() net.Addr {
    return c.conn.RemoteAddr()
}

func (c *Connection) Close() error {
    c.waitingReply.WaitWithTimeout(10 * time.Second)
    _ = c.conn.Close()
    return nil
}

func (c *Connection) Write(b []byte) error {
    if len(b) == 0 {
        return nil
    }
    c.mu.Lock()
    c.waitingReply.Add(1)
    defer func() {
        c.waitingReply.Done()
        c.mu.Unlock()
    }()

    _, err := c.conn.Write(b)
    return err
}

func (c *Connection) GetDBIndex() int {
    return c.selectedDB
}

func (c *Connection) SelectDB(dbNum int) {
    c.selectedDB = dbNum
}

RespHandler: dan Gema sebelumnya adalah serupa, dengan penambahan lapisan teras db.exec arahan penghuraian pelaksanaan

antara muka/pangkalan data/database.go

var (
   unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)

type RespHandler struct {
   activeConn sync.Map
   db         databaseface.Database
   closing    atomic.Boolean
}

func MakeHandler() *RespHandler {
   var db databaseface.Database
   db = database.NewEchoDatabase()
   return &RespHandler{
      db: db,
   }
}

func (h *RespHandler) closeClient(client *connection.Connection) {
   _ = client.Close()
   h.db.AfterClientClose(client)
   h.activeConn.Delete(client)
}

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
   if h.closing.Get() {
      // closing handler refuse new connection
      _ = conn.Close()
   }

   client := connection.NewConn(conn)
   h.activeConn.Store(client, 1)

   ch := parser.ParseStream(conn)
   for payload := range ch {
      if payload.Err != nil {
         if payload.Err == io.EOF ||
            payload.Err == io.ErrUnexpectedEOF ||
            strings.Contains(payload.Err.Error(), "use of closed network connection") {
            // connection closed
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         // protocol err
         errReply := reply.MakeErrReply(payload.Err.Error())
         err := client.Write(errReply.ToBytes())
         if err != nil {
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         continue
      }
      if payload.Data == nil {
         logger.Error("empty payload")
         continue
      }
      r, ok := payload.Data.(*reply.MultiBulkReply)
      if !ok {
         logger.Error("require multi bulk reply")
         continue
      }
      result := h.db.Exec(client, r.Args)
      if result != nil {
         _ = client.Write(result.ToBytes())
      } else {
         _ = client.Write(unknownErrReplyBytes)
      }
   }
}

func (h *RespHandler) Close() error {
   logger.Info("handler shutting down...")
   h.closing.Set(true)
   // TODO: concurrent wait
   h.activeConn.Range(func(key interface{}, val interface{}) bool {
      client := key.(*connection.Connection)
      _ = client.Close()
      return true
   })
   h.db.Close()
   return nil
}

Exec: pelaksanaan lapisan teras

AfterClientClose: kaedah selepas penutupan

CmdLine: alias arahan tatasusunan bait dua dimensi

DataEntity: mewakili data Redis, termasuk rentetan, senarai, set, dll.

database/echo_database.go

type CmdLine = [][]byte

type Database interface {
	Exec(client resp.Connection, args [][]byte) resp.Reply
	AfterClientClose(c resp.Connection)
	Close()
}

type DataEntity struct {
	Data interface{}
}

echo_database: Uji lapisan protokol

Exec: Selepas arahan dihuraikan, gunakan MakeMultiBulkReply untuk membalutnya dan kembalikannya

main.go

type EchoDatabase struct {
}

func NewEchoDatabase() *EchoDatabase {
   return &EchoDatabase{}
}

func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
   return reply.MakeMultiBulkReply(args)
}

func (e EchoDatabase) AfterClientClose(c resp.Connection) {
   logger.Info("EchoDatabase AfterClientClose")
}

func (e EchoDatabase) Close() {
   logger.Info("EchoDatabase Close")
}

main ditukar kepada apa yang baru anda tulis: handler.MakeHandler()

Atas ialah kandungan terperinci Bagaimana untuk melaksanakan penghurai protokol Redis berdasarkan Golang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam