Dans certains scénarios commerciaux, nous devons utiliser plusieurs threads pour exécuter des tâches de manière asynchrone afin d'accélérer l'exécution des tâches.
JDK5 ajoute une nouvelle interface Future, qui est utilisée pour décrire les résultats d'un calcul asynchrone.
Bien que Future et les méthodes d'utilisation associées offrent la possibilité d'exécuter des tâches de manière asynchrone, il est très gênant d'obtenir les résultats. Nous devons utiliser Future.get() pour bloquer le thread appelant, ou utiliser l'interrogation pour déterminer Future.isDone si la tâche est exécutée. est terminé, puis obtenez les résultats.
Ces deux méthodes de traitement ne sont pas très élégantes. Le code pertinent est le suivant :
@Test public void testFuture() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); Future<String> future = executorService.submit(() -> { Thread.sleep(2000); return "hello"; }); System.out.println(future.get()); System.out.println("end"); }
En même temps, Future ne peut pas résoudre le scénario où plusieurs tâches asynchrones doivent dépendre les unes des autres. Le thread doit attendre que la tâche du sous-thread soit terminée. Ensuite, pendant l'exécution, vous avez peut-être pensé à "CountDownLatch" à ce moment-là. Oui, cela peut être résolu.
Deux futurs sont définis ici. Le premier obtient des informations sur l'utilisateur via l'identifiant de l'utilisateur et le second obtient des informations sur le produit via l'identifiant du produit.
@Test public void testCountDownLatch() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(5); CountDownLatch downLatch = new CountDownLatch(2); long startTime = System.currentTimeMillis(); Future<String> userFuture = executorService.submit(() -> { //模拟查询商品耗时500毫秒 Thread.sleep(500); downLatch.countDown(); return "用户A"; }); Future<String> goodsFuture = executorService.submit(() -> { //模拟查询商品耗时500毫秒 Thread.sleep(400); downLatch.countDown(); return "商品A"; }); downLatch.await(); //模拟主程序耗时时间 Thread.sleep(600); System.out.println("获取用户信息:" + userFuture.get()); System.out.println("获取商品信息:" + goodsFuture.get()); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }
「Résultats d'exécution」
Obtention d'informations sur l'utilisateur : Utilisateur A
Obtention d'informations sur le produit : Produit A
Le temps total est de 1110 ms
On peut voir à partir des résultats d'exécution que les résultats ont été obtenus, et si nous n'utilisez pas d'opérations asynchrones, le temps d'exécution Il devrait être : 500+400+600 = 1500. Après avoir utilisé le fonctionnement asynchrone, seul 1110 est réellement utilisé.
Mais après Java8, je ne pense plus que ce soit une solution élégante. Découvrons ensuite l'utilisation de CompletableFuture.
@Test public void testCompletableInfo() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> //模拟查询商品耗时500毫秒 { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "用户A"; }); //调用商品服务获取商品基本信息 CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() -> //模拟查询商品耗时500毫秒 { try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); System.out.println("获取用户信息:" + userFuture.get()); System.out.println("获取商品信息:" + goodsFuture.get()); //模拟主程序耗时时间 Thread.sleep(600); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); }
Résultats d'exécution
Obtenez des informations sur l'utilisateur : Utilisateur A
Obtenez des informations sur le produit : Produit A
La durée totale est de 1112 ms
Vous pouvez facilement implémenter la fonction CountDownLatch via CompletableFuture, vous pensez que c'est la fin, bien plus que cela, CompletableFuture est bien meilleur que cela.
Par exemple, cela peut être implémenté : après l'exécution de la tâche 1, la tâche 2 est exécutée, ou même le résultat de l'exécution de la tâche 1 peut être utilisé comme paramètre d'entrée de la tâche 2 et d'autres fonctions puissantes. Apprenons l'API de CompletableFuture. .
Il existe quatre méthodes statiques dans le code source de CompletableFuture pour effectuer des tâches asynchrones
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..} public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..} public static CompletableFuture<Void> runAsync(Runnable runnable){..} public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
Généralement, nous utilisons les méthodes statiques ci-dessus pour créer CompletableFuture, ce qui est également expliqué. ici Leur différence :
"supplyAsync" exécute les tâches et prend en charge les valeurs de retour.
「runAsync」 exécute la tâche et n'a aucune valeur de retour.
3.1.1, "méthode supplyAsync"
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //自定义线程,根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
3.1.2, "méthode runAsync"
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable) //自定义线程,根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
La classe CompltableFuture propose quatre façons d'obtenir des résultats
//方式一 public T get() //方式二 public T get(long timeout, TimeUnit unit) //方式三 public T getNow(T valueIfAbsent) //方式四 public T join()
Description :
「get() et get(long timeout, TimeUnit unit)」 => sont déjà fournis dans Future. Ce dernier fournit un traitement de délai d'attente. Si le résultat n'est pas obtenu dans le délai spécifié, une exception Timeout sera levée.
「getNow」 => Obtenez le résultat immédiatement sans blocage. Si le calcul du résultat est terminé, le résultat sera renvoyé ou une exception lors du processus de calcul sera renvoyée. Si le calcul n'est pas terminé, l'ensemble. La valeur valueIfAbsent sera renvoyée
"join" => Aucune exception ne sera levée dans la méthode
Exemple
: 示例
:
@Test public void testCompletableGet() throws InterruptedException, ExecutionException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); // getNow方法测试 System.out.println(cp1.getNow("商品B")); //join方法测试 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp2.join()); System.out.println("-----------------------------------------------------"); //get方法测试 CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp3.get()); }
「运行结果」:
第一个执行结果为 「商品B」,因为要先睡上1秒结果不能立即获取
join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException
get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException
通俗点讲就是,「做完第一个任务后,再做第二个任务,第二个任务也没有返回值」。
示例
@Test public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> { try { //执行任务A Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> cp2 = cp1.thenRun(() -> { try { //执行任务B Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } }); // get方法测试 System.out.println(cp2.get()); //模拟主程序耗时时间 Thread.sleep(600); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } //运行结果 /** * null * 总共用时1610ms */
「thenRun 和thenRunAsync有什么区别呢?」
如果你执行第一个任务的时候,传入了一个自定义线程池:
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
说明
: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。
第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
示例
@Test public void testCompletableThenAccept() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }); CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> { System.out.println("上一个任务的返回结果为: " + a); }); cp2.get(); }"Exécuter le résultat": 🎜🎜🎜🎜 Le premier résultat d'exécution est 🎜"Produit B" 🎜, car vous devez d'abord dormir 1 seconde et le résultat ne peut pas être obtenu immédiatement🎜🎜🎜🎜La méthode de jointure ne lèvera pas d'exception, mais le résultat de l'exécution lèvera une exception. L'exception levée est CompletionException🎜🎜🎜🎜La méthode get obtiendra le résultat. Une exception sera levée et l'exception levée par le résultat de l'exécution est ExecutionException🎜🎜🎜🎜4. ://img.php.cn/upload/article/000/887/227/ 168255492938956.png" alt="Comment implémenter le multi-threading asynchrone Java8 CompletableFuture" />🎜🎜4.1, thenRun/thenRunAsync🎜🎜En termes simples , 🎜"Après avoir terminé la première tâche, effectuez la deuxième tâche, la seconde Il n'y a pas de valeur de retour pour chaque tâche"🎜. 🎜🎜🎜Exemple🎜🎜
@Test public void testCompletableThenApply() throws ExecutionException, InterruptedException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }).thenApply((a) -> { if (Objects.equals(a, "dev")) { return "dev"; } return "prod"; }); System.out.println("当前环境为:" + cp1.get()); //输出: 当前环境为:dev }🎜🎜"Quelle est la différence entre thenRun et thenRunAsync ?"🎜🎜🎜Si vous transmettez un pool de threads personnalisé lors de l'exécution de la première tâche : 🎜🎜🎜🎜Appelez la méthode thenRun pour exécuter la deuxième tâche Lorsqu'il y a deux tâches, la deuxième tâche et la première tâche partagent le même pool de threads. 🎜🎜🎜🎜Lorsque vous appelez thenRunAsync pour exécuter la deuxième tâche, la première tâche utilise le pool de threads que vous avez transmis et la deuxième tâche utilise le pool de threads ForkJoin. 🎜🎜🎜🎜
Explication
: La différence entre thenAccept et thenAcceptAsync, thenApply et thenApplyAsync introduite plus tard est également la suivante. 🎜🎜4.2, thenAccept/thenAcceptAsync🎜🎜Une fois l'exécution de la première tâche terminée, la deuxième tâche de méthode de rappel est exécutée 🎜Le résultat de l'exécution de la tâche sera transmis à la méthode de rappel en tant que paramètre d'entrée🎜, mais le rappel. La méthode n’est pas une valeur de retour. 🎜🎜Exemple
🎜@Test public void testCompletableThenAccept() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }); CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> { System.out.println("上一个任务的返回结果为: " + a); }); cp2.get(); }
表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
示例
@Test public void testCompletableThenApply() throws ExecutionException, InterruptedException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }).thenApply((a) -> { if (Objects.equals(a, "dev")) { return "dev"; } return "prod"; }); System.out.println("当前环境为:" + cp1.get()); //输出: 当前环境为:dev }
当CompletableFuture的任务不论是正常完成还是出现异常它都会调用「whenComplete」这回调函数。
「正常完成」:whenComplete返回结果和上级任务一致,异常为null;
「出现异常」:whenComplete返回结果为null,异常为上级任务的异常;
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。
下面来看看示例
@Test public void testCompletableWhenComplete() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出错了"); } System.out.println("正常结束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }); System.out.println("最终返回的结果 = " + future.get()); }
正常完成,没有异常时:
正常结束
whenComplete aDouble is 0.11
whenComplete throwable is null
最终返回的结果 = 0.11
出现异常时:get()会抛出异常
whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
@Test public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出错了"); } System.out.println("正常结束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }).exceptionally((throwable) -> { System.out.println("exceptionally中异常:" + throwable.getMessage()); return 0.0; }); System.out.println("最终返回的结果 = " + future.get()); }
当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。
whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
exceptionally中异常:java.lang.RuntimeException: 出错了
最终返回的结果 = 0.0
thenCombine / thenAcceptBoth / runAfterBoth都表示:「当任务一和任务二都完成再执行任务三」。
区别在于:
「runAfterBoth」 不会把执行结果当做方法入参,且没有返回值
「thenAcceptBoth」: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
「thenCombine」:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
示例
@Test public void testCompletableThenCombine() throws ExecutionException, InterruptedException { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务1结束"); return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务2结束"); return result; }, executorService); //任务组合 CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> { System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId()); System.out.println("任务1返回值:" + f1); System.out.println("任务2返回值:" + f2); return f1 + f2; }, executorService); Integer res = task3.get(); System.out.println("最终结果:" + res); }
「运行结果」
异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
异步任务2结束
执行任务3,当前线程是:19
任务1返回值:2
任务2返回值:2
最终结果:4
applyToEither / acceptEither / runAfterEither 都表示:「两个任务,只要有一个任务完成,就执行任务三」。
区别在于:
「runAfterEither」:不会把执行结果当做方法入参,且没有返回值
「acceptEither」: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
「applyToEither」:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
示例
@Test public void testCompletableEitherAsync() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务1结束"); return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务2结束"); return result; }, executorService); //任务组合 task.acceptEitherAsync(task2, (res) -> { System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId()); System.out.println("上一个任务的结果为:" + res); }, executorService); }
运行结果
//通过结果可以看出,异步任务2都没有执行结束,任务3获取的也是1的执行结果
异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
执行任务3,当前线程是:19
上一个任务的结果为:2
注意
如果把上面的核心线程数改为1也就是
ExecutorService executorService = Executors.newFixedThreadPool(1);
运行结果就是下面的了,会发现根本没有执行任务3,显然是任务3直接被丢弃了。
异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:17
「allOf」:等待所有任务完成
「anyOf」:只要有一个任务完成
示例
allOf:等待所有任务完成
@Test public void testCompletableAallOf() throws ExecutionException, InterruptedException { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务1结束"); return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务2结束"); return result; }, executorService); //开启异步任务3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 3; try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务3结束"); return result; }, executorService); //任务组合 CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3); //等待所有任务完成 allOf.get(); //获取任务的返回结果 System.out.println("task结果为:" + task.get()); System.out.println("task2结果为:" + task2.get()); System.out.println("task3结果为:" + task3.get()); }
anyOf: 只要有一个任务完成
@Test public void testCompletableAnyOf() throws ExecutionException, InterruptedException { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { int result = 1 + 1; return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { int result = 1 + 2; return result; }, executorService); //开启异步任务3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { int result = 1 + 3; return result; }, executorService); //任务组合 CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3); //只要有一个有任务完成 Object o = anyOf.get(); System.out.println("完成的任务的结果:" + o); }
CompletableFuture 使我们的异步编程更加便利的、代码更加优雅的同时,我们也要关注下它,使用的一些注意点。
@Test public void testWhenCompleteExceptionally() { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (1 == 1) { throw new RuntimeException("出错了"); } return 0.11; }); //如果不加 get()方法这一行,看不到异常信息 //future.get(); }
Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。
小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。
//反例 CompletableFuture.get(); //正例 CompletableFuture.get(5, TimeUnit.SECONDS);
CompletableFuture代码中又使用了默认的「ForkJoin线程池」,处理的线程个数是电脑「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!