最近 Redis コードを見たところ、他の言語で実装 (コピー) したいという衝動に駆られました。その後、Netty を試してみようと思いました。理由は 2 つあります。
第一に、NIO と Netty の EventLoop の組み合わせは Redis ネットワーク モデルに非常に近いです。Redis モデルでもより単純です。EventLoop スレッドは 1 つだけです。2 番目: Netty アーキテクチャは非常に優れています。この機会に学んでください。 Redis Server を非常に抽象的な (単純な) 観点から見ると、これは 6379 でリッスンするプログラムです。本質的には、単一の処理を行うハッシュテーブルです。また、Redis プロトコルは、http プロトコルよりも非常に単純です。このプロトコルの一般的な形式は次のとおりです:
*<参数数量> CR LF $<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF ... $<参数 N 的字节数量> CR LF<参数 N 的数据> CR LFこれは、基本的に非常に単純な有限状態マシンです。
そこで、コマンドパーサーの状態を 3 に設定しました。
public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
初期状態を NUMBER_OF_ARGS に設定します。これは、データが到着すると、プログラムの状態がどの状態であり、何を行うかを常に判断します。 .
while(true){ switch (state()){ case NUMBER_OF_ARGS: //从当前数据中读取参数个数 break; case NUMBER_BYTE_OF_ARGS: //从数据中读取参数长度 break; case ARGS_DATA: //按参数长度读取参数 //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS break; } }次に、上記のアイデアを実装しましょう
package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder;import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) = 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF) { throw new DecoderException("can not found LF"); } return (negative? -result:result); } }上記のコードを理解している場合、ネットワーク上の理由により、データが正しく表示されない場合があることがわかります。そして私たちのコードはこの側面をまったく考慮していませんでした? 最初の質問: そこで、私たちは比較的特殊な Decoder、ReplayingDecoder を継承しました。 ReplayingDecoder の CallDecode メソッドを見てみましょう (名前は非常に簡単です。それが何をするのか理解する必要があります)
</p><pre class="brush:java;toolbar:false"> try { decode(ctx, replayable, out); //省略} catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }シグナルリプレイは Netty で定義されたエラーです。エラーを読み取ると、Netty は次回まで待機します。データが到着したら、Decode メソッドを再度実行して、正常に解析できるかどうかを確認します。したがって、必要なデータがすべて読み込まれたと想定できます ただし、replaydecoder の decode メソッドは次のようになります。 2: CheckPoint は、Decode が繰り返し呼び出されるたびに最初から実行されないようにするための状態セットです 。さて、監視部分のコードを作成します。これらはすべてコピーします
</p><pre class="brush:java;toolbar:false"> ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }Redis プロトコルを RedisFrame クラスに解析します
</p><pre class="brush:java;toolbar:false"> package me.yunanw.redisinjava;import java.util.ArrayList;import java.util.List; /** * Created by yunanw on 2016/10/17. */ public class RedisFrame { private int argsCount = 0; List ArgsData = null; public RedisFrame(int argsCount){ this.argsCount = argsCount; this.ArgsData = new ArrayList(argsCount); } public void AppendArgs(byte[] args){ this.ArgsData.add(new String(args)); } public int getCommandCount(){ return ArgsData.size(); } public String GetFristCommand(){ if (ArgsData.size() > 0){ return ArgsData.get(0); } return null; } public String GetCommand(int index){ if (ArgsData.size() > index){ return ArgsData.get(index); } return null; } }それでは、Redis-cli を開いて確認してみましょう。興味深いのは、Redis-cli を開くと、何を応答しても、接続されているとみなして自動的に「コマンド」コマンドを送信することです
。