>  기사  >  Java  >  Java NIO 원리 분석 및 기본 사용법

Java NIO 원리 분석 및 기본 사용법

零下一度
零下一度원래의
2017-06-27 10:18:371659검색

Java NIO 원리 분석

이 기사는 주로 Java NIO의 기본 사용법부터 Linux에서의 NIO API 도입, Java Selector의 기본 구현 원리까지 Java NIO에 중점을 둡니다. Selector其底层的实现原理。

  • Java NIO基本使用

  • Linux下的NIO系统调用介绍

  • Selector原理

  • Channel和Buffer之间的堆外内存

Java NIO基本使用

从JDK NIO文档里面可以发现,Java将其划分成了三大块:ChannelBuffer以及多路复用Selector。Channel的存在,封装了对什么实体的连接通道(如网络/文件);Buffer封装了对数据的缓冲存储,最后对于Selector则是提供了一种可以以单线程非阻塞的方式,来处理多个连接。

基本应用示例

NIO的基本步骤是,创建Selector和ServerSocketChannel,然后注册channel的ACCEPT事件,调用select方法,等待连接的到来,以及接收连接后将其注册到Selector中。下面的为Echo Server的示例:

public class SelectorDemo {

    public static void main(String[] args) throws IOException {


        Selector selector = Selector.open();
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        socketChannel.bind(new InetSocketAddress(8080));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            int ready = selector.select();
            if (ready == 0) {
                continue;
            } else if (ready < 0) {
                break;
            }

            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {

                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel accept = channel.accept();
                    if (accept == null) {
                        continue;
                    }
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 读事件
                    deal((SocketChannel) key.channel(), key);
                } else if (key.isWritable()) {
                    // 写事件
                    resp((SocketChannel) key.channel(), key);
                }
                // 注:处理完成后要从中移除掉
                iterator.remove();
            }
        }
        selector.close();
        socketChannel.close();
    }

    private static void deal(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024);

        int read = channel.read(buffer);

        if (read > 0) {
            buffer.flip();
            responseBuffer.put(buffer);
        } else if (read == -1) {
            System.out.println("socket close");
            channel.close();
            return;
        }

        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        key.attach(responseBuffer);
    }

    private static void resp(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.flip();

        channel.write(buffer);
        if (!buffer.hasRemaining()) {
            key.attach(null);
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

Linux下的NIO系统调用介绍

在Linux环境下,提供了几种方式可以实现NIO,如epoll,poll,select等。对于select/poll,每次调用,都是从外部传入FD和监听事件,这就导致每次调用的时候,都需要将这些数据从用户态复制到内核态,就导致了每次调用代价比较大,而且每次从select/poll返回回来,都是全量的数据,需要自行去遍历检查哪些是READY的。对于epoll,则为增量式的,系统内部维护了所需要的FD和监听事件,要注册的时候,调用epoll_ctl即可,而每次调用,不再需要传入了,返回的时候,只返回READY的监听事件和FD。下面作个简单的伪代码:
具体的可以看以前的文章:

// 1. 创建server socket
// 2. 绑定地址
// 3. 监听端口
// 4. 创建epoll
int epollFd = epoll_create(1024);
// 5. 注册监听事件
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
event.data.fd = serverFd;
epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event);

while(true) {
    readyNums = epoll_wait( epollFd, events, 1024, -1 );
    
    if ( readyNums < 0 )
     {
         printf("epoll_wait error\n");
         exit(-1);
     }

     for ( i = 0; i <  readyNums; ++i)
     {
         if ( events[i].data.fd == serverFd )
         {
             clientFd = accept( serverFd, NULL, NULL );
             // 注册监听事件
             ...
         }else if ( events[i].events & EPOLLIN )
         {
            // 处理读事件
         }else if ( events[i].events & EPOLLRDHUP )
         {
            // 关闭连接事件
            close( events[i].data.fd );
         }
}

Selector原理

SelectionKey

从Java顶层使用者角度来看,channel通过注册,返回SelectionKey,而Selector.select方法,也是通过返回SelectionKey来使用。那么这里为什么会需要这个类呢?这个类有什么作用?无论是任何语言,其实都脱离不了系统底层的支持,通过上述Linux下的基本应用,可以知道,通过系统调用,向其传递和返回的都是FD以及事件这些参数,那么站在设计角度来看,就需要有一个映射关系,使得可以关联起来,这里有Channel封装的是通过,如果将READY事件这些参数放在里面,不太合适,这个时候,SelectionKey出现了,在SelectionKey内部,保存Channel的引用以及一些事件信息,然后Selector通过FD找到SelectionKey来进行关联。在底层EP里面,就有一个属性:Map<Integer,SelectionKeyImpl> fdToKey。<h3>EPollSelectorImpl</h3><p>在Linux 2.6+版本,Java NIO采用的epoll(即<code>EPollSelectorImpl类),对于2.4.x的,则使用poll(即PollSelectorImpl类),这里以epoll为例。

select方法

顶层Selector,通过调用select方法,最终会调用到EPollSelectorImpl.doSelect方法,通过该方法,可以看到,其首先会处理一些不再注册的事件,调用pollWrapper.poll(timeout);

  • Java NIO의 기본 사용

  • Linux에서 NIO 시스템 호출 소개

  • 선택기 원칙


  • 채널과 버퍼 사이의 오프힙 메모리
Java NIO의 기본 사용

JDK NIO 문서를 보면 Java가 이를 세 가지 주요 블록으로 나누는 것을 볼 수 있습니다. : 채널, 버퍼 및 다중화 Selector. 채널의 존재는 모든 엔터티(예: 네트워크/파일)에 대한 연결 채널을 캡슐화합니다. 버퍼는 데이터의 버퍼 저장소를 캡슐화합니다. 마지막으로 Selector는 다중 연결을 처리하는 단일 스레드 비차단 방식을 제공합니다.

기본 적용 예🎜🎜NIO의 기본 단계는 Selector와 ServerSocketChannel을 생성한 후 해당 채널의 ACCEPT 이벤트를 등록하고 select 메서드를 호출한 후 연결이 도착할 때까지 기다린 후 메시지를 받은 후 Selector에 등록하는 것입니다. 연결. 다음은 Echo Server의 예입니다. 🎜
protected int doSelect(long timeout)
    throws IOException
{
    if (closed)
        throw new ClosedSelectorException();
    // 处理一些不再注册的事件
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    // 再进行一次清理
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}


private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        // 获取FD
        int nextFD = pollWrapper.getDescriptor(i);
        // 根据FD找到对应的SelectionKey
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            // 找到该FD的READY事件
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                // 将底层的事件转换为Java封装的事件,SelectionKey.OP_READ等
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                // 没有在原有的SelectedKey里面,说明是在等待过程中加入的
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    // 需要更新selectedKeys集合
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    // 返回Ready的Channel个数
    return numKeysUpdated;
}
🎜Linux에서의 NIO 시스템 호출 소개🎜🎜Linux 환경에서는 epoll, poll, select 등 NIO를 구현하는 여러 가지 방법이 제공됩니다. 선택/폴링의 경우 호출될 때마다 외부에서 FD 및 모니터링 이벤트가 전달됩니다. 이는 호출될 때마다 이러한 데이터를 사용자 상태에서 커널 상태로 복사하여 비교해야 함을 의미합니다. 각 호출 비용이 크며 선택/폴링에서 반환될 때마다 전체 데이터 양을 직접 확인하여 어느 것이 준비되었는지 확인해야 합니다. epoll의 경우 시스템이 내부적으로 필요한 FD 및 모니터링 이벤트를 유지 관리하므로 호출할 때마다 epoll_ctl을 호출하면 됩니다. 반환하면 READY만 반환됩니다. 청취 이벤트 및 FD. 다음은 간단한 의사 코드입니다. 🎜자세한 내용은 이전 기사를 참조하세요. 🎜
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
                             int epfd) throws IOException;
🎜Principle of Selector🎜🎜SelectionKey🎜🎜Java 최상위 사용자의 관점에서 채널은 등록을 통해 SelectionKey를 반환하고 Selector.select 메서드는 또한 SelectionKey를 반환하여 사용됩니다. 그렇다면 왜 이 수업이 여기에 필요한 걸까요? 이 수업은 무엇을 하나요? 언어에 관계없이 위에서 언급한 Linux의 기본 애플리케이션을 통해 FD 및 이벤트와 같은 매개변수가 전달되고 반환된다는 것을 디자인에서 알 수 있습니다. Perspective, 연관될 수 있도록 매핑 관계가 있어야 합니다. 여기에 READY 이벤트의 매개변수를 넣으면 SelectionKey가 내부에 나타납니다. , 채널에 대한 참조 및 일부 이벤트 정보를 저장하면 Selector는 FD를 통해 연결할 SelectionKey를 찾습니다. 기본 EP에는 Map<Integer,SelectionKeyImpl> fdToKey 속성이 있습니다. 🎜🎜EPollSelectorImpl🎜🎜Linux 2.6+ 버전에서 Java NIO는 epoll(예: EPollSelectorImpl 클래스)을 사용하고, 2.4.x에서는 poll(예: PollSelectorImpl 클래스)을 사용합니다. epoll을 예로 들어보겠습니다. 🎜

select 메소드

🎜최상위 Selector는 select 메소드를 호출하여 결국 EPollSelectorImpl.doSelect 메소드를 호출하게 됩니다. 이 메소드를 통해 더 이상 존재하지 않는 일부 이벤트를 먼저 처리하는 것을 볼 수 있습니다. 등록하고 pollWrapper.poll(timeout);을 호출한 다음 다시 정리하면 마지막으로 매핑 관계를 처리해야 함을 알 수 있습니다🎜
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_init(JNIEnv *env, jclass this)
{
    epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create");
    epoll_ctl_func    = (epoll_ctl_t)    dlsym(RTLD_DEFAULT, "epoll_ctl");
    epoll_wait_func   = (epoll_wait_t)   dlsym(RTLD_DEFAULT, "epoll_wait");

    if ((epoll_create_func == NULL) || (epoll_ctl_func == NULL) ||
        (epoll_wait_func == NULL)) {
        JNU_ThrowInternalError(env, "unable to get address of epoll functions, pre-2.6 kernel?");
    }
}
🎜EPollArrayWrapper🎜🎜EpollArrayWrapper는 기본 호출을 캡슐화합니다. 여기에는 다음과 같은 여러 기본 메서드가 포함되어 있습니다. 🎜
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException {
    if(var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    } else if(var1 instanceof DirectBuffer) {
        // 为堆外内存,则直接读取
        return readIntoNativeBuffer(var0, var1, var2, var4, var5);
    } else {
        // 为堆内内存,先获取临时堆外内存
        ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var8;
        try {
            // 读取到堆外内存
            int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5);
            var6.flip();
            if(var7 > 0) {
                // 复制到堆内
                var1.put(var6);
            }

            var8 = var7;
        } finally {
            // 释放临时堆外内存
            Util.offerFirstTemporaryDirectBuffer(var6);
        }

        return var8;
    }
}
🎜 해당 구현 EPollArrayWrapper.c는 openjdk의 기본 디렉터리(native/sun/nio/ch)에서 찾을 수 있습니다. 🎜(그런데 네이티브 메소드를 구현하려면 클래스의 메소드에 네이티브 키워드를 추가한 후 클래스 파일로 컴파일한 후 출력 .h로 변환하면 됩니다. 헤더 파일을 구현하는 방법은 에서 c/C++ 하단은 라이브러리로 컴파일되어 있으니 해당 디렉터리에 넣어주시면 됩니다) 🎜초기화 파일 메소드에서는 동적 파싱을 통해 로딩되고 최종적으로 epoll_create 및 기타 메소드가 호출되는 것을 볼 수 있습니다. 🎜
// 仅仅作了对齐以及将长度放在数组前方就返回了
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
  void* x = os::malloc(sz);
  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
  return addr_to_java(x);
UNSAFE_END
🎜채널과 버퍼 사이의 오프 힙 메모리🎜🎜사람들이 오프 힙 메모리는 누수되기 쉽다는 말을 자주 듣는데, Netty 프레임워크는 오프 힙 메모리를 사용하여 복사본을 줄이고 성능을 향상시킵니다. 그렇다면 여기서 오프 힙 메모리는 무엇을 말하는 걸까요? 궁금해서 read 메소드를 통해 드디어 IOUtil의 read 메소드를 호출한 SocketChannelImpl의 read 메소드를 추적해봤습니다. 먼저 들어오는 버퍼가 DirectBuffer인지 여부를 확인합니다. 그렇지 않은 경우(HeapByteBuffer) 임시 DirectBuffer가 생성된 다음 힙에 복사됩니다. IOUtil.read 방법: 🎜
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException {
    if(var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    } else if(var1 instanceof DirectBuffer) {
        // 为堆外内存,则直接读取
        return readIntoNativeBuffer(var0, var1, var2, var4, var5);
    } else {
        // 为堆内内存,先获取临时堆外内存
        ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var8;
        try {
            // 读取到堆外内存
            int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5);
            var6.flip();
            if(var7 > 0) {
                // 复制到堆内
                var1.put(var6);
            }

            var8 = var7;
        } finally {
            // 释放临时堆外内存
            Util.offerFirstTemporaryDirectBuffer(var6);
        }

        return var8;
    }
}

这里有一个问题就是,为什么会需要DirectBuffer以及堆外内存?通过对DirectByteBuffer的创建来分析,可以知道,通过unsafe.allocateMemory(size);来分配内存的,而对于该方法来说,可以说是直接调用malloc返回,这一块内存是不受GC管理的,也就是所说的:堆外内存容易泄漏。但是对于使用DirectByteBuffer来说,会创建一个Deallocator,注册到Cleaner里面,当对象被回收的时候,则会被直接,从而释放掉内存,减少内存泄漏。要用堆外内存,从上面的创建来看,堆外内存创建后,以long型地址保存的,而堆内内存会受到GC影响,对象会被移动,如果采用堆内内存,进行系统调用的时候,那么GC就需要停止,否则就会有问题,基于这一点,采用了堆外内存(这一块参考了R大的理解:)。

注:堆外内存的创建(unsafe.cpp):

// 仅仅作了对齐以及将长度放在数组前方就返回了
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
  void* x = os::malloc(sz);
  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
  return addr_to_java(x);
UNSAFE_END

위 내용은 Java NIO 원리 분석 및 기본 사용법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.