Heim  >  Artikel  >  Java  >  Analyse und grundlegende Verwendung des Java NIO-Prinzips

Analyse und grundlegende Verwendung des Java NIO-Prinzips

零下一度
零下一度Original
2017-06-27 10:18:371679Durchsuche

Analyse der Java NIO-Prinzipien

Dieser Artikel konzentriert sich hauptsächlich auf Java NIO, von der grundlegenden Verwendung von Java NIO über die Einführung der NIO-API unter Linux bis hin zu Java Selectorden zugrunde liegenden Implementierungsprinzipien.

  • Grundlegende Verwendung von Java NIO

  • Einführung in NIO-Systemaufrufe unter Linux

  • Selektorprinzip

  • Off-Heap-Speicher zwischen Kanal und Puffer

Grundlegende Verwendung von Java NIO

Sie finden es im JDK NIO-Dokumentation, Java unterteilt sie in drei Hauptblöcke: Channel, Buffer und Multiplexing Selector. Das Vorhandensein von Channel kapselt den Verbindungskanal zu jeder Entität (z. B. Netzwerk/Datei); Buffer kapselt die Pufferspeicherung von Daten. Schließlich bietet Selector eine nicht blockierende Single-Thread-Methode zur Verarbeitung mehrerer Verbindungen.

Grundlegendes Anwendungsbeispiel

Die grundlegenden Schritte von NIO bestehen darin, Selector und ServerSocketChannel zu erstellen, dann das ACCEPT-Ereignis des Kanals zu registrieren, die Select-Methode aufzurufen, auf das Eintreffen der Verbindung zu warten und sich zu registrieren es zum Selector. Das Folgende ist ein Beispiel für Echo Server:

public class SelectorDemo {

    public static void main(String[] args) throws IOException {


        Selector selector = Selector.open();
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        socketChannel.bind(new InetSocketAddress(8080));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            int ready = selector.select();
            if (ready == 0) {
                continue;
            } else if (ready < 0) {
                break;
            }

            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {

                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel accept = channel.accept();
                    if (accept == null) {
                        continue;
                    }
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 读事件
                    deal((SocketChannel) key.channel(), key);
                } else if (key.isWritable()) {
                    // 写事件
                    resp((SocketChannel) key.channel(), key);
                }
                // 注:处理完成后要从中移除掉
                iterator.remove();
            }
        }
        selector.close();
        socketChannel.close();
    }

    private static void deal(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024);

        int read = channel.read(buffer);

        if (read > 0) {
            buffer.flip();
            responseBuffer.put(buffer);
        } else if (read == -1) {
            System.out.println("socket close");
            channel.close();
            return;
        }

        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        key.attach(responseBuffer);
    }

    private static void resp(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.flip();

        channel.write(buffer);
        if (!buffer.hasRemaining()) {
            key.attach(null);
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

Einführung in NIO-Systemaufrufe unter Linux

In der Linux-Umgebung stehen verschiedene Möglichkeiten zur Implementierung von NIO zur Verfügung, z. B. Epoll, Poll, und wählen Sie „Warten“. Bei Select/Poll werden bei jedem Aufruf FD- und Überwachungsereignisse von außen übergeben. Dies bedeutet, dass diese Daten bei jedem Aufruf vom Benutzerstatus in den Kernelstatus kopiert werden müssen, was zu einem Vergleich führt Die Kosten für jeden Anruf sind groß und jedes Mal, wenn er von „select/poll“ zurückgegeben wird, muss er selbst durchlaufen werden, um zu überprüfen, welche Daten BEREIT sind. Für epoll ist es inkrementell. Wenn Sie sich registrieren möchten, müssen Sie epoll_ctl nicht mehr übergeben. Wenn Sie zurückkehren, geben Sie nur READY zurück Hörereignisse und FD. Hier ist ein einfacher Pseudocode:
Einzelheiten finden Sie in den vorherigen Artikeln:

// 1. 创建server socket
// 2. 绑定地址
// 3. 监听端口
// 4. 创建epoll
int epollFd = epoll_create(1024);
// 5. 注册监听事件
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
event.data.fd = serverFd;
epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event);

while(true) {
    readyNums = epoll_wait( epollFd, events, 1024, -1 );
    
    if ( readyNums < 0 )
     {
         printf("epoll_wait error\n");
         exit(-1);
     }

     for ( i = 0; i <  readyNums; ++i)
     {
         if ( events[i].data.fd == serverFd )
         {
             clientFd = accept( serverFd, NULL, NULL );
             // 注册监听事件
             ...
         }else if ( events[i].events & EPOLLIN )
         {
            // 处理读事件
         }else if ( events[i].events & EPOLLRDHUP )
         {
            // 关闭连接事件
            close( events[i].data.fd );
         }
}

Selektorprinzip

SelectionKey

Aus der Sicht von Java-Top-Level-Benutzern Sehen Sie, der Kanal gibt durch die Registrierung SelectionKey zurück, und die Selector.select-Methode wird auch durch die Rückgabe von SelectionKey verwendet. Warum wird diese Klasse hier benötigt? Was macht diese Klasse? Unabhängig von der Sprache ist es untrennbar mit der Unterstützung des zugrunde liegenden Systems verbunden. Durch die oben genannten Basisanwendungen können wir erkennen, dass durch Systemaufrufe Parameter wie FD und Ereignisse an dieses zurückgegeben werden Wenn Sie die Parameter des READY-Ereignisses eingeben, ist eine Zuordnungsbeziehung erforderlich Speichern Sie den Verweis auf den Kanal und einige Ereignisinformationen. Anschließend findet der Selektor den SelectionKey zur Zuordnung. Innerhalb des zugrunde liegenden EP gibt es ein Attribut: Map<Integer,SelectionKeyImpl> fdToKey. <h3>EPollSelectorImpl</h3><p>In der Linux-Version 2.6+ verwendet Java NIO Epoll (d. h. <code>EPollSelectorImpl-Klasse), für 2.4.x verwenden Sie poll (d. h. PollSelectorImpl-Klasse), hier nehmen Sie epoll als ein Beispiel.

select-Methode

Der Selector der obersten Ebene ruft durch Aufrufen der select-Methode schließlich die EPollSelectorImpl.doSelect-Methode auf. Über diese Methode können Sie sehen, dass er zunächst einige Ereignisse verarbeitet sind nicht mehr registriert. Rufen Sie pollWrapper.poll(timeout); auf und bereinigen Sie es erneut. Schließlich können Sie sehen, dass die Zuordnungsbeziehung verarbeitet werden muss.

protected int doSelect(long timeout)
    throws IOException
{
    if (closed)
        throw new ClosedSelectorException();
    // 处理一些不再注册的事件
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    // 再进行一次清理
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}


private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        // 获取FD
        int nextFD = pollWrapper.getDescriptor(i);
        // 根据FD找到对应的SelectionKey
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            // 找到该FD的READY事件
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                // 将底层的事件转换为Java封装的事件,SelectionKey.OP_READ等
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                // 没有在原有的SelectedKey里面,说明是在等待过程中加入的
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    // 需要更新selectedKeys集合
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    // 返回Ready的Channel个数
    return numKeysUpdated;
}

EpollArrayWrapper

EpollArrayWrapper kapselt die zugrunde liegenden Aufrufe , die mehrere native Methoden enthält, zum Beispiel:

private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
                             int epfd) throws IOException;

Die entsprechende Implementierung EPollArrayWrapper.c befindet sich im nativen Verzeichnis von openjdk (native/sun/nio/ch).
(Übrigens können Sie zum Implementieren der nativen Methode das native Schlüsselwort zur Methode in der Klasse hinzufügen, es dann in eine Klassendatei kompilieren und dann in Ausgabe .h konvertieren. Die Methode zum Implementieren des Headers Datei am Ende von c/c++, in die so-Bibliothek kompilieren, einfach im entsprechenden Verzeichnis ablegen)
In der Initialisierungsdateimethode können Sie sehen, dass sie durch dynamisches Parsen sowie epoll_create und andere Methoden geladen wird werden endlich aufgerufen.

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_init(JNIEnv *env, jclass this)
{
    epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create");
    epoll_ctl_func    = (epoll_ctl_t)    dlsym(RTLD_DEFAULT, "epoll_ctl");
    epoll_wait_func   = (epoll_wait_t)   dlsym(RTLD_DEFAULT, "epoll_wait");

    if ((epoll_create_func == NULL) || (epoll_ctl_func == NULL) ||
        (epoll_wait_func == NULL)) {
        JNU_ThrowInternalError(env, "unable to get address of epoll functions, pre-2.6 kernel?");
    }
}

Off-Heap-Speicher zwischen Kanal und Puffer

Ich höre oft Leute sagen, dass Off-Heap-Speicher leicht verloren gehen kann und das Netty-Framework Off-Heap-Speicher verwendet, um Kopien zu reduzieren und die Leistung verbessern. Worauf bezieht sich der Off-Heap-Speicher hier? Aus Neugier habe ich ihn schließlich über die Lesemethode in SocketChannelImpl zurückverfolgt, die die Lesemethode von IOUtil aufrief. Zunächst wird ermittelt, ob der eingehende Puffer ein DirectBuffer ist. Wenn nicht (es handelt sich um einen HeapByteBuffer), wird ein temporärer DirectBuffer erstellt und dann auf den Heap kopiert. IOUtil.read-Methode:

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException {
    if(var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    } else if(var1 instanceof DirectBuffer) {
        // 为堆外内存,则直接读取
        return readIntoNativeBuffer(var0, var1, var2, var4, var5);
    } else {
        // 为堆内内存,先获取临时堆外内存
        ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var8;
        try {
            // 读取到堆外内存
            int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5);
            var6.flip();
            if(var7 > 0) {
                // 复制到堆内
                var1.put(var6);
            }

            var8 = var7;
        } finally {
            // 释放临时堆外内存
            Util.offerFirstTemporaryDirectBuffer(var6);
        }

        return var8;
    }
}

这里有一个问题就是,为什么会需要DirectBuffer以及堆外内存?通过对DirectByteBuffer的创建来分析,可以知道,通过unsafe.allocateMemory(size);来分配内存的,而对于该方法来说,可以说是直接调用malloc返回,这一块内存是不受GC管理的,也就是所说的:堆外内存容易泄漏。但是对于使用DirectByteBuffer来说,会创建一个Deallocator,注册到Cleaner里面,当对象被回收的时候,则会被直接,从而释放掉内存,减少内存泄漏。要用堆外内存,从上面的创建来看,堆外内存创建后,以long型地址保存的,而堆内内存会受到GC影响,对象会被移动,如果采用堆内内存,进行系统调用的时候,那么GC就需要停止,否则就会有问题,基于这一点,采用了堆外内存(这一块参考了R大的理解:)。

注:堆外内存的创建(unsafe.cpp):

// 仅仅作了对齐以及将长度放在数组前方就返回了
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
  void* x = os::malloc(sz);
  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
  return addr_to_java(x);
UNSAFE_END

Das obige ist der detaillierte Inhalt vonAnalyse und grundlegende Verwendung des Java NIO-Prinzips. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn