>  기사  >  Java  >  Java 기반 소켓 프로그래밍을 구현하는 방법

Java 기반 소켓 프로그래밍을 구현하는 방법

王林
王林앞으로
2023-05-17 08:37:292203검색

소켓 이해

소켓이라고도 하는 소켓은 서로 다른 프로세스 간의 네트워크 통신을 위한 프로토콜, 규칙 또는 사양입니다.

소켓 프로그래밍의 경우 TCP/UDP와 같은 프로토콜을 기반으로 하는 캡슐화 또는 추상화 계층과 유사한 경우가 많습니다. 네트워크 통신과 관련된 프로그래밍을 위해 시스템에서 제공하는 인터페이스입니다.

소켓 설정의 기본 프로세스

소켓 통신 설정의 기본 프로세스를 이해하기 위해 Linux 운영 체제에서 제공하는 기본 API를 예로 들어 보겠습니다.

Java 기반 소켓 프로그래밍을 구현하는 방법

기본적으로 소켓은 다음과 같습니다. TCP 연결(물론 UDP 및 기타 연결도 가능함) 프로토콜, 프로그래밍 수준에서의 단순화 및 추상화.

1. 가장 기본적인 소켓 데모

1.1 단방향 통신

먼저 메시지를 한 번만 보내고 받는 소켓 기본 코드부터 시작합니다.

Server:

package com.marklux.socket.base;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * The very basic socket server that only listen one single message.
 */

public class BaseSocketServer {

    private ServerSocket server;
    private Socket socket;
    private int port;
    private InputStream inputStream;
    private static final int MAX_BUFFER_SIZE = 1024;

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public BaseSocketServer(int port) {
        this.port = port;
    }

    public void runServerSingle() throws IOException {
        this.server = new ServerSocket(this.port);

        System.out.println("base socket server started.");
        // the code will block here till the request come.
        this.socket = server.accept();

        this.inputStream = this.socket.getInputStream();

        byte[] readBytes = new byte[MAX_BUFFER_SIZE];

        int msgLen;
        StringBuilder stringBuilder = new StringBuilder();

        while ((msgLen = inputStream.read(readBytes)) != -1) {
            stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8"));
        }

        System.out.println("get message from client: " + stringBuilder);

        inputStream.close();
        socket.close();
        server.close();
    }

    public static void main(String[] args) {
        BaseSocketServer bs = new BaseSocketServer(9799);
        try {
            bs.runServerSingle();
        }catch (IOException e) {
            e.printStackTrace();
        }

    }
}

Client:

package com.marklux.socket.base;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;

/**
 * The very basic socket client that only send one single message.
 */

public class BaseSocketClient {
    private String serverHost;
    private int serverPort;
    private Socket socket;
    private OutputStream outputStream;

    public BaseSocketClient(String host, int port) {
        this.serverHost = host;
        this.serverPort = port;
    }

    public void connetServer() throws IOException {
        this.socket = new Socket(this.serverHost, this.serverPort);
        this.outputStream = socket.getOutputStream();
        // why the output stream?
    }

    public void sendSingle(String message) throws IOException {
        try {
            this.outputStream.write(message.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
        }
        this.outputStream.close();
        this.socket.close();
    }

    public static void main(String[] args) {
        BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799);
        try {
            bc.connetServer();
            bc.sendSingle("Hi from mark.");
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
}

서비스 실행 첫 번째 클라이언트를 실행한 다음 클라이언트를 실행하면 효과를 볼 수 있습니다.

  • 여기서 IO 작업 구현에 주의하세요. MAX_BUFFER_SIZE 크기의 바이트 배열을 버퍼로 사용한 다음 입력 스트림에서 바이트를 가져와 버퍼에 넣은 다음 제거합니다. 실제로 나중에 설명할 NIO도 이 아이디어를 기반으로 구현됩니다. MAX_BUFFER_SIZE的byte数组作为缓冲区,然后从输入流中取出字节放置到缓冲区,再从缓冲区中取出字节构建到字符串中去,这在输入流文件很大时非常有用,事实上,后面要讲到的NIO也是基于这种思路实现的。

1.2 双向通信

上面的例子只实现了一次单向的通信,这显然有点浪费通道。socket连接支持全双工的双向通信(底层是tcp),下面的例子中,服务端在收到客户端的消息后,将返回给客户端一个回执。

并且我们使用了一些java.io包装好的方法,来简化整个通信的流程(因为消息长度不大,不再使用缓冲区)。

服务端:

public void runServer() throws IOException {
        this.serverSocket = new ServerSocket(port);
        this.socket = serverSocket.accept();
        this.inputStream = socket.getInputStream();

        String message = new String(inputStream.readAllBytes(), "UTF-8");

        System.out.println("received message: " + message);

        this.socket.shutdownInput(); // 告诉客户端接收已经完毕,之后只能发送

        // write the receipt.

        this.outputStream = this.socket.getOutputStream();
        String receipt = "We received your message: " + message;
        outputStream.write(receipt.getBytes("UTF-8"));

        this.outputStream.close();
        this.socket.close();
    }

客户端:

public void sendMessage(String message) throws IOException {
        this.socket = new Socket(host,port);
        this.outputStream = socket.getOutputStream();
        this.outputStream.write(message.getBytes("UTF-8"));
        this.socket.shutdownOutput(); // 告诉服务器,所有的发送动作已经结束,之后只能接收
        this.inputStream = socket.getInputStream();
        String receipt = new String(inputStream.readAllBytes(), "UTF-8");
        System.out.println("got receipt: " + receipt);
        this.inputStream.close();
        this.socket.close();
    }
  • 注意这里我们在服务端接受到消息以及客户端发送消息后,分别调用了shutdownInput()shutdownOutput()而不是直接close对应的stream,这是因为在关闭任何一个stream,都会直接导致socket的关闭,也就无法进行后面回执的发送了。

  • 但是注意,调用shutdownInput()shutdownOutput()之后,对应的流也会被关闭,不能再次向socket发送/写入了。

2. 发送更多的消息:结束的界定

刚才的两个例子中,每次打开流,都只能进行一次写入/读取操作,结束后对应流被关闭,就无法再次写入/读取了。

在这种情况下,若需发送两条消息,则必须建立两个socket,这既会耗费资源,也会带来不便。其实我们完全可以不关闭对应的流,只要分次写入消息就可以了。

但是这样的话,我们就必须面对另一个问题:如何判断一次消息发送的结束呢?

2.1 使用特殊符号

最简单的办法是使用一些特殊的符号来标记一次发送完成,服务端只要读到对应的符号就可以完成一次读取,然后进行相关的处理操作。

下面的例子中我们使用换行符n

1.2 양방향 통신

위의 예는 단방향 통신만 구현한 것으로, 이는 분명히 채널 낭비입니다. 소켓 연결은 전이중 양방향 통신을 지원합니다(하위 계층은 tcp). 다음 예에서는 클라이언트의 메시지를 받은 후 서버가 클라이언트에 영수증을 반환합니다.

그리고 우리는 전체 통신 프로세스를 단순화하기 위해 일부 java.io 패키지 방법을 사용합니다(메시지 길이가 크지 않기 때문에 버퍼는 더 이상 사용되지 않습니다).

Server:

public void runServer() throws IOException {
        this.server = new ServerSocket(this.port);

        System.out.println("base socket server started.");

        this.socket = server.accept();
        // the code will block here till the request come.

        this.inputStream = this.socket.getInputStream();
        Scanner sc = new Scanner(this.inputStream);
        while (sc.hasNextLine()) {
            System.out.println("get info from client: " + sc.nextLine());
        } // 循环接收并输出消息内容
        this.inputStream.close();
        socket.close();
    }

Client:

public void connetServer() throws IOException {
        this.socket = new Socket(this.serverHost, this.serverPort);
        this.outputStream = socket.getOutputStream();
    }

public void send(String message) throws IOException {
        String sendMsg = message + "\n"; // we mark \n as a end of line.
        try {
            this.outputStream.write(sendMsg.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
        }
//        this.outputStream.close();
//        this.socket.shutdownOutput();
    }

 public static void main(String[] args) {
        CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799);
        try {
            cc.connetServer();
            Scanner sc = new Scanner(System.in);
            while (sc.hasNext()) {
                String line = sc.nextLine();
                cc.send(line);
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

여기서 서버가 메시지를 수신하고 클라이언트가 메시지를 보낸 후 shutdownInput()shutdownOutput을 각각 호출합니다. ) 해당 스트림을 직접 닫는 대신 스트림을 닫으면 소켓이 직접 닫혀 후속 영수증을 보낼 수 없기 때문입니다.
  • 그러나 shutdownInput()shutdownOutput()을 호출한 후에는 해당 스트림도 닫히고 소켓에 다시 보내거나 쓸 수 없다는 점에 유의하세요.
  • 2. 추가 메시지 보내기: end 정의
지금의 두 예에서는 스트림이 열릴 때마다 해당 스트림이 닫힌 후에는 쓰기/읽기 작업을 한 번만 수행할 수 있습니다. 다시 읽어요.

이 경우 2개의 메시지를 보내야 한다면 2개의 소켓을 구축해야 하는데, 이로 인해 리소스가 소모되어 불편을 겪게 됩니다. 실제로 메시지를 일괄적으로 작성하는 한 해당 스트림을 전혀 닫을 필요가 없습니다.

하지만 이 경우 또 다른 문제에 직면해야 합니다. 메시지 전송의 끝을 어떻게 판단해야 할까요?

2.1 특수 기호 사용

가장 간단한 방법은 특수 기호를 사용하여 전송 완료를 표시하는 것입니다. 서버는 해당 기호를 읽는 한 읽기를 완료한 다음 관련 처리 작업을 수행할 수 있습니다.

다음 예에서는 개행 문자 n를 사용하여 전송 끝을 표시합니다. 서버는 메시지를 받을 때마다 메시지를 한 번 인쇄하고 Scanner를 사용하여 작업을 단순화합니다.

Server: 🎜
public void runServer() throws IOException {
        this.serverSocket = new ServerSocket(this.port);
        this.socket = serverSocket.accept();
        this.inputStream = socket.getInputStream();
        byte[] bytes;
        while (true) {
            // 先读第一个字节
            int first = inputStream.read();
            if (first == -1) {
                // 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了
                this.socket.close();
                break;
            }
            // 读取第二个字节
            int second = inputStream.read();

            int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度

            bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可

            inputStream.read(bytes);

            System.out.println("receive message: " + new String(bytes,"UTF-8"));
        }
    }
🎜Client:🎜
public void connetServer() throws IOException {
        this.socket = new Socket(host,port);
        this.outputStream = socket.getOutputStream();
    }

public void sendMessage(String message) throws IOException {
        // 首先要把message转换成bytes以便处理
        byte[] bytes = message.getBytes("UTF-8");
        // 接下来传输两个字节的长度,依然使用移位实现
        int length = bytes.length;
        this.outputStream.write(length >> 8); // write默认一次只传输一个字节
        this.outputStream.write(length);
        // 传输完长度后,再正式传送消息
        this.outputStream.write(bytes);
    }

public static void main(String[] args) {
        LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799);
        try {
            lc.connetServer();
            Scanner sc = new Scanner(System.in);
            while (sc.hasNextLine()) {
                lc.sendMessage(sc.nextLine());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
🎜실행 후 효과는 클라이언트가 텍스트 줄을 입력하고 Enter 키를 누를 때마다 서버가 해당 메시지 읽기 기록을 인쇄한다는 것입니다. 🎜🎜2.2 길이에 따라 정의🎜🎜원래로 돌아가서 메시지가 끝나는 시점을 찾기 어려운 이유는 각 메시지의 길이를 결정할 수 없기 때문입니다. 🎜🎜그러면 실제로 메시지 길이를 먼저 보낼 수 있습니다. 서버가 메시지 길이를 알게 되면 메시지 수신을 완료할 수 있습니다. 🎜🎜일반적으로 메시지 보내기는 2단계로 진행됩니다🎜🎜🎜🎜보내는 메시지의 길이🎜🎜🎜🎜메시지 보내기🎜🎜🎜🎜마지막 질문은 "보내는 메시지의 길이" 입니다. 이 단계에서는 바이트 양을 고정해야 합니다. 그렇지 않으면 여전히 교착 상태에 빠지게 됩니다. 🎜🎜일반적으로 메시지 길이를 저장하기 위해 고정된 바이트 수를 사용할 수 있습니다. 예를 들어 처음 2바이트는 메시지의 길이입니다. 그러나 이렇게 하면 전송할 수 있는 메시지의 최대 길이가 됩니다. 예를 들어 2바이트로 고정되어 있으면 보내는 메시지의 최대 길이는 2^16바이트(64K)를 초과하지 않습니다. 🎜🎜일부 문자의 인코딩을 이해하면 실제로 가변 길이 공간을 사용하여 메시지 길이를 저장할 수 있다는 것을 알 수 있습니다. 예: 🎜🎜🎜첫 번째 바이트의 첫 번째 바이트는 0입니다. 즉, 0XXXXXXX는 길이가 한 문자임을 의미합니다. 섹션, 최대 128은 128B를 의미합니다🎜 첫 번째 바이트의 첫 번째 바이트는 110이고, 다음 바이트는 길이를 나타 내기 위해 첨부됩니다: 110XXXXX 10XXXXXX, 최대값은 2048, 즉 2K를 의미합니다🎜 첫 번째 바이트의 첫 번째 바이트는 1110이고 다음 두 개가 첨부됩니다. 바이트는 길이를 나타냅니다: 110XXXXX 10XXXXXX 10XXXXXX, 최대값은 131072이며 이는 128K🎜등을 의미합니다🎜

当然这样实现起来会麻烦一些,因此下面的例子里我们仍然使用固定的两个字节来记录消息的长度。

服务端:

public void runServer() throws IOException {
        this.serverSocket = new ServerSocket(this.port);
        this.socket = serverSocket.accept();
        this.inputStream = socket.getInputStream();
        byte[] bytes;
        while (true) {
            // 先读第一个字节
            int first = inputStream.read();
            if (first == -1) {
                // 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了
                this.socket.close();
                break;
            }
            // 读取第二个字节
            int second = inputStream.read();

            int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度

            bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可

            inputStream.read(bytes);

            System.out.println("receive message: " + new String(bytes,"UTF-8"));
        }
    }

客户端:

public void connetServer() throws IOException {
        this.socket = new Socket(host,port);
        this.outputStream = socket.getOutputStream();
    }

public void sendMessage(String message) throws IOException {
        // 首先要把message转换成bytes以便处理
        byte[] bytes = message.getBytes("UTF-8");
        // 接下来传输两个字节的长度,依然使用移位实现
        int length = bytes.length;
        this.outputStream.write(length >> 8); // write默认一次只传输一个字节
        this.outputStream.write(length);
        // 传输完长度后,再正式传送消息
        this.outputStream.write(bytes);
    }

public static void main(String[] args) {
        LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799);
        try {
            lc.connetServer();
            Scanner sc = new Scanner(System.in);
            while (sc.hasNextLine()) {
                lc.sendMessage(sc.nextLine());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

3. 处理更多的连接:多线程

3.1 同时实现消息的发送与接收

在考虑服务端处理多连接之前,我们先考虑使用多线程改造一下原有的一对一对话实例。

在原有的例子中,消息的接收方并不能主动地向对方发送消息,换句话说我们并没有实现真正的互相对话,这主要是因为消息的发送和接收这两个动作并不能同时进行,因此我们需要使用两个线程,其中一个用于监听键盘输入并将其写入socket,另一个则负责监听socket并将接受到的消息显示。

出于简单考虑,我们直接让主线程负责键盘监听和消息发送,同时另外开启一个线程用于拉取消息并显示。

消息拉取线程 ListenThread.java

public class ListenThread implements Runnable {
    private Socket socket;
    private InputStream inputStream;

    public ListenThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() throws RuntimeException{
        try {
            this.inputStream = socket.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }

        while (true) {
            try {
                int first = this.inputStream.read();
                if (first == -1) {
                    // 输入流已经被关闭,无需继续读取
                    throw new RuntimeException("disconnected.");
                }
                int second = this.inputStream.read();
                int msgLength = (first<<8) + second;
                byte[] readBuffer = new byte[msgLength];
                this.inputStream.read(readBuffer);

                System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8"));
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e.getMessage());
            }
        }
    }
}

主线程,启动时由用户选择是作为server还是client:

public class ChatSocket {
    private String host;
    private int port;
    private Socket socket;
    private ServerSocket serverSocket;
    private OutputStream outputStream;

    // 以服务端形式启动,创建会话
    public void runAsServer(int port) throws IOException {
        this.serverSocket = new ServerSocket(port);
        System.out.println("[log] server started at port " + port);
        // 等待客户端的加入
        this.socket = serverSocket.accept();
        System.out.println("[log] successful connected with " + socket.getInetAddress());
        // 启动监听线程
        Thread listenThread = new Thread(new ListenThread(this.socket));
        listenThread.start();
        waitAndSend();
    }

    // 以客户端形式启动,加入会话
    public void runAsClient(String host, int port) throws IOException {
        this.socket = new Socket(host, port);
        System.out.println("[log] successful connected to server " + socket.getInetAddress());
        Thread listenThread = new Thread(new ListenThread(this.socket));
        listenThread.start();
        waitAndSend();
    }

    public void waitAndSend() throws IOException {
        this.outputStream = this.socket.getOutputStream();
        Scanner sc = new Scanner(System.in);
        while (sc.hasNextLine()) {
            this.sendMessage(sc.nextLine());
        }
    }

    public void sendMessage(String message) throws IOException {
        byte[] msgBytes = message.getBytes("UTF-8");
        int length = msgBytes.length;
        outputStream.write(length>>8);
        outputStream.write(length);
        outputStream.write(msgBytes);
    }

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        ChatSocket chatSocket = new ChatSocket();
        System.out.println("select connect type: 1 for server and 2 for client");
        int type = Integer.parseInt(scanner.nextLine().toString());
        if (type == 1) {
            System.out.print("input server port: ");
            int port = scanner.nextInt();
            try {
                chatSocket.runAsServer(port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else if (type == 2) {
            System.out.print("input server host: ");
            String host = scanner.nextLine();
            System.out.print("input server port: ");
            int port = scanner.nextInt();
            try {
                chatSocket.runAsClient(host, port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3.2 使用线程池优化服务端并发能力

作为服务端,如果一次只跟一个客户端建立socket连接,未免显得太过浪费资源,因此我们完全可以让服务端和多个客户端建立多个socket。

如果要处理多个连接,就必须解决并发问题,否则可以编写循环轮流处理。我们可以使用多线程来处理并发,不过线程的创建和销毁都会消耗大量的资源和时间,所以最好一步到位,用一个线程池来实现。

下面给出一个示范性质的服务端代码:

public class SocketServer {
  public static void main(String args[]) throws Exception {
    // 监听指定的端口
    int port = 55533;
    ServerSocket server = new ServerSocket(port);
    // server将一直等待连接的到来
    System.out.println("server将一直等待连接的到来");

    //如果使用多线程,那就需要线程池,防止并发过高时创建过多线程耗尽资源
    ExecutorService threadPool = Executors.newFixedThreadPool(100);
    
    while (true) {
      Socket socket = server.accept();
      
      Runnable runnable=()->{
        try {
          // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
          InputStream inputStream = socket.getInputStream();
          byte[] bytes = new byte[1024];
          int len;
          StringBuilder sb = new StringBuilder();
          while ((len = inputStream.read(bytes)) != -1) {
            // 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
            sb.append(new String(bytes, 0, len, "UTF-8"));
          }
          System.out.println("get message from client: " + sb);
          inputStream.close();
          socket.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      };
      threadPool.submit(runnable);
    }

  }
}

4. 连接保活

我想你不难发现一个问题,那就是当socket连接成功建立后,如果中途发生异常导致其中一方断开连接,此时另一方是无法发现的,只有在再次尝试发送/接收消息才会因为抛出异常而退出。

简单的说,就是我们维持的socket连接,是一个长连接,但我们没有保证它的时效性,上一秒它可能还是可以用的,但是下一秒就不一定了。

4.1 使用心跳包

最常见的确保连接随时可用的方法是通过定时发送心跳包来检测连接的正常性。对于需要实现高实时性的服务,比如消息推送,这仍然是非常关键的。

大体的方案如下:

  • 双方约定好心跳包的格式,要能够区别于普通的消息。

  • 客户端每隔一定时间,就向服务端发送一个心跳包

  • 服务端每接收到心跳包时,将其抛弃

  • 如果客户端的某个心跳包发送失败,就可以判断连接已经断开

  • 如果对实时性要求很高,服务端也可以定时检查客户端发送心跳包的频率,如果超过一定时间没有发送可以认为连接已经断开

4.2 断开时重连

使用心跳包必然会增加带宽和性能的负担,对于普通的应用我们其实并没有必要使用这种方案,如果消息发送时抛出了连接异常,直接尝试重新连接就好了。

跟上面的方案对比,其实这个抛出异常的消息就充当了心跳包的角色。

总的来说,连接是否要保活,如何保活,需要根据具体的业务场景灵活地思考和定制。

위 내용은 Java 기반 소켓 프로그래밍을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제