ホームページ >Java >&#&チュートリアル >Java マルチスレッド Future を使用して非同期タスクを取得する方法
前の記事で述べたように、Runnable インターフェイスをコーディングによって実装すると、指定したスレッド (またはスレッド プール) で実行される境界「タスク」が取得されます。
インターフェースを再度観察すると、メソッドの戻り値がないことを見つけるのは難しくありません。
public interface Runnable { void run(); }
JDK1.5 より前では、タスクを実行するには、重要なセクションのリソースにアクセスするためにスレッドを慎重に操作する必要があります。 Callback
を使用して分離することは非常に良い選択です。
ラムダは長さを減らすために使用されますが、ラムダは jdk1.5 より前ではサポートされていないことに注意してください
計算タスクを他のスレッドに分離して実行し、メイン スレッドに戻って結果を消費します
計算や IO などの時間のかかるタスクを他のスレッドに投げて、メイン スレッドがそのタスクに集中できるようにします。ユーザー入力を受け入れてフィードバックを処理することを想定していますが、この部分は省略しましょう。
次のようなコードを設計できます。
ただし、まだ理不尽な点が多くありますが、最適化、デモンストレーションには十分です
class Demo { static final Object queueLock = new Object(); static List<Runnable> mainQueue = new ArrayList<>(); static boolean running = true; static final Runnable FINISH = () -> running = false; public static void main(String[] args) { synchronized (queueLock) { mainQueue.add(Demo::onStart); } while (running) { Runnable runnable = null; synchronized (queueLock) { if (!mainQueue.isEmpty()) runnable = mainQueue.remove(0); } if (runnable != null) { runnable.run(); } Thread.yield(); } } public static void onStart() { //... } public static void finish() { synchronized (queueLock) { mainQueue.clear(); mainQueue.add(FINISH); } } }
コンピューティング スレッドとタスク コールバックをシミュレートします:
interface Callback { void onResultCalculated(int result); } class CalcThread extends Thread { private final Callback callback; private final int a; private final int b; public CalcThread(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { super.run(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } }
onStart ビジネスを入力します:
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new CalcThread(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300).start(); } }
上で述べたように、ビジネスがタスクの実行だけに集中し、スレッド自体にはあまり関心がない場合は、Runnable を使用できます。
class Demo { static class CalcRunnable implements Runnable { private final Callback callback; private final int a; private final int b; public CalcRunnable(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new Thread(new CalcRunnable(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300)).start(); } }
これは難しくありません。想像してみてください: 特定のスレッド、特定の種類のスレッドがタスクを簡単に受信できるようにするには、
が非常に必要です。このシリーズの記事でスレッド プールを振り返ると、スレッド プールが誕生しました。
Synchronize よりも軽量なメカニズムを備えています
より便利なデータ構造を備えています
この時点で、次のことがわかります。JDK1.5 より前では、JDK の機能が不十分だったため、Java プログラムはスレッドrougher を使用していました。
ついに JDK1.5 に新機能が追加されました: Future
と前の記事で言及したスレッド プール、時が経つのは早いもので、ほぼ 20 年が経過しました。 。
/** * 略 * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */ public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
API コメントは削除されていますが、詳しく説明しなくても各 API の意味は理解できます。
明らかに、戻り値を増やすために、Runnable をそのような複雑な インターフェイスに置き換える必要はありません。簡単に考えた後、戻り値を次のように要約できます。
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }明らかに、JDK は下位互換性を提供する必要があります:
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }そしてFutureはその名の通り、「未来」における結果や状態を表し、それを処理するために生まれました非同期でより便利に。 には
FutureTask が組み込まれています。これについては、FutureTask の詳細な説明の章で展開します。
##FutureTaskの詳細な説明
public class FutureTask { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
public class FutureTask { //新建 private static final int NEW = 0; //处理中 private static final int COMPLETING = 1; //正常 private static final int NORMAL = 2; //异常 private static final int EXCEPTIONAL = 3; //已取消 private static final int CANCELLED = 4; //中断中 private static final int INTERRUPTING = 5; //已中断 private static final int INTERRUPTED = 6; }
このタスクの実行状態、最初は NEW。 set、setException、および cancel メソッドでのみ終了状態に遷移します。完了中、状態は COMPLETING (結果が設定されている間) または INTERRUPTING (cancel(true) を満たすためにランナーを中断している間のみ) の一時的な値をとることがあります。 ). これらの中間状態から最終状態への遷移では、値が一意であり、それ以上変更できないため、より安価な順序付け/遅延書き込みが使用されます。
このセクションは次から始まります。ソースコードを読むための 3 つのブロック ステータス判定コア メソッド
public class FutureTask { public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } }キャンセル: 現在のステータスは
#If
##finishCompletion を呼び出します
public class FutureTask { public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) { return false; } try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } }
获取结果: 先判断状态,如果未进入到 COMPLETING
(即为NEW状态),则阻塞等待状态改变,返回结果或抛出异常
public class FutureTask { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); } }
而使用则非常简单,也非常的朴素。
我们以文中的的例子进行改造:
沿用原Runnable逻辑
移除回调,增加 CalcResult
将 CalcResult
对象作为既定返回结果,Runnable中设置其属性
class Demo { static class CalcResult { public int result; } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); calcResult.result = result; }, calcResult); System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } finish(); } }
如果直接使用新特性Callback,则如下:
直接返回结果,当然也可以直接返回Integer,不再包裹一层
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<CalcResult> resultFuture = executor.submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); calcResult.result = result; return calcResult; }); System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); finish(); } }
相信读者诸君会有这样的疑惑:
为何使用Future比原先的回调看起来粗糙?
首先要明确一点:文中前段的回调Demo,虽然达成了既定目标,但效率并不高!!在当时计算很昂贵的背景下,并不会如此莽撞地使用!
而在JDK1.5开始,提供了大量内容支持多线程开发。考虑到篇幅,会在系列文章中逐步展开。
另外,FutureTask中的CAS与Happens-Before本篇中亦不做展开。
接下来,再做一些引申,简单看一看多线程业务模式。
常用的多线程设计模式包括:
Future模式
Master-Worker模式
Guarded Suspension模式
不变模式
生产者-消费
文中对于Future的使用方式遵循了Future模式。
业务方在使用时,已经明确了任务被分离到其他线程执行时有等待期,在此期间,可以干点别的事情,不必浪费系统资源。
在程序系统中设计两类线程,并相互协作:
Master线程(单个)
Worker线程
Master线程负责接受任务、分配任务、接收(必要时进一步组合)结果并返回;
Worker线程负责处理子任务,当子任务处理完成后,向Master线程返回结果;
作者按:此时可再次回想一下文章开头的Demo
使用缓存队列,使得 服务线程/服务进程 在未就绪、忙碌时能够延迟处理请求。
使用等待-通知机制,将消费 服务的返回结果
的方式规范化
在并行开发过程中,为确保数据的一致性和正确性,有必要对对象进行同步,而同步操作会对程序系统的性能产生相当的损耗。
因此,使用状态不可改变的对象,依靠其不变性来确保 并行操作 在 没有同步机制 的情况下,保持一致性和正确性。
对象创建后,其内部状态和数据不再发生改变
对象被共享、被多个线程访问
设计两类线程:若干个生产者线程和若干个消费者线程。
生产者线程负责提交用户请求,消费者线程负责处理用户请求。生产者和消费者之间通过共享内存缓冲区进行通信。
内存缓冲区的意义:
解决是数据在多线程间的共享问题
缓解生产者和消费者之间的性能差
这几种模式从不同角度出发解决特定问题,但亦有一定的相似之处,不再展开。
以上がJava マルチスレッド Future を使用して非同期タスクを取得する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。