이 글에서는 Non-Blocking Communication을 구성하는 주요 카테고리를 자세히 소개합니다: Buffer, Channel, Selector, SelectionKey
ServerSocketChannel은 open 방식으로 ServerSocketChannel을 획득하고, 이를 통해 Non-Blocking 모드로 설정합니다. ServerSocketChannel은 ServerSocketChannel을 통해 소켓을 획득하고 바인딩하여 서비스 프로세스의 수신 포트를 결정합니다. 서비스가 성공적으로 시작되었습니다.
그럼 Non-Blocking 통신의 핵심이 있습니다. Selector는 정적 open() 메소드를 통해 Selector를 획득한 후 ServerSocketChannel이 Selection.OP_ACCEPT 이벤트를 Selector에 등록합니다.
Selector는 이벤트 발생을 모니터링합니다. Selector는 select()를 통해 발생한 SelectionKey 개체 수를 모니터링하고, selectKeys() 메서드를 통해 해당 SelectionKey 개체 컬렉션을 반환합니다. 컬렉션을 탐색하여 해당 SelectionKey 객체를 얻고, 객체의 Channel() 메서드를 통해 연관된 ServerSocketChannel 객체를 얻고, selector() 메서드를 통해 연관된 Selector 객체를 얻습니다.
위에서 얻은 ServerSocketChannel을 통해 accept() 메소드를 실행하여 SocketChannel을 얻은 후 SocketChannel을 Non-Blocking 모드로 설정하고 위에서 생성한 Selector에 SocketChannel을 등록한 후 SelectionKey.OP_READ |SelectionKey.OP_WRITE
이벤트 .
Selector는 위에 바인딩된 해당 이벤트를 모니터링하고, 해당 이벤트가 모니터링되면 읽기 및 쓰기 작업을 수행합니다.
위는 서버 측 Non-Blocking 통신 프로세스를 설명하며 특정 코드를 통해 구현됩니다. 🎜>
/** * 非阻塞模式 * */public class EchoServer2 { private Selector selector = null; private ServerSocketChannel serverSocketChannel = null; private int port = 8001; private Charset charset = Charset.forName("UTF-8"); public EchoServer2() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); //服务器重启的时候,重用端口 serverSocketChannel.socket().setReuseAddress(true); //设置非阻塞模式 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动成功"); } /** * 服务方法 */ public void service() throws IOException { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Set readyKes = selector.selectedKeys(); Iterator it = readyKes.iterator(); while (it.hasNext()) { SelectionKey key = null; try { key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) { System.out.println("连接事件"); //连接事件 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = ssc.accept(); System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() + " : " + socketChannel.socket().getPort()); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } else if (key.isReadable()) { //接收数据 receive(key); } else if (key.isWritable()) { //发送数据 send(key); } } catch (IOException e) { e.printStackTrace(); try { if (key != null) { key.cancel(); key.channel().close(); } }catch (IOException ex){ ex.printStackTrace(); } } } } } private void send(SelectionKey key) throws IOException { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel channel = (SocketChannel) key.channel(); buffer.flip(); //把极限设置为位置,把位置设置为0 String data = decode(buffer); if (data.indexOf("\r\n") == -1) { return; } String outputData = data.substring(0, data.indexOf("\n") + 1); System.out.println("请求数据:" + outputData); ByteBuffer outputBuffer = encode("echo:" + outputData); while (outputBuffer.hasRemaining()) { channel.write(outputBuffer); } ByteBuffer temp = encode(outputData); buffer.position(temp.limit()); buffer.compact(); if (outputData.equals("bye\r\n")) { key.cancel(); channel.close(); System.out.println("关闭与客户的连接"); } } private String decode(ByteBuffer buffer) { CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toString(); } private ByteBuffer encode(String s) { return charset.encode(s); } private void receive(SelectionKey key) throws IOException { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); buffer.limit(buffer.capacity()); buffer.put(readBuff); } public static void main(String[] args) throws IOException { new EchoServer2().service(); } }/** * 创建非阻塞客户端 * */public class EchoClient2 { private SocketChannel socketChannel; private int port = 8001; private Selector selector; private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); private Charset charset = Charset.forName("UTF-8"); public EchoClient2() throws IOException { socketChannel = SocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); socketChannel.connect(inetSocketAddress);// socketChannel.configureBlocking(false);//设置为非阻塞模式 System.out.println("与服务器连接成功"); selector = Selector.open(); } public static void main(String[] args) throws IOException { final EchoClient2 client = new EchoClient2(); Thread receiver = new Thread(new Runnable() { @Override public void run() { client.receiveFromUser(); } }); receiver.start(); client.talk(); } private void receiveFromUser() { try { System.out.println("请输入数据:"); BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in)); String msg = null; while ((msg = localReader.readLine()) != null) { System.out.println("用户输入的数据:" + msg); synchronized (sendBuffer) { sendBuffer.put(encode(msg + "\r\n")); } if (msg.equalsIgnoreCase("bye")) { break; } } } catch (IOException e) { e.printStackTrace(); } } private ByteBuffer encode(String s) { return charset.encode(s); } private void talk() throws IOException { socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = null; try { key = it.next(); it.remove(); if (key.isReadable()) { //System.out.println("读事件"); //读事件 receive(key); } if (key.isWritable()) { // System.out.println("写事件"); //写事件 send(key); } } catch (IOException e) { e.printStackTrace(); if (key != null) { key.cancel(); key.channel().close(); } } } } } private void send(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); synchronized (sendBuffer) { sendBuffer.flip();//把极限设为位置,把位置设为零 channel.write(sendBuffer); sendBuffer.compact();//删除已经发送的数据。 } } private void receive(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); channel.read(receiveBuffer); receiveBuffer.flip();//将limit的值设置为position的值,将position的值设置为0 String receiveData = decode(receiveBuffer); if (receiveData.indexOf("\n") == -1) { return; } String outputData = receiveData.substring(0, receiveData.indexOf("\n") + 1); System.out.println("响应数据:" + outputData); if (outputData.equalsIgnoreCase("echo:bye\r\n")) { key.cancel(); socketChannel.close(); ; System.out.println("关闭与服务器的连接"); selector.close(); System.exit(0); } ByteBuffer temp = encode(outputData); receiveBuffer.position(temp.limit()); receiveBuffer.compact();//删除已经打印的数据 } private String decode(ByteBuffer receiveBuffer) { CharBuffer buffer = charset.decode(receiveBuffer); return buffer.toString(); } }
기능: 물리적 읽기 및 쓰기 횟수를 줄이고, 메모리 생성 및 파괴 횟수. 버퍼 속성: 용량(최대 용량), 제한(실제 용량), 위치(현재 위치).그래픽 관계는 다음과 같습니다.PS: 다른 곳은 용량(capacity), 한도(limit), 위치(position)로 번역하면 위의 Why로 번역하면 이해하는 것이 더 낫다고 생각합니다. 다음 방법을 사용하여 분석과 도표를 통해 이해할 수 있습니다. 물론 가장 명확하게 영어로 표현하는 것이 가장 좋습니다. 세 가지 속성 간의 관계는 다음과 같습니다.capacity≥limit≥position≥0
버퍼 클래스 구조:
java.nio .ByteBuffer 클래스 추상 클래스이므로 인스턴스화할 수 없습니다. 그러나 8개의 특정 구현 클래스가 제공되며, 그 중 가장 기본적인 버퍼는 데이터 단위를 바이트 단위로 저장하는 ByteBuffer입니다.
위 메소드 테스트:일반적인 방법:
clear(): 용량 제한을 설정하고 위치를 0으로 설정합니다.Flip(): 위치에 제한을 설정한 다음 위치를 0으로 설정합니다.
rewind(): 제한을 변경하지 않고 위치를 0으로 설정합니다.
할당(): 버퍼를 생성합니다. 메소드 매개변수는 버퍼 크기를 지정합니다.
Compact(): 버퍼의 현재 위치와 버퍼의 시작 부분 사이의 바이트(있는 경우)를 복사합니다.
clear() 메소드 테스트
@Test public void testClear() { //创建一个10chars大小的缓冲区,默认情况下limit和capacity是相等的 CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); buffer.limit(8);//修改limit的值 System.out.println("修改limit后"); printBufferInfo(buffer); // clear():把limit设置为capacity,再把位置设为0 buffer.clear(); System.out.println("执行clear()方法后"); printBufferInfo(buffer); }실행 결과는 다음과 같습니다.
flip() 메서드 테스트:
@Test public void testFlip() { CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); //put的方法会修改position的值 buffer.put('H'); buffer.put('E'); buffer.put('L'); buffer.put('L'); buffer.put('O'); System.out.println("调用put方法后:"); printBufferInfo(buffer); //flip():把limit设置为position,再把位置设置为0。 buffer.flip(); System.out.println("调用flip方法后:"); printBufferInfo(buffer); }실행 결과는 다음과 같습니다.
되감기 테스트 () 메소드
@Test public void testRewind() { CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); //put的方法会修改position的值 buffer.put('H'); buffer.put('E'); buffer.put('L'); buffer.put('L'); buffer.put('O'); buffer.limit(8); System.out.println("调用put、limit方法后:"); printBufferInfo(buffer); //rewind():不改变limit,把位置设为0。 buffer.rewind(); System.out.println("调用rewind方法后:"); printBufferInfo(buffer); }실행 결과는 다음과 같습니다.
compact() 메소드 테스트
@Test public void testCompact(){ CharBuffer buffer = CharBuffer.allocate(10); System.out.println("创建默认情况"); printBufferInfo(buffer); //put的方法会修改position的值 buffer.put('H'); buffer.put('E'); buffer.put('L'); buffer.put('L'); buffer.put('O'); buffer.limit(8);//修改limit的值 System.out.println("调用put和limit方法后:"); printBufferInfo(buffer); System.out.println("调用compact方法后:"); //将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。 buffer.compact(); printBufferInfo(buffer); }
이것은 JDK에 도입된 이 메소드의 기능입니다:
버퍼의 현재 위치와 버퍼 사이에 바이트(있는 경우)를 복사합니다. 버퍼의 시작 부분으로 제한됩니다. 즉, 인덱스 p = position()의 바이트를 인덱스 0에 복사하고, 인덱스 p + 1의 바이트를 인덱스 1에 복사하는 식으로 인덱스 Limit() - 1의 바이트가 Index n =limit(에 복사될 때까지 계속됩니다. ) - 1 - p. 그런 다음 버퍼의 위치를 n+1로 설정하고 범위를 용량으로 설정합니다. 태그가 이미 정의된 경우 해당 태그는 삭제됩니다.공식 입장이 너무 이해하기 어렵습니다.
将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。并将limit(实际容量)设置为 capacity(最大容量)。执行compact()方法前,limit的值是:8,position的值是:5。按照上面描述的执行完compact()后,position的值计算方式是:n+1;n=limit-1-p;所有n=8-1-5=2,最后position的值为:2+1=3。和程序运行的结果一致。
可以在这种情况:从缓冲区写入数据之后调用此方法,以防写入不完整。
buf.clear(); // Prepare buffer for use while (in.read(buf) >= 0 || buf.position != 0) { buf.flip(); out.write(buf); buf.compact(); // In case of partial write }
如果out.write()方法没有将缓存中的数据读取完,这个时候的position位置指向的是剩余数据的位置。达到防止写入不完整。
作用: 连接缓冲区与数据源或数据目的地。
常用类:
Channel
接口有下面两个子接口ReadableByteChannel和WritableByteChannel和一个抽象实现类SelectableChannel。
在ReadableByteChannel接口中申明了read(ByteBuffer
dst)方法。在WritableByteChannel接口中申明了write(ByteBuffer[]
srcs):方法。SelectableChannel抽象类中主要方法,configureBlocking(boolean
block)、register();方法。 ByteChannel
接口继承了ReadableChannel和WritableChannel。所以ByteChannel具有读和写的功能。ServerSocketChannel继承了SelectableChannel类抽象类,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件功能。
SocketChannel也继承了SelectableChannel类还实现ByteChannel接口,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件、从缓冲区读写数据的功能。
通过类图展现:
作用:只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监听这些事件的发生。
流程:
Selector通过静态的open()方法创建一个Selector对象,SelectableChannel类向Selector注册了特定的事件。Selector就会监控这些事件发生,Selector通过select()监控已发生的SelectionKey对象的数目,通过selectKeys()方法返回对应的selectionKey对象集合。遍历该集合得到相应的selectionKey对象,通过该对象的channel()方法获取关联的SelectableChannel对象,
通过selector()方法就可以获取关联的Selector对象。
Note:
当Selector的select()方法还有一个重载方式:select(long timeout)。并且该方法采用阻塞的工作方式,如果相关事件的selectionKey对象的数目一个也没有,就进入阻塞状态。知道出现以下情况之一,才从select()方法中返回。
至少有一个SelectionKey的相关事件已经发生。
其他线程调用了Selector的wakeup()方法,导致执行select()方法的线程立即返回。
当前执行的select()方法的线程被中断。
超出了等待时间。仅限调用select(long timeout)方法时出现。如果没有设置超时时间,则永远不会超时。
Selector类有两个非常重要的方法: 静态方法open(),这是Selector的静态工厂方法,创建一个Selector对象。
selectedKeys()方法返回被Selector捕获的SelectionKey的集合。
作用:
ServerSocketChannel或SocketChannel通过register()方法向Selector注册事件时,register()方法会创建一个SelectionKey对象,该对象是用来跟踪注册事件的句柄。在SelectionKey对象的有效期间,Selector会一直监控与SelectionKey对象相关的事件,如果事件发生,就会把SelectionKey对象添加到Selected-keys集合中。SelectionKey中定义的事件: 定义了4种事件:
1、SelectionKey.OP_ACCEPT:接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个连接了。常量值为16.
2、SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功。常量值为8.
3、SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读数据可以执行读操作。常量值为1.
4、SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了。常量值为4.일반적인 방법:
채널() 메서드: 연결된 SelectedChannel(ServerSocketChannel 및 SocketChannel 포함)을 반환합니다.
selector() 메소드: 연관된 Selector 객체를 반환합니다.
이들 사이의 관계는 다음과 같습니다.
위는 얕은 것부터 깊은 것까지 Java 네트워크 프로그래밍의 비차단 통신에 대한 그래픽 및 텍스트 코드 예제에 대한 자세한 설명입니다. 더 많은 관련 내용 PHP 중국어 홈페이지(www.php.cn)를 주목해주세요!