首頁 >Java >java教程 >Java中關於NIO核心元件的詳細介紹

Java中關於NIO核心元件的詳細介紹

黄舟
黄舟原創
2017-07-18 09:43:041771瀏覽

背景知識

同步、非同步、阻塞、非阻塞

首先,這幾個概念非常容易搞混淆,但NIO中又有涉及,所以總結一下[1]。

  • 同步:API呼叫返回時呼叫者就知道操作的結果如何了(實際讀取/寫入了多少位元組)。

  • 非同步:相對於同步,API呼叫返回時呼叫者不知道操作的結果,後面才會回呼通知結果。

  • 阻塞:當無資料可讀,或無法寫入所有資料時,掛起目前執行緒等待。

  • 非阻塞:讀取時,可以讀多少資料就讀多少然後返回,寫入時,可以寫入多少資料就寫入多少然後返回。

對於I/O操作,根據Oracle官網的文檔,同步非同步的劃分標準是“呼叫者是否需要等待I/O操作完成”,這個“等待I/O操作完成」的意思不是指一定要讀取到數據或說寫入所有數據,而是指真正進行I/O操作時,例如數據在TCP/IP協定棧緩衝區和JVM緩衝區之間傳輸的這段時間,呼叫者是否要等待。

所以,我們常用的read() 和write() 方法都是同步I/O,同步I/O又分為阻塞和非阻塞兩種模式,如果是非阻塞模式,偵測到無數據可讀時,直接就返回了,並沒有真正執行I/O操作。

總結就是,Java中實際上只有同步阻塞I/O、同步非阻塞I/O 與非同步I/O 三種機制,我們下文所說的是前兩種,JDK 1.7才開始引入非同步I/O,那稱為NIO.2。

傳統IO

我們知道,一個新技術的出現總是伴隨著改進和提升,Java NIO的出現也是如此。

傳統 I/O 是阻塞式I/O,主要問題是系統資源的浪費。例如我們為了讀取一個TCP連接的數據,呼叫InputStream 的read() 方法,這會使當前線程被掛起,直到有數據到達才被喚醒,那該線程在數據到達這段時間內,佔用著內存資源(儲存線程棧)卻無所作為,也就是俗話說的佔著茅坑不拉屎,為了讀取其他連接的數據,我們不得不啟動另外的線程。在並發連線數量不多的時候,這可能沒什麼問題,然而當連線數量達到一定規模,記憶體資源會被大量執行緒消耗殆盡。另一方面,執行緒切換需要更改處理器的狀態,例如程式計數器、暫存器的值,因此非常頻繁的在大量執行緒之間切換,同樣是一種資源浪費。

隨著科技的發展,現代作業系統提供了新的I/O機制,可以避免這種資源浪費。基於此,誕生了Java NIO,NIO的代表性特徵就是非阻塞I/O。緊接著我們發現,簡單的使用非阻塞I/O並不能解決問題,因為在非阻塞模式下,read()方法在沒有讀取到資料時就會立即返回,不知道資料何時到達的我們,只能不停的呼叫read()方法進行重試,這顯然太浪費CPU資源了,從下文可以知道,Selector元件正是為解決此問題而生。

Java NIO 核心元件

1.Channel

概念

Java NIO中的所有I/O操作都基於Channel對象,就像串流操作都要基於Stream物件一樣,因此很有必要先了解Channel是什麼。以下內容摘自JDK 1.8的文檔

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one / / 花! O operations, for example reading or writing.

從上述內容可知,一個Channel(通道)代表和某一實體的連接,這個實體可以是文件、網絡套接字等。也就是說,通道是Java NIO提供的一座橋樑,用於我們的程式和作業系統底層I/O服務進行互動。

通道是一種很基本很抽象的描述,和不同的I/O服務交互,執行不同的I/O操作,實現不一樣,因此具體的有FileChannel、SocketChannel等。

通道使用起來跟Stream比較像,可以讀取資料到Buffer中,也可以把Buffer中的資料寫入通道。

當然,也有區別,主要體現在如下兩點:

  • 一個通道,既可以讀又可以寫,而一個Stream是單向的(所以分InputStream 和OutputStream)

  • 」通道有非阻塞I/O模式

實作

Java NIO中最常用的通道實作是如下幾個,可以看出跟傳統的I/O 操作類別是一一對應的。

  • FileChannel:讀寫檔案

  • DatagramChannel: UDP協定網路通訊

  • SocketChannel:TCP協定網路通訊

  • ServerSocketChannel:監聽TCP連線

    #

2.Buffer

NIO中所使用的緩衝區不是一個簡單的byte數組,而是封裝過的Buffer類,透過它提供的API,我們可以靈活的操縱數據,下面細細道來。

與Java基本類型相對應,NIO提供了多種Buffer 類型,如ByteBuffer、CharBuffer、IntBuffer等,差異就是讀寫緩衝區時的單位長度不一樣(以對應類型的變數為單位進行讀寫)。

Buffer中有3個很重要的變量,它們是理解Buffer工作機制的關鍵,分別是

  • capacity (總容量)

  • position(指標目前位置)

  • limit (讀/寫邊界位置)

Buffer的工作方式跟C語言裡的字元數組非常的像,類比一下,capacity就是數組的總長度,position就是我們讀/寫字元的下標變量,limit就是結束符的位置。 Buffer初始時3個變數的情況如下圖

在對Buffer進行讀取/寫入的過程中,position會往後移動,而 limit 就是 position 移動的邊界。由此不難想像,在對Buffer進行寫入操作時,limit應設定為capacity的大小,而對Buffer進行讀取操作時,limit應設定為資料的實際結束位置。 (註:將Buffer資料 寫入 頻道是Buffer 讀取 操作,從頻道 讀取 資料到Buffer是Buffer 寫入#操作)

在對Buffer進行讀取/寫入操作前,我們可以呼叫Buffer類別提供的一些輔助方法來正確設定position 和limit 的值,主要有以下幾個

  • flip(): 設定limit 為position 的值,然後position 置為0。對Buffer進行讀取操作前呼叫。

  • rewind(): 僅將 position 置0。一般是在重新讀取Buffer資料前調用,例如要讀取同一個Buffer的資料寫入多個通道時會用到。

  • clear(): 回到初始狀態,即 limit 等於 capacity,position 置0。重新對Buffer進行寫入操作前呼叫。

  • compact(): 將未讀取的資料(position 與limit 之間的資料)移到緩衝區開頭,並將position 設定為這段資料末尾的下一個位置。其實就等價於重新向緩衝區寫入了這麼一段資料。

然後,看一個實例,使用FileChannel 讀寫文字文件,透過這個例子驗證通道可讀可寫的特性以及Buffer的基本用法(注意FileChannel 不能設定為非阻塞模式)。

    FileChannel channel = new RandomAccessFile("test.txt", "rw").getChannel();
    channel.position(channel.size());  // 移动文件指针到末尾(追加写入)

    ByteBuffer byteBuffer = ByteBuffer.allocate(20);

    // 数据写入Buffer
    byteBuffer.put("你好,世界!\n".getBytes(StandardCharsets.UTF_8));

    // Buffer -> Channel
    byteBuffer.flip();
    while (byteBuffer.hasRemaining()) {
        channel.write(byteBuffer);
    }

    channel.position(0); // 移动文件指针到开头(从头读取)
    CharBuffer charBuffer = CharBuffer.allocate(10);
    CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();

    // 读出所有数据
    byteBuffer.clear();
    while (channel.read(byteBuffer) != -1 || byteBuffer.position() > 0) {
        byteBuffer.flip();

        // 使用UTF-8解码器解码
        charBuffer.clear();
        decoder.decode(byteBuffer, charBuffer, false);
        System.out.print(charBuffer.flip().toString());

        byteBuffer.compact(); // 数据可能有剩余
    }

    channel.close();

這個例子中使用了兩個Buffer,其中 byteBuffer 作為通道讀寫的資料緩衝區,charBuffer 用於儲存解碼後的字元。 clear() 和flip() 的用法正如上文所述,需要注意的是最後那個compact() 方法,即使charBuffer 的大小完全足以容納byteBuffer 解碼後的數據,這個compact() 也必不可少,這是因為常用中文字元的UTF-8編碼佔3個字節,因此有很大機率出現在中間截斷的情況,請看下圖:

當Decoder 讀取到緩衝區末端的0xe4 時,無法將其對應到一個Unicode,decode()方法第三個參數false 的作用就是讓Decoder 把無法對應的位元組及其後面的資料都視為附加數據,因此decode () 方法會在此停止,且position 會回退到0xe4 的位置。如此一來, 緩衝區中就遺留了「中」字編碼的第一個位元組,必須將其 compact 到前面,以正確的和後序資料拼接起來。 (關於字元編碼,可以參考我的前一篇文章:http://www.cnblogs.com/coderjun/p/5117590.html)

BTW,範例中的CharsetDecoder 也是Java NIO 的一個新特性,所以大家應該發現了一點哈,NIO的操作是面向緩衝區的(傳統I/O是面向流的)。

至此,我們了解了 Channel 與 Buffer 的基本用法。接下來要說的是讓一個執行緒管理多個Channel的重要元件。

3.Selector

Selector 是什麼

Selector(選擇器)是一個特殊的元件,用於採集各個通道的狀態(或者說事件)。我們先將通道註冊到選擇器,並設定好關心的事件,然後就可以透過呼叫select()方法,靜靜地等待事件發生。

通道有以下4個事件可供我們監聽:

  • Accept:有可以接受的連線

  • Connect:連線成功

  • Read:有資料可讀

  • Write:可以寫入資料了

为什么要用Selector

前文说了,如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。

使用方法

如下所示,创建一个Selector,并注册一个Channel。

注意:要将 Channel 注册到 Selector,首先需要将 Channel 设置为非阻塞模式,否则会抛异常。

Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

register()方法的第二个参数名叫“interest set”,也就是你所关心的事件集合。如果你关心多个事件,用一个“按位或运算符”分隔,比如

SelectionKey.OP_READ | SelectionKey.OP_WRITE

这种写法一点都不陌生,支持位运算的编程语言里都这么玩,用一个整型变量可以标识多种状态,它是怎么做到的呢,其实很简单,举个例子,首先预定义一些常量,它们的值(二进制)如下

可以发现,它们值为1的位都是错开的,因此对它们进行按位或运算之后得出的值就没有二义性,可以反推出是由哪些变量运算而来。怎么判断呢,没错,就是“按位与”运算。比如,现在有一个状态集合变量值为 0011,我们只需要判断 “0011 & OP_READ” 的值是 1 还是 0 就能确定集合是否包含 OP_READ 状态。

然后,注意 register() 方法返回了一个SelectionKey的对象,这个对象包含了本次注册的信息,我们也可以通过它修改注册信息。从下面完整的例子中可以看到,select()之后,我们也是通过获取一个 SelectionKey 的集合来获取到那些状态就绪了的通道。

一个完整实例

概念和理论的东西阐述完了(其实写到这里,我发现没写出多少东西,好尴尬(⊙ˍ⊙)),看一个完整的例子吧。

这个例子使用Java NIO实现了一个单线程的服务端,功能很简单,监听客户端连接,当连接建立后,读取客户端的消息,并向客户端响应一条消息。

需要注意的是,我用字符 ‘\0′(一个值为0的字节) 来标识消息结束。

单线程Server

public class NioServer {

    public static void main(String[] args) throws IOException {
        // 创建一个selector
        Selector selector = Selector.open();

        // 初始化TCP连接监听通道
        ServerSocketChannel listenChannel = ServerSocketChannel.open();
        listenChannel.bind(new InetSocketAddress(9999));
        listenChannel.configureBlocking(false);
        // 注册到selector(监听其ACCEPT事件)
        listenChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 创建一个缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(100);

        while (true) {
            selector.select(); //阻塞,直到有监听的事件发生
            Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();

            // 通过迭代器依次访问select出来的Channel事件
            while (keyIter.hasNext()) {
                SelectionKey key = keyIter.next();

                if (key.isAcceptable()) { // 有连接可以接受
                    SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);

                    System.out.println("与【" + channel.getRemoteAddress() + "】建立了连接!");

                } else if (key.isReadable()) { // 有数据可以读取
                    buffer.clear();

                    // 读取到流末尾说明TCP连接已断开,
                    // 因此需要关闭通道或者取消监听READ事件
                    // 否则会无限循环
                    if (((SocketChannel) key.channel()).read(buffer) == -1) {
                        key.channel().close();
                        continue;
                    } 

                    // 按字节遍历数据
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        byte b = buffer.get();

                        if (b == 0) { // 客户端消息末尾的\0
                            System.out.println();

                            // 响应客户端
                            buffer.clear();
                            buffer.put("Hello, Client!\0".getBytes());
                            buffer.flip();
                            while (buffer.hasRemaining()) {
                                ((SocketChannel) key.channel()).write(buffer);
                            }
                        } else {
                            System.out.print((char) b);
                        }
                    }
                }

                // 已经处理的事件一定要手动移除
                keyIter.remove();
            }
        }
    }
}

Client

这个客户端纯粹测试用,为了看起来不那么费劲,就用传统的写法了,代码很简短。

要严谨一点测试的话,应该并发运行大量Client,统计服务端的响应时间,而且连接建立后不要立刻发送数据,这样才能发挥出服务端非阻塞I/O的优势。

public class Client {

    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("localhost", 9999);
        InputStream is = socket.getInputStream();
        OutputStream os = socket.getOutputStream();

        // 先向服务端发送数据
        os.write("Hello, Server!\0".getBytes());

        // 读取服务端发来的数据
        int b;
        while ((b = is.read()) != 0) {
            System.out.print((char) b);
        }
        System.out.println();

        socket.close();
    }
}

以上是Java中關於NIO核心元件的詳細介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn