>일반적인 문제 >비동기 프로그래밍의 여러 가지 방법, 얼마나 많이 알고 있나요?

비동기 프로그래밍의 여러 가지 방법, 얼마나 많이 알고 있나요?

Java后端技术全栈
Java后端技术全栈앞으로
2023-08-15 16:14:14834검색

비동기 실행은 개발자에게 낯설지 않습니다. 실제 개발 프로세스에서 비동기 실행은 동기 실행에 비해 요청 링크 시간을 크게 단축할 수 있습니다.

예: 「发送短信、邮件、异步更新等」 다음은 비동기식으로 구현할 수 있는 일반적인 시나리오입니다.

비동기를 구현하는 8가지 방법

1, Thread

2, Future

3, 비동기 프레임워크 CompletableFuture

4, Spring 주석 @Async

5, Spring ApplicationEvent 이벤트

6 , 메시지 queue

7, Hutool의 ThreadUtil

8과 같은 타사 비동기 프레임워크, Guava asynchronous

비동기란 무엇입니까?

먼저 사용자가 주문하는 일반적인 시나리오를 살펴보겠습니다.

비동기 프로그래밍의 여러 가지 방법, 얼마나 많이 알고 있나요?
비즈니스 시나리오

비동기란 무엇입니까?

동기화 작업에서 문자 메시지 보내기를 실행할 때 선물 포인트 작업을 수행하려면 이 메서드가 완전히 실행될 때까지 기다려야 합니다. 선물 포인트 작업에 시간이 오래 걸립니다. 실행하려면 문자 메시지를 보내는 데 시간이 오래 걸립니다. 이는 일반적인 동기화 시나리오입니다.

실제로 문자 메시지 전송과 포인트 제공 사이에는 의존성이 없습니다. 비동기식을 통해 포인트 제공과 문자 메시지 전송의 두 가지 작업을 동시에 수행할 수 있습니다.

비동기 프로그래밍의 여러 가지 방법, 얼마나 많이 알고 있나요?
Asynchronous

이것을 비동기식이라고 합니다. 아주 간단하지 않나요? 비동기식을 구현하는 몇 가지 방법에 대해 이야기해 보겠습니다.

Asynchronous 특정 구현

1. Thread asynchronous

public class AsyncThread extends Thread {

    @Override
    public void run() {
        System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
    }

    public static void main(String[] args) {
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.start();
    }
}

물론 Thread 스레드가 자주 생성되고 파괴되어 시스템 리소스를 낭비한다면 스레드 풀:

private ExecutorService executorService = Executors.newCachedThreadPool();

public void fun() {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            log.info("执行业务逻辑...");
        }
    });
}

비즈니스 로직을 Runnable 또는 Callable로 캡슐화하고 실행을 위해 스레드 풀에 전달할 수 있습니다.

2. Future asynchronous

@Slf4j
public class FutureManager {


    public String execute() throws Exception {


        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {


                System.out.println(" --- task start --- ");
                Thread.sleep(3000);
                System.out.println(" --- task finish ---");
                return "this is future execute final result!!!";
            }
        });


        //这里需要返回值时会阻塞主线程
        String result = future.get();
        log.info("Future get result: {}", result);
        return result;
    }


    @SneakyThrows
    public static void main(String[] args) {
        FutureManager manager = new FutureManager();
        manager.execute();
    }
}

출력 결과:

--- task start --- 
 --- task finish ---
 Future get result: this is future execute final result!!!

(1) Future

Future의 단점은 다음과 같습니다.

비동기 작업 계산을 수동적으로 받을 수 없습니다. 결과: 실행을 위해 스레드 풀의 스레드에 비동기 작업을 적극적으로 제출할 수 있지만, 비동기 작업 실행이 완료된 후에는 작업 완료 여부를 메인 스레드에 알릴 수 없으므로 작업 실행 결과를 적극적으로 얻어야 ​​합니다. get 메소드를 통해.

향후 조각은 서로 격리됩니다. 장기 실행 비동기 작업이 실행된 후 해당 작업에서 반환된 결과를 사용하여 추가 작업을 수행하려는 경우도 있습니다. 두 관계를 사용하려면 프로그램 개발자가 수동으로 바인딩하고 할당해야 합니다. Future는 작업 흐름(파이프라인)을 형성할 수 없습니다. 각 Future는 서로 격리되어 있으므로 CompletableFuture는 여러 Future를 직렬로 결합하여 작업 흐름을 형성할 수 있습니다. .

Futrue에는 좋은 오류 처리 메커니즘이 없습니다. 지금까지 비동기 작업 실행 중에 예외가 발생하면 호출자는 이를 수동적으로 감지할 수 없습니다. 비동기 작업이 실행되었습니다. 오류가 발생했으며 추가 판단 및 처리가 필요합니다.

3. CompletableFuture는 비동기식을 구현합니다.

public class CompletableFutureCompose {
    /**
     * thenAccept子任务和父任务公用同一个线程
     */
    @SneakyThrows
    public static void thenRunAsync() {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something...");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

    public static void main(String[] args) {
        thenRunAsync();
    }
}

ExecutorService를 명시적으로 사용할 필요는 없습니다. CompletableFuture는 내부적으로 ForkJoinPool을 사용하여 비동기식 작업을 처리하려는 경우에도 가능합니다. .

4. Spring의 @Async 비동기

(1) 사용자 정의 비동기 스레드 풀

/**
 * 线程池参数配置,多个线程池实现线程池隔离,@Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async("taskName")
@EnableAsync
@Configuration
public class TaskPoolConfig {
    /**
     * 自定义线程池
     *
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  :" + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

(2) AsyncService

public interface AsyncService {

    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);

    MessageResult sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {
    @Autowired
    private IMessageHandler mesageHandler;

    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {
        try {


            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);


        } catch (Exception e) {
            log.error("发送短信异常 -> ", e)
        }
    }
    
    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);

        } catch (Exception e) {
            log.error("发送email异常 -> ", e)
        }
    }
}

실제 프로젝트에서는 @Async를 사용하여 스레드 풀을 호출하고 권장 및 기타 방법을 사용합니다. 사용자 정의 스레드 풀을 사용하는 모드입니다. @Async를 직접 사용하여 비동기식을 직접 구현하는 것은 권장되지 않습니다.

5. Spring ApplicationEvent 이벤트는 비동기입니다

(1)定义事件

public class AsyncSendEmailEvent extends ApplicationEvent {
    /**
     * 邮箱
     **/
    private String email;
   /**
     * 主题
     **/
    private String subject;
    /**
     * 内容
     **/
    private String content;
  
    /**
     * 接收者
     **/
    private String targetUserId;

}

(2)定义事件处理器

@Slf4j
@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {

    @Autowired
    private IMessageHandler mesageHandler;
    
    @Async("taskExecutor")
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {
        if (event == null) {
            return;
        }

        String email = event.getEmail();
        String subject = event.getSubject();
        String content = event.getContent();
        String targetUserId = event.getTargetUserId();
        mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
      }
}

另外,可能有些时候采用ApplicationEvent实现异步的使用,当程序出现异常错误的时候,需要考虑补偿机制,那么这时候可以结合Spring Retry重试来帮助我们避免这种异常造成数据不一致问题。

6、消息队列

(1)回调事件消息生产者

@Slf4j
@Component
public class CallbackProducer {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendCallbackMessage(CallbackDTO allbackDTO, final long delayTimes) {

        log.info("生产者发送消息,callbackDTO,{}", callbackDTO);

        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
                return message;
            }
        });
    }
}

(2)回调事件消息消费者

@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
public class CallbackConsumer {

    @Autowired
    private IGlobalUserService globalUserService;

    @RabbitHandler
    public void handle(String json, Channel channel, @Headers Map<String, Object> map) throws Exception {

        if (map.get("error") != null) {
            //否认消息
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
            return;
        }

        try {
        
            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
            //执行业务逻辑
            globalUserService.execute(callbackDTO);
            //消息消息成功手动确认,对应消息确认模式acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);

        } catch (Exception e) {
            log.error("回调失败 -> {}", e);
        }
    }
}

7、ThreadUtil异步工具类

@Slf4j
public class ThreadUtils {

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            ThreadUtil.execAsync(() -> {
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
                int number = threadLocalRandom.nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("当前第:" + i + "个线程");
        }

        log.info("task finish!");
    }
}

8、Guava异步

Guava的ListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用「Guava ListenableFuture」可以帮我们检测Future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

ListenableFuture是一个接口,它从jdk的Future接口继承,添加了void addListener(Runnable listener, Executor executor)方法。

我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例:

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("callable execute...")
                TimeUnit.SECONDS.sleep(1);
                return 1;
            }
        });

首先,通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后,使用此实例的submit方法即可初始化ListenableFuture对象。

ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了ListenableFuture实例,可以执行此Future并执行Future完成之后的回调函数。

Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        //成功执行...
        System.out.println("Get listenable future&#39;s result with callback " + result);
    }

    @Override
    public void onFailure(Throwable t) {
        //异常情况处理...
        t.printStackTrace();
    }
});

那么,以上就是本期介绍的实现异步的8种方式了。

위 내용은 비동기 프로그래밍의 여러 가지 방법, 얼마나 많이 알고 있나요?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 Java后端技术全栈에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제