Heim >Java >javaLernprogramm >Beispieleinführung: Java implementiert einen Coroutine-Pool basierend auf Quasar

Beispieleinführung: Java implementiert einen Coroutine-Pool basierend auf Quasar

WBOY
WBOYnach vorne
2022-06-27 13:51:091652Durchsuche

Dieser Artikel vermittelt Ihnen relevantes Wissen über Java, das hauptsächlich die damit verbundenen Probleme der Implementierung eines Coroutine-Pools auf Basis von Quasar klärt. Ein Thread kann mehrere Coroutinen haben, und ein Prozess kann auch mehrere Coroutinen unabhängig voneinander haben , während Coroutinen asynchron sind. Schauen wir sie uns gemeinsam an. Ich hoffe, dass es für alle hilfreich ist.

Beispieleinführung: Java implementiert einen Coroutine-Pool basierend auf Quasar

Empfohlenes Lernen: „Java-Video-Tutorial

Geschäftsszenario: Sowohl Golang als auch Swoole umfassen Coroutinen, bei gleicher Anzahl gleichzeitiger Aufgaben können Coroutinen ein Vielfaches an Threads umfassen. Als ich kürzlich Java abfragte, erfuhr ich, dass Java selbst keine Coroutinen hat, aber eine bestimmte Kuh hat Coroutinen selbst implementiert, was der Protagonist dieses Artikels ist, Quasar (Faserprozess)! Aber ich habe noch niemanden gesehen, der die coole Funktionsweise des handgeschriebenen Coroutine-Pools enthüllt hat (Wer würde ihn direkt verwenden? Das bedeutet, dass er von der Gesellschaft nicht ernsthaft geschlagen wurde ~)

Ein Thread kann mehrere Coroutinen haben, und ein Prozess kann auch mehrere Coroutinen haben Besitze mehrere Coroutinen.

Thread-Prozesse sind alle synchrone Mechanismen, während Coroutinen asynchron sind.

Die Coroutine kann den Status des letzten Aufrufs beibehalten. Jedes Mal, wenn der Prozess erneut eintritt, entspricht dies dem Eintritt in den Status des letzten Aufrufs.

Threads sind präventiv, während Coroutinen nicht präemptiv sind. Daher müssen Benutzer ihre Nutzungsrechte aufgeben, um zu anderen Coroutinen zu wechseln. Daher hat tatsächlich nur eine Coroutine das Recht, gleichzeitig ausgeführt zu werden, was der Fähigkeit von entspricht ein einzelner Thread.

Coroutinen ersetzen keine Threads, sondern sind von Threads getrennte CPU-Ressourcen. Coroutinen benötigen Threads zum Hosten und Ausführen, aber Coroutinen verwenden Threads nicht direkt. Die Coroutine verwendet direkt den Executor (Interceptor). Der Executor kann jedem Thread oder Thread-Pool zugeordnet werden und der aktuelle Thread oder UI-Thread sein oder einen neuen Prozess erstellen.

Threads sind Ressourcen von Coroutinen. Coroutinen nutzen die Thread-Ressource indirekt über Interceptor.

Ohne weitere Umschweife gehen wir direkt zum Code:

Importpaket:

        <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>
WorkTools工具类:
package com.example.ai;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;

import java.util.concurrent.ArrayBlockingQueue;


public class WorkTools {
    //协程池中默认协程的个数为5
    private static int WORK_NUM = 5;
    //队列默认任务为100
    private static int TASK_COUNT = 100;

    //工做协程数组
    private Fiber[] workThreads;
    //等待队列
    private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;

    //用户在构造这个协程池时,但愿启动的协程数
    private final int workerNum;


    //构造方法:建立具备默认协程个数的协程池
    public WorkTools() {
        this(WORK_NUM,TASK_COUNT);
    }

    //建立协程池,workNum为协程池中工做协程的个数
    public WorkTools(int workerNum, int taskCount) {
        if (workerNum <= 0) {
            workerNum = WORK_NUM;
        }
        if (taskCount <= 0) {
            taskCount = TASK_COUNT;
        }
        this.workerNum = workerNum;
        taskQueue = new ArrayBlockingQueue(taskCount);
        workThreads = new Fiber[workerNum];
        for (int i = 0; i < workerNum; i++) {
            int finalI = i;
            workThreads[i] = new Fiber<>(new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    SuspendableRunnable runnable = null;
                    while (true){
                        try{
                            //取任务,没有则阻塞。
                            runnable = taskQueue.take();
                        }catch (Exception e){
                            System.out.println(e.getMessage());
                        }
                        //存在任务则运行。
                        if(runnable != null){
                            runnable.run();
                        }

                        runnable = null;
                    }
                }
            });  //new一个工做协程

            workThreads[i].start();  //启动工做协程

        }

        Runtime.getRuntime().availableProcessors();
    }
    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
    public void execute(SuspendableRunnable task) {
        try {
            taskQueue.put(task);   //put:阻塞接口的插入
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("阻塞");
        }
    }
    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
    public void destory() {
        //工做协程中止工做,且置为null
        System.out.println("ready close thread...");
        for (int i = 0; i < workerNum; i++) {

            workThreads[i] = null; //help gc
        }
        taskQueue.clear();  //清空等待队列
    }
    //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
    @Override
    public String toString() {
        return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();
    }
}

Testcode:

package com.example.ai;

import co.paralleluniverse.strands.SuspendableRunnable;
import lombok.SneakyThrows;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.CountDownLatch;



@SpringBootApplication


public class AiApplication {

    @SneakyThrows
    public static void main(String[] args) {
        //等待协程任务完毕后再结束主线程
        CountDownLatch cdl = new CountDownLatch(50);
        //开启5个协程,50个任务列队。
        WorkTools myThreadPool = new WorkTools(5, 50);
        for (int i = 0; i< 50; i++){
            int finalI = i;
            myThreadPool.execute(new SuspendableRunnable() {
                @Override
                public void run() {
                    System.out.println(finalI);
                    try {
                        //延迟1秒
                        Thread.sleep(1000);
                        cdl.countDown();
                    } catch (InterruptedException e) {
                        System.out.println("阻塞中");
                    }
                }
            });

        }
        //阻塞
        cdl.await();
    }

}

Der spezifische Code ist kommentiert, damit Sie ihn selbst verstehen können. Ich habe es auch mithilfe des Thread-Pool-Schreibens implementiert.

Derzeit versuchen wir, das Problem zu lösen: Während des Coroutine-Blockierungsprozesses meldet die Fiber-Klasse eine Blockierungswarnung, deren Beobachtung verwirrend und nervig ist. Es gibt vorerst keine Möglichkeit, damit umzugehen. Mal sehen, ob einer von euch Experten in den Kommentaren unten irgendwelche Ideen hat. Vielen Dank~

Empfohlenes Lernen: „Java-Video-Tutorial

Das obige ist der detaillierte Inhalt vonBeispieleinführung: Java implementiert einen Coroutine-Pool basierend auf Quasar. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:csdn.net. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen