ホームページ >Java >&#&チュートリアル >導入例: Java は quasar に基づいたコルーチン プールを実装します

導入例: Java は quasar に基づいたコルーチン プールを実装します

WBOY
WBOY転載
2022-06-27 13:51:091706ブラウズ

この記事では、java に関する関連知識を提供します。主に、Quasar に基づいたコルーチン プールの実装に関連する問題を整理します。1 つのスレッドに複数のコルーチンを持たせることができ、1 つのプロセスを複数のコルーチンで独立させることもできます。 , スレッドのプロセスはすべて同期機構ですが、コルーチンは非同期機構です。一緒に見ていきましょう。皆さんの参考になれば幸いです。

導入例: Java は quasar に基づいたコルーチン プールを実装します

推奨学習: 「java ビデオ チュートリアル

ビジネス シナリオ: Golang と swoole は両方ともコルーチンを採用し、同じタスクに取り組みます同時実行数では、コルーチンはスレッドの数倍になる可能性があります。そこで最近、Java にクエリを行ったときに、Java 自体にはコルーチンがないことを知りました。しかし、ある牛が自分でコルーチンを実装したのです。これがこの記事の主人公、クエーサー (ファイバー プロセス) です。しかし、手書きのコルーチン プールの優れた操作を公開している人を見たことがありません (誰がそれを直接使用するでしょうか? つまり、彼らは社会からひどく叩かれていないということです~)

スレッドには複数のコルーチンを含めることができます。プロセスには複数のコルーチンを個別に含めることもできます。

スレッド プロセスはすべて同期メカニズムですが、コルーチンは非同期です。

コルーチンは最後の呼び出しの状態を保持できるため、プロセスが再開始されるたびに、最後の呼び出しの状態に入るのと同じになります。

スレッドはプリエンプティブですが、コルーチンは非プリエンプティブであるため、ユーザーは他のコルーチンに切り替えるために使用権を解放する必要があります。したがって、実際に同時に実行する権利を持つのは 1 つのコルーチンだけであり、これは同等です単一スレッドの能力。

コルーチンはスレッドを置き換えるものではありませんが、スレッドから抽象化されています。スレッドは分割された CPU リソースです。コルーチンは組織化されたコード プロセスです。コルーチンはホストして実行するためにスレッドを必要とします。スレッドはコルーチンです。リソースですが、コルーチンは直接的なものではありません。スレッドを使用します。コルーチンは実行プログラム (インターセプター) を直接使用します。実行プログラムは、任意のスレッドまたはスレッド プールに関連付けることができ、現在のスレッド、UI スレッド、または新しいプロセスを作成することができます。

スレッドはコルーチンのリソースです。コルーチンは、インターセプターを介して間接的にスレッド リソースを使用します。

早速、コードに進みましょう:

パッケージのインポート:

        <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>
WorkTools工具类:
package com.example.ai;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;

import java.util.concurrent.ArrayBlockingQueue;


public class WorkTools {
    //协程池中默认协程的个数为5
    private static int WORK_NUM = 5;
    //队列默认任务为100
    private static int TASK_COUNT = 100;

    //工做协程数组
    private Fiber[] workThreads;
    //等待队列
    private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;

    //用户在构造这个协程池时,但愿启动的协程数
    private final int workerNum;


    //构造方法:建立具备默认协程个数的协程池
    public WorkTools() {
        this(WORK_NUM,TASK_COUNT);
    }

    //建立协程池,workNum为协程池中工做协程的个数
    public WorkTools(int workerNum, int taskCount) {
        if (workerNum <= 0) {
            workerNum = WORK_NUM;
        }
        if (taskCount <= 0) {
            taskCount = TASK_COUNT;
        }
        this.workerNum = workerNum;
        taskQueue = new ArrayBlockingQueue(taskCount);
        workThreads = new Fiber[workerNum];
        for (int i = 0; i < workerNum; i++) {
            int finalI = i;
            workThreads[i] = new Fiber<>(new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    SuspendableRunnable runnable = null;
                    while (true){
                        try{
                            //取任务,没有则阻塞。
                            runnable = taskQueue.take();
                        }catch (Exception e){
                            System.out.println(e.getMessage());
                        }
                        //存在任务则运行。
                        if(runnable != null){
                            runnable.run();
                        }

                        runnable = null;
                    }
                }
            });  //new一个工做协程

            workThreads[i].start();  //启动工做协程

        }

        Runtime.getRuntime().availableProcessors();
    }
    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
    public void execute(SuspendableRunnable task) {
        try {
            taskQueue.put(task);   //put:阻塞接口的插入
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("阻塞");
        }
    }
    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
    public void destory() {
        //工做协程中止工做,且置为null
        System.out.println("ready close thread...");
        for (int i = 0; i < workerNum; i++) {

            workThreads[i] = null; //help gc
        }
        taskQueue.clear();  //清空等待队列
    }
    //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
    @Override
    public String toString() {
        return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();
    }
}

テスト コード:

package com.example.ai;

import co.paralleluniverse.strands.SuspendableRunnable;
import lombok.SneakyThrows;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.CountDownLatch;



@SpringBootApplication


public class AiApplication {

    @SneakyThrows
    public static void main(String[] args) {
        //等待协程任务完毕后再结束主线程
        CountDownLatch cdl = new CountDownLatch(50);
        //开启5个协程,50个任务列队。
        WorkTools myThreadPool = new WorkTools(5, 50);
        for (int i = 0; i< 50; i++){
            int finalI = i;
            myThreadPool.execute(new SuspendableRunnable() {
                @Override
                public void run() {
                    System.out.println(finalI);
                    try {
                        //延迟1秒
                        Thread.sleep(1000);
                        cdl.countDown();
                    } catch (InterruptedException e) {
                        System.out.println("阻塞中");
                    }
                }
            });

        }
        //阻塞
        cdl.await();
    }

}

特定のコードはコメント化されているため、自分で理解できます。こちらもスレッドプール書き込みを利用して実装しました。

現在、私たちは問題の解決に取り組んでいます。コルーチンのブロック処理中に、Fiber クラスがブロック警告を報告しますが、これは混乱を招き、見るのが煩わしいものです。現時点ではこれに対処する方法はありません。専門家の誰かが何かアイデアを持っているかどうか、以下のコメント欄で確認してみましょう。ありがとうございます~

推奨学習: 「Java ビデオ チュートリアル

以上が導入例: Java は quasar に基づいたコルーチン プールを実装しますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はcsdn.netで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。