Home  >  Article  >  Java  >  Java Websocket development practice: how to handle large-scale concurrent connections

Java Websocket development practice: how to handle large-scale concurrent connections

WBOY
WBOYOriginal
2023-12-02 09:07:211306browse

Java Websocket开发实践:如何处理大规模并发连接

Java Websocket is a protocol used to establish real-time two-way communication between a web browser and a web server. In today's Internet applications, real-time is becoming more and more important, and one of the scenarios that requires real-time communication is social chat. In chat scenarios, large-scale concurrent connections need to be handled. And Java Websocket is an excellent choice.

In this article, we will introduce how to use Java Websocket to handle large-scale concurrent connections through code examples.

Let’s take a look at the common ideas first. In Java Websocket, Java EE's Servlet and WebSocketEndpoint are often used. In some simple examples, we will use these classes, but when the number of connections increases, using these classes directly can easily cause performance bottlenecks, and we need to use some more efficient tools to handle connections.

Here, we will use the netty-socketio library in JavaTreasureChest to handle Java Websocket connections. Netty is a high-performance network programming framework, and SocketIO is a protocol for implementing real-time applications.

Code Example

First, we need to add dependencies on the netty-socketio library. In the Maven project, we can add the following dependencies in the pom.xml file:

<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.17</version>
</dependency>

Next, we need to implement a Java class as a WebSocket server and listen for connection requests. The sample code is as follows:

import com.corundumstudio.socketio.*;
import com.corundumstudio.socketio.listener.*;

public class WebSocketServer {
    public static void main(String[] args) {
        // 创建配置对象
        Configuration config = new Configuration();
        config.setHostname("localhost");
        config.setPort(9092);

        // 创建SocketIO服务器
        SocketIOServer server = new SocketIOServer(config);

        // 添加连接事件监听器
        server.addConnectListener(new ConnectListener() {
            @Override
            public void onConnect(SocketIOClient client) {
                System.out.println("连接成功:" + client.getSessionId().toString());
            }
        });

        // 启动服务器
        server.start();

        // 等待连接关闭
        System.in.read();
        server.stop();
    }
}

In this code, we use the SocketIOServer class in the SocketIO library to create a WebSocket server. When the connection is successful, the connection success message will be printed.

Next, we need to register the listener with the server so that it can be processed when the client connects. The code is as follows:

// 添加事件监听器
server.addEventListener("client_msg", String.class, new DataListener<String>() {
    @Override
    public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
        System.out.println("收到消息:" + data + ",sessionId=" + client.getSessionId());
    }
});

In this code snippet, we register an event named "client_msg" and add a DataListener to handle the received message.

Sometimes, we may also need to authenticate the connection. The SocketIO library provides an AuthorizationListener interface, which we can implement to handle authentication. The sample code is as follows:

// 添加身份验证监听器
server.addAuthorizationListener(new AuthorizationListener() {
    @Override
    public boolean isAuthorized(HandshakeData handshakeData) {
        // 验证用户是否具有连接权限
        return true;
    }
});

In this code snippet, we add an AuthorizationListener to handle authentication requests. The logic here is to authenticate all connections.

Finally, we need to start the WebSocket server and wait for the connection to close. The code is as follows:

// 启动服务器
server.start();

// 等待连接关闭
System.in.read();
server.stop();

This is a simple Java Websocket server implementation, but it cannot handle large-scale concurrent connections. In the next section, we will cover how to use the netty-socketio library to handle large-scale concurrent connections.

Use namespace and room to handle concurrent connections

In order to handle a large number of concurrent connections, we need to group the connections. In the netty-socketio library, we can use namespace and room for grouping. A namespace is a logical channel that contains a group of rooms. A room is a room that contains a group of users.

The specific usage is as follows:

// 创建SocketIO服务器
SocketIOServer server = new SocketIOServer(config);

// 创建namespace
SocketIONamespace chatNamespace = server.addNamespace("/chat");

// 设置连接事件监听器
chatNamespace.addConnectListener(new ConnectListener() {
    @Override
    public void onConnect(SocketIOClient client) {
        // 加入默认房间
        client.joinRoom("default");
    }
});

// 设置事件监听器
chatNamespace.addEventListener("client_msg", String.class, new DataListener<String>() {
    @Override
    public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
        String sessionId = client.getSessionId().toString();
        System.out.println("收到消息:" + data + ",sessionId=" + sessionId);
        
        // 广播消息到房间的所有用户
        chatNamespace.getRoomOperations("default").sendEvent("server_msg", sessionId + ":" + data);
    }
});

// 启动服务器
server.start();

In this code segment, we use namespace and room to handle the connection. First, we created a logical channel called "chat" and added a default room. Next, when handling client connections, we add the connection to the default room.

When receiving a message from the client, we broadcast the message to all users in the default room. The getRoomOperations method is used here to obtain the operation objects in the room.

In this way, we can handle large-scale concurrent connections by using namespace and room.

Performance Optimization

In order to ensure performance under large-scale concurrent connections, we need to perform performance optimization. Here we list several common optimization methods.

  1. Use thread pool

When the number of concurrent connections increases, we can use the thread pool to improve performance. In netty-socketio, we can create a thread pool in the following way:

// 创建配置对象
Configuration config = new Configuration();
...
// 创建线程池
config.setWorkerThreads(100);
  1. Cache database connection

In database operations, we can cache connections to avoid frequent creation connect. In netty-socketio, we can cache the database connection in ConnectListener and use it in DataListener. The sample code is as follows:

chatNamespace.addConnectListener(new ConnectListener() {
    @Override
    public void onConnect(SocketIOClient client) {
        // 加入默认房间
        client.joinRoom("default");
        // 缓存数据库连接
        client.set("conn", getDBConnection());
    }
});

chatNamespace.addEventListener("client_msg", String.class, new DataListener<String>() {
    @Override
    public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
        String sessionId = client.getSessionId().toString();
        System.out.println("收到消息:" + data + ",sessionId=" + sessionId);

        // 使用缓存的数据库连接
        Connection conn = (Connection)client.get("conn");
        ...
    }
});

Here we use the set method of SocketIOClient to cache the database connection and use it in DataListener.

  1. Use the cached message queue

When the amount of concurrent messages is large, we can store the messages in the cached message queue and wait for subsequent processing. This can alleviate instantaneous concurrency pressure. The sample code is as follows:

private Queue<String> messageQueue = new ConcurrentLinkedDeque<>();

chatNamespace.addEventListener("client_msg", String.class, new DataListener<String>() {
    @Override
    public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
        String sessionId = client.getSessionId().toString();
        System.out.println("收到消息:" + data + ",sessionId=" + sessionId);

        // 将消息放入缓存队列
        messageQueue.offer(sessionId + ":" + data);
    }
});

// 消息处理线程
new Thread(new Runnable() {
    @Override
    public void run() {
        while (true) {
            try {
                // 从队列取出消息并处理
                String message = messageQueue.poll();
                processMessage(message);
            
                // 睡眠1秒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}).start();

Here we define a ConcurrentLinkedDeque queue to store messages. In the DataListener, put the message into the queue. In the processing thread, the message is taken from the queue and processed. Note that the thread sleep time needs to be set here to avoid excessive CPU usage.

Summary

In this article, we introduced how to use netty-socketio to handle large-scale concurrent connections. Using namespace and room to group connections and optimize performance can help us handle a large number of connections in synchronous communication scenarios.

In addition, it should be noted that the WebSocket protocol is usually used to implement long connections in real-time communication scenarios, but it may also have security risks. Therefore, in practical applications, we need to use it with caution and consider safety.

The above is the detailed content of Java Websocket development practice: how to handle large-scale concurrent connections. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn