Maison >Java >javaDidacticiel >Comment utiliser l'outil multithread Java CompletableFuture
Lors de l'écriture d'un programme multi-thread, vous pouvez utiliser Future pour obtenir les résultats d'un thread asynchrone, mais vous rencontrerez quelques problèmes lors de l'utilisation :
Si vous le souhaitez Pour effectuer d'autres opérations sur les résultats de Future, vous devez bloquer le thread actuel
Plusieurs Futures ne peuvent pas être exécutés dans une chaîne. Les résultats de chaque Future sont indépendants. Il est prévu de faire une autre chose asynchrone sur le résultat. d'un futur ;
Il n'y a pas de stratégie de gestion des exceptions. Si l'exécution du futur échoue, elle doit être capturée manuellement
Afin de résoudre le problème du futur, JDK nous a fourni un classe d'outils utile CompletableFuture dans 1.8 ;
Il implémente les interfaces Future et CompletionStage et fournit les méthodes de traitement correspondantes pour les lacunes de Future.
Notre nouvelle logique de traitement peut être automatiquement rappelée une fois l'exécution du thread asynchrone terminée, sans blocage
Plusieurs tâches asynchrones peuvent être organisées, combinées ou triées
Gestion des exceptions
Le cœur de CompletableFuture L'idée est que chaque tâche asynchrone peut être considérée comme une étape (CompletionStage), puis d'autres tâches asynchrones peuvent faire ce qu'elles veulent en fonction de cette étape.
CompletionStage définit de nombreuses méthodes de traitement par étapes, qui sont très puissantes. Voici seulement quelques méthodes couramment utilisées dans la vie quotidienne pour votre référence.
Utilisation simple
Exécution asynchrone, aucun résultat requis :
// 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));
Exécution asynchrone, tout en renvoyant les résultats :
// 同样可以指定线程池 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!"); System.out.println(stringCompletableFuture.get());
thenRun : Non besoin des résultats de l'étape précédente, diriger de nouvelles opérations
puisAccepter : obtenir le contenu du traitement asynchrone précédent et effectuer de nouvelles opérations
puisAppliquer : obtenir le contenu de l'étape précédente, puis générer un nouveau contenu
Tout ce qui porte le suffixe Async signifie que la nouvelle opération de traitement est toujours asynchrone. Les opérations asynchrones peuvent spécifier des exécuteurs pour le traitement
// Demo CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") // 针对上一步的结果做处理,产生新的结果 .thenApplyAsync(s -> s.toUpperCase()) // 针对上一步的结果做处理,不返回结果 .thenAcceptAsync(s -> System.out.println(s)) // 不需要上一步返回的结果,直接进行操作 .thenRunAsync(() -> System.out.println("end")); ;
Lorsque nous avons deux traitements de rappel, n'importe quelle complétion peut être utilisée, les deux résultats n'ont aucune relation, puis utilisez acceptEither.
Celui qui termine en premier l'exécution des deux threads asynchrones utilisera le résultat Il en va de même pour les autres types de méthodes.
// 返回abc CompletableFuture .supplyAsync(() -> { SleepUtils.sleep(100); return "Hello CompletableFuture!"; }) .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }); // 返回Hello CompletableFuture! CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .acceptEither(CompletableFuture.supplyAsync(() -> { SleepUtils.sleep(100); return "abc"; }), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } });
thenCombine
Lorsque nous avons deux CompletionStage, nous devons intégrer les deux résultats, puis calculer un nouveau résultat.
thenCompose traite le résultat du CompletionStage précédent et renvoie le résultat, et le type de retour doit être CompletionStage.
thenCombine obtient le résultat du premier CompletionStage, puis obtient le CompletionStage actuel et traite les résultats des deux.
CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172); CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65) .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() { @Override public Double apply(Integer wight, Integer height) { return wight * 10000.0 / (height * height); } }) ;
thenAcceptBoth
nécessite les résultats de deux CompleteableFutures asynchrones. Lorsque les deux sont terminés, le rappel thenAcceptBoth est entré.
// thenAcceptBoth案例: CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() { // 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数 @Override public void accept(String s, String s2) { System.out.println("param1=" + s + ", param2=" + s2); } }); // 结果:param1=Hello CompletableFuture!, param2=abc
Lorsque nous utilisons CompleteFuture pour effectuer des appels en chaîne, si l'un des multiples rappels asynchrones a un problème d'exécution, alors les rappels suivants s'arrêteront, une exception est donc nécessaire pour les stratégies de traitement.
exceptionnellement
exceptionnellement, c'est lorsqu'une erreur survient, cela nous donne la possibilité de récupérer et de personnaliser le contenu renvoyé.
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("发生错误"); }).exceptionally(throwable -> { log.error("调用错误 {}", throwable.getMessage(), throwable); return "异常处理内容"; });
handle
exceptionnellement ne sera exécuté que lorsqu'une exception se produit, tandis que handle sera exécuté indépendamment du fait qu'une erreur se produise ou non.
CompletableFuture.supplyAsync(() -> { return "abc"; }) .handle((r,err) -> { log.error("调用错误 {}", err.getMessage(), err); // 对结果做额外的处理 return r; }) ;
L'exigence est d'informer les utilisateurs avec des conditions spécifiques dans un tableau via des messages texte. Cependant, il existe des millions d'utilisateurs de messages texte si la lecture en un seul thread est possible. utilisé, l’efficacité de lecture sera très lente. À ce stade, vous pouvez envisager d'utiliser le multithread pour lire :
1. Divisez la tâche de lecture en plusieurs sous-tâches différentes et spécifiez le décalage et le nombre de lectures
// 假设有500万条记录 long recordCount = 500 * 10000; int subTaskRecordCount = 10000; // 对记录进行分片 List<Map> subTaskList = new LinkedList<>(); for (int i = 0; i < recordCount / 500; i++) { // 如果子任务结构复杂,建议使用对象 HashMap<String, Integer> subTask = new HashMap<>(); subTask.put("index", i); subTask.put("offset", i * subTaskRecordCount); subTask.put("count", subTaskRecordCount); subTaskList.add(subTask); }
2. Utilisez la lecture par lots multi-thread
// 进行subTask批量处理,拆分为不同的任务 subTaskList.stream() .map(subTask -> CompletableFuture.runAsync(()->{ // 读取数据,然后处理 // dataTunel.read(subTask); },excuturs)) // 使用应用的通用任务线程池 .map(c -> ((CompletableFuture<?>) c).join());
3. Effectuer un traitement de logique métier ou effectuer directement un traitement de logique métier après la lecture ;
在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。
当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;
List<Task> taskList = new ArrayList<>(); List<Object> result = taskList.stream() .map(task -> CompletableFuture.supplyAsync(()->{ // handlerMap.get(task).query(); return ""; }, executorService)) .map(c -> c.join()) .collect(Collectors.toList());
如果不使用传入的线程池,大家用默认的线程池ForkJoinPool
thenRun用的默认和上一个任务使用相同的线程池
thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;
exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!