Home  >  Article  >  Java  >  How to create a multi-threaded server in Java

How to create a multi-threaded server in Java

PHPz
PHPzforward
2023-05-10 15:58:141298browse

A typical single-threaded server example is as follows:

while (true) {
    Socket socket = null;
    try {
        // 接收客户连接
        socket = serverSocket.accept();
        // 从socket中获得输入流与输出流,与客户通信
        ...
    } catch(IOException e) {
        e.printStackTrace()
    } finally {
        try {
            if(socket != null) {
                // 断开连接
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

The server receives a client connection, communicates with the client, disconnects after the communication is completed, and then receives the next client connection. If there is a client connection at the same time, Multiple client connection requests must be queued by these clients. If you keep customers waiting for too long, your website will lose credibility, which will reduce traffic.

Concurrency performance is generally used to measure a server's ability to respond to multiple clients at the same time. A server with good concurrency performance must meet two conditions:

  • Can Receive and process multiple customer connections at the same time

  • For each customer, a response will be given quickly

Use multiple threads to serve multiple customers at the same time Customers provide services, which is the most common means to improve server concurrency performance. There are generally three ways:

  • Assign a worker thread to each customer

  • Create a thread pool, and its working threads will serve customers

  • Use the existing thread pool in the Java class library, and its working threads will serve customers

Assign a thread to each customer

The main thread of the server is responsible for receiving the customer's connection. Every time a customer connection is received, a worker thread will be created, which is responsible for it. Communication with customers

public class EchoServer {
    private int port = 8000;
    private ServerSocket serverSocket;
    public EchoServer() throws IOException {
        serverSocket = new ServerSocket(port);
        System.out.println("服务器启动");
    }
    public void service() {
        while(true) {
            Socket socket = null;
            try {
                // 接教客户连接
                socket = serverSocket.accept();
                // 创建一个工作线程
                Thread workThread = new Thread(new Handler(socket));
                // 启动工作线程
                workThread.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String args[])throws TOException {
        new EchoServer().service();
    }
    // 负责与单个客户的通信   
    class Handler implements Runnable {
        private Socket socket;
        pub1ic Handler(Socket socket) {
            this.socket = socket;
        }
        private PrintWriter getWriter(Socket socket) throws IOException {...}
        private BufferedReader getReader(Socket socket) throws IOException {...}
        public String echo(String msg) {...}
        public void run() {
            try {
                System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort());
                BufferedReader br = getReader(socket);
                PrintWriter pw = getWriter(socket);
                String msg = null;
                // 接收和发送数据,直到通信结束
                while ((msg = br.readLine()) != null) {
                    System.out.println("from "+ socket.getInetAddress() + ":" + socket.getPort() + ">" + msg);
                    pw.println(echo(msg));
                    if (msg.equals("bye")) break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    // 断开连接
                    if(socket != nulll) socket.close();
                } catch (IOException e) {
                    e,printStackTrace();
                }
            }
        }
    }
}

Create thread pool

The previous implementation has the following shortcomings:

  • The server creates and destroys worker threads The overhead is large. If the server needs to communicate with many clients, and the communication time with each client is very short, then it is possible that the overhead of the server creating new threads for the clients is greater than the overhead of actually communicating with the clients.

  • In addition to the overhead of creating and destroying threads, active threads also consume system resources. Each thread will occupy a certain amount of memory. If a large number of clients connect to the server at the same time, a large number of worker threads must be created. They consume a large amount of memory and may cause insufficient memory space in the system.

Some worker threads are pre-created in the thread pool, which continuously take tasks from the work queue and then execute the task. When the worker thread finishes executing a task, it will continue to execute the next task in the work queue

The thread pool has the following advantages:

  • Reduces the time required to create and destroy threads times, each worker thread can be reused all the time and can perform multiple tasks

  • The number of threads in the thread pool can be easily adjusted according to the system's carrying capacity to prevent excessive system consumption resources and cause the system to crash

public class ThreadPool extends ThreadGroup {
    // 线程池是否关闭
    private boolean isClosed = false;
    // 表示工作队列
    private LinkedList<Runnable> workQueue;
    // 表示线程池ID
    private static int threadPoolID;
    // 表示工作线程ID
    // poolSize 指定线程池中的工作线程数目
    public ThreadPool(int poolSize) {
        super("ThreadPool-"+ (threadPoolID++));
        setDaemon(true);
        // 创建工作队列
        workQueue = new LinkedList<Runnable>();
        for (int i = 0; i < poolSize; i++) {
            // 创建并启动工作线程
            new WorkThread().start(); 
        }
    }
    /**
     * 向工作队列中加入一个新任务,由工作线程去执行任务
     */
    public synchronized void execute(Runnable tank) {
        // 线程池被关则抛出IllegalStateException异常
        if(isClosed) {
            throw new IllegalStateException();
        }
        if(task != null) {
            workQueue.add(task);
            // 唤醒正在getTask()方法中等待任务的工作线限
            notify();
        }
    }
    /**
     * 从工作队列中取出一个任务,工作线程会调用此方法
     */
    protected synchronized Runnable getTask() throws InterruptedException {
        while(workQueue,size() == 0) {
            if (isClosed) return null;
            wait(); // 如果工作队列中没有任务,就等待任务
        }
        return workQueue.removeFirst();
    }
    /**
     * 关闭线程池
     */
    public synchronized void close() {
        if(!isClosed) {
            isClosed = true;
            // 清空工作队列
            workQueue.clear();
            // 中断所有的工作线程,该方法继承自ThreadGroup类
            interrupt();
        }
    }
    /**
     * 等待工作线程把所有任务执行完
     */
    public void join() {
        synchronized (this) {
            isClosed = true;
            // 唤醒还在getTask()方法中等待任务的工作线程
            notifyAll();
        }
        Thread[] threads = new Thread[activeCount()];
        // enumerate()方法继承自ThreadGroup类获得线程组中当前所有活着的工作线程
        int count = enumerate(threads);
        // 等待所有工作线程运行结束
        for(int i = 0; i < count; i++) {
            try {
                // 等待工作线程运行结束
                threads[i].join();
            } catch((InterruptedException ex) {}
        }
    }
    /**
     * 内部类:工作线程
     */
    private class WorkThread extends Thread {
        public WorkThread() {
            // 加入当前 ThreadPool 线程组
            super(ThreadPool.this, "WorkThread-" + (threadID++));
        }
        public void run() {
            // isInterrupted()方法承自Thread类,判断线程是否被中断
            while (!isInterrupted()) {
                Runnable task = null;
                try {
                    // 取出任务
                    task = getTask();
                } catch(InterruptedException ex) {}
                // 如果 getTask() 返回 nu11 或者线程执行 getTask() 时被中断,则结束此线程
                if(task != null) return;
                // 运行任务,异常在catch代码块中被捕获
                try {
                    task.run();
                } catch(Throwable t) {
                    t.printStackTrace();
                }
            }
        }
    }
}

The server implemented using the thread pool is as follows:

publlc class EchoServer {
    private int port = 8000;
    private ServerSocket serverSocket;
    private ThreadPool threadPool;	// 线程港
    private final int POOL_SIZE = 4;	// 单个CPU时线程池中工作线程的数目
    public EchoServer() throws IOException {
        serverSocket = new ServerSocket(port);
        // 创建线程池
        // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目
        // 系统的CPU越多,线程池中工作线程的数目也越多
        threadPool= new ThreadPool(
        	Runtime.getRuntime().availableProcessors() * POOL_SIZE);
        System.out.println("服务器启动");
    }
    public void service() {
        while (true) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
                // 把与客户通信的任务交给线程池
                threadPool.execute(new Handler(socket));
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String args[])throws TOException {
        new EchoServer().service();
    }
    // 负责与单个客户的通信,与上例类似
    class Handler implements Runnable {...}
}

Using the thread pool provided by Java

java. The util.concurrent package provides a ready-made thread pool implementation, which is more robust and more powerful. For more information about the thread pool, please refer to this article

public class Echoserver {
    private int port = 8000;
    private ServerSocket serverSocket;
    // 线程池
    private ExecutorService executorService;
    // 单个CPU时线程池中工作线程的数目
    private final int POOL_SIZE = 4;
    public EchoServer() throws IOException {
        serverSocket = new ServerSocket(port);
        // 创建线程池
        // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目
        // 系统的CPU越多,线程池中工作线程的数目也越多
        executorService = Executors.newFixedThreadPool(
        	Runtime.getRuntime().availableProcessors() * POOL_SIZE);
        System.out.println("服务器启动");
    }
    public void service() {
        while(true) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
                executorService.execute(new Handler(socket));
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
    }
     public static void main(String args[])throws TOException {
        new EchoServer().service();
    }
    // 负责与单个客户的通信,与上例类似
    class Handler implements Runnable {...}
}

Notes on using the thread pool

Although the thread pool can greatly improve the concurrent performance of the server, there are certain risks in using it, which can easily cause the following problems:

  • Deadlock

Any multi-threaded application has the risk of deadlock. The simplest situation that causes a deadlock is: Thread A holds the lock of object X and is waiting for the lock of object Y, while thread B holds the lock of object Y and is waiting for the lock of object X. Thread A and thread B Neither releases the lock it holds, and waits for the other party's lock. This causes the two threads to wait forever, and deadlock occurs.

Any multi-threaded program has the risk of deadlock, but threads The pool will also cause another kind of deadlock: assuming that all worker threads in the thread pool are blocked while executing their respective tasks, they are all waiting for the execution result of a certain task A. Task A is still in the work queue. Since there is no idle thread, task A cannot be executed. This causes all worker threads in the thread pool to be blocked forever, and deadlock occurs.

  • Insufficient system resources

If the thread pool There are very many threads in the system, and these threads will consume a large amount of resources including memory and other system resources, thus seriously affecting system performance

  • Concurrency errors

The thread pool's work queue relies on the wait() and notify() methods to enable worker threads to obtain tasks in time, but both methods are difficult to use. If not coded correctly, notifications can be lost, causing worker threads to remain idle, ignoring tasks that need to be processed in the work queue

  • Thread leak

For a thread pool with a fixed number of worker threads, if the worker thread throws a RuntimeException or Error when executing a task, and these exceptions or errors are not caught, the worker thread will terminate abnormally, causing the thread pool to be permanently lost. A worker thread. If all worker threads terminate abnormally, the thread pool becomes empty and there are no available worker threads to handle tasks

Another situation that leads to thread leakage is that the worker thread is blocked while executing a task, such as waiting for the user to enter data, but because the user has not entered data (perhaps because the user walked away), this work The thread keeps getting blocked. Such a worker thread exists in name only, and it actually does not perform any tasks. If all the worker threads in the thread pool are in such a blocked state, then the thread pool will not be able to process newly added tasks

  • Task overload

When there are a large number of tasks waiting to be executed in the work queue, these tasks themselves may consume too many system resources and cause a shortage of system resources

In summary, the thread pool may bring various risks, In order to avoid them as much as possible, the following principles need to be followed when using the thread pool:

If task A needs to wait synchronously for the execution result of task B during execution, then task A is not suitable to be added to the work queue of the thread pool. If you add a task like Task A that needs to wait for the execution results of other tasks into the work queue, it may cause a deadlock in the thread pool.

If a task is executed, it may be blocked for a long time. If the task is blocked, a timeout should be set to prevent the worker thread from permanently blocking and causing thread leakage.

Understand the characteristics of the task and analyze whether the task performs IO operations that often block or performs operations that never block. operate. The former uses the CPU intermittently, while the latter has higher CPU utilization. Classify the tasks according to the characteristics of the tasks, and then add different types of tasks to the work queues of different thread pools, so that each thread pool can be adjusted according to the characteristics of the tasks

Adjust the size of the thread pool, The optimal size of the thread pool depends primarily on the number of available CPUs on the system and the characteristics of the tasks in the work queue. If there is only one work queue on a system with N CPUs and all of them are computational tasks, then when the thread pool has N or N 1 worker threads, the maximum CPU utilization will generally be obtained

If the work queue contains tasks that perform IO operations and block frequently, let the size of the thread pool exceed the number of available CPUs, because not all worker threads are working all the time. Select a typical task, and then estimate the ratio between the waiting time (WT) and the actual time occupied by the CPU for calculations (ST) during the execution of this task: WT/ST. For a system with N CPUs, approximately N(1 WT/ST) threads need to be set up to ensure that the CPU is fully utilized

To avoid task overload, the server should limit the concurrent connections of customers based on the system's capacity. Number of. When the number of concurrent connections of a client exceeds the limit, the server can reject the connection request and give the client a friendly prompt.

The above is the detailed content of How to create a multi-threaded server in Java. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete