RESP協定
RESP是客戶端與服務端通訊的協議,格式有五種:
正常回覆:以「 」開頭,以「\ r\n」結尾的字串形式
錯誤回覆:以「-」開頭,以「\r\n」結尾的字串形式
整數:以「:」開頭,以「\r\n」結尾的字串形式
多行字串:以「$」開頭,後面跟著實際發送位元組數,再以「\r\n」開頭和結尾
$3\r\nabc\r\n
#陣列:以「*」開頭,後面跟著成員個數
SET key value
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\ nvalue\r\n
客戶端和伺服器發送的命令或資料一律以\r\n (CRLF)作為換行符。
當我們輸入*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n $5\r\nvalue\r\n這樣一串指令,服務端接收到的是如下的指令:
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n
## interface/resp/conn.go
type Connection interface { Write([]byte) error GetDBIndex() int SelectDB(int) } interface/resp/reply.go type Reply interface { ToBytes() []byte }
- Connection介面:Redis客戶端的一個連線 ##Write:回覆客戶端訊息
- GetDBIndex:Redis有16個DB
- Reply介面:回應介面
type PongReply struct{}
var pongBytes = []byte("+PONG\r\n")
func (r *PongReply) ToBytes() []byte {
return pongBytes
}
var thePongReply = new(PongReply)
func MakePongReply() *PongReply {
return thePongReply
}
type OkReply struct{}
var okBytes = []byte("+OK\r\n")
func (r *OkReply) ToBytes() []byte {
return okBytes
}
var theOkReply = new(OkReply)
func MakeOkReply() *OkReply {
return theOkReply
}
var nullBulkBytes = []byte("$-1\r\n")
type NullBulkReply struct{}
func (r *NullBulkReply) ToBytes() []byte {
return nullBulkBytes
}
func MakeNullBulkReply() *NullBulkReply {
return &NullBulkReply{}
}
var emptyMultiBulkBytes = []byte("*0\r\n")
type EmptyMultiBulkReply struct{}
func (r *EmptyMultiBulkReply) ToBytes() []byte {
return emptyMultiBulkBytes
}
type NoReply struct{}
var noBytes = []byte("")
func (r *NoReply) ToBytes() []byte {
return noBytes
}
定義五個回覆:回覆pong,ok,null,空數組,空
type ErrorReply interface {
Error() string
ToBytes() []byte
}
ErrorReply:定義錯誤介面
type UnknownErrReply struct{}
var unknownErrBytes = []byte("-Err unknown\r\n")
func (r *UnknownErrReply) ToBytes() []byte {
return unknownErrBytes
}
func (r *UnknownErrReply) Error() string {
return "Err unknown"
}
type ArgNumErrReply struct {
Cmd string
}
func (r *ArgNumErrReply) ToBytes() []byte {
return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}
func (r *ArgNumErrReply) Error() string {
return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
return &ArgNumErrReply{
Cmd: cmd,
}
}
type SyntaxErrReply struct{}
var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}
func MakeSyntaxErrReply() *SyntaxErrReply {
return theSyntaxErrReply
}
func (r *SyntaxErrReply) ToBytes() []byte {
return syntaxErrBytes
}
func (r *SyntaxErrReply) Error() string {
return "Err syntax error"
}
type WrongTypeErrReply struct{}
var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")
func (r *WrongTypeErrReply) ToBytes() []byte {
return wrongTypeErrBytes
}
func (r *WrongTypeErrReply) Error() string {
return "WRONGTYPE Operation against a key holding the wrong kind of value"
}
type ProtocolErrReply struct {
Msg string
}
func (r *ProtocolErrReply) ToBytes() []byte {
return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}
func (r *ProtocolErrReply) Error() string {
return "ERR Protocol error: '" + r.Msg
}
errors定義5種錯誤:UnknownErrReply 未知錯誤,ArgNumErrReply 參數個數錯誤,SyntaxErrReply 語法錯誤,WrongTypeErrReply 資料類型錯誤,ProtocolErrReply 協定錯誤
##
var ( nullBulkReplyBytes = []byte("$-1") // 协议的结尾 CRLF = "\r\n" ) type BulkReply struct { Arg []byte } func MakeBulkReply(arg []byte) *BulkReply { return &BulkReply{ Arg: arg, } } func (r *BulkReply) ToBytes() []byte { if len(r.Arg) == 0 { return nullBulkReplyBytes } return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF) } type MultiBulkReply struct { Args [][]byte } func (r *MultiBulkReply) ToBytes() []byte { argLen := len(r.Args) var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) for _, arg := range r.Args { if arg == nil { buf.WriteString("$-1" + CRLF) } else { buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF) } } return buf.Bytes() } func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{ Args: args, } } type StatusReply struct { Status string } func MakeStatusReply(status string) *StatusReply { return &StatusReply{ Status: status, } } func (r *StatusReply) ToBytes() []byte { return []byte("+" + r.Status + CRLF) } type IntReply struct { Code int64 } func MakeIntReply(code int64) *IntReply { return &IntReply{ Code: code, } } func (r *IntReply) ToBytes() []byte { return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF) } type StandardErrReply struct { Status string } func (r *StandardErrReply) ToBytes() []byte { return []byte("-" + r.Status + CRLF) } func (r *StandardErrReply) Error() string { return r.Status } func MakeErrReply(status string) *StandardErrReply { return &StandardErrReply{ Status: status, } } func IsErrorReply(reply resp.Reply) bool { return reply.ToBytes()[0] == '-' }#BulkReply:回覆一個字串
- MultiBulkReply:回覆字串陣列
- StatusReply:狀態回覆
- #IntReply:數字回覆
- StandardErrReply:標準錯誤回覆
- IsErrorReply:判斷是否為錯誤回覆
- ToBytes:將字串轉換成RESP協定規定的格式
- resp/parser/parser.go
type Payload struct { Data resp.Reply Err error } type readState struct { readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 } func (s *readState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } func parse0(reader io.Reader, ch chan<- *Payload) { ...... }
##Payload結構體:客服端給我們發送的資料
Reply:客戶端與服務端互相發送的資料都稱為ReplyreadState結構體:
- ##readingMultiLine:解析單行還是多行資料
- expectedArgsCount:應該讀取的參數數量
- msgType:訊息類型
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil }readLine:一行一行的讀取。讀取正常的行,以\n分隔。讀取正文中包含\r\n字元的行時,state.bulkLen加上換行符號\r\n(state.bulkLen 2)
func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } }parseMultiBulkHeader:解析陣列的頭部,設定期望的行數和相關參數。 parseBulkHeader:解析多行字串的頭部。
func parseSingleLineReply(msg []byte) (resp.Reply, error) { str := strings.TrimSuffix(string(msg), "\r\n") var result resp.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) } return result, nil } func readBody(msg []byte, state *readState) error { line := msg[0 : len(msg)-2] var err error if line[0] == '$' { // bulk reply state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }parseSingleLineReply:解析單行命令readBody:讀取多行的命令,如果是$開頭,設定bulkLen,在讀取下一行時根據這個2,不是$開頭則直接添加到args
func parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() bufReader := bufio.NewReader(reader) var state readState var err error var msg []byte for { var ioErr bool msg, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { ch <- &Payload{ Err: err, } close(ch) return } ch <- &Payload{ Err: err, } state = readState{} continue } if !state.readingMultiLine { if msg[0] == '*' { // multi bulk reply err = parseMultiBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: &reply.EmptyMultiBulkReply{}, } state = readState{} continue } } else if msg[0] == '$' { // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &Payload{ Data: &reply.NullBulkReply{}, } state = readState{} // reset state continue } } else { // single line reply result, err := parseSingleLineReply(msg) ch <- &Payload{ Data: result, Err: err, } state = readState{} // reset state continue } } else { // read bulk reply err = readBody(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } // if sending finished if state.finished() { var result resp.Reply if state.msgType == '*' { result = reply.MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = reply.MakeBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = readState{} } } } }parse0:解析指令,解析完成後透過channel發出去
resp/connection/conn.go
type Connection struct { conn net.Conn waitingReply wait.Wait mu sync.Mutex // 避免多个协程往客户端中写 selectedDB int } func NewConn(conn net.Conn) *Connection { return &Connection{ conn: conn, } } func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *Connection) Close() error { c.waitingReply.WaitWithTimeout(10 * time.Second) _ = c.conn.Close() return nil } func (c *Connection) Write(b []byte) error { if len(b) == 0 { return nil } c.mu.Lock() c.waitingReply.Add(1) defer func() { c.waitingReply.Done() c.mu.Unlock() }() _, err := c.conn.Write(b) return err } func (c *Connection) GetDBIndex() int { return c.selectedDB } func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum }我們之前編寫的EchoHandler用於接收用戶的輸入並將其原封不動地返回。現在需要寫一個RespHandler來取代EchoHandler,由解析器來進行解析。一個管理客戶端連線的結構體Connection需要存在於RespHandler中。 Connection:客戶端連接,在協定層的handler中會用到
##resp/handler/handler.go
var ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) type RespHandler struct { activeConn sync.Map db databaseface.Database closing atomic.Boolean } func MakeHandler() *RespHandler { var db databaseface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } } func (h *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() h.db.AfterClientClose(client) h.activeConn.Delete(client) } func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection _ = conn.Close() } client := connection.NewConn(conn) h.activeConn.Store(client, 1) ch := parser.ParseStream(conn) for payload := range ch { if payload.Err != nil { if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { // connection closed h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } // protocol err errReply := reply.MakeErrReply(payload.Err.Error()) err := client.Write(errReply.ToBytes()) if err != nil { h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } continue } if payload.Data == nil { logger.Error("empty payload") continue } r, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } result := h.db.Exec(client, r.Args) if result != nil { _ = client.Write(result.ToBytes()) } else { _ = client.Write(unknownErrReplyBytes) } } } func (h *RespHandler) Close() error { logger.Info("handler shutting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(*connection.Connection) _ = client.Close() return true }) h.db.Close() return nil }
RespHandler:和之前的echo類似,加了核心層的db.exec執行解析的指令
interface/database/database.gotype CmdLine = [][]byte type Database interface { Exec(client resp.Connection, args [][]byte) resp.Reply AfterClientClose(c resp.Connection) Close() } type DataEntity struct { Data interface{} }
Exec:核心層的執行
AfterClientClose:關閉之後的善後方法CmdLine:二維位元組數組的指令別名
DataEntity:表示Redis的數據,包括string, list, set等等
database/echo_database.gotype EchoDatabase struct {
}
func NewEchoDatabase() *EchoDatabase {
return &EchoDatabase{}
}
func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
return reply.MakeMultiBulkReply(args)
}
func (e EchoDatabase) AfterClientClose(c resp.Connection) {
logger.Info("EchoDatabase AfterClientClose")
}
func (e EchoDatabase) Close() {
logger.Info("EchoDatabase Close")
}
echo_database:測試協定層
Exec:指令解析後,再使用MakeMultiBulkReply包裝一下返回去
######main.go######err := tcp.ListenAndServeWithSignal( &tcp.Config{ Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), }, handler.MakeHandler()) if err != nil { logger.Error(err) }###main改成剛才寫的:handler.MakeHandler()###
以上是基於Golang如何實作Redis協定解析器的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Redis和SQL數據庫的主要區別在於:Redis是內存數據庫,適用於高性能和靈活性需求;SQL數據庫是關係型數據庫,適用於復雜查詢和數據一致性需求。具體來說,1)Redis提供高速數據訪問和緩存服務,支持多種數據類型,適用於緩存和實時數據處理;2)SQL數據庫通過表格結構管理數據,支持複雜查詢和事務處理,適用於電商和金融系統等需要數據一致性的場景。

REDISACTSASBOTHADATASTOREANDASERVICE.1)ASADATASTORE,ITUSESIN-MEMORYSTOOGATOFORFOFFASTESITION,支持VariousDatharptructuresLikeKey-valuepairsandsortedsetsetsetsetsetsetsets.2)asaservice,ItprovidespunctionslikeItionitionslikepunikeLikePublikePublikePlikePlikePlikeAndluikeAndluAascriptingiationsmpleplepleclexplectiations

Redis與其他數據庫相比,具有以下獨特優勢:1)速度極快,讀寫操作通常在微秒級別;2)支持豐富的數據結構和操作;3)靈活的使用場景,如緩存、計數器和發布訂閱。選擇Redis還是其他數據庫需根據具體需求和場景,Redis在高性能、低延遲應用中表現出色。

Redis在數據存儲和管理中扮演著關鍵角色,通過其多種數據結構和持久化機製成為現代應用的核心。 1)Redis支持字符串、列表、集合、有序集合和哈希表等數據結構,適用於緩存和復雜業務邏輯。 2)通過RDB和AOF兩種持久化方式,Redis確保數據的可靠存儲和快速恢復。

Redis是一種NoSQL數據庫,適用於大規模數據的高效存儲和訪問。 1.Redis是開源的內存數據結構存儲系統,支持多種數據結構。 2.它提供極快的讀寫速度,適合緩存、會話管理等。 3.Redis支持持久化,通過RDB和AOF方式確保數據安全。 4.使用示例包括基本的鍵值對操作和高級的集合去重功能。 5.常見錯誤包括連接問題、數據類型不匹配和內存溢出,需注意調試。 6.性能優化建議包括選擇合適的數據結構和設置內存淘汰策略。

Redis在現實世界中的應用包括:1.作為緩存系統加速數據庫查詢,2.存儲Web應用的會話數據,3.實現實時排行榜,4.作為消息隊列簡化消息傳遞。 Redis的多功能性和高性能使其在這些場景中大放異彩。

Redis脫穎而出是因為其高速、多功能性和豐富的數據結構。 1)Redis支持字符串、列表、集合、散列和有序集合等數據結構。 2)它通過內存存儲數據,支持RDB和AOF持久化。 3)從Redis6.0開始引入多線程處理I/O操作,提升了高並發場景下的性能。

RedisisclassifiedasaNoSQLdatabasebecauseitusesakey-valuedatamodelinsteadofthetraditionalrelationaldatabasemodel.Itoffersspeedandflexibility,makingitidealforreal-timeapplicationsandcaching,butitmaynotbesuitableforscenariosrequiringstrictdataintegrityo


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

EditPlus 中文破解版
體積小,語法高亮,不支援程式碼提示功能

Dreamweaver Mac版
視覺化網頁開發工具

記事本++7.3.1
好用且免費的程式碼編輯器

MantisBT
Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

Dreamweaver CS6
視覺化網頁開發工具