Maison  >  Article  >  Java  >  Comment créer un serveur multithread en Java

Comment créer un serveur multithread en Java

PHPz
PHPzavant
2023-05-10 15:58:141340parcourir

Un exemple typique de serveur monothread est le suivant :

while (true) {
    Socket socket = null;
    try {
        // 接收客户连接
        socket = serverSocket.accept();
        // 从socket中获得输入流与输出流,与客户通信
        ...
    } catch(IOException e) {
        e.printStackTrace()
    } finally {
        try {
            if(socket != null) {
                // 断开连接
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Le serveur reçoit une connexion client, communique avec le client, se déconnecte une fois la communication terminée, puis reçoit la connexion client suivante s'il existe plusieurs connexions client les demandant. en même temps, les clients devront faire la queue. Si vous faites attendre les clients trop longtemps, votre site Web perdra en crédibilité, ce qui réduira le trafic.

Les performances de concurrence sont généralement utilisées pour mesurer la capacité d'un serveur à répondre à plusieurs clients en même temps. Un serveur avec de bonnes performances de concurrence doit remplir deux conditions :

  • Être capable de recevoir et de traiter plusieurs connexions clients en même temps.

  • Pour chaque client, une réponse sera donnée rapidement

Utiliser plusieurs threads pour fournir des services à plusieurs clients en même temps Il s'agit généralement de la méthode la plus courante pour améliorer les performances simultanées du serveur. de trois manières :

  • Pour chaque client, le client alloue un thread de travail

  • Créez un pool de threads et le thread de travail qu'il contient sert le client

  • Utilisez le pool de threads prêt à l'emploi dans la bibliothèque de classes Java, et son thread de travail sert le client

Attribuez un thread à chaque client

Le thread principal du serveur est responsable de la réception de la connexion du client. Chaque fois qu'une connexion client est reçue, un thread de travail est créé, qui est responsable. pour communiquer avec le client

public class EchoServer {
    private int port = 8000;
    private ServerSocket serverSocket;
    public EchoServer() throws IOException {
        serverSocket = new ServerSocket(port);
        System.out.println("服务器启动");
    }
    public void service() {
        while(true) {
            Socket socket = null;
            try {
                // 接教客户连接
                socket = serverSocket.accept();
                // 创建一个工作线程
                Thread workThread = new Thread(new Handler(socket));
                // 启动工作线程
                workThread.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String args[])throws TOException {
        new EchoServer().service();
    }
    // 负责与单个客户的通信   
    class Handler implements Runnable {
        private Socket socket;
        pub1ic Handler(Socket socket) {
            this.socket = socket;
        }
        private PrintWriter getWriter(Socket socket) throws IOException {...}
        private BufferedReader getReader(Socket socket) throws IOException {...}
        public String echo(String msg) {...}
        public void run() {
            try {
                System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort());
                BufferedReader br = getReader(socket);
                PrintWriter pw = getWriter(socket);
                String msg = null;
                // 接收和发送数据,直到通信结束
                while ((msg = br.readLine()) != null) {
                    System.out.println("from "+ socket.getInetAddress() + ":" + socket.getPort() + ">" + msg);
                    pw.println(echo(msg));
                    if (msg.equals("bye")) break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    // 断开连接
                    if(socket != nulll) socket.close();
                } catch (IOException e) {
                    e,printStackTrace();
                }
            }
        }
    }
}

Créer un pool de threads

Précédent Cette implémentation présente les inconvénients suivants :

  • Le serveur crée et détruit des threads de travail avec une surcharge élevée Si le serveur a besoin de communiquer avec de nombreux clients, et le serveur crée et détruit des threads de travail avec une surcharge élevée. le temps de communication avec chaque client est court, alors il est possible que le serveur soit un client. La surcharge liée à la création de nouveaux threads est supérieure à la surcharge liée à la communication réelle avec le client

  • En plus de la surcharge liée à la création et à la destruction de threads, les threads actifs consomment également des ressources système. Chaque thread occupera une certaine quantité de mémoire. Si un grand nombre de clients se connectent au serveur en même temps, un grand nombre de threads de travail doivent être créés. Ils consomment une grande quantité de mémoire et peuvent entraîner un espace mémoire insuffisant. Certains travaux sont pré-créés dans le pool de threads, qui récupèrent en permanence les tâches de la file d'attente de travail, puis les exécutent. Lorsque le thread de travail termine d'exécuter une tâche, il continuera à exécuter la tâche suivante dans la file d'attente de travail

  • Le pool de threads présente les avantages suivants :

Réduit le nombre de threads créés et détruits, et chaque thread de travail peut toujours être réutilisé, peut effectuer plusieurs tâches

  • Le nombre de threads dans le pool de threads peut être facilement ajusté en fonction de la capacité de charge du système pour éviter les pannes du système dues à une consommation excessive de ressources système

  • public class ThreadPool extends ThreadGroup {
        // 线程池是否关闭
        private boolean isClosed = false;
        // 表示工作队列
        private LinkedList<Runnable> workQueue;
        // 表示线程池ID
        private static int threadPoolID;
        // 表示工作线程ID
        // poolSize 指定线程池中的工作线程数目
        public ThreadPool(int poolSize) {
            super("ThreadPool-"+ (threadPoolID++));
            setDaemon(true);
            // 创建工作队列
            workQueue = new LinkedList<Runnable>();
            for (int i = 0; i < poolSize; i++) {
                // 创建并启动工作线程
                new WorkThread().start(); 
            }
        }
        /**
         * 向工作队列中加入一个新任务,由工作线程去执行任务
         */
        public synchronized void execute(Runnable tank) {
            // 线程池被关则抛出IllegalStateException异常
            if(isClosed) {
                throw new IllegalStateException();
            }
            if(task != null) {
                workQueue.add(task);
                // 唤醒正在getTask()方法中等待任务的工作线限
                notify();
            }
        }
        /**
         * 从工作队列中取出一个任务,工作线程会调用此方法
         */
        protected synchronized Runnable getTask() throws InterruptedException {
            while(workQueue,size() == 0) {
                if (isClosed) return null;
                wait(); // 如果工作队列中没有任务,就等待任务
            }
            return workQueue.removeFirst();
        }
        /**
         * 关闭线程池
         */
        public synchronized void close() {
            if(!isClosed) {
                isClosed = true;
                // 清空工作队列
                workQueue.clear();
                // 中断所有的工作线程,该方法继承自ThreadGroup类
                interrupt();
            }
        }
        /**
         * 等待工作线程把所有任务执行完
         */
        public void join() {
            synchronized (this) {
                isClosed = true;
                // 唤醒还在getTask()方法中等待任务的工作线程
                notifyAll();
            }
            Thread[] threads = new Thread[activeCount()];
            // enumerate()方法继承自ThreadGroup类获得线程组中当前所有活着的工作线程
            int count = enumerate(threads);
            // 等待所有工作线程运行结束
            for(int i = 0; i < count; i++) {
                try {
                    // 等待工作线程运行结束
                    threads[i].join();
                } catch((InterruptedException ex) {}
            }
        }
        /**
         * 内部类:工作线程
         */
        private class WorkThread extends Thread {
            public WorkThread() {
                // 加入当前 ThreadPool 线程组
                super(ThreadPool.this, "WorkThread-" + (threadID++));
            }
            public void run() {
                // isInterrupted()方法承自Thread类,判断线程是否被中断
                while (!isInterrupted()) {
                    Runnable task = null;
                    try {
                        // 取出任务
                        task = getTask();
                    } catch(InterruptedException ex) {}
                    // 如果 getTask() 返回 nu11 或者线程执行 getTask() 时被中断,则结束此线程
                    if(task != null) return;
                    // 运行任务,异常在catch代码块中被捕获
                    try {
                        task.run();
                    } catch(Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
        }
    }

    Les serveurs implémentés à l'aide du thread pool sont les suivants :

    publlc class EchoServer {
        private int port = 8000;
        private ServerSocket serverSocket;
        private ThreadPool threadPool;	// 线程港
        private final int POOL_SIZE = 4;	// 单个CPU时线程池中工作线程的数目
        public EchoServer() throws IOException {
            serverSocket = new ServerSocket(port);
            // 创建线程池
            // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目
            // 系统的CPU越多,线程池中工作线程的数目也越多
            threadPool= new ThreadPool(
            	Runtime.getRuntime().availableProcessors() * POOL_SIZE);
            System.out.println("服务器启动");
        }
        public void service() {
            while (true) {
                Socket socket = null;
                try {
                    socket = serverSocket.accept();
                    // 把与客户通信的任务交给线程池
                    threadPool.execute(new Handler(socket));
                } catch(IOException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String args[])throws TOException {
            new EchoServer().service();
        }
        // 负责与单个客户的通信,与上例类似
        class Handler implements Runnable {...}
    }
  • Utilisez le pool de threads fourni par Java

Le package java.util.concurrent fournit une implémentation de pool de threads prête à l'emploi, qui est plus robuste et plus puissante. Pour plus d'informations sur le pool de threads, veuillez vous référer à cet article

public class Echoserver {
    private int port = 8000;
    private ServerSocket serverSocket;
    // 线程池
    private ExecutorService executorService;
    // 单个CPU时线程池中工作线程的数目
    private final int POOL_SIZE = 4;
    public EchoServer() throws IOException {
        serverSocket = new ServerSocket(port);
        // 创建线程池
        // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目
        // 系统的CPU越多,线程池中工作线程的数目也越多
        executorService = Executors.newFixedThreadPool(
        	Runtime.getRuntime().availableProcessors() * POOL_SIZE);
        System.out.println("服务器启动");
    }
    public void service() {
        while(true) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
                executorService.execute(new Handler(socket));
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
    }
     public static void main(String args[])throws TOException {
        new EchoServer().service();
    }
    // 负责与单个客户的通信,与上例类似
    class Handler implements Runnable {...}
}

Remarques sur l'utilisation du pool de threads

Bien que le pool de threads puisse considérablement améliorer les performances de concurrence du serveur, son utilisation comporte certains risques, qui peuvent facilement provoquer les problèmes suivants : java.util.concurrent 包提供了现成的线程池的实现,更加健壮,功能也更强大,更多关于线程池的介绍可以这篇文章

rrreee

使用线程池的注意事项

虽然线程池能大大提高服务器的并发性能,但使用它也存在一定风险,容易引发下面的问题:

  • 死锁

任何多线程应用程序都有死锁风险。造成死锁的最简单的情形是:线程 A 持有对象 X 的锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的锁,并且在等待对象 X 的锁,线程 A 与线程 B 都不释放自己持有的锁,并且等待对方的锁,这就导致两个线程永远等待下去,死锁就这样产生了

任何多线程程序都有死锁的风险,但线程池还会导致另外一种死锁:假定线程池中的所有工作线程都在执行各自任务时被阻塞,它们都在等待某个任务 A 的执行结果。而任务 A 依然在工作队列中,由于没有空闲线程,使得任务 A 一直不能被执行。这使得线程池中的所有工作线程都永远阻塞下去,死锁就这样产生了

  • 系统资源不足

如果线程池中的线程数目非常多,这些线程就会消耗包括内存和其他系统资源在内的大量资源,从而严重影响系统性能

  • 并发错误

线程池的工作队列依靠 wait() 和 notify()

  • deadlock

Toute application multithread Tous les programmes risquent de se retrouver dans une impasse. La situation la plus simple qui provoque un blocage est la suivante : le thread A détient le verrou de l'objet X et attend le verrou de l'objet Y, tandis que le thread B détient le verrou de l'objet Y et attend le verrou de l'objet X. Thread A et thread B Ni l'un ni l'autre ne libère le verrou qu'il détient et attend le verrou de l'autre partie, ce qui fait attendre indéfiniment les deux threads et un blocage se produit. Tout programme multithread présente un risque de blocage, mais le pool de threads provoquera également un autre type de blocage. blocage : supposons que tous les threads de travail du pool de threads sont bloqués lors de l'exécution de leurs tâches respectives et qu'ils attendent tous le résultat de l'exécution d'une certaine tâche A. La tâche A est toujours dans la file d'attente de travail. Puisqu'il n'y a pas de thread inactif, la tâche A ne peut pas être exécutée. Cela entraîne le blocage permanent de tous les threads de travail du pool de threads et un blocage se produit. Si le nombre de threads dans le pool de threads est très important, ces threads consommeront de la mémoire et d'autres ressources. y compris les ressources système, affectent sérieusement les performances du système

🎜🎜Erreurs de concurrence🎜🎜🎜🎜La file d'attente de travail du pool de threads s'appuie sur les méthodes wait() et notify() pour permettre aux threads de travail d'obtenir des tâches en temps opportun, mais les deux méthodes sont difficiles à utiliser. Si elles ne sont pas codées correctement, les notifications peuvent être perdues, ce qui fait que le thread de travail reste inactif, ignorant les tâches qui doivent être traitées dans la file d'attente de travail🎜🎜🎜🎜Fuite de thread🎜🎜🎜🎜Pour un pool de threads avec un nombre fixe de travailleurs threads, si le thread de travail Si une RuntimeException ou une erreur est levée lors de l'exécution d'une tâche et que ces exceptions ou erreurs ne sont pas interceptées, le thread de travail se terminera anormalement, entraînant la perte permanente d'un thread de travail par le pool de threads. Si tous les threads de travail se terminent anormalement, le pool de threads devient vide et il n'y a aucun thread de travail disponible pour gérer les tâches🎜

Une autre situation qui entraîne une fuite de thread est que le thread de travail est bloqué lors de l'exécution d'une tâche, par exemple en attendant que l'utilisateur saisisse des données, mais parce que l'utilisateur n'a pas saisi de données (peut-être parce qu'il s'est éloigné), le thread de travail a été bloqué. Un tel thread de travail n’existe que de nom et n’effectue en réalité aucune tâche. Si tous les threads de travail du pool de threads sont dans un tel état bloqué, alors le pool de threads ne sera pas en mesure de traiter les tâches nouvellement ajoutées

  • Surcharge de tâches

Lorsqu'il y a un grand nombre de tâches en attente d'être exécutées dans la file d'attente de travail, ces tâches peuvent consommer trop de ressources système et provoquer un manque de ressources système

En résumé, le pool de threads peut entraîner divers risques. Afin de les éviter autant que possible, vous devez suivre les. principes suivants lors de l'utilisation du pool de threads :

Si la tâche A Pendant le processus d'exécution, il est nécessaire d'attendre le résultat de l'exécution de la tâche B de manière synchrone, donc la tâche A ne convient pas pour être ajoutée à la file d'attente de travail du pool de threads. Si vous ajoutez une tâche comme la tâche A qui doit attendre les résultats d'exécution d'autres tâches dans la file d'attente de travail, cela peut provoquer un blocage dans le pool de threads

Si l'exécution d'une certaine tâche peut être bloquée et qu'elle est bloquée pendant une longue période, alors un délai d'attente doit être défini pour empêcher le thread de travail de se bloquer définitivement et de provoquer une fuite de thread

Comprenez les caractéristiques de la tâche et analysez si la tâche effectue des opérations d'E/S qui sont souvent bloquées ou effectue des opérations de calcul qui ne le sont jamais bloqué. Le premier utilise le processeur par intermittence, tandis que le second a une utilisation plus élevée du processeur. Classez les tâches en fonction des caractéristiques des tâches, puis ajoutez différents types de tâches aux files d'attente de travail des différents pools de threads. De cette façon, chaque pool de threads peut être ajusté séparément en fonction des caractéristiques des tâches

Ajustez la taille. du pool de threads et la taille maximale du pool de threads. La taille optimale dépend principalement du nombre de processeurs disponibles sur le système et des caractéristiques des tâches dans la file d'attente de travail. S'il n'y a qu'une seule file d'attente de travail sur un système avec N processeurs et que tous sont des tâches de calcul, alors lorsque le pool de threads a N ou N+1 threads de travail, l'utilisation maximale du processeur sera généralement obtenue

Si le travail Si le Si la file d'attente contient des tâches qui effectuent des opérations d'E/S et bloquent fréquemment, la taille du pool de threads doit dépasser le nombre de processeurs disponibles, car tous les threads de travail ne fonctionnent pas tout le temps. Sélectionnez une tâche type, puis estimez le rapport entre le temps d'attente (WT) et le temps réel occupé par le CPU pour les calculs (ST) lors de l'exécution de cette tâche : WT/ST. Pour un système avec N processeurs, environ N(1+WT/ST) threads doivent être configurés pour garantir que le processeur est pleinement utilisé

Pour éviter une surcharge de tâches, le serveur doit limiter le nombre de connexions simultanées des clients en fonction de la capacité du système. Lorsque le nombre de connexions client simultanées dépasse la limite, le serveur peut rejeter la demande de connexion et envoyer une invite conviviale au client.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer