ホームページ >Java >&#&チュートリアル >Java に基づいてソケット プログラミングを実装する方法
ソケットは、ソケットとも呼ばれ、異なるプロセス間のネットワーク通信のためのプロトコル、規約、または仕様です。
ソケット プログラミングの場合、TCP/UDP などのプロトコルに基づくカプセル化または抽象化の層に似ており、ネットワーク通信に関連するプログラミングのためにシステムによって提供されるインターフェイスです。
ソケット通信を確立する基本プロセスを理解するために、Linux オペレーティング システムによって提供される基本 API を例に挙げます。
本質的に、ソケットは tcp 接続 (もちろん、udp などの他の接続の場合もあります) プロトコルをプログラミング レベルで単純化および抽象化したものであることがわかります。
1. 最も基本的なソケットのデモ
package com.marklux.socket.base; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; /** * The very basic socket server that only listen one single message. */ public class BaseSocketServer { private ServerSocket server; private Socket socket; private int port; private InputStream inputStream; private static final int MAX_BUFFER_SIZE = 1024; public int getPort() { return port; } public void setPort(int port) { this.port = port; } public BaseSocketServer(int port) { this.port = port; } public void runServerSingle() throws IOException { this.server = new ServerSocket(this.port); System.out.println("base socket server started."); // the code will block here till the request come. this.socket = server.accept(); this.inputStream = this.socket.getInputStream(); byte[] readBytes = new byte[MAX_BUFFER_SIZE]; int msgLen; StringBuilder stringBuilder = new StringBuilder(); while ((msgLen = inputStream.read(readBytes)) != -1) { stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8")); } System.out.println("get message from client: " + stringBuilder); inputStream.close(); socket.close(); server.close(); } public static void main(String[] args) { BaseSocketServer bs = new BaseSocketServer(9799); try { bs.runServerSingle(); }catch (IOException e) { e.printStackTrace(); } } }クライアント:
package com.marklux.socket.base; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.Socket; /** * The very basic socket client that only send one single message. */ public class BaseSocketClient { private String serverHost; private int serverPort; private Socket socket; private OutputStream outputStream; public BaseSocketClient(String host, int port) { this.serverHost = host; this.serverPort = port; } public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); // why the output stream? } public void sendSingle(String message) throws IOException { try { this.outputStream.write(message.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } this.outputStream.close(); this.socket.close(); } public static void main(String[] args) { BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799); try { bc.connetServer(); bc.sendSingle("Hi from mark."); }catch (IOException e) { e.printStackTrace(); } } }最初にサーバーを実行し、次にクライアントを実行すると、効果がわかります。 ここでの IO 操作の実装に注意してください。バッファとしてサイズ
public void runServer() throws IOException { this.serverSocket = new ServerSocket(port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); String message = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("received message: " + message); this.socket.shutdownInput(); // 告诉客户端接收已经完毕,之后只能发送 // write the receipt. this.outputStream = this.socket.getOutputStream(); String receipt = "We received your message: " + message; outputStream.write(receipt.getBytes("UTF-8")); this.outputStream.close(); this.socket.close(); }クライアント:
public void sendMessage(String message) throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); this.outputStream.write(message.getBytes("UTF-8")); this.socket.shutdownOutput(); // 告诉服务器,所有的发送动作已经结束,之后只能接收 this.inputStream = socket.getInputStream(); String receipt = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("got receipt: " + receipt); this.inputStream.close(); this.socket.close(); }ここでは、サーバーがメッセージを受信した後、クライアントがメッセージを送信した後にそれぞれ呼び出していることに注意してください。 ##shutdownInput()
は、対応するストリームを直接閉じる代わりに置き換えられます。これは、ストリームを閉じると直接ソケットが閉じられ、その後の受信が処理できなくなるためです。の送信されました。 ただし、
を呼び出した後は、対応するストリームも閉じられ、ソケットをもう一度/書き込みました。 2. さらにメッセージを送信: 終わりの定義
しかし、この場合、別の問題に直面しなければなりません。それは、メッセージ送信の終了をどのように判断するかということです。
2.1 特殊記号を使用する
最も簡単な方法は、いくつかの特殊記号を使用して送信の完了をマークすることです。サーバーは、対応する記号を読み取る限り読み取りを完了できます。関連する処理操作を実行します。
を使用して送信の終了をマークします。サーバーはメッセージを受信するたびに、メッセージを 1 回出力し、スキャナーを使用して簡略化します。操作:
サーバー:<pre class="brush:java;">public void runServer() throws IOException {
this.server = new ServerSocket(this.port);
System.out.println("base socket server started.");
this.socket = server.accept();
// the code will block here till the request come.
this.inputStream = this.socket.getInputStream();
Scanner sc = new Scanner(this.inputStream);
while (sc.hasNextLine()) {
System.out.println("get info from client: " + sc.nextLine());
} // 循环接收并输出消息内容
this.inputStream.close();
socket.close();
}</pre>
クライアント:
public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); } public void send(String message) throws IOException { String sendMsg = message + "\n"; // we mark \n as a end of line. try { this.outputStream.write(sendMsg.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } // this.outputStream.close(); // this.socket.shutdownOutput(); } public static void main(String[] args) { CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799); try { cc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String line = sc.nextLine(); cc.send(line); } }catch (IOException e) { e.printStackTrace(); } }
実行後の効果は、クライアントがテキスト行を入力して Enter キーを押すたびに、サーバーは印刷を実行します。対応するメッセージ読み取りレコードを出力します。
2.2 長さに基づいて定義する
元の点に戻りますが、メッセージがいつ終了するかを特定することが難しい理由は、各メッセージの長さを決定できないためです。
一般に、メッセージの送信は 2 つのステップになります
送信されるメッセージの長さ
メッセージの送信
最後の問題は、「送信メッセージの長さ」ステップで送信されるバイト量を修正する必要があるということです。修正しないと、依然としてデッドロックが発生します。
一部の文字のエンコーディングを理解していれば、実際に可変長スペースを使用してメッセージの長さを格納できることがわかります。たとえば、次のようになります。
最初のバイトが始まります。 0 の場合: つまり、0XXXXXXX、長さが 1 バイトであることを意味し、最大値は 128、つまり 128B最初のバイトの最初のバイトは 110 で、その後、長さを示すために次のバイトが付加されます: 110XXXXX 10XXXXXX、最大値は 2048 で、これは 2K
を意味します。最初のバイトの最初のバイトは 1110 で、次の 2 バイトが長さを示すために含まれます: 110XXXXX 10XXXXXX 10XXXXXX、最大値は 131072、つまり 128Kなどの上###
当然这样实现起来会麻烦一些,因此下面的例子里我们仍然使用固定的两个字节来记录消息的长度。
服务端:
public void runServer() throws IOException { this.serverSocket = new ServerSocket(this.port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); byte[] bytes; while (true) { // 先读第一个字节 int first = inputStream.read(); if (first == -1) { // 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了 this.socket.close(); break; } // 读取第二个字节 int second = inputStream.read(); int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度 bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可 inputStream.read(bytes); System.out.println("receive message: " + new String(bytes,"UTF-8")); } }
客户端:
public void connetServer() throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); } public void sendMessage(String message) throws IOException { // 首先要把message转换成bytes以便处理 byte[] bytes = message.getBytes("UTF-8"); // 接下来传输两个字节的长度,依然使用移位实现 int length = bytes.length; this.outputStream.write(length >> 8); // write默认一次只传输一个字节 this.outputStream.write(length); // 传输完长度后,再正式传送消息 this.outputStream.write(bytes); } public static void main(String[] args) { LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799); try { lc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { lc.sendMessage(sc.nextLine()); } } catch (IOException e) { e.printStackTrace(); } }
在考虑服务端处理多连接之前,我们先考虑使用多线程改造一下原有的一对一对话实例。
在原有的例子中,消息的接收方并不能主动地向对方发送消息,换句话说我们并没有实现真正的互相对话,这主要是因为消息的发送和接收这两个动作并不能同时进行,因此我们需要使用两个线程,其中一个用于监听键盘输入并将其写入socket,另一个则负责监听socket并将接受到的消息显示。
出于简单考虑,我们直接让主线程负责键盘监听和消息发送,同时另外开启一个线程用于拉取消息并显示。
消息拉取线程 ListenThread.java
public class ListenThread implements Runnable { private Socket socket; private InputStream inputStream; public ListenThread(Socket socket) { this.socket = socket; } @Override public void run() throws RuntimeException{ try { this.inputStream = socket.getInputStream(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } while (true) { try { int first = this.inputStream.read(); if (first == -1) { // 输入流已经被关闭,无需继续读取 throw new RuntimeException("disconnected."); } int second = this.inputStream.read(); int msgLength = (first<<8) + second; byte[] readBuffer = new byte[msgLength]; this.inputStream.read(readBuffer); System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8")); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } } } }
主线程,启动时由用户选择是作为server还是client:
public class ChatSocket { private String host; private int port; private Socket socket; private ServerSocket serverSocket; private OutputStream outputStream; // 以服务端形式启动,创建会话 public void runAsServer(int port) throws IOException { this.serverSocket = new ServerSocket(port); System.out.println("[log] server started at port " + port); // 等待客户端的加入 this.socket = serverSocket.accept(); System.out.println("[log] successful connected with " + socket.getInetAddress()); // 启动监听线程 Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } // 以客户端形式启动,加入会话 public void runAsClient(String host, int port) throws IOException { this.socket = new Socket(host, port); System.out.println("[log] successful connected to server " + socket.getInetAddress()); Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } public void waitAndSend() throws IOException { this.outputStream = this.socket.getOutputStream(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { this.sendMessage(sc.nextLine()); } } public void sendMessage(String message) throws IOException { byte[] msgBytes = message.getBytes("UTF-8"); int length = msgBytes.length; outputStream.write(length>>8); outputStream.write(length); outputStream.write(msgBytes); } public static void main(String[] args) { Scanner scanner = new Scanner(System.in); ChatSocket chatSocket = new ChatSocket(); System.out.println("select connect type: 1 for server and 2 for client"); int type = Integer.parseInt(scanner.nextLine().toString()); if (type == 1) { System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsServer(port); } catch (IOException e) { e.printStackTrace(); } }else if (type == 2) { System.out.print("input server host: "); String host = scanner.nextLine(); System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsClient(host, port); } catch (IOException e) { e.printStackTrace(); } } } }
作为服务端,如果一次只跟一个客户端建立socket连接,未免显得太过浪费资源,因此我们完全可以让服务端和多个客户端建立多个socket。
如果要处理多个连接,就必须解决并发问题,否则可以编写循环轮流处理。我们可以使用多线程来处理并发,不过线程的创建和销毁都会消耗大量的资源和时间,所以最好一步到位,用一个线程池来实现。
下面给出一个示范性质的服务端代码:
public class SocketServer { public static void main(String args[]) throws Exception { // 监听指定的端口 int port = 55533; ServerSocket server = new ServerSocket(port); // server将一直等待连接的到来 System.out.println("server将一直等待连接的到来"); //如果使用多线程,那就需要线程池,防止并发过高时创建过多线程耗尽资源 ExecutorService threadPool = Executors.newFixedThreadPool(100); while (true) { Socket socket = server.accept(); Runnable runnable=()->{ try { // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取 InputStream inputStream = socket.getInputStream(); byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { // 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8 sb.append(new String(bytes, 0, len, "UTF-8")); } System.out.println("get message from client: " + sb); inputStream.close(); socket.close(); } catch (Exception e) { e.printStackTrace(); } }; threadPool.submit(runnable); } } }
我想你不难发现一个问题,那就是当socket连接成功建立后,如果中途发生异常导致其中一方断开连接,此时另一方是无法发现的,只有在再次尝试发送/接收消息才会因为抛出异常而退出。
简单的说,就是我们维持的socket连接,是一个长连接,但我们没有保证它的时效性,上一秒它可能还是可以用的,但是下一秒就不一定了。
最常见的确保连接随时可用的方法是通过定时发送心跳包来检测连接的正常性。对于需要实现高实时性的服务,比如消息推送,这仍然是非常关键的。
大体的方案如下:
双方约定好心跳包的格式,要能够区别于普通的消息。
客户端每隔一定时间,就向服务端发送一个心跳包
服务端每接收到心跳包时,将其抛弃
如果客户端的某个心跳包发送失败,就可以判断连接已经断开
如果对实时性要求很高,服务端也可以定时检查客户端发送心跳包的频率,如果超过一定时间没有发送可以认为连接已经断开
使用心跳包必然会增加带宽和性能的负担,对于普通的应用我们其实并没有必要使用这种方案,如果消息发送时抛出了连接异常,直接尝试重新连接就好了。
跟上面的方案对比,其实这个抛出异常的消息就充当了心跳包的角色。
总的来说,连接是否要保活,如何保活,需要根据具体的业务场景灵活地思考和定制。
以上がJava に基づいてソケット プログラミングを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。