Rumah >Java >javaTutorial >Cara menggunakan Masa Depan berbilang benang Java untuk mendapatkan tugas tak segerak

Cara menggunakan Masa Depan berbilang benang Java untuk mendapatkan tugas tak segerak

PHPz
PHPzke hadapan
2023-05-01 23:04:131438semak imbas

Keterbatasan Runnable

Seperti yang kami nyatakan dalam artikel sebelumnya, jika anda melaksanakan antara muka Runnable melalui pengekodan, anda akan mendapat "tugas" sempadan yang berjalan dalam benang tertentu (atau kumpulan benang).

Memerhati semula antara muka, tidak sukar untuk mendapati ia tidak mempunyai nilai pulangan kaedah:

public interface Runnable {
    void run();
}

Sebelum JDK1.5, jika anda ingin menggunakan hasil pelaksanaan tugas, anda perlu mengendalikan urutan dengan teliti untuk mengakses sumber bahagian kritikal. Menggunakan 回调 untuk penyahgandingan ialah pilihan yang sangat baik.

Demo kecil untuk berlatih - semak semula pengetahuan artikel sebelumnya

Perhatikan bahawa lambda digunakan untuk mengurangkan panjang, tetapi jdk1.5 tidak menyokong lambda sebelum jdk1.5

Asingkan tugas pengkomputeran ke dalam urutan lain laksanakan, dan kemudian kembali ke utas utama untuk menggunakan keputusan

Kami membuang tugasan yang memakan masa seperti pengiraan dan IO ke utas lain, membenarkan utas utama menumpukan padanya perniagaan sendiri diandaikan bahawa ia menerima input pengguna dan memproses maklum balas, tetapi Mari kita tinggalkan bahagian ini

Kita boleh mereka bentuk kod yang serupa dengan yang berikut:

Walaupun ia masih mempunyai banyak perkara yang tidak munasabah yang patut dioptimumkan, ia sudah memadai untuk demonstrasi

class Demo {
    static final Object queueLock = new Object();
    static List<Runnable> mainQueue = new ArrayList<>();
    static boolean running = true;
    static final Runnable FINISH = () -> running = false;
    public static void main(String[] args) {
        synchronized (queueLock) {
            mainQueue.add(Demo::onStart);
        }
        while (running) {
            Runnable runnable = null;
            synchronized (queueLock) {
                if (!mainQueue.isEmpty())
                    runnable = mainQueue.remove(0);
            }
            if (runnable != null) {
                runnable.run();
            }
            Thread.yield();
        }
    }
    public static void onStart() {
        //...
    }
    public static void finish() {
        synchronized (queueLock) {
            mainQueue.clear();
            mainQueue.add(FINISH);
        }
    }
}

Simulasikan urutan pengkomputeran dan panggilan balik tugas:

interface Callback {
    void onResultCalculated(int result);
}
class CalcThread extends Thread {
    private final Callback callback;
    private final int a;
    private final int b;
    public CalcThread(Callback callback, int a, int b) {
        this.callback = callback;
        this.a = a;
        this.b = b;
    }
    @Override
    public void run() {
        super.run();
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        final int result = a + b;
        System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
        synchronized (queueLock) {
            mainQueue.add(() -> callback.onResultCalculated(result));
        }
    }
}

Isikan perniagaan onStart:

class Demo {
    public static void onStart() {
        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
        new CalcThread(result -> {
            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
            finish();
        }, 200, 300).start();
    }
}

Semakan: Optimumkan untuk menggunakan Runnable

Seperti yang kami nyatakan di atas, jika perniagaan Jika anda hanya menumpukan pada pelaksanaan tugas dan tidak terlalu mengambil berat tentang benang itu sendiri, anda boleh menggunakan Runnable:

class Demo {
    static class CalcRunnable implements Runnable {
        private final Callback callback;
        private final int a;
        private final int b;
        public CalcRunnable(Callback callback, int a, int b) {
            this.callback = callback;
            this.a = a;
            this.b = b;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            final int result = a + b;
            System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
            synchronized (queueLock) {
                mainQueue.add(() -> callback.onResultCalculated(result));
            }
        }
    }
    public static void onStart() {
        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
        new Thread(new CalcRunnable(result -> {
            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
            finish();
        }, 200, 300)).start();
    }
}

Ia bukan sukar untuk dibayangkan: Kami sangat memerlukan

  • untuk membenarkan utas tertentu, Jenis utas tertentu boleh menerima tugas dengan mudah. ​​Melihat kembali kumpulan utas dalam siri artikel ini, kumpulan utas datang menjadi

  • Ia mempunyai mekanisme yang lebih ringan daripada Synchronize

  • Mempunyai struktur data yang lebih mudah

Pada ketika ini, kita dapat menyedari: Sebelum JDK1.5, disebabkan fungsi JDK yang tidak mencukupi, program Java menggunakan benangLebih Kasar.

Masa depan, yang dilahirkan untuk penggunaan tak segerak

, akhirnya mempunyai ciri baharu dalam JDK1.5: Future dan kumpulan benang yang disebut dalam artikel sebelumnya, dan hampir 20 tahun telah berlalu.

/**
 * 略
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future&#39;s {@code get} method
 */
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

Walaupun ulasan API telah dialih keluar, anda masih boleh memahami maksud setiap API, jadi saya tidak akan menerangkan secara terperinci.

Jelas sekali, untuk meningkatkan nilai pulangan, tidak perlu menggantikan dengan antara muka Runnable yang begitu rumit. Selepas pemikiran ringkas, kita boleh meringkaskan nilai pulangan:

  • Kembalikan hasil perniagaan dalam Runnable, seperti pengiraan, sumber bacaan, dll.

  • Hanya kembalikan hasil selepas Runnable dilaksanakan

Dari perspektif lapisan perniagaan, hanya antara muka berikut diperlukan, yang menambah nilai pulangan dan menjadikannya lebih mesra pengguna . Pengecualian:

Nota pengarang: Ketepikan pelaksanaan asas dan lihat sahaja keperluan pengekodan bahagian perniagaan

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Jelas sekali, JDK perlu menyediakan keserasian ke belakang:

  • Runnable tidak boleh dibuang dan tidak boleh dibuang

  • Pengguna tidak boleh diminta untuk memfaktorkan semula kod sepenuhnya

Jadi penyesuai disediakan bersama-sama. Membenarkan pengguna melakukan pemfaktoran semula separa mudah untuk menggunakan ciri baharu

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

Dan Masa Depan adalah seperti yang dicadangkan oleh namanya. Ia mewakili hasil dan status dalam "masa hadapan" dan dicipta untuk mengendalikan pemprosesan tak segerak lebih selesa.

dan terbina dalam FutureTask akan dikembangkan dalam bab penjelasan terperinci FutureTask.

Rajah kelas

Berdasarkan JDK1.8, mari kita lihat struktur rajah kelas yang diselaraskan:

Cara menggunakan Masa Depan berbilang benang Java untuk mendapatkan tugas tak segerak

Penjelasan terperinci FutureTask

Pembina

public class FutureTask {
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
}

Kitaran hayat

public class FutureTask {
    //新建
    private static final int NEW = 0;
    //处理中
    private static final int COMPLETING = 1;
    //正常
    private static final int NORMAL = 2;
    //异常
    private static final int EXCEPTIONAL = 3;
    //已取消
    private static final int CANCELLED = 4;
    //中断中
    private static final int INTERRUPTING = 5;
    //已中断
    private static final int INTERRUPTED = 6;
}

Penukaran kitaran hayat yang mungkin adalah seperti berikut:

  • BARU ->MELENGKAPKAN ->

  • BARU -> LUAR BIASA

  • BARU -> BARU -> MENGGANGGU -> TERGANGGU

  • Penjelasan asal dalam JDK adalah seperti berikut:

Keadaan jalankan tugas ini, pada mulanya BARU jalankan peralihan keadaan kepada keadaan terminal hanya dalam kaedah yang ditetapkan, setException, dan batalkan Semasa selesai, keadaan mungkin mengambil nilai sementara iaitu COMPLETING (semasa hasil sedang ditetapkan) atau MENGGANGGU (hanya semasa mengganggu pelari untuk memenuhi pembatalan(. benar)). Peralihan daripada keadaan pertengahan ke akhir ini menggunakan penulisan tersusun/malas yang lebih murah kerana nilai adalah unik dan tidak boleh diubah suai lagi.

Kaedah teras

Bahagian ini bermula dengan tiga blok berikut untuk membaca kod sumber

Penghakiman status

  • Batal

  • Dapatkan keputusan

  • Pelaksanaan API penghakiman status adalah sangat mudah

    public class FutureTask {
        public boolean isCancelled() {
            return state >= CANCELLED;
        }
        public boolean isDone() {
            return state != NEW;
        }
    }
  • Batal:

Status semasa ialah

dan CAS berjaya mengubah suai keadaan, jika tidak, ia mengembalikan kegagalan pembatalan
  • NEWJika

    , ganggu urutan pelaksana dan CAS ubah suai keadaan kepada TERGANGGU
  • mayInterruptIfRunningPanggil selesai

  • untuk memadam dan memberitahu semua yang menunggu.
public class FutureTask {
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) {
            return false;
        }
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null; ) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (; ; ) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }
}

获取结果: 先判断状态,如果未进入到 COMPLETING(即为NEW状态),则阻塞等待状态改变,返回结果或抛出异常

public class FutureTask {
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }
}

如何使用

而使用则非常简单,也非常的朴素。

我们以文中的的例子进行改造:

  • 沿用原Runnable逻辑

  • 移除回调,增加 CalcResult

  • CalcResult 对象作为既定返回结果,Runnable中设置其属性

class Demo {
   static class CalcResult {
      public int result;
   }
   public static void onStart() {
      System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
      final CalcResult calcResult = new CalcResult();
      Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> {
         try {
            Thread.sleep(10);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         final int result = 200 + 300;
         System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
         calcResult.result = result;
      }, calcResult);
      System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis());
      if (resultFuture.isDone()) {
         try {
            final int ret = resultFuture.get().result;
            System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
         }
      }
      finish();
   }
}

如果直接使用新特性Callback,则如下:

直接返回结果,当然也可以直接返回Integer,不再包裹一层

class Demo {
   public static void onStart() {
      System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
      ExecutorService executor = Executors.newSingleThreadExecutor();
      Future<CalcResult> resultFuture = executor.submit(() -> {
         try {
            Thread.sleep(10);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         final int result = 200 + 300;
         System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
         final CalcResult calcResult = new CalcResult();
         calcResult.result = result;
         return calcResult;
      });
      System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis());
      if (resultFuture.isDone()) {
         try {
            final int ret = resultFuture.get().result;
            System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
         }
      }
      executor.shutdown();
      finish();
   }
}

相信读者诸君会有这样的疑惑:

为何使用Future比原先的回调看起来粗糙?

首先要明确一点:文中前段的回调Demo,虽然达成了既定目标,但效率并不高!!在当时计算很昂贵的背景下,并不会如此莽撞地使用!

而在JDK1.5开始,提供了大量内容支持多线程开发。考虑到篇幅,会在系列文章中逐步展开。

另外,FutureTask中的CAS与Happens-Before本篇中亦不做展开。

接下来,再做一些引申,简单看一看多线程业务模式。

引申,多线程业务模式

常用的多线程设计模式包括:

  • Future模式

  • Master-Worker模式

  • Guarded Suspension模式

  • 不变模式

  • 生产者-消费

Future模式

文中对于Future的使用方式遵循了Future模式。

业务方在使用时,已经明确了任务被分离到其他线程执行时有等待期,在此期间,可以干点别的事情,不必浪费系统资源。

Master-Worker模式

在程序系统中设计两类线程,并相互协作:

  • Master线程(单个)

  • Worker线程

Master线程负责接受任务、分配任务、接收(必要时进一步组合)结果并返回;

Worker线程负责处理子任务,当子任务处理完成后,向Master线程返回结果;

作者按:此时可再次回想一下文章开头的Demo

Guarded Suspension模式

  • 使用缓存队列,使得 服务线程/服务进程 在未就绪、忙碌时能够延迟处理请求。

  • 使用等待-通知机制,将消费 服务的返回结果 的方式规范化

不变模式

在并行开发过程中,为确保数据的一致性和正确性,有必要对对象进行同步,而同步操作会对程序系统的性能产生相当的损耗。

因此,使用状态不可改变的对象,依靠其不变性来确保 并行操作没有同步机制 的情况下,保持一致性和正确性。

  • 对象创建后,其内部状态和数据不再发生改变

  • 对象被共享、被多个线程访问

生产者-消费

设计两类线程:若干个生产者线程和若干个消费者线程。

生产者线程负责提交用户请求,消费者线程负责处理用户请求。生产者和消费者之间通过共享内存缓冲区进行通信。

内存缓冲区的意义:

  • 解决是数据在多线程间的共享问题

  • 缓解生产者和消费者之间的性能差

这几种模式从不同角度出发解决特定问题,但亦有一定的相似之处,不再展开。

Atas ialah kandungan terperinci Cara menggunakan Masa Depan berbilang benang Java untuk mendapatkan tugas tak segerak. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam