首頁 >类库下载 >java类库 >實作一個 Java 版的 Redis

實作一個 Java 版的 Redis

高洛峰
高洛峰原創
2018-05-30 13:56:003731瀏覽

最近看了Redis 的程式碼,感覺還是挺簡單的.有衝動想用其它語言實現(抄)一個.原來想用Python 實現來著.後來想想試試Netty.原因有二

    第一:Java的NIO 和Netty 的EventLoop 配合起來和Redis 的網路模型很接近.都是Ractor 模型.甚至 Redis的模型更簡單--只有一個EventLoop 線程.寫(抄)起來更方便

   第二:Netty 架構挺不錯.藉這個機會學習一下.

如果我們從一個很抽象(簡單)的角度看Redis Server.就是一個監聽在6379的程序, 本質上是一個處理單線線請求的Hashtable. 而Redis 的協議也是非常非常的簡單.比http 協定可簡單多了.

以下是這個協定的一般形式:

*<参数数量> CR LF
$<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF
...
$<参数 N 的字节数量> CR LF<参数 N 的数据> CR LF

這基本上就是一個很簡單的有限狀態機.

實作一個 Java 版的 Redis

所以我給我們的命令解析器設定333個狀態.

public enum State {
    NUMBER_OF_ARGS,
    NUMBER_BYTE_OF_ARGS,
    ARGS_DATA
}

我們將初始狀態設定NUMBER_OF_ARGS 也就是開始那個綠色的狀態.當有資料到達時.我們不停的判斷程式的狀態.是哪個狀態,我們做啥.

while(true){    switch (state()){        case NUMBER_OF_ARGS:
            //从当前数据中读取参数个数
            break;        case NUMBER_BYTE_OF_ARGS:
            //从数据中读取参数长度
            break;        case ARGS_DATA:
            //按参数长度读取参数
            //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS
            break;
    }
}

下面我們按著我們上面思路實現一下.

package me.yunanw.redisinjava;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;import java.util.List;
/**
 * Created by yunanw on 2016/10/15.
 */
 public class CommandDecoder extends ReplayingDecoder {    
 public enum State {
        NUMBER_OF_ARGS,
        NUMBER_BYTE_OF_ARGS,
        ARGS_DATA
    }    
    static final char CR = &#39;\r&#39;;    
    static final char LF = &#39;\n&#39;;    
    public CommandDecoder(){

        state(State.NUMBER_OF_ARGS);
    }    
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list);        
        if (frame != null){
            list.add(frame);
        }
    }    
    private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        RedisFrame frame = null;        
        int currentArgsLen = 0;        
        int argsCount = 0;        
        while(true){            
        switch (state()){                
        case NUMBER_OF_ARGS:                    
        if (byteBuf.readByte() != &#39;*&#39;){                        
        throw new DecoderException("can not found *");
                    }
                    argsCount = parseRedisNumber(byteBuf);
                    frame = new RedisFrame(argsCount);
                    checkpoint(State.NUMBER_BYTE_OF_ARGS);                    
                    break;                
                    case NUMBER_BYTE_OF_ARGS:                    
                    if (byteBuf.readByte() != &#39;$&#39;){                        
                    throw new DecoderException("can not found $");
                    }
                    currentArgsLen = parseRedisNumber(byteBuf);
                    checkpoint(State.ARGS_DATA);;                    
                    break;                
                    case ARGS_DATA:
                    frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array());                    
                    if (byteBuf.readByte() != CR || byteBuf.readByte() != LF)                        
                    throw new DecoderException("can not found CR OR LF");                    
                    if ((--argsCount) = 0 && digit < 10)
                     {
                result = (result * 10) + digit;
            } else {                
            throw new DecoderException("Invalid character in integer");
            }
        } while ((readByte = byteBuf.readByte()) != CR);        
        if ((readByte = byteBuf.readByte()) != LF)
        {            
        throw new DecoderException("can not found LF");
        }        
        return (negative? -result:result);
    }

}

寫到這裡有一個小問題,如果你上面代碼看懂了,你就會發現一個小問題.如果由於網絡原因,有時數據可以並沒有接收完全.而我們的代碼完全沒有做這方面的考慮? 而Checkpoint 這是又什麼鬼?

第一個問題:

    事實上我們有考慮這個問題.所以我們繼承了一個相對比較特別Decoder--ReplayingDecoder.我們看一下ReplayingDecoder的CallDecode 方法.(這個名字起的非常的直白.你一定明白他是乾啥的)

</p><pre class="brush:java;toolbar:false">
try {
    decode(ctx, replayable, out);
    //省略} catch (Signal replay) {
    replay.expect(REPLAY);     //省略
    // Return to the checkpoint (or oldPosition) and retry.
    int checkpoint = this.checkpoint;    
    if (checkpoint >= 0) {        
    in.readerIndex(checkpoint);
    } else {        
    // Called by cleanup() - no need to maintain the readerIndex
        // anymore because the buffer has been released already.
    }    
    break;
}

Signal replay 是Netty 中定義的一個錯誤.當我們讀取錯誤時,Netty 會再等到下次有資料到達時,再試一次Decode 方法.看看能再解析成功.所以我們就可以假設定我們要的資料都已經讀取了.

但是要注意: replaydecoder 的decode 方法會被反覆調用..所以我們的程式碼中要做好這樣的準備.

二: CheckPoint 就是為了防止如果每次反复調用Decode 時從頭執行,而設置的一個狀態.讓我們這個decode 方法有狀態.

好了.現在我們建立監部分的程式碼.這都是套數,直接抄下來就行了

</p><pre class="brush:java;toolbar:false">
ServerBootstrap bootstrap = new ServerBootstrap();
final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1);
try {
    bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .localAddress(port)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer() {
                @Override                
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new CommandDecoder());
                    p.addLast(new RedisServerHandler());
                }
            });    
            // Start the server.
    ChannelFuture f = bootstrap.bind().sync();    
    // Wait until the server socket is closed.
    f.channel().closeFuture().sync();
} finally {    
// Shut down all event loops to terminate all threads.
    group.shutdownGracefully();
}

我們把Redis 的協定解析為RedisFrame 類別

</p><pre class="brush:java;toolbar:false">
package me.yunanw.redisinjava;import java.util.ArrayList;import java.util.List;
/**
 * Created by yunanw on 2016/10/17.
 */
 public class RedisFrame {    
 private int argsCount = 0;
    List ArgsData = null;    
    public RedisFrame(int argsCount){        
    this.argsCount = argsCount;        
    this.ArgsData = new ArrayList(argsCount);

    }    public void AppendArgs(byte[] args){        
    this.ArgsData.add(new String(args));
    }    public int getCommandCount(){        
    return ArgsData.size();
    }    public String GetFristCommand(){        
    if (ArgsData.size() > 0){            
    return ArgsData.get(0);
        }        
        return null;
    }    
    public String GetCommand(int index){        
    if (ArgsData.size() > index){            
    return ArgsData.get(index);
        }       
        return null;
    }
}

好了.這時你打開Redis-cli 試試看是不是可以連上我們的"假Redis" Server.有意的是---你打開Redis-cli.他會自動發一個"Command" 命令.而你不管回復什麼,它都認為連上了

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
上一篇:Java位元組流下一篇:Java位元組流