ホームページ >Java >&#&チュートリアル >Java でマルチスレッドサーバーを作成する方法
典型的なシングルスレッド サーバーの例は次のとおりです:
while (true) { Socket socket = null; try { // 接收客户连接 socket = serverSocket.accept(); // 从socket中获得输入流与输出流,与客户通信 ... } catch(IOException e) { e.printStackTrace() } finally { try { if(socket != null) { // 断开连接 socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
サーバーはクライアント接続を受信し、クライアントと通信し、通信の完了後に切断し、次のクライアント接続を受信します。同時にクライアント接続であるため、これらのクライアントは複数のクライアント接続要求をキューに入れる必要があります。顧客を長時間待たせると、Web サイトの信頼性が失われ、トラフィックが減少します。
同時実行パフォーマンスは、通常、同時に複数のクライアントに応答するサーバーの能力を測定するために使用されます。優れた同時実行パフォーマンスを持つサーバーは、次の 2 つの条件を満たしている必要があります。複数の顧客接続を同時に受信して処理します
顧客ごとにすぐに応答が返されます
複数のスレッドを使用して複数のサービスに対応します顧客は同時にサービスを提供します。これは、サーバーの同時実行パフォーマンスを向上させる最も一般的な手段です。通常、次の 3 つの方法があります:
各顧客にワーカー スレッドを割り当てる
スレッド プールを作成すると、その作業スレッドが顧客にサービスを提供します
Java クラス ライブラリの既存のスレッド プールを使用すると、その作業スレッドが顧客にサービスを提供します
各顧客にスレッドを割り当てる
public class EchoServer { private int port = 8000; private ServerSocket serverSocket; public EchoServer() throws IOException { serverSocket = new ServerSocket(port); System.out.println("服务器启动"); } public void service() { while(true) { Socket socket = null; try { // 接教客户连接 socket = serverSocket.accept(); // 创建一个工作线程 Thread workThread = new Thread(new Handler(socket)); // 启动工作线程 workThread.start(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 负责与单个客户的通信 class Handler implements Runnable { private Socket socket; pub1ic Handler(Socket socket) { this.socket = socket; } private PrintWriter getWriter(Socket socket) throws IOException {...} private BufferedReader getReader(Socket socket) throws IOException {...} public String echo(String msg) {...} public void run() { try { System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort()); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; // 接收和发送数据,直到通信结束 while ((msg = br.readLine()) != null) { System.out.println("from "+ socket.getInetAddress() + ":" + socket.getPort() + ">" + msg); pw.println(echo(msg)); if (msg.equals("bye")) break; } } catch (IOException e) { e.printStackTrace(); } finally { try { // 断开连接 if(socket != nulll) socket.close(); } catch (IOException e) { e,printStackTrace(); } } } } }
以前の実装には次の欠点があります:
サーバーワーカー スレッドの作成と破棄 オーバーヘッドが大きい サーバーが多くのクライアントと通信する必要があり、各クライアントとの通信時間が非常に短い場合、クライアント用に新しいスレッドを作成するサーバーのオーバーヘッドが、実際にクライアントと通信するオーバーヘッド。
スレッドの作成と破棄のオーバーヘッドに加えて、アクティブなスレッドはシステム リソースも消費します。各スレッドは一定量のメモリを占有します。多数のクライアントが同時にサーバーに接続する場合、多数のワーカー スレッドを作成する必要があります。ワーカー スレッドは大量のメモリを消費し、システム内のメモリ領域が不足する可能性があります。 system.
一部のワーカー スレッドはスレッド プール内に事前に作成されており、継続的にワーク キューからタスクを取得して、そのタスクを実行します。ワーカー スレッドはタスクの実行を終了すると、ワーク キュー内の次のタスクの実行を継続します。
時間を短縮します。スレッドの作成と破棄に必要な時間は、各ワーカー スレッドを常に再利用でき、複数のタスクを実行できます。
スレッド プール内のスレッドの数は、システムの状況に応じて簡単に調整できます。システムのリソースの過度の消費を防ぎ、システムのクラッシュを引き起こす可能性のある処理能力を備えています。
public class ThreadPool extends ThreadGroup { // 线程池是否关闭 private boolean isClosed = false; // 表示工作队列 private LinkedList<Runnable> workQueue; // 表示线程池ID private static int threadPoolID; // 表示工作线程ID // poolSize 指定线程池中的工作线程数目 public ThreadPool(int poolSize) { super("ThreadPool-"+ (threadPoolID++)); setDaemon(true); // 创建工作队列 workQueue = new LinkedList<Runnable>(); for (int i = 0; i < poolSize; i++) { // 创建并启动工作线程 new WorkThread().start(); } } /** * 向工作队列中加入一个新任务,由工作线程去执行任务 */ public synchronized void execute(Runnable tank) { // 线程池被关则抛出IllegalStateException异常 if(isClosed) { throw new IllegalStateException(); } if(task != null) { workQueue.add(task); // 唤醒正在getTask()方法中等待任务的工作线限 notify(); } } /** * 从工作队列中取出一个任务,工作线程会调用此方法 */ protected synchronized Runnable getTask() throws InterruptedException { while(workQueue,size() == 0) { if (isClosed) return null; wait(); // 如果工作队列中没有任务,就等待任务 } return workQueue.removeFirst(); } /** * 关闭线程池 */ public synchronized void close() { if(!isClosed) { isClosed = true; // 清空工作队列 workQueue.clear(); // 中断所有的工作线程,该方法继承自ThreadGroup类 interrupt(); } } /** * 等待工作线程把所有任务执行完 */ public void join() { synchronized (this) { isClosed = true; // 唤醒还在getTask()方法中等待任务的工作线程 notifyAll(); } Thread[] threads = new Thread[activeCount()]; // enumerate()方法继承自ThreadGroup类获得线程组中当前所有活着的工作线程 int count = enumerate(threads); // 等待所有工作线程运行结束 for(int i = 0; i < count; i++) { try { // 等待工作线程运行结束 threads[i].join(); } catch((InterruptedException ex) {} } } /** * 内部类:工作线程 */ private class WorkThread extends Thread { public WorkThread() { // 加入当前 ThreadPool 线程组 super(ThreadPool.this, "WorkThread-" + (threadID++)); } public void run() { // isInterrupted()方法承自Thread类,判断线程是否被中断 while (!isInterrupted()) { Runnable task = null; try { // 取出任务 task = getTask(); } catch(InterruptedException ex) {} // 如果 getTask() 返回 nu11 或者线程执行 getTask() 时被中断,则结束此线程 if(task != null) return; // 运行任务,异常在catch代码块中被捕获 try { task.run(); } catch(Throwable t) { t.printStackTrace(); } } } } }
スレッド プールを使用して実装されたサーバーは次のとおりです。
publlc class EchoServer { private int port = 8000; private ServerSocket serverSocket; private ThreadPool threadPool; // 线程港 private final int POOL_SIZE = 4; // 单个CPU时线程池中工作线程的数目 public EchoServer() throws IOException { serverSocket = new ServerSocket(port); // 创建线程池 // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目 // 系统的CPU越多,线程池中工作线程的数目也越多 threadPool= new ThreadPool( Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println("服务器启动"); } public void service() { while (true) { Socket socket = null; try { socket = serverSocket.accept(); // 把与客户通信的任务交给线程池 threadPool.execute(new Handler(socket)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 负责与单个客户的通信,与上例类似 class Handler implements Runnable {...} }
public class Echoserver { private int port = 8000; private ServerSocket serverSocket; // 线程池 private ExecutorService executorService; // 单个CPU时线程池中工作线程的数目 private final int POOL_SIZE = 4; public EchoServer() throws IOException { serverSocket = new ServerSocket(port); // 创建线程池 // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目 // 系统的CPU越多,线程池中工作线程的数目也越多 executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println("服务器启动"); } public void service() { while(true) { Socket socket = null; try { socket = serverSocket.accept(); executorService.execute(new Handler(socket)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 负责与单个客户的通信,与上例类似 class Handler implements Runnable {...} }
スレッド プールの使用に関する注意事項スレッド プールを使用するとサーバーの同時実行パフォーマンスが大幅に向上しますが、その使用には一定のリスクがあり、次の問題が発生しやすくなります。
wait()
メソッドと#スレッド リーク
スレッド リークにつながるもう 1 つの状況は、ユーザーがデータを入力するのを待機しているなど、タスクの実行中にワーカー スレッドがブロックされているが、ユーザーがデータを入力していないため (おそらくユーザーが立ち去ったため)、これが発生することです。 work スレッドがブロックされ続けています。このようなワーカー スレッドは名前だけが存在し、実際にはタスクを実行しません。スレッド プール内のすべてのワーカー スレッドがそのようなブロック状態にある場合、スレッド プールは新しく追加されたタスクを処理できなくなります
要約すると、スレッド プールは次のようになります。さまざまなリスクを引き起こす可能性があります。それらをできるだけ回避するには、スレッド プールを使用するときに次の原則に従う必要があります。
タスク A が実行中にタスク B の実行結果を同期的に待つ必要がある場合の場合、タスク A はスレッド プールのワーク キューに追加するのに適していません。タスク A のように、他のタスクの実行結果を待つ必要があるタスクをワーク キューに追加すると、スレッド プールでデッドロックが発生する可能性があります。
タスクが実行されると、ブロックされる可能性があります。タスクがブロックされている場合は、ワーカー スレッドが永続的にブロックされてスレッド リークが発生するのを防ぐために、タイムアウトを設定する必要があります。頻繁にブロックするか、決してブロックしない操作を実行します。前者は CPU を断続的に使用しますが、後者は CPU 使用率が高くなります。タスクの特性に従ってタスクを分類し、さまざまなタイプのタスクを異なるスレッド プールのワーク キューに追加して、各スレッド プールをタスクの特性に応じて調整できるようにします。スレッド プールのサイズ。スレッド プールの最適なサイズは、主にシステム上で使用可能な CPU の数と、ワーク キュー内のタスクの特性によって決まります。 N 個の CPU を備えたシステム上にワーク キューが 1 つだけあり、それらがすべて計算タスクである場合、スレッド プールに N または N 1 個のワーカー スレッドがあると、通常、最大 CPU 使用率が得られます
Ifワーク キューには、IO 操作を実行して頻繁にブロックするタスクが含まれています。すべてのワーカー スレッドが常に動作しているわけではないため、スレッド プールのサイズが使用可能な CPU の数を超えます。典型的なタスクを選択し、このタスクの実行中に待機時間 (WT) と CPU が計算に費やした実際の時間 (ST) の比率 (WT/ST) を推定します。 N 個の CPU を備えたシステムの場合、CPU が完全に利用されるようにするには、約 N(1 WT/ST) のスレッドをセットアップする必要があります。
タスクの過負荷を避けるために、サーバーは、顧客ベースの同時接続を制限する必要があります。システムの容量に依存します。クライアントの同時接続数が制限を超えると、サーバーは接続要求を拒否し、クライアントにわかりやすいプロンプトを表示することができます。
以上がJava でマルチスレッドサーバーを作成する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。