ホームページ >Java >&#&チュートリアル >Java NIO リアクター パターン
Java NIO リアクター モードのシンプル モデル
一般に、NIO のリアクター モードは次のようになります: 1 つのアクセプター (もちろん複数でも機能しますが、一般的なシナリオでは 1 つで十分です) が受け入れイベントを担当し、受信したイベントを登録します。 Reactor プールから取り出された Reactor では、特定のアルゴリズムに従ってサーバーにソケット チャネルが送信され、登録されたイベントの読み取り、書き込みなどが行われます。その後、このソケット チャネルのすべての IO イベントはアクセプターとは関係がなく、登録された Reactor によってすべて処理されます。
各アクセプターと各リアクターはそれぞれセレクターを保持します
もちろん、各アクセプターとリアクターはスレッドである必要があります(少なくとも論理的にはスレッドである必要があります)
簡単な実装、3 つのクラス NioAcceptor、NioReactor、および ReactorPool :
rrreerrreeReactorPool は Reactor:
package cc.lixiaohui.demo.dp.reator; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去 */ public class NioAcceptor { private int port; private String host; private Selector selector; // Java NIO Selector private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel private ReactorPool reactorPool; // NioReactor池 private Thread thread; // 工作线程 private volatile boolean stop = false; private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class); public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException { this.port = port; this.host = Objects.requireNonNull(host); this.reactorPool = new ReactorPool(reactorPoolSize); selector = Selector.open(); // 创建selector serverChannel = ServerSocketChannel.open(); // new server socket channel serverChannel.configureBlocking(false); // in non-blocking mode serverChannel.bind(new InetSocketAddress(host, port)); // bind serverChannel.register(selector, SelectionKey.OP_ACCEPT); // } public void stop() throws InterruptedException { stop = true; thread.join(); } public void start() { thread = new Thread(new AcceptTask(this)); thread.start(); } private static class AcceptTask implements Runnable { NioAcceptor acceptor; AcceptTask(NioAcceptor acceptor) { this.acceptor = acceptor; } public void run() { final Selector selector = acceptor.selector; Set<SelectionKey> keys = null; while (!acceptor.stop) { // 运行中 try { selector.select(1000L); // select, 最多等1秒 keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { // 可accept SocketChannel channel = acceptor.serverChannel.accept(); channel.configureBlocking(false); // 取下一个Reactor并把SocketChannel加入到Reactor的注册队列 acceptor.reactorPool.nextReactor().postRegistry(channel); } else { key.cancel(); } } } finally { keys.clear(); } } catch (IOException e) { logger.error("", e); } } } } }