ホームページ  >  記事  >  非同期プログラミングにはいくつかの方法がありますが、いくつ知っていますか?

非同期プログラミングにはいくつかの方法がありますが、いくつ知っていますか?

Java后端技术全栈
Java后端技术全栈転載
2023-08-15 16:14:14771ブラウズ

非同期実行は開発者にとって馴染みのあるものではありません。実際の開発プロセスでは、多くのシナリオで非同期実行がよく使用されます。同期実行と比較して、非同期実行はリクエスト リンクを大幅に短縮できます。時間消費する。

例: 「テキスト メッセージ、電子メール、非同期更新などの送信」、これらは非同期で実装できる一般的なシナリオです。

#非同期実装を実現する 8 つの方法

1、スレッド

2、将来

3、非同期フレームワーク CompletableFuture

4、Spring アノテーション @Async

5、Spring ApplicationEvent イベント

6、メッセージ キュー

7 、 Hutool の ThreadUtil

8、Guava などのサードパーティの非同期フレームワーク非同期

非同期とは何ですか?

まず、ユーザーが注文する一般的なシナリオを見てみましょう:

非同期プログラミングにはいくつかの方法がありますが、いくつ知っていますか?ビジネス シナリオ

非同期とは何ですか?

同期操作で

Send SMS を実行するときは、Giving Points 操作を実行する前に、このメソッドが完全に実行されるまで待つ必要があります。 ポイントを付与する場合 このアクションの実行には長い時間がかかり、テキスト メッセージの送信には待機が必要です。これは一般的な同期シナリオです。

実際には、テキスト メッセージの送信とポイントのギフトの間に依存関係はありません。非同期により、次のようなポイントのギフトとテキスト メッセージの送信の 2 つの操作を同時に実行できることがわかります。 ##
非同期プログラミングにはいくつかの方法がありますが、いくつ知っていますか?
非同期

これはいわゆる非同期です。非常に簡単ではありませんか?非同期を実装するいくつかの方法について説明しましょう。

非同期固有の実装

1. スレッド非同期

public class AsyncThread extends Thread {

    @Override
    public void run() {
        System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
    }

    public static void main(String[] args) {
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.start();
    }
}

もちろん、Thread スレッドが毎回作成され、頻繁に作成および破棄され、システム リソースが無駄になる場合は、スレッド プールを使用できます。

private ExecutorService executorService = Executors.newCachedThreadPool();

public void fun() {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            log.info("执行业务逻辑...");
        }
    });
}

は、ビジネス ロジックを Runnable または呼び出し可能。実行はスレッド プールに任せます。

2. 今後の非同期

@Slf4j
public class FutureManager {


    public String execute() throws Exception {


        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {


                System.out.println(" --- task start --- ");
                Thread.sleep(3000);
                System.out.println(" --- task finish ---");
                return "this is future execute final result!!!";
            }
        });


        //这里需要返回值时会阻塞主线程
        String result = future.get();
        log.info("Future get result: {}", result);
        return result;
    }


    @SneakyThrows
    public static void main(String[] args) {
        FutureManager manager = new FutureManager();
        manager.execute();
    }
}

出力結果:

--- task start --- 
 --- task finish ---
 Future get result: this is future execute final result!!!

(1) 今後の欠点

Future の欠点には次の点が含まれます:

非同期タスクの計算結果を受動的に受け取ることはできません: 非同期タスクをスレッド プール内のスレッドにアクティブに送信できますが、実行後は非同期タスクの実行が完了すると、メインスレッドはタスクが完了したかどうかを通知できないため、get メソッドを通じてタスクの実行結果を能動的に取得する必要があります。

将来のファイルは相互に分離されます: 長時間実行される非同期タスクの実行後、タスクから返された結果を使用してさらなる操作を実行したい場合があります。非同期タスクである場合、2 つの関係にはプログラム開発者による手動のバインディングと割り当てが必要です。Future はタスク フロー (パイプライン) を形成できません。各 Future は互いに分離されているため、CompletableFuture があり、CompletableFuture は複数の Future を接続して形成できますタスクフロー。

Futrue には優れたエラー処理メカニズムがありません: これまで、非同期タスクの実行中に例外が発生した場合、呼び出し元はそれを受動的に認識できず、get メソッドをキャプチャする必要がありました。例外が発生した場合にのみ、非同期タスクの実行にエラーがあるかどうかを知ることができ、さらなる判断と処理を行うことができます。

3. CompletableFuture は非同期を実装します

public class CompletableFutureCompose {
    /**
     * thenAccept子任务和父任务公用同一个线程
     */
    @SneakyThrows
    public static void thenRunAsync() {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something...");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

    public static void main(String[] args) {
        thenRunAsync();
    }
}

ExecutorService を明示的に使用する必要はありません。CompletableFuture は内部的に ForkJoinPool を使用して非同期タスクを処理します。シナリオ 独自の非同期スレッド プールをカスタマイズすることもできます。

4. Spring の @Async 非同期

(1) カスタム非同期スレッド プール

/**
 * 线程池参数配置,多个线程池实现线程池隔离,@Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async("taskName")
@EnableAsync
@Configuration
public class TaskPoolConfig {
    /**
     * 自定义线程池
     *
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  :" + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

( 2) AsyncService

public interface AsyncService {

    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);

    MessageResult sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {
    @Autowired
    private IMessageHandler mesageHandler;

    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {
        try {


            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);


        } catch (Exception e) {
            log.error("发送短信异常 -> ", e)
        }
    }
    
    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);

        } catch (Exception e) {
            log.error("发送email异常 -> ", e)
        }
    }
}

実際のプロジェクトでは、@Async を使用してスレッド プールを呼び出します。推奨される方法はカスタム スレッド プール モードを使用することです。@Async を直接使用して非同期を直接実装することはお勧めできません。

5. Spring ApplicationEvent イベントは非同期で実装されます

(1)定义事件

public class AsyncSendEmailEvent extends ApplicationEvent {
    /**
     * 邮箱
     **/
    private String email;
   /**
     * 主题
     **/
    private String subject;
    /**
     * 内容
     **/
    private String content;
  
    /**
     * 接收者
     **/
    private String targetUserId;

}

(2)定义事件处理器

@Slf4j
@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {

    @Autowired
    private IMessageHandler mesageHandler;
    
    @Async("taskExecutor")
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {
        if (event == null) {
            return;
        }

        String email = event.getEmail();
        String subject = event.getSubject();
        String content = event.getContent();
        String targetUserId = event.getTargetUserId();
        mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
      }
}

另外,可能有些时候采用ApplicationEvent实现异步的使用,当程序出现异常错误的时候,需要考虑补偿机制,那么这时候可以结合Spring Retry重试来帮助我们避免这种异常造成数据不一致问题。

6、消息队列

(1)回调事件消息生产者

@Slf4j
@Component
public class CallbackProducer {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendCallbackMessage(CallbackDTO allbackDTO, final long delayTimes) {

        log.info("生产者发送消息,callbackDTO,{}", callbackDTO);

        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
                return message;
            }
        });
    }
}

(2)回调事件消息消费者

@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
public class CallbackConsumer {

    @Autowired
    private IGlobalUserService globalUserService;

    @RabbitHandler
    public void handle(String json, Channel channel, @Headers Map<String, Object> map) throws Exception {

        if (map.get("error") != null) {
            //否认消息
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
            return;
        }

        try {
        
            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
            //执行业务逻辑
            globalUserService.execute(callbackDTO);
            //消息消息成功手动确认,对应消息确认模式acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);

        } catch (Exception e) {
            log.error("回调失败 -> {}", e);
        }
    }
}

7、ThreadUtil异步工具类

@Slf4j
public class ThreadUtils {

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            ThreadUtil.execAsync(() -> {
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
                int number = threadLocalRandom.nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("当前第:" + i + "个线程");
        }

        log.info("task finish!");
    }
}

8、Guava异步

Guava的ListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用「Guava ListenableFuture」可以帮我们检测Future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

ListenableFuture是一个接口,它从jdk的Future接口继承,添加了void addListener(Runnable listener, Executor executor)方法。

我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例:

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("callable execute...")
                TimeUnit.SECONDS.sleep(1);
                return 1;
            }
        });

首先,通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后,使用此实例的submit方法即可初始化ListenableFuture对象。

ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了ListenableFuture实例,可以执行此Future并执行Future完成之后的回调函数。

Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        //成功执行...
        System.out.println("Get listenable future&#39;s result with callback " + result);
    }

    @Override
    public void onFailure(Throwable t) {
        //异常情况处理...
        t.printStackTrace();
    }
});

那么,以上就是本期介绍的实现异步的8种方式了。

以上が非同期プログラミングにはいくつかの方法がありますが、いくつ知っていますか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はJava后端技术全栈で複製されています。侵害がある場合は、admin@php.cn までご連絡ください。