Home  >  Article  >  Java  >  Example introduction: Java implements coroutine pool based on quasar

Example introduction: Java implements coroutine pool based on quasar

WBOY
WBOYforward
2022-06-27 13:51:091585browse

This article brings you relevant knowledge about java, which mainly sorts out the related issues of implementing coroutine pool based on quasar. One thread can have multiple coroutines, and one process can also be independent With multiple coroutines, thread processes are all synchronous mechanisms, while coroutines are asynchronous. Let's take a look at them together. I hope it will be helpful to everyone.

Example introduction: Java implements coroutine pool based on quasar

Recommended learning: "java video tutorial"

Business scenario: Golang and swoole both embrace coroutines and work on the same task Under the concurrent number, coroutines can be several times more than threads. So recently when I was querying Java, I learned that Java itself does not have coroutines, but a certain cow implemented coroutines on his own, which is the protagonist of this article, quasar (fiber process)! But I haven’t seen anyone reveal the cool operations of the handwritten coroutine pool (who would use it directly? That means they haven’t been severely beaten by society~)

A thread can have multiple coroutines, and a process can also You can have multiple coroutines individually.

Thread processes are all synchronous mechanisms, while coroutines are asynchronous.

The coroutine can retain the state of the last call. Each time the process re-enters, it is equivalent to entering the state of the last call.

Threads are preemptive, while coroutines are non-preemptive, so users need to release their usage rights to switch to other coroutines. Therefore, only one coroutine actually has the right to run at the same time, which is equivalent to a single thread. ability.

Coroutines do not replace threads, but are abstracted from threads. Threads are divided CPU resources. Coroutines are organized code processes. Coroutines need threads to host and run. Threads are coroutines. Resources, but the coroutine does not directly use threads. The coroutine directly uses the executor (Interceptor). The executor can be associated with any thread or thread pool, and can be the current thread, UI thread, or create a new process.

Threads are resources of coroutines. Coroutines use the thread resource indirectly through Interceptor.

Without further ado, let’s go straight to the code:

Import package:

        <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>
WorkTools工具类:
package com.example.ai;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;

import java.util.concurrent.ArrayBlockingQueue;


public class WorkTools {
    //协程池中默认协程的个数为5
    private static int WORK_NUM = 5;
    //队列默认任务为100
    private static int TASK_COUNT = 100;

    //工做协程数组
    private Fiber[] workThreads;
    //等待队列
    private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;

    //用户在构造这个协程池时,但愿启动的协程数
    private final int workerNum;


    //构造方法:建立具备默认协程个数的协程池
    public WorkTools() {
        this(WORK_NUM,TASK_COUNT);
    }

    //建立协程池,workNum为协程池中工做协程的个数
    public WorkTools(int workerNum, int taskCount) {
        if (workerNum <= 0) {
            workerNum = WORK_NUM;
        }
        if (taskCount <= 0) {
            taskCount = TASK_COUNT;
        }
        this.workerNum = workerNum;
        taskQueue = new ArrayBlockingQueue(taskCount);
        workThreads = new Fiber[workerNum];
        for (int i = 0; i < workerNum; i++) {
            int finalI = i;
            workThreads[i] = new Fiber<>(new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    SuspendableRunnable runnable = null;
                    while (true){
                        try{
                            //取任务,没有则阻塞。
                            runnable = taskQueue.take();
                        }catch (Exception e){
                            System.out.println(e.getMessage());
                        }
                        //存在任务则运行。
                        if(runnable != null){
                            runnable.run();
                        }

                        runnable = null;
                    }
                }
            });  //new一个工做协程

            workThreads[i].start();  //启动工做协程

        }

        Runtime.getRuntime().availableProcessors();
    }
    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
    public void execute(SuspendableRunnable task) {
        try {
            taskQueue.put(task);   //put:阻塞接口的插入
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("阻塞");
        }
    }
    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
    public void destory() {
        //工做协程中止工做,且置为null
        System.out.println("ready close thread...");
        for (int i = 0; i < workerNum; i++) {

            workThreads[i] = null; //help gc
        }
        taskQueue.clear();  //清空等待队列
    }
    //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
    @Override
    public String toString() {
        return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();
    }
}

Test code:

package com.example.ai;

import co.paralleluniverse.strands.SuspendableRunnable;
import lombok.SneakyThrows;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.CountDownLatch;



@SpringBootApplication


public class AiApplication {

    @SneakyThrows
    public static void main(String[] args) {
        //等待协程任务完毕后再结束主线程
        CountDownLatch cdl = new CountDownLatch(50);
        //开启5个协程,50个任务列队。
        WorkTools myThreadPool = new WorkTools(5, 50);
        for (int i = 0; i< 50; i++){
            int finalI = i;
            myThreadPool.execute(new SuspendableRunnable() {
                @Override
                public void run() {
                    System.out.println(finalI);
                    try {
                        //延迟1秒
                        Thread.sleep(1000);
                        cdl.countDown();
                    } catch (InterruptedException e) {
                        System.out.println("阻塞中");
                    }
                }
            });

        }
        //阻塞
        cdl.await();
    }

}

The specific code is commented, so you can understand it yourself. I also implemented it using thread pool writing.

Currently we are trying to solve the problem: During the coroutine blocking process, the Fiber class will report a blocking warning, which is confusing and annoying to watch. There is no way to deal with it for the time being. Let’s see if any of you experts have any ideas in the comments below. Thank you very much~

Recommended study: "java video tutorial"

The above is the detailed content of Example introduction: Java implements coroutine pool based on quasar. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:csdn.net. If there is any infringement, please contact admin@php.cn delete