ホームページ  >  記事  >  Java  >  Java NIO リアクター パターン

Java NIO リアクター パターン

巴扎黑
巴扎黑オリジナル
2016-12-19 11:46:011675ブラウズ

Java NIO リアクター モードのシンプル モデル

一般に、NIO のリアクター モードは次のようになります: 1 つのアクセプター (もちろん複数でも機能しますが、一般的なシナリオでは 1 つで十分です) が受け入れイベントを担当し、受信したイベントを登録します。 Reactor プールから取り出された Reactor では、特定のアルゴリズムに従ってサーバーにソケット チャネルが送信され、登録されたイベントの読み取り、書き込みなどが行われます。その後、このソケット チャネルのすべての IO イベントはアクセプターとは関係がなく、登録された Reactor によってすべて処理されます。

各アクセプターと各リアクターはそれぞれセレクターを保持します

もちろん、各アクセプターとリアクターはスレッドである必要があります(少なくとも論理的にはスレッドである必要があります)

簡単な実装、3 つのクラス NioAcceptor、NioReactor、および ReactorPool :

rrreerrree

ReactorPool は Reactor:

package cc.lixiaohui.demo.dp.reator;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去
 */
public class NioAcceptor {
private int port;
private String host;
private Selector selector; // Java NIO Selector
private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel
private ReactorPool reactorPool; // NioReactor池
private Thread thread; // 工作线程
private volatile boolean stop = false;
private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class);
public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException {
this.port = port;
this.host = Objects.requireNonNull(host);
this.reactorPool = new ReactorPool(reactorPoolSize);
selector = Selector.open(); // 创建selector
serverChannel = ServerSocketChannel.open(); // new server socket channel
serverChannel.configureBlocking(false); // in non-blocking mode
serverChannel.bind(new InetSocketAddress(host, port)); // bind
serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 
}
public void stop() throws InterruptedException {
stop = true;
thread.join();
}
public void start() {
thread = new Thread(new AcceptTask(this));
thread.start();
}
private static class AcceptTask implements Runnable {
NioAcceptor acceptor;
AcceptTask(NioAcceptor acceptor) {
this.acceptor = acceptor;
}
public void run() {
final Selector selector = acceptor.selector;
Set<SelectionKey> keys = null;
while (!acceptor.stop) { // 运行中
try {
selector.select(1000L); // select, 最多等1秒
keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) { // 可accept
SocketChannel channel = acceptor.serverChannel.accept();
channel.configureBlocking(false);
// 取下一个Reactor并把SocketChannel加入到Reactor的注册队列
acceptor.reactorPool.nextReactor().postRegistry(channel);
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (IOException e) {
logger.error("", e);
}
}
}
}
}


を管理するために使用されます
声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。