Cet article se concentre principalement sur Java NIO, de l'utilisation de base de Java NIO, à l'introduction de l'API NIO sous Linux, en passant par Java Selector
ses principes d'implémentation sous-jacents.
Utilisation de base de Java NIO
Introduction aux appels système NIO sous Linux
Principe du sélecteur
Mémoire hors tas entre Channel et Buffer
Vous pouvez la trouver dans le JDK Documentation NIO, Java la divise en trois blocs principaux : Channel
, Buffer
et multiplexage Selector
. L'existence de Channel encapsule le canal de connexion à n'importe quelle entité (telle qu'un réseau/fichier) ; Buffer encapsule le stockage tampon des données. Enfin, Selector fournit un moyen non bloquant à thread unique pour traiter plusieurs connexions.
Les étapes de base de NIO sont de créer Selector et ServerSocketChannel, puis d'enregistrer l'événement ACCEPT du canal, d'appeler la méthode select, d'attendre l'arrivée de la connexion et enregistrez-le avec Selector. Voici un exemple d'Echo Server :
public class SelectorDemo { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel socketChannel = ServerSocketChannel.open(); socketChannel.bind(new InetSocketAddress(8080)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int ready = selector.select(); if (ready == 0) { continue; } else if (ready < 0) { break; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel accept = channel.accept(); if (accept == null) { continue; } accept.configureBlocking(false); accept.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // 读事件 deal((SocketChannel) key.channel(), key); } else if (key.isWritable()) { // 写事件 resp((SocketChannel) key.channel(), key); } // 注:处理完成后要从中移除掉 iterator.remove(); } } selector.close(); socketChannel.close(); } private static void deal(SocketChannel channel, SelectionKey key) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(1024); ByteBuffer responseBuffer = ByteBuffer.allocate(1024); int read = channel.read(buffer); if (read > 0) { buffer.flip(); responseBuffer.put(buffer); } else if (read == -1) { System.out.println("socket close"); channel.close(); return; } key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); key.attach(responseBuffer); } private static void resp(SocketChannel channel, SelectionKey key) throws IOException { ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.flip(); channel.write(buffer); if (!buffer.hasRemaining()) { key.attach(null); key.interestOps(SelectionKey.OP_READ); } } }
Dans l'environnement Linux, plusieurs manières sont proposées pour implémenter NIO, telles que epoll, poll, et sélectionnez attendre. Pour select/poll, chaque fois qu'il est appelé, les événements FD et de surveillance sont transmis de l'extérieur. Cela signifie que chaque fois qu'il est appelé, ces données doivent être copiées de l'état utilisateur vers l'état du noyau, ce qui entraîne une comparaison. du coût de chaque appel. Il est important, et chaque fois qu'il est renvoyé par select/poll, il s'agit de la quantité totale de données. Vous devez le parcourir vous-même pour vérifier lesquelles sont PRÊTES. Pour epoll, c'est incrémentiel. Le système maintient les événements FD et de surveillance requis en interne. Lorsque vous souhaitez vous inscrire, appelez simplement epoll_ctl à chaque fois que vous appelez, vous n'avez plus besoin de le transmettre. Lorsque vous revenez, vous renvoyez uniquement READY. événements d'écoute et FD. Voici un pseudocode simple :
Pour plus de détails, veuillez consulter les articles précédents :
// 1. 创建server socket // 2. 绑定地址 // 3. 监听端口 // 4. 创建epoll int epollFd = epoll_create(1024); // 5. 注册监听事件 struct epoll_event event; event.events = EPOLLIN | EPOLLRDHUP | EPOLLET; event.data.fd = serverFd; epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event); while(true) { readyNums = epoll_wait( epollFd, events, 1024, -1 ); if ( readyNums < 0 ) { printf("epoll_wait error\n"); exit(-1); } for ( i = 0; i < readyNums; ++i) { if ( events[i].data.fd == serverFd ) { clientFd = accept( serverFd, NULL, NULL ); // 注册监听事件 ... }else if ( events[i].events & EPOLLIN ) { // 处理读事件 }else if ( events[i].events & EPOLLRDHUP ) { // 关闭连接事件 close( events[i].data.fd ); } }
Du point de vue des utilisateurs Java de haut niveau Voir, le canal renvoie SelectionKey via l'enregistrement, et la méthode Selector.select est également utilisée en renvoyant SelectionKey. Alors pourquoi ce cours est-il nécessaire ici ? A quoi sert cette classe ? Quel que soit le langage, il est indissociable du support du système sous-jacent. Grâce aux applications de base mentionnées ci-dessus sous Linux, nous pouvons savoir que via les appels système, des paramètres tels que FD et les événements lui sont transmis et renvoyés à partir d'une conception. Dans la perspective, il doit y avoir une relation de mappage pour qu'elle puisse être associée. L'encapsulation du canal ici est passée. Si vous placez les paramètres de l'événement READY à l'intérieur, ce n'est pas approprié pour le moment, SelectionKey apparaît. , enregistrez la référence au canal et quelques informations sur l'événement, puis le sélecteur trouve la SelectionKey via FD à associer. À l’intérieur du EP
sous-jacent, il y a un attribut : Map<Integer,SelectionKeyImpl> fdToKey. <h3>EPollSelectorImpl</h3><p>Dans la version Linux 2.6+, Java NIO utilise epoll (c'est-à-dire la classe <code>EPollSelectorImpl
), pour 2.4.x, utilisez poll (c'est-à-dire la classe PollSelectorImpl
), ici Prenez epoll comme un exemple.
Le sélecteur de niveau supérieur, en appelant la méthode select, finira par appeler la méthode EPollSelectorImpl.doSelect. Grâce à cette méthode, vous pouvez voir qu'il traitera d'abord certains événements qui. ne sont plus enregistrés. Appelez pollWrapper.poll(timeout);
, puis nettoyez-le à nouveau. Enfin, vous pouvez voir que la relation de mappage doit être traitée
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); // 处理一些不再注册的事件 processDeregisterQueue(); try { begin(); pollWrapper.poll(timeout); } finally { end(); } // 再进行一次清理 processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { // 获取FD int nextFD = pollWrapper.getDescriptor(i); // 根据FD找到对应的SelectionKey SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { // 找到该FD的READY事件 int rOps = pollWrapper.getEventOps(i); if (selectedKeys.contains(ski)) { // 将底层的事件转换为Java封装的事件,SelectionKey.OP_READ等 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { // 没有在原有的SelectedKey里面,说明是在等待过程中加入的 ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { // 需要更新selectedKeys集合 selectedKeys.add(ski); numKeysUpdated++; } } } } // 返回Ready的Channel个数 return numKeysUpdated; }
EpollArrayWrapper encapsule les appels sous-jacents. , qui contient plusieurs méthodes natives. Par exemple :
private native int epollCreate(); private native void epollCtl(int epfd, int opcode, int fd, int events); private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
L'implémentation correspondante EPollArrayWrapper.c se trouve dans le répertoire natif d'openjdk (native/sun/nio/ch).
(Au fait, pour implémenter la méthode native, vous pouvez ajouter le mot-clé natif à la méthode dans la classe, puis le compiler dans un fichier de classe, puis convertir et afficher .h, la méthode d'implémentation du fichier d'en-tête en bas de c/c++, compilez dans la bibliothèque so, placez-le simplement dans le répertoire correspondant)
Dans la méthode du fichier d'initialisation, vous pouvez voir qu'il est chargé via l'analyse dynamique, et epoll_create et d'autres méthodes sont finalement appelé.
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_init(JNIEnv *env, jclass this) { epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create"); epoll_ctl_func = (epoll_ctl_t) dlsym(RTLD_DEFAULT, "epoll_ctl"); epoll_wait_func = (epoll_wait_t) dlsym(RTLD_DEFAULT, "epoll_wait"); if ((epoll_create_func == NULL) || (epoll_ctl_func == NULL) || (epoll_wait_func == NULL)) { JNU_ThrowInternalError(env, "unable to get address of epoll functions, pre-2.6 kernel?"); } }
J'entends souvent les gens dire que la mémoire hors tas est facile à fuir, et le framework Netty utilise la mémoire hors tas pour réduire les copies et améliorer les performances. Alors, à quoi fait référence la mémoire hors tas ici ? Avec curiosité, via la méthode read, je l'ai finalement retracé jusqu'à la méthode read dans SocketChannelImpl, qui a appelé la méthode read de IOUtil. Il déterminera d'abord si le Buffer entrant est un DirectBuffer. Sinon (il s'agit d'un HeapByteBuffer), un DirectBuffer temporaire sera créé puis copié dans le tas. Méthode IOUtil.read :
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException { if(var1.isReadOnly()) { throw new IllegalArgumentException("Read-only buffer"); } else if(var1 instanceof DirectBuffer) { // 为堆外内存,则直接读取 return readIntoNativeBuffer(var0, var1, var2, var4, var5); } else { // 为堆内内存,先获取临时堆外内存 ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining()); int var8; try { // 读取到堆外内存 int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5); var6.flip(); if(var7 > 0) { // 复制到堆内 var1.put(var6); } var8 = var7; } finally { // 释放临时堆外内存 Util.offerFirstTemporaryDirectBuffer(var6); } return var8; } }
这里有一个问题就是,为什么会需要DirectBuffer以及堆外内存?通过对DirectByteBuffer的创建来分析,可以知道,通过unsafe.allocateMemory(size);来分配内存的,而对于该方法来说,可以说是直接调用malloc返回,这一块内存是不受GC管理的,也就是所说的:堆外内存容易泄漏。但是对于使用DirectByteBuffer来说,会创建一个Deallocator,注册到Cleaner里面,当对象被回收的时候,则会被直接,从而释放掉内存,减少内存泄漏。要用堆外内存,从上面的创建来看,堆外内存创建后,以long型地址保存的,而堆内内存会受到GC影响,对象会被移动,如果采用堆内内存,进行系统调用的时候,那么GC就需要停止,否则就会有问题,基于这一点,采用了堆外内存(这一块参考了R大的理解:)。
注:堆外内存的创建(unsafe.cpp):
// 仅仅作了对齐以及将长度放在数组前方就返回了 UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size)) UnsafeWrapper("Unsafe_AllocateMemory"); size_t sz = (size_t)size; if (sz != (julong)size || size < 0) { THROW_0(vmSymbols::java_lang_IllegalArgumentException()); } if (sz == 0) { return 0; } sz = round_to(sz, HeapWordSize); void* x = os::malloc(sz); if (x == NULL) { THROW_0(vmSymbols::java_lang_OutOfMemoryError()); } //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize); return addr_to_java(x); UNSAFE_END
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!