前の記事で述べたように、Runnable インターフェイスをコーディングによって実装すると、指定したスレッド (またはスレッド プール) で実行される境界「タスク」が取得されます。
public interface Runnable { void run(); }
JDK1.5 より前では、タスクを実行するには、重要なセクションのリソースにアクセスするためにスレッドを慎重に操作する必要があります。 Callback
ラムダは長さを減らすために使用されますが、ラムダは jdk1.5 より前ではサポートされていないことに注意してください
計算タスクを他のスレッドに分離して実行し、メイン スレッドに戻って結果を消費します
計算や IO などの時間のかかるタスクを他のスレッドに投げて、メイン スレッドがそのタスクに集中できるようにします。ユーザー入力を受け入れてフィードバックを処理することを想定していますが、この部分は省略しましょう。
class Demo { static final Object queueLock = new Object(); static List<Runnable> mainQueue = new ArrayList<>(); static boolean running = true; static final Runnable FINISH = () -> running = false; public static void main(String[] args) { synchronized (queueLock) { mainQueue.add(Demo::onStart); } while (running) { Runnable runnable = null; synchronized (queueLock) { if (!mainQueue.isEmpty()) runnable = mainQueue.remove(0); } if (runnable != null) { runnable.run(); } Thread.yield(); } } public static void onStart() { //... } public static void finish() { synchronized (queueLock) { mainQueue.clear(); mainQueue.add(FINISH); } } }
コンピューティング スレッドとタスク コールバックをシミュレートします:
interface Callback { void onResultCalculated(int result); } class CalcThread extends Thread { private final Callback callback; private final int a; private final int b; public CalcThread(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { super.run(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } }
onStart ビジネスを入力します:
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new CalcThread(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300).start(); } }
上で述べたように、ビジネスがタスクの実行だけに集中し、スレッド自体にはあまり関心がない場合は、Runnable を使用できます。
class Demo { static class CalcRunnable implements Runnable { private final Callback callback; private final int a; private final int b; public CalcRunnable(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new Thread(new CalcRunnable(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300)).start(); } }
これは難しくありません。想像してみてください: 特定のスレッド、特定の種類のスレッドがタスクを簡単に受信できるようにするには、
が非常に必要です。このシリーズの記事でスレッド プールを振り返ると、スレッド プールが誕生しました。
Synchronize よりも軽量なメカニズムを備えています
この時点で、次のことがわかります。JDK1.5 より前では、JDK の機能が不十分だったため、Java プログラムはスレッドrougher を使用していました。
ついに JDK1.5 に新機能が追加されました: Future
と前の記事で言及したスレッド プール、時が経つのは早いもので、ほぼ 20 年が経過しました。 。
/** * 略 * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */ public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
API コメントは削除されていますが、詳しく説明しなくても各 API の意味は理解できます。
明らかに、戻り値を増やすために、Runnable をそのような複雑な インターフェイスに置き換える必要はありません。簡単に考えた後、戻り値を次のように要約できます。
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }明らかに、JDK は下位互換性を提供する必要があります:
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }そしてFutureはその名の通り、「未来」における結果や状態を表し、それを処理するために生まれました非同期でより便利に。 には
FutureTask が組み込まれています。これについては、FutureTask の詳細な説明の章で展開します。
public class FutureTask { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
public class FutureTask { //新建 private static final int NEW = 0; //处理中 private static final int COMPLETING = 1; //正常 private static final int NORMAL = 2; //异常 private static final int EXCEPTIONAL = 3; //已取消 private static final int CANCELLED = 4; //中断中 private static final int INTERRUPTING = 5; //已中断 private static final int INTERRUPTED = 6; }
このタスクの実行状態、最初は NEW。 set、setException、および cancel メソッドでのみ終了状態に遷移します。完了中、状態は COMPLETING (結果が設定されている間) または INTERRUPTING (cancel(true) を満たすためにランナーを中断している間のみ) の一時的な値をとることがあります。 ). これらの中間状態から最終状態への遷移では、値が一意であり、それ以上変更できないため、より安価な順序付け/遅延書き込みが使用されます。
このセクションは次から始まります。ソースコードを読むための 3 つのブロック ステータス判定コア メソッド
public class FutureTask { public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } }キャンセル: 現在のステータスは
##finishCompletion を呼び出します
public class FutureTask { public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) { return false; } try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } }
获取结果: 先判断状态,如果未进入到 COMPLETING
public class FutureTask { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); } }
移除回调,增加 CalcResult
将 CalcResult
class Demo { static class CalcResult { public int result; } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); calcResult.result = result; }, calcResult); System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } finish(); } }
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<CalcResult> resultFuture = executor.submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); calcResult.result = result; return calcResult; }); System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); finish(); } }
Guarded Suspension模式
使用缓存队列,使得 服务线程/服务进程 在未就绪、忙碌时能够延迟处理请求。
使用等待-通知机制,将消费 服务的返回结果
因此,使用状态不可改变的对象,依靠其不变性来确保 并行操作 在 没有同步机制 的情况下,保持一致性和正确性。
