本文詳細介紹組成非阻塞通訊的幾大類別:Buffer、Channel、Selector、SelectionKey
ServerSocketChannel透過open方法取得ServerSocketChannel,透過ServerSocketChannel設定為非阻塞模式,再透過ServerSocketChannel取得socket,綁定定服務進程監聽埠。服務啟動成功。
然後就是非阻塞通訊的精髓了,Selector透過靜態的open()方法取得到Selector,然後ServerSocketChannel註冊Selection.OP_ACCEPT事件到Selector。
Selector就會監控事件發生,Selector透過select()監控已發生的SelectionKey物件的數目,透過selectKeys()方法傳回對應的selectionKey物件集合。遍歷此集合得到對應的selectionKey對象,透過該物件的channel()方法取得關聯的ServerSocketChannel對象, 透過selector()方法就可以取得關聯的Selector物件。
透過上面取得的ServerSocketChannel執行accept()方法取得SocketChannel,再透過SocketChannel設定為非阻塞模式,在將SocketChannel註冊到上面建立的Selector上,註冊SelectionKey .OP_READ |SelectionKey.OP_WRITE
事件。
Selector將在監控對應上面綁定的事件,監控到對應的事件的話執行讀取和寫入的操作。
上面描述了服務端非阻塞方式通訊的一個流程,以下透過具體程式碼實作:
/** * 非阻塞模式 * */public class EchoServer2 { private Selector selector = null; private ServerSocketChannel serverSocketChannel = null; private int port = 8001; private Charset charset = Charset.forName("UTF-8"); public EchoServer2() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); //服务器重启的时候,重用端口 serverSocketChannel.socket().setReuseAddress(true); //设置非阻塞模式 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动成功"); } /** * 服务方法 */ public void service() throws IOException { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Set readyKes = selector.selectedKeys(); Iterator it = readyKes.iterator(); while (it.hasNext()) { SelectionKey key = null; try { key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) { System.out.println("连接事件"); //连接事件 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = ssc.accept(); System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() + " : " + socketChannel.socket().getPort()); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } else if (key.isReadable()) { //接收数据 receive(key); } else if (key.isWritable()) { //发送数据 send(key); } } catch (IOException e) { e.printStackTrace(); try { if (key != null) { key.cancel(); key.channel().close(); } }catch (IOException ex){ ex.printStackTrace(); } } } } } private void send(SelectionKey key) throws IOException { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel channel = (SocketChannel) key.channel(); buffer.flip(); //把极限设置为位置,把位置设置为0 String data = decode(buffer); if (data.indexOf("\r\n") == -1) { return; } String outputData = data.substring(0, data.indexOf("\n") + 1); System.out.println("请求数据:" + outputData); ByteBuffer outputBuffer = encode("echo:" + outputData); while (outputBuffer.hasRemaining()) { channel.write(outputBuffer); } ByteBuffer temp = encode(outputData); buffer.position(temp.limit()); buffer.compact(); if (outputData.equals("bye\r\n")) { key.cancel(); channel.close(); System.out.println("关闭与客户的连接"); } } private String decode(ByteBuffer buffer) { CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toString(); } private ByteBuffer encode(String s) { return charset.encode(s); } private void receive(SelectionKey key) throws IOException { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); buffer.limit(buffer.capacity()); buffer.put(readBuff); } public static void main(String[] args) throws IOException { new EchoServer2().service(); } }/** * 创建非阻塞客户端 * */public class EchoClient2 { private SocketChannel socketChannel; private int port = 8001; private Selector selector; private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); private Charset charset = Charset.forName("UTF-8"); public EchoClient2() throws IOException { socketChannel = SocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); socketChannel.connect(inetSocketAddress);// socketChannel.configureBlocking(false);//设置为非阻塞模式 System.out.println("与服务器连接成功"); selector = Selector.open(); } public static void main(String[] args) throws IOException { final EchoClient2 client = new EchoClient2(); Thread receiver = new Thread(new Runnable() { @Override public void run() { client.receiveFromUser(); } }); receiver.start(); client.talk(); } private void receiveFromUser() { try { System.out.println("请输入数据:"); BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in)); String msg = null; while ((msg = localReader.readLine()) != null) { System.out.println("用户输入的数据:" + msg); synchronized (sendBuffer) { sendBuffer.put(encode(msg + "\r\n")); } if (msg.equalsIgnoreCase("bye")) { break; } } } catch (IOException e) { e.printStackTrace(); } } private ByteBuffer encode(String s) { return charset.encode(s); } private void talk() throws IOException { socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = null; try { key = it.next(); it.remove(); if (key.isReadable()) { //System.out.println("读事件"); //读事件 receive(key); } if (key.isWritable()) { // System.out.println("写事件"); //写事件 send(key); } } catch (IOException e) { e.printStackTrace(); if (key != null) { key.cancel(); key.channel().close(); } } } } } private void send(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); synchronized (sendBuffer) { sendBuffer.flip();//把极限设为位置,把位置设为零 channel.write(sendBuffer); sendBuffer.compact();//删除已经发送的数据。 } } private void receive(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); channel.read(receiveBuffer); receiveBuffer.flip();//将limit的值设置为position的值,将position的值设置为0 String receiveData = decode(receiveBuffer); if (receiveData.indexOf("\n") == -1) { return; } String outputData = receiveData.substring(0, receiveData.indexOf("\n") + 1); System.out.println("响应数据:" + outputData); if (outputData.equalsIgnoreCase("echo:bye\r\n")) { key.cancel(); socketChannel.close(); ; System.out.println("关闭与服务器的连接"); selector.close(); System.exit(0); } ByteBuffer temp = encode(outputData); receiveBuffer.position(temp.limit()); receiveBuffer.compact();//删除已经打印的数据 } private String decode(ByteBuffer receiveBuffer) { CharBuffer buffer = charset.decode(receiveBuffer); return buffer.toString(); } }
緩衝區
通道
#Selector
#作用:減少物理讀取和寫入次數,減少記憶體建立和銷毀次數。 緩衝區的屬性:capacity(最大容量)、limit(實際容量)、position(目前位置)。 PS:其他地方是翻譯成capacity(容量)、limit(極限)、position位置),我個人覺得翻譯成上面的更好理解,為啥透過下面的方法解析和圖解就可明白。當然最好透過英文表達這樣最清楚。
三個屬性的關係為:capacity≥limit≥position≥0
圖解關係如下:
緩衝區類別結構:
java.nio.ByteBuffer類是一個抽象類,不能被實例化。但是提供了8個具體的實現類,其中最基本的緩衝區是ByteBuffer,它存放的資料單元是位元組。
常用方法:
clear():把limit設定為capacity,再把位置設為0
# flip():把limit設定為position,再把位置設為0。
rewind():不改變limit,把位置設為0。
allocate():建立一個緩衝區中,方法參數指定緩衝區大小
compact():將緩衝區的目前位置和界限之間的位元組(如果有)複製到緩衝區的開始處。
測試上述方法:
測試clear()方法
@Test public void testClear() { //创建一个10chars大小的缓冲区,默认情况下limit和capacity是相等的 CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); buffer.limit(8);//修改limit的值 System.out.println("修改limit后"); printBufferInfo(buffer); // clear():把limit设置为capacity,再把位置设为0 buffer.clear(); System.out.println("执行clear()方法后"); printBufferInfo(buffer); }
執行結果如下:
測試flip()方法:
@Test public void testFlip() { CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); //put的方法会修改position的值 buffer.put('H'); buffer.put('E'); buffer.put('L'); buffer.put('L'); buffer.put('O'); System.out.println("调用put方法后:"); printBufferInfo(buffer); //flip():把limit设置为position,再把位置设置为0。 buffer.flip(); System.out.println("调用flip方法后:"); printBufferInfo(buffer); }
執行結果如下:
#測試rewind()方法
@Test public void testRewind() { CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); //put的方法会修改position的值 buffer.put('H'); buffer.put('E'); buffer.put('L'); buffer.put('L'); buffer.put('O'); buffer.limit(8); System.out.println("调用put、limit方法后:"); printBufferInfo(buffer); //rewind():不改变limit,把位置设为0。 buffer.rewind(); System.out.println("调用rewind方法后:"); printBufferInfo(buffer); }
執行結果如下:
測試compact()方法
@Test public void testCompact(){ CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); //put的方法会修改position的值 buffer.put('H'); buffer.put('E'); buffer.put('L'); buffer.put('L'); buffer.put('O'); buffer.limit(8);//修改limit的值 System.out.println("调用put和limit方法后:"); printBufferInfo(buffer); System.out.println("调用compact方法后:"); //将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。 buffer.compact(); printBufferInfo(buffer); }
##這是JDK中介紹此方法的作用:
將緩衝區的目前位置和界限之間的位元組(如果有)複製到緩衝區的開始處。即將索引p = position() 處的位元組複製到索引0 處,將索引p + 1 處的位元組複製到索引1 處,依此類推,直到將索引limit() - 1 處的位元組複製到索引n = limit() - 1 - p 處。然後將緩衝區的位置設為 n+1,並將其界限設定為其容量。如果已定義了標記,則丟棄它。官方表示的太難理解了:#
将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。并将limit(实际容量)设置为 capacity(最大容量)。执行compact()方法前,limit的值是:8,position的值是:5。按照上面描述的执行完compact()后,position的值计算方式是:n+1;n=limit-1-p;所有n=8-1-5=2,最后position的值为:2+1=3。和程序运行的结果一致。
可以在这种情况:从缓冲区写入数据之后调用此方法,以防写入不完整。
buf.clear(); // Prepare buffer for use while (in.read(buf) >= 0 || buf.position != 0) { buf.flip(); out.write(buf); buf.compact(); // In case of partial write }
如果out.write()方法没有将缓存中的数据读取完,这个时候的position位置指向的是剩余数据的位置。达到防止写入不完整。
作用: 连接缓冲区与数据源或数据目的地。
常用类:
Channel
接口有下面两个子接口ReadableByteChannel和WritableByteChannel和一个抽象实现类SelectableChannel。
在ReadableByteChannel接口中申明了read(ByteBuffer
dst)方法。在WritableByteChannel接口中申明了write(ByteBuffer[]
srcs):方法。SelectableChannel抽象类中主要方法,configureBlocking(boolean
block)、register();方法。 ByteChannel
接口继承了ReadableChannel和WritableChannel。所以ByteChannel具有读和写的功能。ServerSocketChannel继承了SelectableChannel类抽象类,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件功能。
SocketChannel也继承了SelectableChannel类还实现ByteChannel接口,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件、从缓冲区读写数据的功能。
通过类图展现:
作用:只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监听这些事件的发生。
流程:
Selector通过静态的open()方法创建一个Selector对象,SelectableChannel类向Selector注册了特定的事件。Selector就会监控这些事件发生,Selector通过select()监控已发生的SelectionKey对象的数目,通过selectKeys()方法返回对应的selectionKey对象集合。遍历该集合得到相应的selectionKey对象,通过该对象的channel()方法获取关联的SelectableChannel对象,
通过selector()方法就可以获取关联的Selector对象。
Note:
当Selector的select()方法还有一个重载方式:select(long timeout)。并且该方法采用阻塞的工作方式,如果相关事件的selectionKey对象的数目一个也没有,就进入阻塞状态。知道出现以下情况之一,才从select()方法中返回。
至少有一个SelectionKey的相关事件已经发生。
其他线程调用了Selector的wakeup()方法,导致执行select()方法的线程立即返回。
当前执行的select()方法的线程被中断。
超出了等待时间。仅限调用select(long timeout)方法时出现。如果没有设置超时时间,则永远不会超时。
Selector类有两个非常重要的方法: 静态方法open(),这是Selector的静态工厂方法,创建一个Selector对象。
selectedKeys()方法返回被Selector捕获的SelectionKey的集合。
作用:
ServerSocketChannel或SocketChannel通过register()方法向Selector注册事件时,register()方法会创建一个SelectionKey对象,该对象是用来跟踪注册事件的句柄。在SelectionKey对象的有效期间,Selector会一直监控与SelectionKey对象相关的事件,如果事件发生,就会把SelectionKey对象添加到Selected-keys集合中。SelectionKey中定义的事件: 定义了4种事件:
1、SelectionKey.OP_ACCEPT:接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个连接了。常量值为16.
2、SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功。常量值为8.
3、SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读数据可以执行读操作。常量值为1.
4、SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了。常量值为4.常用方法:
channel()方法:傳回與它關聯的SelectedChannel(包括ServerSocketChannel和SocketChannel)。
selector()方法:傳回與它關聯的Selector物件。
它們之間的關係如下:
以上就是Java網路程式設計由淺入深三一文了解非阻塞通訊的圖文程式碼範例詳解的內容,更多相關內容請關注PHP中文網(www.php.cn)!