Heim  >  Artikel  >  Java  >  So erstellen Sie einen Multithread-Server in Java

So erstellen Sie einen Multithread-Server in Java

PHPz
PHPznach vorne
2023-05-10 15:58:141342Durchsuche

Ein typisches Beispiel für einen Single-Threaded-Server lautet wie folgt:

while (true) {
    Socket socket = null;
    try {
        // 接收客户连接
        socket = serverSocket.accept();
        // 从socket中获得输入流与输出流,与客户通信
        ...
    } catch(IOException e) {
        e.printStackTrace()
    } finally {
        try {
            if(socket != null) {
                // 断开连接
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Der Server empfängt eine Client-Verbindung, kommuniziert mit dem Client, trennt die Verbindung, nachdem die Kommunikation abgeschlossen ist, und empfängt dann die nächste Client-Verbindung, die diese anfordert Gleichzeitig müssen Kunden in der Schlange stehen. Wenn Sie Kunden zu lange warten lassen, verliert Ihre Website an Glaubwürdigkeit, was zu weniger Traffic führt.

Parallelitätsleistung wird im Allgemeinen verwendet, um die Fähigkeit eines Servers zu messen, auf mehrere Clients gleichzeitig zu reagieren. Ein Server mit guter Parallelitätsleistung muss zwei Bedingungen erfüllen:

  • In der Lage sein, mehrere Clientverbindungen gleichzeitig zu empfangen und zu verarbeiten

  • Für jeden Kunden wird schnell eine Antwort gegeben.

Verwenden Sie mehrere Threads, um Dienste für mehrere Kunden gleichzeitig bereitzustellen. Dies ist im Allgemeinen die gebräuchlichste Methode, um die gleichzeitige Leistung des Servers zu verbessern Drei Möglichkeiten:

  • Für jeden Der Kunde weist einen Arbeitsthread zu

  • Erstellen Sie einen Thread-Pool, und der darin enthaltene Arbeitsthread dient dem Kunden

  • Verwenden Sie den vorgefertigten Thread-Pool in der Java-Klassenbibliothek. und sein Worker-Thread dient dem Kunden

Weisen Sie jedem Kunden einen Thread zu

Der Haupt-Thread des Servers ist für den Empfang der Kundenverbindung verantwortlich. Jedes Mal, wenn eine Kundenverbindung empfangen wird, wird ein Worker-Thread erstellt, der verantwortlich ist für die Kommunikation mit dem Kunden

public class EchoServer {
    private int port = 8000;
    private ServerSocket serverSocket;
    public EchoServer() throws IOException {
        serverSocket = new ServerSocket(port);
        System.out.println("服务器启动");
    }
    public void service() {
        while(true) {
            Socket socket = null;
            try {
                // 接教客户连接
                socket = serverSocket.accept();
                // 创建一个工作线程
                Thread workThread = new Thread(new Handler(socket));
                // 启动工作线程
                workThread.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String args[])throws TOException {
        new EchoServer().service();
    }
    // 负责与单个客户的通信   
    class Handler implements Runnable {
        private Socket socket;
        pub1ic Handler(Socket socket) {
            this.socket = socket;
        }
        private PrintWriter getWriter(Socket socket) throws IOException {...}
        private BufferedReader getReader(Socket socket) throws IOException {...}
        public String echo(String msg) {...}
        public void run() {
            try {
                System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort());
                BufferedReader br = getReader(socket);
                PrintWriter pw = getWriter(socket);
                String msg = null;
                // 接收和发送数据,直到通信结束
                while ((msg = br.readLine()) != null) {
                    System.out.println("from "+ socket.getInetAddress() + ":" + socket.getPort() + ">" + msg);
                    pw.println(echo(msg));
                    if (msg.equals("bye")) break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    // 断开连接
                    if(socket != nulll) socket.close();
                } catch (IOException e) {
                    e,printStackTrace();
                }
            }
        }
    }
}

Erstellen Sie einen Thread-Pool

Vorherige Diese Implementierung weist die folgenden Mängel auf:

  • Der Server erstellt und zerstört Arbeitsthreads, was sehr teuer ist, wenn der Server mit vielen Clients kommunizieren muss Wenn die Kommunikationszeit mit jedem Client kurz ist, ist es möglich, dass der Server ein Client ist. Der Overhead für die Erstellung neuer Threads ist größer als der Overhead für die tatsächliche Kommunikation mit dem Client

  • Zusätzlich zum Overhead für das Erstellen und Zerstören von Threads , aktive Threads verbrauchen auch Systemressourcen. Jeder Thread belegt eine bestimmte Menge an Speicher. Wenn eine große Anzahl von Clients gleichzeitig eine Verbindung zum Server herstellt, müssen viele Arbeitsthreads erstellt werden. Diese verbrauchen viel Speicher und können zu unzureichendem Speicherplatz führen Im Thread-Pool werden einige Jobs vorab erstellt, die kontinuierlich Aufgaben aus der Arbeitswarteschlange übernehmen und diese dann ausführen. Wenn der Arbeitsthread mit der Ausführung einer Aufgabe fertig ist, führt er weiterhin die nächste Aufgabe in der Arbeitswarteschlange aus

  • Der Thread-Pool bietet die folgenden Vorteile:

Reduziert die Anzahl der erstellten und zerstörten Threads, und jeder Arbeitsthread kann jederzeit wiederverwendet werden, Kann mehrere Aufgaben ausführen

  • Die Anzahl der Threads im Thread-Pool kann einfach an die Tragfähigkeit des Systems angepasst werden, um Systemabstürze aufgrund übermäßigen Systemressourcenverbrauchs zu verhindern

  • public class ThreadPool extends ThreadGroup {
        // 线程池是否关闭
        private boolean isClosed = false;
        // 表示工作队列
        private LinkedList<Runnable> workQueue;
        // 表示线程池ID
        private static int threadPoolID;
        // 表示工作线程ID
        // poolSize 指定线程池中的工作线程数目
        public ThreadPool(int poolSize) {
            super("ThreadPool-"+ (threadPoolID++));
            setDaemon(true);
            // 创建工作队列
            workQueue = new LinkedList<Runnable>();
            for (int i = 0; i < poolSize; i++) {
                // 创建并启动工作线程
                new WorkThread().start(); 
            }
        }
        /**
         * 向工作队列中加入一个新任务,由工作线程去执行任务
         */
        public synchronized void execute(Runnable tank) {
            // 线程池被关则抛出IllegalStateException异常
            if(isClosed) {
                throw new IllegalStateException();
            }
            if(task != null) {
                workQueue.add(task);
                // 唤醒正在getTask()方法中等待任务的工作线限
                notify();
            }
        }
        /**
         * 从工作队列中取出一个任务,工作线程会调用此方法
         */
        protected synchronized Runnable getTask() throws InterruptedException {
            while(workQueue,size() == 0) {
                if (isClosed) return null;
                wait(); // 如果工作队列中没有任务,就等待任务
            }
            return workQueue.removeFirst();
        }
        /**
         * 关闭线程池
         */
        public synchronized void close() {
            if(!isClosed) {
                isClosed = true;
                // 清空工作队列
                workQueue.clear();
                // 中断所有的工作线程,该方法继承自ThreadGroup类
                interrupt();
            }
        }
        /**
         * 等待工作线程把所有任务执行完
         */
        public void join() {
            synchronized (this) {
                isClosed = true;
                // 唤醒还在getTask()方法中等待任务的工作线程
                notifyAll();
            }
            Thread[] threads = new Thread[activeCount()];
            // enumerate()方法继承自ThreadGroup类获得线程组中当前所有活着的工作线程
            int count = enumerate(threads);
            // 等待所有工作线程运行结束
            for(int i = 0; i < count; i++) {
                try {
                    // 等待工作线程运行结束
                    threads[i].join();
                } catch((InterruptedException ex) {}
            }
        }
        /**
         * 内部类:工作线程
         */
        private class WorkThread extends Thread {
            public WorkThread() {
                // 加入当前 ThreadPool 线程组
                super(ThreadPool.this, "WorkThread-" + (threadID++));
            }
            public void run() {
                // isInterrupted()方法承自Thread类,判断线程是否被中断
                while (!isInterrupted()) {
                    Runnable task = null;
                    try {
                        // 取出任务
                        task = getTask();
                    } catch(InterruptedException ex) {}
                    // 如果 getTask() 返回 nu11 或者线程执行 getTask() 时被中断,则结束此线程
                    if(task != null) return;
                    // 运行任务,异常在catch代码块中被捕获
                    try {
                        task.run();
                    } catch(Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
        }
    }

    Die Server werden mithilfe des Threads implementiert Der Pool lautet wie folgt:

    publlc class EchoServer {
        private int port = 8000;
        private ServerSocket serverSocket;
        private ThreadPool threadPool;	// 线程港
        private final int POOL_SIZE = 4;	// 单个CPU时线程池中工作线程的数目
        public EchoServer() throws IOException {
            serverSocket = new ServerSocket(port);
            // 创建线程池
            // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目
            // 系统的CPU越多,线程池中工作线程的数目也越多
            threadPool= new ThreadPool(
            	Runtime.getRuntime().availableProcessors() * POOL_SIZE);
            System.out.println("服务器启动");
        }
        public void service() {
            while (true) {
                Socket socket = null;
                try {
                    socket = serverSocket.accept();
                    // 把与客户通信的任务交给线程池
                    threadPool.execute(new Handler(socket));
                } catch(IOException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String args[])throws TOException {
            new EchoServer().service();
        }
        // 负责与单个客户的通信,与上例类似
        class Handler implements Runnable {...}
    }
  • Verwenden Sie den von Java bereitgestellten Thread-Pool. Das java.util.concurrent-Paket bietet eine vorgefertigte Thread-Pool-Implementierung, die robuster und leistungsfähiger ist Informationen zum Thread-Pool finden Sie in diesem Artikel
public class Echoserver {
    private int port = 8000;
    private ServerSocket serverSocket;
    // 线程池
    private ExecutorService executorService;
    // 单个CPU时线程池中工作线程的数目
    private final int POOL_SIZE = 4;
    public EchoServer() throws IOException {
        serverSocket = new ServerSocket(port);
        // 创建线程池
        // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目
        // 系统的CPU越多,线程池中工作线程的数目也越多
        executorService = Executors.newFixedThreadPool(
        	Runtime.getRuntime().availableProcessors() * POOL_SIZE);
        System.out.println("服务器启动");
    }
    public void service() {
        while(true) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
                executorService.execute(new Handler(socket));
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
    }
     public static void main(String args[])throws TOException {
        new EchoServer().service();
    }
    // 负责与单个客户的通信,与上例类似
    class Handler implements Runnable {...}
}

Hinweise zur Verwendung des Thread-Pools

Obwohl der Thread-Pool die Parallelitätsleistung des Servers erheblich verbessern kann, birgt seine Verwendung gewisse Risiken, die leicht zu den folgenden Problemen führen können:

java.util.concurrent 包提供了现成的线程池的实现,更加健壮,功能也更强大,更多关于线程池的介绍可以这篇文章

rrreee

使用线程池的注意事项

虽然线程池能大大提高服务器的并发性能,但使用它也存在一定风险,容易引发下面的问题:

  • 死锁

任何多线程应用程序都有死锁风险。造成死锁的最简单的情形是:线程 A 持有对象 X 的锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的锁,并且在等待对象 X 的锁,线程 A 与线程 B 都不释放自己持有的锁,并且等待对方的锁,这就导致两个线程永远等待下去,死锁就这样产生了

任何多线程程序都有死锁的风险,但线程池还会导致另外一种死锁:假定线程池中的所有工作线程都在执行各自任务时被阻塞,它们都在等待某个任务 A 的执行结果。而任务 A 依然在工作队列中,由于没有空闲线程,使得任务 A 一直不能被执行。这使得线程池中的所有工作线程都永远阻塞下去,死锁就这样产生了

  • 系统资源不足

如果线程池中的线程数目非常多,这些线程就会消耗包括内存和其他系统资源在内的大量资源,从而严重影响系统性能

  • 并发错误

线程池的工作队列依靠 wait() 和 notify()

Deadlock
  • Jede Multithread-Anwendung Bei allen Programmen besteht die Gefahr eines Deadlocks. Die einfachste Situation, die einen Deadlock verursacht, ist: Thread A hält die Sperre von Objekt X und wartet auf die Sperre von Objekt Y, während Thread B die Sperre von Objekt Y hält und auf die Sperre von Objekt X wartet. Thread A und Thread B Keiner von beiden gibt die Sperre frei, die er hält, und wartet auf die Sperre der anderen Partei, was dazu führt, dass die beiden Threads ewig warten und ein Deadlock auftritt. Bei jedem Multithread-Programm besteht das Risiko eines Deadlocks, aber der Thread-Pool verursacht auch eine andere Art von Deadlock: Angenommen, alle Arbeitsthreads im Thread-Pool sind bei der Ausführung ihrer jeweiligen Aufgaben blockiert und warten alle auf das Ausführungsergebnis einer bestimmten Aufgabe A. Aufgabe A befindet sich noch in der Arbeitswarteschlange, da kein Leerlaufthread vorhanden ist, kann Aufgabe A nicht ausgeführt werden. Dies führt dazu, dass alle Arbeitsthreads im Thread-Pool dauerhaft blockiert werden und es zu einem Deadlock kommt. Unzureichende Systemressourcen

Wenn die Anzahl der Threads im Thread-Pool sehr groß ist, verbrauchen diese Threads Speicher und andere Ressourcen, einschließlich Systemressourcen, beeinträchtigen die Systemleistung erheblich. 🎜🎜🎜🎜Parallelitätsfehler🎜🎜🎜🎜Die Arbeitswarteschlange des Thread-Pools basiert auf wait() und notify() Methode, um es Arbeitsthreads zu ermöglichen, Aufgaben zeitnah abzurufen, aber beide Methoden sind schwierig zu verwenden. Bei nicht korrekter Codierung können Benachrichtigungen verloren gehen, was dazu führt, dass der Worker-Thread im Leerlauf bleibt und die Aufgaben ignoriert, die in der Arbeitswarteschlange verarbeitet werden müssen Threads, wenn der Worker-Thread beim Ausführen einer Aufgabe eine RuntimeException oder einen Fehler auslöst und diese Ausnahmen oder Fehler nicht abgefangen werden, wird der Worker-Thread abnormal beendet, was dazu führt, dass der Thread-Pool dauerhaft einen Worker-Thread verliert. Wenn alle Worker-Threads abnormal beendet werden, wird der Thread-Pool leer und es sind keine Worker-Threads zur Bearbeitung von Aufgaben verfügbar🎜

Eine weitere Situation, die zu Thread-Lecks führt, besteht darin, dass der Arbeitsthread blockiert ist, während er eine Aufgabe ausführt, z. B. beim Warten auf die Eingabe von Daten durch den Benutzer, aber weil der Benutzer keine Daten eingegeben hat (vielleicht weil der Benutzer weggegangen ist), wird der Arbeitsthread blockiert wurde blockiert. Ein solcher Arbeitsthread existiert nur dem Namen nach und führt tatsächlich keine Aufgaben aus. Wenn sich alle Arbeitsthreads im Thread-Pool in einem solchen blockierten Zustand befinden, kann der Thread-Pool neu hinzugefügte Aufgaben nicht verarbeiten.

  • Aufgabenüberlastung

Wenn eine große Anzahl von Aufgaben darauf wartet Diese Aufgaben werden in der Arbeitswarteschlange ausgeführt Es kann zu viele Systemressourcen verbrauchen und zu einem Mangel an Systemressourcen führen

Zusammenfassend kann der Thread-Pool verschiedene Risiken mit sich bringen. Um sie so weit wie möglich zu vermeiden, müssen Sie Folgendes befolgen Bei der Verwendung des Thread-Pools gelten folgende Grundsätze:

Wenn Aufgabe A Während des Ausführungsprozesses muss synchron auf das Ausführungsergebnis von Aufgabe B gewartet werden, sodass Aufgabe A nicht zum Hinzufügen zur Arbeitswarteschlange des Thread-Pools geeignet ist. Wenn Sie eine Aufgabe wie Aufgabe A hinzufügen, die auf die Ausführungsergebnisse anderer Aufgaben in der Arbeitswarteschlange warten muss, kann dies zu einem Deadlock im Thread-Pool führen

Wenn die Ausführung einer bestimmten Aufgabe möglicherweise blockiert ist, wird sie blockiert Für eine lange Zeit sollte dann ein Timeout eingestellt werden, um zu verhindern, dass der Arbeitsthread dauerhaft blockiert und Thread-Lecks verursacht.

Verstehen Sie die Eigenschaften der Aufgabe und analysieren Sie, ob die Aufgabe häufig blockierte E/A-Vorgänge oder nie blockierte Berechnungsvorgänge ausführt blockiert. Ersteres nutzt die CPU zeitweise, während letzteres eine höhere CPU-Auslastung aufweist. Klassifizieren Sie die Aufgaben entsprechend den Eigenschaften der Aufgaben und fügen Sie dann verschiedene Arten von Aufgaben zu den Arbeitswarteschlangen verschiedener Thread-Pools hinzu. Auf diese Weise kann jeder Thread-Pool separat entsprechend den Eigenschaften der Aufgaben angepasst werden

Passen Sie die Größe an des Thread-Pools und die maximale Größe des Thread-Pools. Die optimale Größe hängt in erster Linie von der Anzahl der verfügbaren CPUs im System und den Eigenschaften der Aufgaben in der Arbeitswarteschlange ab. Wenn es auf einem System mit N CPUs nur eine Arbeitswarteschlange gibt und es sich bei allen um Rechenaufgaben handelt, wird im Allgemeinen die maximale CPU-Auslastung erzielt, wenn der Thread-Pool über N oder N+1 Arbeitsthreads verfügt Da die Warteschlange Aufgaben enthält, die E/A-Vorgänge ausführen und häufig blockieren, sollte die Größe des Thread-Pools die Anzahl der verfügbaren CPUs überschreiten, da nicht alle Arbeitsthreads ständig arbeiten. Wählen Sie eine typische Aufgabe aus und schätzen Sie dann das Verhältnis zwischen der Wartezeit (WT) und der tatsächlichen Zeit, die die CPU für Berechnungen (ST) während der Ausführung dieser Aufgabe beansprucht: WT/ST. Für ein System mit N CPUs müssen ungefähr N(1+WT/ST) Threads eingerichtet werden, um sicherzustellen, dass die CPU vollständig ausgelastet ist

Um eine Aufgabenüberlastung zu vermeiden, sollte der Server die Anzahl gleichzeitiger Verbindungen von Kunden basierend auf begrenzen die Kapazität des Systems. Wenn die Anzahl gleichzeitiger Verbindungen eines Clients das Limit überschreitet, kann der Server die Verbindungsanfrage ablehnen und dem Client eine freundliche Aufforderung geben.

Das obige ist der detaillierte Inhalt vonSo erstellen Sie einen Multithread-Server in Java. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen