RESP protocol
RESP is a protocol for communication between the client and the server. There are five formats:
Normal reply: starting with " " and ending with "\ r\n" in the form of a string
Error reply: starting with "-" and ending in the form of a string with "\r\n"
Integer: starting with ":", String format ending with "\r\n"
Multi-line string: starting with "$", followed by the actual number of bytes sent, and then starting and ending with "\r\n"
$3\r\nabc\r\n
Array: starts with "*", followed by the number of members
SET key value
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\ nvalue\r\n
Commands or data sent by the client and server always use \r\n (CRLF) as the newline character.
When we enter *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n $5\r\nvalue\r\nFor such a series of commands, the server receives the following commands:
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n
interface/resp/conn.go
type Connection interface { Write([]byte) error GetDBIndex() int SelectDB(int) } interface/resp/reply.go type Reply interface { ToBytes() []byte }
Connection interface: a connection for the Redis client
Write: reply to the client
GetDBIndex: Redis has 16 DBs
Reply interface: response interface
resp/reply/consts.go
type PongReply struct{} var pongBytes = []byte("+PONG\r\n") func (r *PongReply) ToBytes() []byte { return pongBytes } var thePongReply = new(PongReply) func MakePongReply() *PongReply { return thePongReply } type OkReply struct{} var okBytes = []byte("+OK\r\n") func (r *OkReply) ToBytes() []byte { return okBytes } var theOkReply = new(OkReply) func MakeOkReply() *OkReply { return theOkReply } var nullBulkBytes = []byte("$-1\r\n") type NullBulkReply struct{} func (r *NullBulkReply) ToBytes() []byte { return nullBulkBytes } func MakeNullBulkReply() *NullBulkReply { return &NullBulkReply{} } var emptyMultiBulkBytes = []byte("*0\r\n") type EmptyMultiBulkReply struct{} func (r *EmptyMultiBulkReply) ToBytes() []byte { return emptyMultiBulkBytes } type NoReply struct{} var noBytes = []byte("") func (r *NoReply) ToBytes() []byte { return noBytes }
Define five kinds of replies: reply pong, ok, null, empty array, empty
resp/reply/reply.go
type ErrorReply interface { Error() string ToBytes() []byte }
ErrorReply: Define error interface
resp/reply/errors.go
type UnknownErrReply struct{} var unknownErrBytes = []byte("-Err unknown\r\n") func (r *UnknownErrReply) ToBytes() []byte { return unknownErrBytes } func (r *UnknownErrReply) Error() string { return "Err unknown" } type ArgNumErrReply struct { Cmd string } func (r *ArgNumErrReply) ToBytes() []byte { return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n") } func (r *ArgNumErrReply) Error() string { return "ERR wrong number of arguments for '" + r.Cmd + "' command" } func MakeArgNumErrReply(cmd string) *ArgNumErrReply { return &ArgNumErrReply{ Cmd: cmd, } } type SyntaxErrReply struct{} var syntaxErrBytes = []byte("-Err syntax error\r\n") var theSyntaxErrReply = &SyntaxErrReply{} func MakeSyntaxErrReply() *SyntaxErrReply { return theSyntaxErrReply } func (r *SyntaxErrReply) ToBytes() []byte { return syntaxErrBytes } func (r *SyntaxErrReply) Error() string { return "Err syntax error" } type WrongTypeErrReply struct{} var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n") func (r *WrongTypeErrReply) ToBytes() []byte { return wrongTypeErrBytes } func (r *WrongTypeErrReply) Error() string { return "WRONGTYPE Operation against a key holding the wrong kind of value" } type ProtocolErrReply struct { Msg string } func (r *ProtocolErrReply) ToBytes() []byte { return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n") } func (r *ProtocolErrReply) Error() string { return "ERR Protocol error: '" + r.Msg }
errors defines 5 types of errors: UnknownErrReply Unknown error , ArgNumErrReply parameter number error, SyntaxErrReply syntax error, WrongTypeErrReply data type error, ProtocolErrReply protocol error
resp/reply/reply.go
var ( nullBulkReplyBytes = []byte("$-1") // 协议的结尾 CRLF = "\r\n" ) type BulkReply struct { Arg []byte } func MakeBulkReply(arg []byte) *BulkReply { return &BulkReply{ Arg: arg, } } func (r *BulkReply) ToBytes() []byte { if len(r.Arg) == 0 { return nullBulkReplyBytes } return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF) } type MultiBulkReply struct { Args [][]byte } func (r *MultiBulkReply) ToBytes() []byte { argLen := len(r.Args) var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) for _, arg := range r.Args { if arg == nil { buf.WriteString("$-1" + CRLF) } else { buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF) } } return buf.Bytes() } func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{ Args: args, } } type StatusReply struct { Status string } func MakeStatusReply(status string) *StatusReply { return &StatusReply{ Status: status, } } func (r *StatusReply) ToBytes() []byte { return []byte("+" + r.Status + CRLF) } type IntReply struct { Code int64 } func MakeIntReply(code int64) *IntReply { return &IntReply{ Code: code, } } func (r *IntReply) ToBytes() []byte { return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF) } type StandardErrReply struct { Status string } func (r *StandardErrReply) ToBytes() []byte { return []byte("-" + r.Status + CRLF) } func (r *StandardErrReply) Error() string { return r.Status } func MakeErrReply(status string) *StandardErrReply { return &StandardErrReply{ Status: status, } } func IsErrorReply(reply resp.Reply) bool { return reply.ToBytes()[0] == '-' }
BulkReply: Reply to a string
MultiBulkReply: Reply to an array of strings
StatusReply: Status reply
-
IntReply: Numeric reply
StandardErrReply: Standard error reply
IsErrorReply: Determine whether it is an error reply
ToBytes: Convert the string into the format specified by the RESP protocol
resp/parser/parser.go
type Payload struct { Data resp.Reply Err error } type readState struct { readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 } func (s *readState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } func parse0(reader io.Reader, ch chan<- *Payload) { ...... }
Payload Structure: The data sent to us by the customer service side
Reply: The data sent by the client and the server to each other is called Reply
readState structure:
-
readingMultiLine: parsing single or multiple lines of data
expectedArgsCount: the number of parameters that should be read
msgType: message type
args: Message content
bulkLen: Data length
finished method: Determine whether the parsing is completed
ParseStream method: Asynchronously parse the data and put it into the pipeline, and return the pipeline data
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil }
readLine: Read line by line. Read normal lines, separated by \n. When reading lines containing \r\n characters in the text, state.bulkLen adds the newline character \r\n (state.bulkLen 2)
func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } }
parseMultiBulkHeader: Parse the header of the array and set the expected number of lines and related parameters.
parseBulkHeader: Parse the header of a multi-line string.
func parseSingleLineReply(msg []byte) (resp.Reply, error) { str := strings.TrimSuffix(string(msg), "\r\n") var result resp.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) } return result, nil } func readBody(msg []byte, state *readState) error { line := msg[0 : len(msg)-2] var err error if line[0] == '$' { // bulk reply state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }
parseSingleLineReply: Parse a single-line command
readBody: Read a multi-line command. If it starts with $, set bulkLen. When reading the next line, use this 2. If it does not start with $, add it directly. Go to args
func parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() bufReader := bufio.NewReader(reader) var state readState var err error var msg []byte for { var ioErr bool msg, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { ch <- &Payload{ Err: err, } close(ch) return } ch <- &Payload{ Err: err, } state = readState{} continue } if !state.readingMultiLine { if msg[0] == '*' { // multi bulk reply err = parseMultiBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: &reply.EmptyMultiBulkReply{}, } state = readState{} continue } } else if msg[0] == '$' { // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &Payload{ Data: &reply.NullBulkReply{}, } state = readState{} // reset state continue } } else { // single line reply result, err := parseSingleLineReply(msg) ch <- &Payload{ Data: result, Err: err, } state = readState{} // reset state continue } } else { // read bulk reply err = readBody(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } // if sending finished if state.finished() { var result resp.Reply if state.msgType == '*' { result = reply.MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = reply.MakeBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = readState{} } } } }
parse0: Parse the command and send it through the channel after the parsing is completed
resp/connection/conn.go
type Connection struct { conn net.Conn waitingReply wait.Wait mu sync.Mutex // 避免多个协程往客户端中写 selectedDB int } func NewConn(conn net.Conn) *Connection { return &Connection{ conn: conn, } } func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *Connection) Close() error { c.waitingReply.WaitWithTimeout(10 * time.Second) _ = c.conn.Close() return nil } func (c *Connection) Write(b []byte) error { if len(b) == 0 { return nil } c.mu.Lock() c.waitingReply.Add(1) defer func() { c.waitingReply.Done() c.mu.Unlock() }() _, err := c.conn.Write(b) return err } func (c *Connection) GetDBIndex() int { return c.selectedDB } func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum }
We wrote before The EchoHandler is used to receive user input and return it unchanged. Now you need to write a RespHandler to replace the EchoHandler and let the parser perform the parsing. A Connection structure that manages client connections needs to exist in RespHandler.
Connection: Client connection, used in the handler of the protocol layer
resp/handler/handler.go
var ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) type RespHandler struct { activeConn sync.Map db databaseface.Database closing atomic.Boolean } func MakeHandler() *RespHandler { var db databaseface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } } func (h *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() h.db.AfterClientClose(client) h.activeConn.Delete(client) } func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection _ = conn.Close() } client := connection.NewConn(conn) h.activeConn.Store(client, 1) ch := parser.ParseStream(conn) for payload := range ch { if payload.Err != nil { if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { // connection closed h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } // protocol err errReply := reply.MakeErrReply(payload.Err.Error()) err := client.Write(errReply.ToBytes()) if err != nil { h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } continue } if payload.Data == nil { logger.Error("empty payload") continue } r, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } result := h.db.Exec(client, r.Args) if result != nil { _ = client.Write(result.ToBytes()) } else { _ = client.Write(unknownErrReplyBytes) } } } func (h *RespHandler) Close() error { logger.Info("handler shutting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(*connection.Connection) _ = client.Close() return true }) h.db.Close() return nil }
RespHandler: and The previous echo is similar, with the addition of the core layer's db.exec execution parsing instructions
interface/database/database.go
type CmdLine = [][]byte type Database interface { Exec(client resp.Connection, args [][]byte) resp.Reply AfterClientClose(c resp.Connection) Close() } type DataEntity struct { Data interface{} }
Exec: core layer execution
AfterClientClose: Aftercare method after closing
CmdLine: Command alias for two-dimensional byte array
DataEntity: Represents Redis data, including string, list, set, etc.
database/echo_database.go
type EchoDatabase struct { } func NewEchoDatabase() *EchoDatabase { return &EchoDatabase{} } func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply { return reply.MakeMultiBulkReply(args) } func (e EchoDatabase) AfterClientClose(c resp.Connection) { logger.Info("EchoDatabase AfterClientClose") } func (e EchoDatabase) Close() { logger.Info("EchoDatabase Close") }
echo_database: Test protocol layer
Exec: After the instruction is parsed, use MakeMultiBulkReply to wrap it and return it
main.go
err := tcp.ListenAndServeWithSignal( &tcp.Config{ Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), }, handler.MakeHandler()) if err != nil { logger.Error(err) }
Change main to what you just wrote: handler.MakeHandler()
The above is the detailed content of How to implement Redis protocol parser based on Golang. For more information, please follow other related articles on the PHP Chinese website!

Redis plays a key role in data storage and management, and has become the core of modern applications through its multiple data structures and persistence mechanisms. 1) Redis supports data structures such as strings, lists, collections, ordered collections and hash tables, and is suitable for cache and complex business logic. 2) Through two persistence methods, RDB and AOF, Redis ensures reliable storage and rapid recovery of data.

Redis is a NoSQL database suitable for efficient storage and access of large-scale data. 1.Redis is an open source memory data structure storage system that supports multiple data structures. 2. It provides extremely fast read and write speeds, suitable for caching, session management, etc. 3.Redis supports persistence and ensures data security through RDB and AOF. 4. Usage examples include basic key-value pair operations and advanced collection deduplication functions. 5. Common errors include connection problems, data type mismatch and memory overflow, so you need to pay attention to debugging. 6. Performance optimization suggestions include selecting the appropriate data structure and setting up memory elimination strategies.

The applications of Redis in the real world include: 1. As a cache system, accelerate database query, 2. To store the session data of web applications, 3. To implement real-time rankings, 4. To simplify message delivery as a message queue. Redis's versatility and high performance make it shine in these scenarios.

Redis stands out because of its high speed, versatility and rich data structure. 1) Redis supports data structures such as strings, lists, collections, hashs and ordered collections. 2) It stores data through memory and supports RDB and AOF persistence. 3) Starting from Redis 6.0, multi-threaded I/O operations have been introduced, which has improved performance in high concurrency scenarios.

RedisisclassifiedasaNoSQLdatabasebecauseitusesakey-valuedatamodelinsteadofthetraditionalrelationaldatabasemodel.Itoffersspeedandflexibility,makingitidealforreal-timeapplicationsandcaching,butitmaynotbesuitableforscenariosrequiringstrictdataintegrityo

Redis improves application performance and scalability by caching data, implementing distributed locking and data persistence. 1) Cache data: Use Redis to cache frequently accessed data to improve data access speed. 2) Distributed lock: Use Redis to implement distributed locks to ensure the security of operation in a distributed environment. 3) Data persistence: Ensure data security through RDB and AOF mechanisms to prevent data loss.

Redis's data model and structure include five main types: 1. String: used to store text or binary data, and supports atomic operations. 2. List: Ordered elements collection, suitable for queues and stacks. 3. Set: Unordered unique elements set, supporting set operation. 4. Ordered Set (SortedSet): A unique set of elements with scores, suitable for rankings. 5. Hash table (Hash): a collection of key-value pairs, suitable for storing objects.

Redis's database methods include in-memory databases and key-value storage. 1) Redis stores data in memory, and reads and writes fast. 2) It uses key-value pairs to store data, supports complex data structures such as lists, collections, hash tables and ordered collections, suitable for caches and NoSQL databases.


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

MantisBT
Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

mPDF
mPDF is a PHP library that can generate PDF files from UTF-8 encoded HTML. The original author, Ian Back, wrote mPDF to output PDF files "on the fly" from his website and handle different languages. It is slower than original scripts like HTML2FPDF and produces larger files when using Unicode fonts, but supports CSS styles etc. and has a lot of enhancements. Supports almost all languages, including RTL (Arabic and Hebrew) and CJK (Chinese, Japanese and Korean). Supports nested block-level elements (such as P, DIV),

Dreamweaver CS6
Visual web development tools

DVWA
Damn Vulnerable Web App (DVWA) is a PHP/MySQL web application that is very vulnerable. Its main goals are to be an aid for security professionals to test their skills and tools in a legal environment, to help web developers better understand the process of securing web applications, and to help teachers/students teach/learn in a classroom environment Web application security. The goal of DVWA is to practice some of the most common web vulnerabilities through a simple and straightforward interface, with varying degrees of difficulty. Please note that this software

ZendStudio 13.5.1 Mac
Powerful PHP integrated development environment