Heim  >  Artikel  >  Java  >  Methoden zur Implementierung der Socket-Programmierung auf Basis von Java

Methoden zur Implementierung der Socket-Programmierung auf Basis von Java

王林
王林nach vorne
2023-05-17 08:37:292203Durchsuche

Lernen Sie Socket kennen

Socket, auch Socket genannt, ist ein Protokoll, eine Konvention oder eine Spezifikation für die Netzwerkkommunikation zwischen verschiedenen Prozessen.

Bei der Socket-Programmierung handelt es sich häufiger um eine Kapselungs- oder Abstraktionsschicht, die auf Protokollen wie TCP/UDP basiert. Es handelt sich um eine Schnittstelle, die von einem System für die Programmierung im Zusammenhang mit der Netzwerkkommunikation bereitgestellt wird.

Der grundlegende Prozess zum Einrichten eines Sockets

Wir nehmen die vom Linux-Betriebssystem bereitgestellte Basis-API als Beispiel, um den grundlegenden Prozess zum Einrichten einer Socket-Kommunikation zu verstehen:

Methoden zur Implementierung der Socket-Programmierung auf Basis von Java

Sie können sehen, dass es sich im Wesentlichen um einen Socket handelt Eine TCP-Verbindung (natürlich ist es auch möglich, UDP und andere Verbindungen) Protokolle, Vereinfachung und Abstraktion auf Programmierebene. 1. Die einfachste Socket-Demonstration Zuerst den Client ausführen und dann den Client ausführen. Sie können den Effekt sehen.

Achten Sie hier auf die Implementierung der E/A-Operation. Wir verwenden ein Byte-Array der Größe MAX_BUFFER_SIZE als Puffer, nehmen dann die Bytes aus dem Eingabestream, platzieren sie im Puffer und entfernen sie dann Sie aus dem Puffer herauszunehmen und sie in Strings zu konstruieren, ist sehr nützlich, wenn die Eingabe-Stream-Datei groß ist. Tatsächlich wird auch NIO, das später besprochen wird, auf der Grundlage dieser Idee implementiert.

1.2 Zwei-Wege-Kommunikation

Das obige Beispiel implementiert nur eine einseitige Kommunikation, was offensichtlich eine Verschwendung von Kanälen darstellt. Die Socket-Verbindung unterstützt die bidirektionale Vollduplex-Kommunikation (die unterste Ebene ist TCP). Nach dem Empfang der Client-Nachricht sendet der Server eine Empfangsbestätigung an den Client zurück.

Und wir verwenden einige in Java.io verpackte Methoden, um den gesamten Kommunikationsprozess zu vereinfachen (da die Nachrichtenlänge nicht groß ist, wird der Puffer nicht mehr verwendet).

    Server:
  • package com.marklux.socket.base;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * The very basic socket server that only listen one single message.
     */
    
    public class BaseSocketServer {
    
        private ServerSocket server;
        private Socket socket;
        private int port;
        private InputStream inputStream;
        private static final int MAX_BUFFER_SIZE = 1024;
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public BaseSocketServer(int port) {
            this.port = port;
        }
    
        public void runServerSingle() throws IOException {
            this.server = new ServerSocket(this.port);
    
            System.out.println("base socket server started.");
            // the code will block here till the request come.
            this.socket = server.accept();
    
            this.inputStream = this.socket.getInputStream();
    
            byte[] readBytes = new byte[MAX_BUFFER_SIZE];
    
            int msgLen;
            StringBuilder stringBuilder = new StringBuilder();
    
            while ((msgLen = inputStream.read(readBytes)) != -1) {
                stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8"));
            }
    
            System.out.println("get message from client: " + stringBuilder);
    
            inputStream.close();
            socket.close();
            server.close();
        }
    
        public static void main(String[] args) {
            BaseSocketServer bs = new BaseSocketServer(9799);
            try {
                bs.runServerSingle();
            }catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    }

    Client:MAX_BUFFER_SIZE的byte数组作为缓冲区,然后从输入流中取出字节放置到缓冲区,再从缓冲区中取出字节构建到字符串中去,这在输入流文件很大时非常有用,事实上,后面要讲到的NIO也是基于这种思路实现的。

1.2 双向通信

上面的例子只实现了一次单向的通信,这显然有点浪费通道。socket连接支持全双工的双向通信(底层是tcp),下面的例子中,服务端在收到客户端的消息后,将返回给客户端一个回执。

并且我们使用了一些java.io包装好的方法,来简化整个通信的流程(因为消息长度不大,不再使用缓冲区)。

服务端:

package com.marklux.socket.base;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;

/**
 * The very basic socket client that only send one single message.
 */

public class BaseSocketClient {
    private String serverHost;
    private int serverPort;
    private Socket socket;
    private OutputStream outputStream;

    public BaseSocketClient(String host, int port) {
        this.serverHost = host;
        this.serverPort = port;
    }

    public void connetServer() throws IOException {
        this.socket = new Socket(this.serverHost, this.serverPort);
        this.outputStream = socket.getOutputStream();
        // why the output stream?
    }

    public void sendSingle(String message) throws IOException {
        try {
            this.outputStream.write(message.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
        }
        this.outputStream.close();
        this.socket.close();
    }

    public static void main(String[] args) {
        BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799);
        try {
            bc.connetServer();
            bc.sendSingle("Hi from mark.");
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端:

public void runServer() throws IOException {
        this.serverSocket = new ServerSocket(port);
        this.socket = serverSocket.accept();
        this.inputStream = socket.getInputStream();

        String message = new String(inputStream.readAllBytes(), "UTF-8");

        System.out.println("received message: " + message);

        this.socket.shutdownInput(); // 告诉客户端接收已经完毕,之后只能发送

        // write the receipt.

        this.outputStream = this.socket.getOutputStream();
        String receipt = "We received your message: " + message;
        outputStream.write(receipt.getBytes("UTF-8"));

        this.outputStream.close();
        this.socket.close();
    }
  • 注意这里我们在服务端接受到消息以及客户端发送消息后,分别调用了shutdownInput()shutdownOutput()而不是直接close对应的stream,这是因为在关闭任何一个stream,都会直接导致socket的关闭,也就无法进行后面回执的发送了。

  • 但是注意,调用shutdownInput()shutdownOutput()之后,对应的流也会被关闭,不能再次向socket发送/写入了。

2. 发送更多的消息:结束的界定

刚才的两个例子中,每次打开流,都只能进行一次写入/读取操作,结束后对应流被关闭,就无法再次写入/读取了。

在这种情况下,若需发送两条消息,则必须建立两个socket,这既会耗费资源,也会带来不便。其实我们完全可以不关闭对应的流,只要分次写入消息就可以了。

但是这样的话,我们就必须面对另一个问题:如何判断一次消息发送的结束呢?

2.1 使用特殊符号

最简单的办法是使用一些特殊的符号来标记一次发送完成,服务端只要读到对应的符号就可以完成一次读取,然后进行相关的处理操作。

下面的例子中我们使用换行符n

public void sendMessage(String message) throws IOException {
        this.socket = new Socket(host,port);
        this.outputStream = socket.getOutputStream();
        this.outputStream.write(message.getBytes("UTF-8"));
        this.socket.shutdownOutput(); // 告诉服务器,所有的发送动作已经结束,之后只能接收
        this.inputStream = socket.getInputStream();
        String receipt = new String(inputStream.readAllBytes(), "UTF-8");
        System.out.println("got receipt: " + receipt);
        this.inputStream.close();
        this.socket.close();
    }

Beachten Sie hier, dass wir shutdownInput() bzw. shutdownOutput aufrufen, nachdem der Server die Nachricht empfangen und der Client die Nachricht gesendet hat. ), anstatt den entsprechenden Stream direkt zu schließen. Dies liegt daran, dass das Schließen eines Streams direkt dazu führt, dass der Socket geschlossen wird, sodass keine weiteren Empfangsbestätigungen gesendet werden können.

Beachten Sie jedoch, dass nach dem Aufruf von shutdownInput() und shutdownOutput() auch der entsprechende Stream geschlossen wird und nicht erneut an den Socket senden/schreiben kann.

2. Weitere Nachrichten senden: Definition des Endes

In den beiden Beispielen kann jedes Mal, wenn der Stream geöffnet wird, nur ein Schreib-/Lesevorgang ausgeführt werden. Nachdem der entsprechende Stream geschlossen wurde, kann er nicht geschrieben/gelesen werden. nochmal lesen.

Wenn Sie in diesem Fall zwei Nachrichten senden müssen, müssen Sie zwei Sockets einrichten, was Ressourcen verbraucht und Unannehmlichkeiten verursacht. Tatsächlich müssen wir den entsprechenden Stream überhaupt nicht schließen, solange wir die Nachricht stapelweise schreiben.
  • Aber in diesem Fall müssen wir uns einem anderen Problem stellen: Wie kann man das Ende einer gesendeten Nachricht beurteilen?

    2.1 Verwenden Sie spezielle Symbole
  • Der einfachste Weg besteht darin, den Abschluss einer Übertragung mit einigen speziellen Symbolen zu markieren. Der Server kann einen Lesevorgang abschließen, solange er die entsprechenden Symbole liest, und dann entsprechende Verarbeitungsvorgänge ausführen.

    Im folgenden Beispiel verwenden wir das Zeilenumbruchzeichen n, um das Ende einer Übertragung zu markieren. Jedes Mal, wenn der Server eine Nachricht empfängt, druckt er sie einmal aus und verwendet den Scanner, um den Vorgang zu vereinfachen:
Server:

public void runServer() throws IOException {
        this.server = new ServerSocket(this.port);

        System.out.println("base socket server started.");

        this.socket = server.accept();
        // the code will block here till the request come.

        this.inputStream = this.socket.getInputStream();
        Scanner sc = new Scanner(this.inputStream);
        while (sc.hasNextLine()) {
            System.out.println("get info from client: " + sc.nextLine());
        } // 循环接收并输出消息内容
        this.inputStream.close();
        socket.close();
    }

Client:

public void connetServer() throws IOException {
        this.socket = new Socket(this.serverHost, this.serverPort);
        this.outputStream = socket.getOutputStream();
    }

public void send(String message) throws IOException {
        String sendMsg = message + "\n"; // we mark \n as a end of line.
        try {
            this.outputStream.write(sendMsg.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
        }
//        this.outputStream.close();
//        this.socket.shutdownOutput();
    }

 public static void main(String[] args) {
        CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799);
        try {
            cc.connetServer();
            Scanner sc = new Scanner(System.in);
            while (sc.hasNext()) {
                String line = sc.nextLine();
                cc.send(line);
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

Der Effekt nach dem Ausführen besteht darin, dass der Server jedes Mal, wenn der Client eine Textzeile eingibt und die Eingabetaste drückt, den entsprechenden Nachrichtenlesedatensatz ausdruckt.

2.2 Basierend auf der Länge definieren

Zurück zum ursprünglichen Punkt: Der Grund, warum es für uns schwierig ist, das Ende der Nachricht zu finden, liegt darin, dass wir die Länge jeder Nachricht nicht bestimmen können.

Dann können Sie tatsächlich zuerst die Länge der Nachricht senden. Wenn der Server die Länge der Nachricht kennt, kann er den Empfang der Nachricht abschließen.

Im Allgemeinen besteht das Senden einer Nachricht aus zwei Schritten.🎜🎜🎜🎜Die Länge der gesendeten Nachricht In diesem Schritt muss die Anzahl der Bytes festgelegt werden, sonst geraten wir immer noch in einen Deadlock. 🎜🎜Im Allgemeinen können wir eine feste Anzahl von Bytes verwenden, um die Länge der Nachricht zu speichern. Beispielsweise ist festgelegt, dass die ersten 2 Bytes die Länge der Nachricht sind, die wir übertragen können ist ebenfalls festgelegt. Bei 2 Bytes als Beispiel überschreitet die maximale Länge der von uns gesendeten Nachricht 2^16 Bytes nicht, was 64 KB entspricht. 🎜🎜Wenn Sie die Kodierung einiger Zeichen verstehen, wissen Sie, dass wir tatsächlich Speicherplatz mit variabler Länge verwenden können, um die Länge der Nachricht zu speichern, zum Beispiel: 🎜🎜🎜Das erste Byte des ersten Bytes ist 0: das heißt 0XXXXXXX Dies bedeutet, dass die Länge ein Zeichen beträgt. Abschnitt, maximal 128, bedeutet 128B🎜Das erste Byte des ersten Bytes ist 110, dann wird das nächste Byte angehängt, um die Länge anzugeben: 110XXXXX 10XXXXXX, das Maximum ist 2048, was 2K bedeutet🎜Die Das erste Byte des ersten Bytes ist 1110, dann werden die nächsten beiden angehängt. Bytes stellen die Länge dar: 110XXXXX 10XXXXXX 10XXXXXX, das Maximum ist 131072, was 128 KB bedeutet🎜usw.🎜

当然这样实现起来会麻烦一些,因此下面的例子里我们仍然使用固定的两个字节来记录消息的长度。

服务端:

public void runServer() throws IOException {
        this.serverSocket = new ServerSocket(this.port);
        this.socket = serverSocket.accept();
        this.inputStream = socket.getInputStream();
        byte[] bytes;
        while (true) {
            // 先读第一个字节
            int first = inputStream.read();
            if (first == -1) {
                // 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了
                this.socket.close();
                break;
            }
            // 读取第二个字节
            int second = inputStream.read();

            int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度

            bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可

            inputStream.read(bytes);

            System.out.println("receive message: " + new String(bytes,"UTF-8"));
        }
    }

客户端:

public void connetServer() throws IOException {
        this.socket = new Socket(host,port);
        this.outputStream = socket.getOutputStream();
    }

public void sendMessage(String message) throws IOException {
        // 首先要把message转换成bytes以便处理
        byte[] bytes = message.getBytes("UTF-8");
        // 接下来传输两个字节的长度,依然使用移位实现
        int length = bytes.length;
        this.outputStream.write(length >> 8); // write默认一次只传输一个字节
        this.outputStream.write(length);
        // 传输完长度后,再正式传送消息
        this.outputStream.write(bytes);
    }

public static void main(String[] args) {
        LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799);
        try {
            lc.connetServer();
            Scanner sc = new Scanner(System.in);
            while (sc.hasNextLine()) {
                lc.sendMessage(sc.nextLine());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

3. 处理更多的连接:多线程

3.1 同时实现消息的发送与接收

在考虑服务端处理多连接之前,我们先考虑使用多线程改造一下原有的一对一对话实例。

在原有的例子中,消息的接收方并不能主动地向对方发送消息,换句话说我们并没有实现真正的互相对话,这主要是因为消息的发送和接收这两个动作并不能同时进行,因此我们需要使用两个线程,其中一个用于监听键盘输入并将其写入socket,另一个则负责监听socket并将接受到的消息显示。

出于简单考虑,我们直接让主线程负责键盘监听和消息发送,同时另外开启一个线程用于拉取消息并显示。

消息拉取线程 ListenThread.java

public class ListenThread implements Runnable {
    private Socket socket;
    private InputStream inputStream;

    public ListenThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() throws RuntimeException{
        try {
            this.inputStream = socket.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }

        while (true) {
            try {
                int first = this.inputStream.read();
                if (first == -1) {
                    // 输入流已经被关闭,无需继续读取
                    throw new RuntimeException("disconnected.");
                }
                int second = this.inputStream.read();
                int msgLength = (first<<8) + second;
                byte[] readBuffer = new byte[msgLength];
                this.inputStream.read(readBuffer);

                System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8"));
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e.getMessage());
            }
        }
    }
}

主线程,启动时由用户选择是作为server还是client:

public class ChatSocket {
    private String host;
    private int port;
    private Socket socket;
    private ServerSocket serverSocket;
    private OutputStream outputStream;

    // 以服务端形式启动,创建会话
    public void runAsServer(int port) throws IOException {
        this.serverSocket = new ServerSocket(port);
        System.out.println("[log] server started at port " + port);
        // 等待客户端的加入
        this.socket = serverSocket.accept();
        System.out.println("[log] successful connected with " + socket.getInetAddress());
        // 启动监听线程
        Thread listenThread = new Thread(new ListenThread(this.socket));
        listenThread.start();
        waitAndSend();
    }

    // 以客户端形式启动,加入会话
    public void runAsClient(String host, int port) throws IOException {
        this.socket = new Socket(host, port);
        System.out.println("[log] successful connected to server " + socket.getInetAddress());
        Thread listenThread = new Thread(new ListenThread(this.socket));
        listenThread.start();
        waitAndSend();
    }

    public void waitAndSend() throws IOException {
        this.outputStream = this.socket.getOutputStream();
        Scanner sc = new Scanner(System.in);
        while (sc.hasNextLine()) {
            this.sendMessage(sc.nextLine());
        }
    }

    public void sendMessage(String message) throws IOException {
        byte[] msgBytes = message.getBytes("UTF-8");
        int length = msgBytes.length;
        outputStream.write(length>>8);
        outputStream.write(length);
        outputStream.write(msgBytes);
    }

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        ChatSocket chatSocket = new ChatSocket();
        System.out.println("select connect type: 1 for server and 2 for client");
        int type = Integer.parseInt(scanner.nextLine().toString());
        if (type == 1) {
            System.out.print("input server port: ");
            int port = scanner.nextInt();
            try {
                chatSocket.runAsServer(port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else if (type == 2) {
            System.out.print("input server host: ");
            String host = scanner.nextLine();
            System.out.print("input server port: ");
            int port = scanner.nextInt();
            try {
                chatSocket.runAsClient(host, port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3.2 使用线程池优化服务端并发能力

作为服务端,如果一次只跟一个客户端建立socket连接,未免显得太过浪费资源,因此我们完全可以让服务端和多个客户端建立多个socket。

如果要处理多个连接,就必须解决并发问题,否则可以编写循环轮流处理。我们可以使用多线程来处理并发,不过线程的创建和销毁都会消耗大量的资源和时间,所以最好一步到位,用一个线程池来实现。

下面给出一个示范性质的服务端代码:

public class SocketServer {
  public static void main(String args[]) throws Exception {
    // 监听指定的端口
    int port = 55533;
    ServerSocket server = new ServerSocket(port);
    // server将一直等待连接的到来
    System.out.println("server将一直等待连接的到来");

    //如果使用多线程,那就需要线程池,防止并发过高时创建过多线程耗尽资源
    ExecutorService threadPool = Executors.newFixedThreadPool(100);
    
    while (true) {
      Socket socket = server.accept();
      
      Runnable runnable=()->{
        try {
          // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
          InputStream inputStream = socket.getInputStream();
          byte[] bytes = new byte[1024];
          int len;
          StringBuilder sb = new StringBuilder();
          while ((len = inputStream.read(bytes)) != -1) {
            // 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
            sb.append(new String(bytes, 0, len, "UTF-8"));
          }
          System.out.println("get message from client: " + sb);
          inputStream.close();
          socket.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      };
      threadPool.submit(runnable);
    }

  }
}

4. 连接保活

我想你不难发现一个问题,那就是当socket连接成功建立后,如果中途发生异常导致其中一方断开连接,此时另一方是无法发现的,只有在再次尝试发送/接收消息才会因为抛出异常而退出。

简单的说,就是我们维持的socket连接,是一个长连接,但我们没有保证它的时效性,上一秒它可能还是可以用的,但是下一秒就不一定了。

4.1 使用心跳包

最常见的确保连接随时可用的方法是通过定时发送心跳包来检测连接的正常性。对于需要实现高实时性的服务,比如消息推送,这仍然是非常关键的。

大体的方案如下:

  • 双方约定好心跳包的格式,要能够区别于普通的消息。

  • 客户端每隔一定时间,就向服务端发送一个心跳包

  • 服务端每接收到心跳包时,将其抛弃

  • 如果客户端的某个心跳包发送失败,就可以判断连接已经断开

  • 如果对实时性要求很高,服务端也可以定时检查客户端发送心跳包的频率,如果超过一定时间没有发送可以认为连接已经断开

4.2 断开时重连

使用心跳包必然会增加带宽和性能的负担,对于普通的应用我们其实并没有必要使用这种方案,如果消息发送时抛出了连接异常,直接尝试重新连接就好了。

跟上面的方案对比,其实这个抛出异常的消息就充当了心跳包的角色。

总的来说,连接是否要保活,如何保活,需要根据具体的业务场景灵活地思考和定制。

Das obige ist der detaillierte Inhalt vonMethoden zur Implementierung der Socket-Programmierung auf Basis von Java. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen