Soket, juga dikenali sebagai soket, ialah protokol, konvensyen atau spesifikasi untuk komunikasi rangkaian antara proses yang berbeza.
Untuk pengaturcaraan soket, ia lebih kerap seperti lapisan enkapsulasi atau abstraksi berdasarkan TCP/UDP dan protokol lain Ia adalah antara muka yang disediakan oleh sistem untuk pengaturcaraan berkaitan komunikasi.
Kami mengambil API asas yang disediakan oleh sistem pengendalian Linux sebagai contoh untuk memahami proses asas mewujudkan komunikasi soket:
Anda boleh melihat bahawa pada asasnya, soket ialah penyederhanaan dan pengabstrakan sambungan TCP (sudah tentu, ia juga mungkin sambungan lain seperti UDP) protokol di peringkat pengaturcaraan.
Pertama, kita mulakan dengan kod asas soket yang hanya menghantar dan menerima mesej sekali:
Pelayan:
package com.marklux.socket.base; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; /** * The very basic socket server that only listen one single message. */ public class BaseSocketServer { private ServerSocket server; private Socket socket; private int port; private InputStream inputStream; private static final int MAX_BUFFER_SIZE = 1024; public int getPort() { return port; } public void setPort(int port) { this.port = port; } public BaseSocketServer(int port) { this.port = port; } public void runServerSingle() throws IOException { this.server = new ServerSocket(this.port); System.out.println("base socket server started."); // the code will block here till the request come. this.socket = server.accept(); this.inputStream = this.socket.getInputStream(); byte[] readBytes = new byte[MAX_BUFFER_SIZE]; int msgLen; StringBuilder stringBuilder = new StringBuilder(); while ((msgLen = inputStream.read(readBytes)) != -1) { stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8")); } System.out.println("get message from client: " + stringBuilder); inputStream.close(); socket.close(); server.close(); } public static void main(String[] args) { BaseSocketServer bs = new BaseSocketServer(9799); try { bs.runServerSingle(); }catch (IOException e) { e.printStackTrace(); } } }
Pelanggan:
package com.marklux.socket.base; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.Socket; /** * The very basic socket client that only send one single message. */ public class BaseSocketClient { private String serverHost; private int serverPort; private Socket socket; private OutputStream outputStream; public BaseSocketClient(String host, int port) { this.serverHost = host; this.serverPort = port; } public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); // why the output stream? } public void sendSingle(String message) throws IOException { try { this.outputStream.write(message.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } this.outputStream.close(); this.socket.close(); } public static void main(String[] args) { BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799); try { bc.connetServer(); bc.sendSingle("Hi from mark."); }catch (IOException e) { e.printStackTrace(); } } }
Jalankan pelayan dahulu, kemudian pelanggan, dan anda boleh melihat kesannya.
Perhatikan pelaksanaan operasi IO di sini Kami menggunakan tatasusunan bait bersaiz MAX_BUFFER_SIZE
sebagai penimbal, kemudian ambil bait daripada aliran input dan letakkannya dalam penimbal, dan kemudian. daripada Bytes dikeluarkan daripada penimbal dan dibina ke dalam rentetan, yang sangat berguna apabila fail aliran input adalah besar, sebenarnya, NIO yang akan dibincangkan kemudian juga dilaksanakan berdasarkan idea ini.
Contoh di atas hanya melaksanakan komunikasi sehala, yang jelas merupakan satu pembaziran saluran. Sambungan soket menyokong komunikasi dua hala dupleks penuh (lapisan bawah ialah tcp Dalam contoh berikut, selepas menerima mesej klien, pelayan akan mengembalikan resit kepada klien.
Dan kami menggunakan beberapa kaedah berpakej java.io untuk memudahkan keseluruhan proses komunikasi (kerana panjang mesej tidak besar, penimbal tidak lagi digunakan).
Pelayan:
public void runServer() throws IOException { this.serverSocket = new ServerSocket(port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); String message = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("received message: " + message); this.socket.shutdownInput(); // 告诉客户端接收已经完毕,之后只能发送 // write the receipt. this.outputStream = this.socket.getOutputStream(); String receipt = "We received your message: " + message; outputStream.write(receipt.getBytes("UTF-8")); this.outputStream.close(); this.socket.close(); }
Pelanggan:
public void sendMessage(String message) throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); this.outputStream.write(message.getBytes("UTF-8")); this.socket.shutdownOutput(); // 告诉服务器,所有的发送动作已经结束,之后只能接收 this.inputStream = socket.getInputStream(); String receipt = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("got receipt: " + receipt); this.inputStream.close(); this.socket.close(); }
Perhatikan bahawa di sini kami memanggil masing-masing selepas pelayan menerima mesej dan pelanggan menghantar mesej shutdownInput()
dan shutdownOutput()
ditambah dan bukannya menutup terus strim yang sepadan Ini kerana menutup mana-mana aliran akan secara langsung menyebabkan soket ditutup, menjadikannya mustahil untuk menghantar resit berikutnya.
Tetapi ambil perhatian bahawa selepas memanggil shutdownInput()
dan shutdownOutput()
, strim yang sepadan juga akan ditutup dan tidak boleh menghantar/menulis ke soket lagi.
Dalam dua contoh tadi, setiap kali strim dibuka, hanya satu tulisan/baca boleh dilakukan Selepas operasi pengambilan selesai, aliran yang sepadan ditutup dan tidak boleh ditulis/dibaca semula.
Dalam kes ini, jika dua mesej perlu dihantar, dua soket mesti diwujudkan, yang akan menggunakan sumber dan menyebabkan kesulitan. Sebenarnya, kita tidak perlu menutup aliran yang sepadan sama sekali, selagi kita menulis mesej dalam kelompok.
Tetapi dalam kes ini, kita mesti menghadapi masalah lain: Bagaimana untuk menilai penghujung penghantaran mesej?
Cara paling mudah ialah menggunakan beberapa simbol khas untuk menandakan selesainya penghantaran Pelayan boleh melengkapkan bacaan asalkan ia membaca simbol yang sepadan, dan kemudian melaksanakan operasi pemprosesan yang berkaitan.
Dalam contoh berikut, kami menggunakan aksara baris baharu n
untuk menandakan tamatnya penghantaran Setiap kali pelayan menerima mesej, ia mencetaknya sekali dan menggunakan Pengimbas untuk memudahkan operasi:
Pelayan:
public void runServer() throws IOException { this.server = new ServerSocket(this.port); System.out.println("base socket server started."); this.socket = server.accept(); // the code will block here till the request come. this.inputStream = this.socket.getInputStream(); Scanner sc = new Scanner(this.inputStream); while (sc.hasNextLine()) { System.out.println("get info from client: " + sc.nextLine()); } // 循环接收并输出消息内容 this.inputStream.close(); socket.close(); }
Pelanggan:
public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); } public void send(String message) throws IOException { String sendMsg = message + "\n"; // we mark \n as a end of line. try { this.outputStream.write(sendMsg.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } // this.outputStream.close(); // this.socket.shutdownOutput(); } public static void main(String[] args) { CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799); try { cc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String line = sc.nextLine(); cc.send(line); } }catch (IOException e) { e.printStackTrace(); } }
Kesan selepas dijalankan ialah setiap kali pelanggan memasuki baris teks dan menekan Enter, pelayan akan mencetak bacaan mesej yang sepadan rekod.
Kembali ke titik asal, sebab mengapa kita sukar untuk mengesan apabila mesej itu tamat adalah kerana kita tidak dapat menentukan panjang setiap mesej.
Kemudian anda sebenarnya boleh menghantar panjang mesej terlebih dahulu Apabila pelayan mengetahui panjang mesej, ia boleh melengkapkan penerimaan mesej.
Secara umum, menghantar mesej menjadi dua langkah
Tempoh menghantar mesej
Menghantar mesej
Masalah terakhir ialah jumlah bait yang dihantar dalam langkah "panjang mesej dihantar" mesti dibetulkan, jika tidak, kita masih akan menemui jalan buntu.
Secara umumnya, kita boleh menggunakan bilangan bait yang tetap untuk menyimpan panjang mesej Sebagai contoh, 2 bait pertama ialah panjang mesej yang boleh kita hantar juga telah ditetapkan, dengan mengambil 2 bait sebagai contoh, panjang maksimum mesej yang kami hantar tidak melebihi 2^16 bait, iaitu 64K.
Jika anda memahami pengekodan beberapa aksara, anda akan tahu bahawa kami sebenarnya boleh menggunakan ruang panjang berubah-ubah untuk menyimpan panjang mesej, contohnya:
Bait pertama bermula dengan 0: Iaitu 0XXXXXXX, yang bermaksud panjang ialah satu bait, maksimum ialah 128, yang bermaksud 128B
Bait pertama bait pertama ialah 110, kemudian bait berikut menunjukkan panjang: 110XXXXX 10XXXXXX, maksimum ialah 2048, yang bermaksud 2K
Bait pertama bagi bait pertama ialah 1110, kemudian dua bait berikut disertakan untuk menunjukkan panjang: 110XXXX 10XXXXXX 10XXXXXX, maksimum ialah 131072, yang bermaksud 128K
dan seterusnya
当然这样实现起来会麻烦一些,因此下面的例子里我们仍然使用固定的两个字节来记录消息的长度。
服务端:
public void runServer() throws IOException { this.serverSocket = new ServerSocket(this.port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); byte[] bytes; while (true) { // 先读第一个字节 int first = inputStream.read(); if (first == -1) { // 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了 this.socket.close(); break; } // 读取第二个字节 int second = inputStream.read(); int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度 bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可 inputStream.read(bytes); System.out.println("receive message: " + new String(bytes,"UTF-8")); } }
客户端:
public void connetServer() throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); } public void sendMessage(String message) throws IOException { // 首先要把message转换成bytes以便处理 byte[] bytes = message.getBytes("UTF-8"); // 接下来传输两个字节的长度,依然使用移位实现 int length = bytes.length; this.outputStream.write(length >> 8); // write默认一次只传输一个字节 this.outputStream.write(length); // 传输完长度后,再正式传送消息 this.outputStream.write(bytes); } public static void main(String[] args) { LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799); try { lc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { lc.sendMessage(sc.nextLine()); } } catch (IOException e) { e.printStackTrace(); } }
在考虑服务端处理多连接之前,我们先考虑使用多线程改造一下原有的一对一对话实例。
在原有的例子中,消息的接收方并不能主动地向对方发送消息,换句话说我们并没有实现真正的互相对话,这主要是因为消息的发送和接收这两个动作并不能同时进行,因此我们需要使用两个线程,其中一个用于监听键盘输入并将其写入socket,另一个则负责监听socket并将接受到的消息显示。
出于简单考虑,我们直接让主线程负责键盘监听和消息发送,同时另外开启一个线程用于拉取消息并显示。
消息拉取线程 ListenThread.java
public class ListenThread implements Runnable { private Socket socket; private InputStream inputStream; public ListenThread(Socket socket) { this.socket = socket; } @Override public void run() throws RuntimeException{ try { this.inputStream = socket.getInputStream(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } while (true) { try { int first = this.inputStream.read(); if (first == -1) { // 输入流已经被关闭,无需继续读取 throw new RuntimeException("disconnected."); } int second = this.inputStream.read(); int msgLength = (first<<8) + second; byte[] readBuffer = new byte[msgLength]; this.inputStream.read(readBuffer); System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8")); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } } } }
主线程,启动时由用户选择是作为server还是client:
public class ChatSocket { private String host; private int port; private Socket socket; private ServerSocket serverSocket; private OutputStream outputStream; // 以服务端形式启动,创建会话 public void runAsServer(int port) throws IOException { this.serverSocket = new ServerSocket(port); System.out.println("[log] server started at port " + port); // 等待客户端的加入 this.socket = serverSocket.accept(); System.out.println("[log] successful connected with " + socket.getInetAddress()); // 启动监听线程 Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } // 以客户端形式启动,加入会话 public void runAsClient(String host, int port) throws IOException { this.socket = new Socket(host, port); System.out.println("[log] successful connected to server " + socket.getInetAddress()); Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } public void waitAndSend() throws IOException { this.outputStream = this.socket.getOutputStream(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { this.sendMessage(sc.nextLine()); } } public void sendMessage(String message) throws IOException { byte[] msgBytes = message.getBytes("UTF-8"); int length = msgBytes.length; outputStream.write(length>>8); outputStream.write(length); outputStream.write(msgBytes); } public static void main(String[] args) { Scanner scanner = new Scanner(System.in); ChatSocket chatSocket = new ChatSocket(); System.out.println("select connect type: 1 for server and 2 for client"); int type = Integer.parseInt(scanner.nextLine().toString()); if (type == 1) { System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsServer(port); } catch (IOException e) { e.printStackTrace(); } }else if (type == 2) { System.out.print("input server host: "); String host = scanner.nextLine(); System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsClient(host, port); } catch (IOException e) { e.printStackTrace(); } } } }
作为服务端,如果一次只跟一个客户端建立socket连接,未免显得太过浪费资源,因此我们完全可以让服务端和多个客户端建立多个socket。
如果要处理多个连接,就必须解决并发问题,否则可以编写循环轮流处理。我们可以使用多线程来处理并发,不过线程的创建和销毁都会消耗大量的资源和时间,所以最好一步到位,用一个线程池来实现。
下面给出一个示范性质的服务端代码:
public class SocketServer { public static void main(String args[]) throws Exception { // 监听指定的端口 int port = 55533; ServerSocket server = new ServerSocket(port); // server将一直等待连接的到来 System.out.println("server将一直等待连接的到来"); //如果使用多线程,那就需要线程池,防止并发过高时创建过多线程耗尽资源 ExecutorService threadPool = Executors.newFixedThreadPool(100); while (true) { Socket socket = server.accept(); Runnable runnable=()->{ try { // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取 InputStream inputStream = socket.getInputStream(); byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { // 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8 sb.append(new String(bytes, 0, len, "UTF-8")); } System.out.println("get message from client: " + sb); inputStream.close(); socket.close(); } catch (Exception e) { e.printStackTrace(); } }; threadPool.submit(runnable); } } }
我想你不难发现一个问题,那就是当socket连接成功建立后,如果中途发生异常导致其中一方断开连接,此时另一方是无法发现的,只有在再次尝试发送/接收消息才会因为抛出异常而退出。
简单的说,就是我们维持的socket连接,是一个长连接,但我们没有保证它的时效性,上一秒它可能还是可以用的,但是下一秒就不一定了。
最常见的确保连接随时可用的方法是通过定时发送心跳包来检测连接的正常性。对于需要实现高实时性的服务,比如消息推送,这仍然是非常关键的。
大体的方案如下:
双方约定好心跳包的格式,要能够区别于普通的消息。
客户端每隔一定时间,就向服务端发送一个心跳包
服务端每接收到心跳包时,将其抛弃
如果客户端的某个心跳包发送失败,就可以判断连接已经断开
如果对实时性要求很高,服务端也可以定时检查客户端发送心跳包的频率,如果超过一定时间没有发送可以认为连接已经断开
使用心跳包必然会增加带宽和性能的负担,对于普通的应用我们其实并没有必要使用这种方案,如果消息发送时抛出了连接异常,直接尝试重新连接就好了。
跟上面的方案对比,其实这个抛出异常的消息就充当了心跳包的角色。
总的来说,连接是否要保活,如何保活,需要根据具体的业务场景灵活地思考和定制。
Atas ialah kandungan terperinci Kaedah untuk melaksanakan pengaturcaraan Soket berdasarkan Java. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!