首頁 >Java >java教程 >Java NIO核心元件的深入理解

Java NIO核心元件的深入理解

不言
不言轉載
2018-11-16 17:42:222557瀏覽

這篇文章帶給大家的內容是關於Java NIO核心元件的深入理解,有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

同步、非同步、阻塞、非阻塞

首先,這幾個概念非常容易搞混淆,但NIO中又有涉及,所以總結一下[1]。

  • 同步:API呼叫返回時呼叫者就知道操作的結果如何了(實際讀取/寫入了多少位元組)。

  • 非同步:相對於同步,API呼叫返回時呼叫者不知道操作的結果,後面才會回呼通知結果。

  • 阻塞:當無資料可讀,或無法寫入所有資料時,掛起目前執行緒等待。

  • 非阻塞:讀取時,可以讀多少資料就讀多少然後返回,寫入時,可以寫入多少資料就寫入多少然後返回。

對於I/O操作,根據Oracle官網的文檔,同步非同步的劃分標準是“呼叫者是否需要等待I/O操作完成”,這個“等待I/O操作完成」的意思不是指一定要讀取到數據或說寫入所有數據,而是指真正進行I/O操作時,例如數據在TCP/IP協定棧緩衝區和JVM緩衝區之間傳輸的這段時間,呼叫者是否要等待。

所以,我們常用的read() 和write() 方法都是同步I/O,同步I/O又分為阻塞和非阻塞兩種模式,如果是非阻塞模式,偵測到無數據可讀時,直接就返回了,並沒有真正執行I/O操作。

總結就是,Java中實際上只有同步阻塞I/O、同步非阻塞I/O 與非同步I/O 三種機制,我們下文所說的是前兩種,JDK 1.7才開始引入非同步I/O,那稱為NIO.2。

傳統IO

我們知道,一個新技術的出現總是伴隨著改進和提升,Java NIO的出現也是如此。

傳統 I/O 是阻塞式I/O,主要問題是系統資源的浪費。例如我們為了讀取一個TCP連接的數據,呼叫InputStream 的read() 方法,這會使當前線程被掛起,直到有數據到達才被喚醒,那該線程在數據到達這段時間內,佔用著內存資源(儲存線程棧)卻無所作為,也就是俗話說的佔著茅坑不拉屎,為了讀取其他連接的數據,我們不得不啟動另外的線程。在並發連線數量不多的時候,這可能沒什麼問題,然而當連線數量達到一定規模,記憶體資源會被大量執行緒消耗殆盡。另一方面,執行緒切換需要更改處理器的狀態,例如程式計數器、暫存器的值,因此非常頻繁的在大量執行緒之間切換,同樣是一種資源浪費。

隨著科技的發展,現代作業系統提供了新的I/O機制,可以避免這種資源浪費。基於此,誕生了Java NIO,NIO的代表性特徵就是非阻塞I/O。緊接著我們發現,簡單的使用非阻塞I/O並不能解決問題,因為在非阻塞模式下,read()方法在沒有讀取到資料時就會立即返回,不知道資料何時到達的我們,只能不停的呼叫read()方法進行重試,這顯然太浪費CPU資源了,從下文可以知道,Selector元件正是為解決此問題而生。

Java NIO 核心元件

1.Channel

概念

Java NIO中的所有I/O操作都基於Channel對象,就像串流操作都要基於Stream物件一樣,因此很有必要先了解Channel是什麼。以下內容摘自JDK 1.8的文檔

A channel represents an open connection to an entity such as a
hardware device, a file, a network socket, or a program component that##is capableone of performing 書或 more distinct I/O operations, for
example reading or writing.

從上述內容可知,一個Channel(通道)代表和某一實體的連接,這個實體可以是文件、網路套接字等。也就是說,通道是Java NIO提供的一座橋樑,用於我們的程式和作業系統底層I/O服務進行互動。

通道是一種很基本很抽象的描述,和不同的I/O服務交互,執行不同的I/O操作,實現不一樣,因此具體的有FileChannel、SocketChannel等。

通道使用起來跟Stream比較像,可以讀取資料到Buffer中,也可以把Buffer中的資料寫入通道。

Java NIO核心元件的深入理解

當然,也有區別,主要體現在如下兩點:

  • 一個通道,既可以讀取又可以寫,而一個Stream是單向的(所以分InputStream 和OutputStream)

  • #通道有非阻塞I/O模式

實作

Java NIO中最常用的通道實作是如下幾個,可以看出跟傳統的I/O 操作類別是一一對應的。

  • FileChannel:讀寫檔案

  • DatagramChannel: UDP協定網路通訊

  • SocketChannel:TCP協定網路通訊

  • ServerSocketChannel:監聽TCP連線

2.Buffer

NIO中所使用的緩衝區不是一個簡單的byte數組,而是封裝過的Buffer類,透過它提供的API,我們可以靈活的操縱數據,下面細細道來。

與Java基本類型相對應,NIO提供了多種Buffer 類型,如ByteBuffer、CharBuffer、IntBuffer等,差異就是讀寫緩衝區時的單位長度不一樣(以對應類型的變數為單位進行讀寫)。

Buffer中有3個很重要的變量,它們是理解Buffer工作機制的關鍵,分別是

  • capacity (總容量)

  • position(指標目前位置)

  • limit (讀/寫邊界位置)

Buffer的工作方式跟C語言裡的字元數組非常的像,類比一下,capacity就是數組的總長度,position就是我們讀/寫字元的下標變量,limit就是結束符的位置。 Buffer初始時3個變數的情況如下圖

Java NIO核心元件的深入理解

#在對Buffer進行讀取/寫入的過程中,position會往後移動,而limit就是position 移動的邊界。由此不難想像,在對Buffer進行寫入操作時,limit應設定為capacity的大小,而對Buffer進行讀取操作時,limit應設定為資料的實際結束位置。 (注意:將Buffer資料寫入通道是Buffer 讀取操作,從通道讀取資料到Buffer是Buffer 寫入操作)

在對Buffer進行讀/寫操作前,我們可以呼叫Buffer類別提供的一些輔助方法來正確設定position 和limit 的值,主要有以下幾個

  • flip(): 設定limit 為position 的值,然後position 置為0。對Buffer進行讀取操作前呼叫。

  • rewind(): 僅將 position
     置0。一般是在重新讀取Buffer資料前調用,例如要讀取同一個Buffer的資料寫入多個通道時會用到。

  • clear(): 回到初始狀態,即 limit 等於 capacity,position 置0。重新對Buffer進行寫入操作前呼叫。

  • compact(): 將未讀取的資料(position 與limit 之間的資料)移到緩衝區開頭,並將position
     設定為這段資料末尾的下一個位置。其實就等價於重新向緩衝區寫入了這麼一段資料。

然後,看一個實例,使用FileChannel 讀寫文字文件,透過這個例子驗證通道可讀可寫的特性以及Buffer的基本用法(注意FileChannel 不能設定為非阻塞模式)。

FileChannel channel = new RandomAccessFile("test.txt", "rw").getChannel();
channel.position(channel.size());  // 移动文件指针到末尾(追加写入)

ByteBuffer byteBuffer = ByteBuffer.allocate(20);

// 数据写入Buffer
byteBuffer.put("你好,世界!\n".getBytes(StandardCharsets.UTF_8));

// Buffer -> Channel
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
    channel.write(byteBuffer);
}

channel.position(0); // 移动文件指针到开头(从头读取)
CharBuffer charBuffer = CharBuffer.allocate(10);
CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();

// 读出所有数据
byteBuffer.clear();
while (channel.read(byteBuffer) != -1 || byteBuffer.position() > 0) {
    byteBuffer.flip();

    // 使用UTF-8解码器解码
    charBuffer.clear();
    decoder.decode(byteBuffer, charBuffer, false);
    System.out.print(charBuffer.flip().toString());

    byteBuffer.compact(); // 数据可能有剩余
}

channel.close();

這個例子中使用了兩個Buffer,其中 byteBuffer 作為通道讀寫的資料緩衝區,charBuffer 用於儲存解碼後的字元。 clear() 和flip() 的用法正如上文所述,需要注意的是最後那個compact() 方法,即使charBuffer 的大小完全足以容納byteBuffer 解碼後的數據,這個compact() 也必不可少,這是因為常用中文字元的UTF-8編碼佔3個字節,因此有很大機率出現在中間截斷的情況,請看下圖:

Java NIO核心元件的深入理解

當Decoder 讀取到緩衝區末端的0xe4 時,無法將其對應到一個Unicode,decode()方法第三個參數false 的作用就是讓Decoder 把無法對應的位元組及其後面的資料都視作附加數據,因此decode() 方法會在此處停止,並且position 會回退到0xe4 的位置。如此一來, 緩衝區中就遺留了「中」字編碼的第一個位元組,必須將其 compact 到前面,以正確的和後序資料拼接起來。

BTW,範例中的 CharsetDecoder 也是 Java NIO 的新特性,所以大家應該發現了一點哈,NIO的操作是面向緩衝區的(傳統I/O是面向流的)。

至此,我們了解了 Channel 與 Buffer 的基本用法。接下來要說的是讓一個執行緒管理多個Channel的重要元件。

3.Selector

Selector 是什麼

Selector(選擇器)是一個特殊的元件,用於採集各個通道的狀態(或者說事件)。我們先將通道註冊到選擇器,並設定好關心的事件,然後就可以透過呼叫select()方法,靜靜地等待事件發生。

通道有以下4個事件可供我們監聽:

  • Accept:有可以接受的連線

  • Connect:連線成功

  • Read:有資料可讀

  • Write:可以寫入資料了

为什么要用Selector

前文说了,如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。

使用方法

如下所示,创建一个Selector,并注册一个Channel。

注意:要将 Channel 注册到 Selector,首先需要将 Channel 设置为非阻塞模式,否则会抛异常。

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

register()方法的第二个参数名叫“interest set”,也就是你所关心的事件集合。如果你关心多个事件,用一个“按位或运算符”分隔,比如

SelectionKey.OP_READ | SelectionKey.OP_WRITE复制代码

这种写法一点都不陌生,支持位运算的编程语言里都这么玩,用一个整型变量可以标识多种状态,它是怎么做到的呢,其实很简单,举个例子,首先预定义一些常量,它们的值(二进制)如下

Java NIO核心元件的深入理解

可以发现,它们值为1的位都是错开的,因此对它们进行按位或运算之后得出的值就没有二义性,可以反推出是由哪些变量运算而来。怎么判断呢,没错,就是“按位与”运算。比如,现在有一个状态集合变量值为 0011,我们只需要判断 “0011 & OP_READ” 的值是 1 还是 0 就能确定集合是否包含 OP_READ 状态。

然后,注意 register() 方法返回了一个SelectionKey的对象,这个对象包含了本次注册的信息,我们也可以通过它修改注册信息。从下面完整的例子中可以看到,select()之后,我们也是通过获取一个 SelectionKey 的集合来获取到那些状态就绪了的通道。

一个完整实例

概念和理论的东西阐述完了(其实写到这里,我发现没写出多少东西,好尴尬(⊙ˍ⊙)),看一个完整的例子吧。

这个例子使用Java NIO实现了一个单线程的服务端,功能很简单,监听客户端连接,当连接建立后,读取客户端的消息,并向客户端响应一条消息。

需要注意的是,我用字符 ‘0′(一个值为0的字节) 来标识消息结束。

单线程Server

public class NioServer {

public static void main(String[] args) throws IOException {
    // 创建一个selector
    Selector selector = Selector.open();

    // 初始化TCP连接监听通道
    ServerSocketChannel listenChannel = ServerSocketChannel.open();
    listenChannel.bind(new InetSocketAddress(9999));
    listenChannel.configureBlocking(false);
    // 注册到selector(监听其ACCEPT事件)
    listenChannel.register(selector, SelectionKey.OP_ACCEPT);

    // 创建一个缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(100);

    while (true) {
        selector.select(); //阻塞,直到有监听的事件发生
        Iterator<selectionkey> keyIter = selector.selectedKeys().iterator();

        // 通过迭代器依次访问select出来的Channel事件
        while (keyIter.hasNext()) {
            SelectionKey key = keyIter.next();

            if (key.isAcceptable()) { // 有连接可以接受
                SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
                channel.configureBlocking(false);
                channel.register(selector, SelectionKey.OP_READ);

                System.out.println("与【" + channel.getRemoteAddress() + "】建立了连接!");

            } else if (key.isReadable()) { // 有数据可以读取
                buffer.clear();

                // 读取到流末尾说明TCP连接已断开,
                // 因此需要关闭通道或者取消监听READ事件
                // 否则会无限循环
                if (((SocketChannel) key.channel()).read(buffer) == -1) {
                    key.channel().close();
                    continue;
                } 

                // 按字节遍历数据
                buffer.flip();
                while (buffer.hasRemaining()) {
                    byte b = buffer.get();

                    if (b == 0) { // 客户端消息末尾的\0
                        System.out.println();

                        // 响应客户端
                        buffer.clear();
                        buffer.put("Hello, Client!\0".getBytes());
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            ((SocketChannel) key.channel()).write(buffer);
                        }
                    } else {
                        System.out.print((char) b);
                    }
                }
            }

            // 已经处理的事件一定要手动移除
            keyIter.remove();
        }
    }
}
}</selectionkey>

Client

这个客户端纯粹测试用,为了看起来不那么费劲,就用传统的写法了,代码很简短。

要严谨一点测试的话,应该并发运行大量Client,统计服务端的响应时间,而且连接建立后不要立刻发送数据,这样才能发挥出服务端非阻塞I/O的优势。

public class Client {

public static void main(String[] args) throws Exception {
    Socket socket = new Socket("localhost", 9999);
    InputStream is = socket.getInputStream();
    OutputStream os = socket.getOutputStream();

    // 先向服务端发送数据
    os.write("Hello, Server!\0".getBytes());

    // 读取服务端发来的数据
    int b;
    while ((b = is.read()) != 0) {
        System.out.print((char) b);
    }
    System.out.println();

    socket.close();
}
}

NIO vs IO

学习了NIO之后我们都会有这样一个疑问:到底什么时候该用NIO,什么时候该用传统的I/O呢?

其实了解他们的特性后,答案还是比较明确的,NIO擅长1个线程管理多条连接,节约系统资源,但是如果每条连接要传输的数据量很大的话,因为是同步I/O,会导致整体的响应速度很慢;而传统I/O为每一条连接创建一个线程,能充分利用处理器并行处理的能力,但是如果连接数量太多,内存资源会很紧张。

总结就是:连接数多数据量小用NIO,连接数少用I/O(写起来也简单- -)。

Next

经过NIO核心组件的学习,了解了非阻塞服务端实现的基本方法。然而,细心的你们肯定也发现了,上面那个完整的例子,实际上就隐藏了很多问题。比如,例子中只是简单的将读取到的每个字节输出,实际环境中肯定是要读取到完整的消息后才能进行下一步处理,由于NIO的非阻塞特性,一次可能只读取到消息的一部分,这已经很糟糕了,如果同一条连接会连续发来多条消息,那不仅要对消息进行拼接,还需要切割,同理,例子中给客户端响应的时候,用了个while()循环,保证数据全部write完成再做其它工作,实际应用中为了性能,肯定不会这么写。另外,为了充分利用现代处理器多核心并行处理的能力,应该用一个线程组来管理这些连接的事件。

要解决这些问题,需要一个严谨而繁琐的设计,不过幸运的是,我们有开源的框架可用,那就是优雅而强大的Netty,Netty基于Java NIO,提供异步调用接口,开发高性能服务器的一个很好的选择,之前在项目中使用过,但没有深入学习,打算下一步好好学学它,到时候再写一篇笔记。

Java NIO設計的目標是為程式設計師提供API以享受現代作業系統最新的I/O機制,所以覆蓋面較廣,除了文中所涉及的組件與特性,還有很多其它的,例如Pipe(管道)、Path(路徑)、Files(檔案) 等,有的是用來提升I/O效能的新元件,有的是簡化I/O操作的工具,具體用法可以參考最後References 裡的連結。


以上是Java NIO核心元件的深入理解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:segmentfault.com。如有侵權,請聯絡admin@php.cn刪除