멀티 스레드 프로그램을 작성할 때 Future를 사용하여 비동기 스레드에서 결과를 얻을 수 있지만 사용 중에 몇 가지 문제를 발견하게 됩니다.
원하는 경우 Future의 결과에 대해 추가 작업을 수행하려면 현재 스레드를 차단해야 합니다
여러 Future는 체인에서 실행될 수 없습니다. 각 Future의 결과는 독립적입니다.
Future 실행이 실패하면 수동으로 캡처해야 합니다.
Future 문제를 해결하기 위해 JDK는 우리에게 유용한 기능을 제공했습니다. 1.8의 도구 클래스 CompletableFuture
Future 및 CompletionStage 인터페이스를 구현하고 Future의 단점에 대한 해당 처리 방법을 제공합니다.
우리의 새로운 처리 로직은 비동기 스레드 실행이 끝난 후 차단 없이 자동으로 호출될 수 있습니다.
여러 비동기 작업을 정렬, 결합 또는 정렬할 수 있습니다
예외 처리
CompletableFuture의 핵심 각 비동기 작업을 하나의 단계(CompletionStage)로 간주하면 다른 비동기 작업이 이 단계를 기반으로 원하는 작업을 수행할 수 있다는 아이디어입니다.
CompletionStage는 매우 강력한 여러 단계 처리 방법을 정의합니다. 다음은 참고용으로 일상 생활에서 일반적으로 사용되는 몇 가지 방법입니다.
간단한 사용법
비동기 실행, 결과 필요 없음:
// 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));
비동기 실행, 결과 반환 중:
// 同样可以指定线程池 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!"); System.out.println(stringCompletableFuture.get());
thenRun : 아니요 이전 단계의 결과가 필요하면 새로운 작업을 지시
thenAccept: 이전 비동기 처리의 콘텐츠를 가져와 새 작업을 수행
thenApply: 이전 단계의 콘텐츠를 가져온 다음 새 콘텐츠를 생성
Async 접미사가 있으면 새 처리 작업이 여전히 비동기적이라는 의미입니다. 비동기 작업은 처리를 위해 실행자를 지정할 수 있습니다
// Demo CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") // 针对上一步的结果做处理,产生新的结果 .thenApplyAsync(s -> s.toUpperCase()) // 针对上一步的结果做处理,不返回结果 .thenAcceptAsync(s -> System.out.println(s)) // 不需要上一步返回的结果,直接进行操作 .thenRunAsync(() -> System.out.println("end")); ;
두 개의 콜백 처리가 있는 경우 모든 완료를 사용할 수 있으며 두 결과는 관계가 없으며 acceptEither를 사용하세요.
두 개의 비동기 스레드 실행을 먼저 완료한 사람이 그 결과를 사용하게 됩니다. 다른 유형의 메서드에도 마찬가지입니다.
// 返回abc CompletableFuture .supplyAsync(() -> { SleepUtils.sleep(100); return "Hello CompletableFuture!"; }) .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }); // 返回Hello CompletableFuture! CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .acceptEither(CompletableFuture.supplyAsync(() -> { SleepUtils.sleep(100); return "abc"; }), new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } });
thenCombine
두 개의 CompletionStage가 있는 경우 두 결과를 통합한 다음 새로운 결과를 계산해야 합니다.
thenCompose는 이전 CompletionStage의 결과를 처리하여 결과를 반환하며, 반환 유형은 CompletionStage여야 합니다.
thenCombine은 첫 번째 CompletionStage의 결과를 가져온 다음 현재 CompletionStage를 가져와 두 결과를 처리합니다.
CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172); CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65) .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() { @Override public Double apply(Integer wight, Integer height) { return wight * 10000.0 / (height * height); } }) ;
thenAcceptBoth
에는 두 개의 비동기 CompletableFuture 결과가 필요합니다. 둘 다 완료되면 thenAcceptBoth 콜백이 입력됩니다.
// thenAcceptBoth案例: CompletableFuture .supplyAsync(() -> "Hello CompletableFuture!") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() { // 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数 @Override public void accept(String s, String s2) { System.out.println("param1=" + s + ", param2=" + s2); } }); // 结果:param1=Hello CompletableFuture!, param2=abc
CompleteFuture를 사용하여 체인 호출을 할 때 여러 비동기 콜백 중 하나에 실행 문제가 있으면 후속 콜백이 중지되므로 예외 처리 전략이 필요합니다.
예외적으로
오류가 발생한 경우에는 반환 콘텐츠를 복구하고 맞춤 설정할 수 있는 기회가 제공됩니다.
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("发生错误"); }).exceptionally(throwable -> { log.error("调用错误 {}", throwable.getMessage(), throwable); return "异常处理内容"; });
handle
예외적으로는 예외가 발생할 때만 실행되고, 핸들은 오류 발생 여부와 관계없이 실행됩니다.
CompletableFuture.supplyAsync(() -> { return "abc"; }) .handle((r,err) -> { log.error("调用错误 {}", err.getMessage(), err); // 对结果做额外的处理 return r; }) ;
요건은 테이블의 특정 조건을 문자 메시지를 통해 사용자에게 알리는 것입니다. 그러나 단일 스레드 읽기인 경우 수백만 명의 문자 메시지 사용자가 있습니다. 사용하면 읽기 효율성이 매우 느려집니다. 이때 멀티스레딩을 사용하여 읽기를 고려할 수 있습니다.
1. 읽기 작업을 여러 개의 하위 작업으로 나누고 읽기 횟수와 오프셋을 지정하세요.
// 假设有500万条记录 long recordCount = 500 * 10000; int subTaskRecordCount = 10000; // 对记录进行分片 List<Map> subTaskList = new LinkedList<>(); for (int i = 0; i < recordCount / 500; i++) { // 如果子任务结构复杂,建议使用对象 HashMap<String, Integer> subTask = new HashMap<>(); subTask.put("index", i); subTask.put("offset", i * subTaskRecordCount); subTask.put("count", subTaskRecordCount); subTaskList.add(subTask); }
2. 멀티스레딩 일괄 읽기
// 进行subTask批量处理,拆分为不同的任务 subTaskList.stream() .map(subTask -> CompletableFuture.runAsync(()->{ // 读取数据,然后处理 // dataTunel.read(subTask); },excuturs)) // 使用应用的通用任务线程池 .map(c -> ((CompletableFuture<?>) c).join());
3를 사용하세요. 비즈니스 로직 처리를 수행하거나 읽은 후 직접 비즈니스 로직 처리를 수행합니다.
在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。
当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;
List<Task> taskList = new ArrayList<>(); List<Object> result = taskList.stream() .map(task -> CompletableFuture.supplyAsync(()->{ // handlerMap.get(task).query(); return ""; }, executorService)) .map(c -> c.join()) .collect(Collectors.toList());
如果不使用传入的线程池,大家用默认的线程池ForkJoinPool
thenRun用的默认和上一个任务使用相同的线程池
thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;
exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。
위 내용은 Java 멀티스레딩 도구 CompletableFuture를 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!