首頁  >  文章  >  Java  >  實例介紹Java基於quasar實作協程池

實例介紹Java基於quasar實作協程池

WBOY
WBOY轉載
2022-06-27 13:51:091632瀏覽

本篇文章為大家帶來了關於java的相關知識,其中主要整理了基於quasar實作協程池的相關問題,一個執行緒可以多個協程,一個行程也可以單獨擁有多個協程,執行緒進程都是同步機制,而協程則是異步,下面一起來看一下,希望對大家有幫助。

實例介紹Java基於quasar實作協程池

推薦學習:《java影片教學

業務場景:golang與swoole都擁抱了協程,在同任務並發數量下,協程可比執行緒多數倍。所以最近在查詢java時了解java本身是沒有協程的,但是某頭牛自行實現了協程,也就是本文的主角quasar(纖程)!不過沒看到誰公開一下手寫協程池的騷操作(誰會直接new它用?那是沒挨過社會的毒打呀~)

一個線程可以多個協程,一個進程也可以單獨擁有多個協程。

執行緒進程都是同步機制,而協程則是異步。

協程能保留上一次呼叫時的狀態,每次過程重入時,就等於進入上一次呼叫的狀態。

線程是搶佔式,而協程是非搶佔式的,所以需要用戶自己釋放使用權來切換到其他協程,因此同一時間其實只有一個協程擁有運行權,相當於單線程的能力。

協程並不是取代執行緒, 而且抽象於執行緒之上, 執行緒是被分割的CPU資源, 協程是組織好的程式碼流程, 協程需要執行緒來承載執行, 執行緒是協程的資源, 但協程不會直接使用執行緒, 協程直接利用的是執行器(Interceptor), 執行器可以關聯任意執行緒或執行緒池, 可以使目前執行緒, UI執行緒, 或新建新程.。

執行緒是協程的資源。協程透過Interceptor來間接使用執行緒這個資源。

廢話不多說,直接上程式碼:

導入套件:

        <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>
WorkTools工具类:
package com.example.ai;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;

import java.util.concurrent.ArrayBlockingQueue;


public class WorkTools {
    //协程池中默认协程的个数为5
    private static int WORK_NUM = 5;
    //队列默认任务为100
    private static int TASK_COUNT = 100;

    //工做协程数组
    private Fiber[] workThreads;
    //等待队列
    private final ArrayBlockingQueue<SuspendableRunnable> taskQueue;

    //用户在构造这个协程池时,但愿启动的协程数
    private final int workerNum;


    //构造方法:建立具备默认协程个数的协程池
    public WorkTools() {
        this(WORK_NUM,TASK_COUNT);
    }

    //建立协程池,workNum为协程池中工做协程的个数
    public WorkTools(int workerNum, int taskCount) {
        if (workerNum <= 0) {
            workerNum = WORK_NUM;
        }
        if (taskCount <= 0) {
            taskCount = TASK_COUNT;
        }
        this.workerNum = workerNum;
        taskQueue = new ArrayBlockingQueue(taskCount);
        workThreads = new Fiber[workerNum];
        for (int i = 0; i < workerNum; i++) {
            int finalI = i;
            workThreads[i] = new Fiber<>(new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    SuspendableRunnable runnable = null;
                    while (true){
                        try{
                            //取任务,没有则阻塞。
                            runnable = taskQueue.take();
                        }catch (Exception e){
                            System.out.println(e.getMessage());
                        }
                        //存在任务则运行。
                        if(runnable != null){
                            runnable.run();
                        }

                        runnable = null;
                    }
                }
            });  //new一个工做协程

            workThreads[i].start();  //启动工做协程

        }

        Runtime.getRuntime().availableProcessors();
    }
    //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定
    public void execute(SuspendableRunnable task) {
        try {
            taskQueue.put(task);   //put:阻塞接口的插入
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("阻塞");
        }
    }
    //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
    public void destory() {
        //工做协程中止工做,且置为null
        System.out.println("ready close thread...");
        for (int i = 0; i < workerNum; i++) {

            workThreads[i] = null; //help gc
        }
        taskQueue.clear();  //清空等待队列
    }
    //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数
    @Override
    public String toString() {
        return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size();
    }
}

測試程式碼:

package com.example.ai;

import co.paralleluniverse.strands.SuspendableRunnable;
import lombok.SneakyThrows;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.CountDownLatch;



@SpringBootApplication


public class AiApplication {

    @SneakyThrows
    public static void main(String[] args) {
        //等待协程任务完毕后再结束主线程
        CountDownLatch cdl = new CountDownLatch(50);
        //开启5个协程,50个任务列队。
        WorkTools myThreadPool = new WorkTools(5, 50);
        for (int i = 0; i< 50; i++){
            int finalI = i;
            myThreadPool.execute(new SuspendableRunnable() {
                @Override
                public void run() {
                    System.out.println(finalI);
                    try {
                        //延迟1秒
                        Thread.sleep(1000);
                        cdl.countDown();
                    } catch (InterruptedException e) {
                        System.out.println("阻塞中");
                    }
                }
            });

        }
        //阻塞
        cdl.await();
    }

}

 特定程式碼都有註解了,自行了解。我也是以線程池寫法實作。

目前為解決問題:在協程阻塞過程中Fiber類會報阻塞警告,滿臉懵逼啊,看著很討厭。暫時沒有辦法處理,看各位大神誰有招下方評論提供給下思路。萬分感謝~

推薦學習:《java影片教學

以上是實例介紹Java基於quasar實作協程池的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:csdn.net。如有侵權,請聯絡admin@php.cn刪除