Heim >häufiges Problem >Mehrere Methoden der asynchronen Programmierung, wie viele kennen Sie?

Mehrere Methoden der asynchronen Programmierung, wie viele kennen Sie?

Java后端技术全栈
Java后端技术全栈nach vorne
2023-08-15 16:14:14895Durchsuche

Asynchrone Ausführung ist für Entwickler kein Unbekannter. Im tatsächlichen Entwicklungsprozess wird die asynchrone Ausführung häufig im Vergleich zur synchronen Ausführung verwendet.

Zum Beispiel: 「发送短信、邮件、异步更新等」, das sind typische Szenarien, die asynchron umgesetzt werden können. 8 Möglichkeiten zur Implementierung von asynchronem

6, Nachricht queue7, asynchrones Framework von Drittanbietern, wie ThreadUtil von Hutool8, Guava asynchron

Was ist asynchron?

Schauen wir uns zunächst ein häufiges Szenario an, in dem Benutzer Bestellungen aufgeben:

Geschäftsszenario

Was ist asynchron?

Wenn wir im Synchronisierungsvorgang „SMS senden“ ausführen, müssen wir warten, bis diese Methode vollständig ausgeführt ist, bevor wir den Vorgang „Geschenkpunkte“ ausführen können, wenn die Aktion „Geschenkpunkte“ lange dauert Die Ausführung einer Textnachricht dauert lange. Warten Sie, dies ist ein typisches Synchronisierungsszenario. Tatsächlich besteht keine Abhängigkeit zwischen dem Senden von Textnachrichten und dem Verschenken von Punkten. Durch Asynchronität können wir erkennen, dass die beiden Vorgänge des Verschenkens von Punkten und des Sendens von Textnachrichten gleichzeitig ausgeführt werden können, wie zum Beispiel:
Mehrere Methoden der asynchronen Programmierung, wie viele kennen Sie?
Asynchron

Das nennt man asynchron. Ist das nicht sehr einfach?

Asynchrone spezifische Implementierung 1. Thread asynchron

public class AsyncThread extends Thread {

    @Override
    public void run() {
        System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
    }

    public static void main(String[] args) {
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.start();
    }
}

Wenn ein Thread-Thread jedes Mal erstellt wird, wird er natürlich häufig erstellt und zerstört, was Systemressourcen verschwendet Verwenden Sie einen Thread-Pool:

private ExecutorService executorService = Executors.newCachedThreadPool();

public void fun() {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            log.info("执行业务逻辑...");
        }
    });
}

Sie können Geschäftslogik in Runnable oder Callable kapseln und sie zur Ausführung an den Thread-Pool übergeben. 2. Zukunft asynchron Obwohl wir asynchrone Aufgaben aktiv zur Ausführung an Threads im Thread-Pool senden können, kann der Hauptthread nach Abschluss der asynchronen Aufgabenausführung nicht benachrichtigt werden, ob die Aufgabe abgeschlossen ist oder nicht. Er muss die Ergebnisse der Aufgabenausführung aktiv abrufen über die get-Methode.

Zukünftige Teile sind voneinander isoliert:

Manchmal möchten Sie nach der Ausführung einer asynchronen Aufgabe mit langer Laufzeit das von ihr zurückgegebene Ergebnis verwenden, um weitere Vorgänge auszuführen, und die Beziehung zwischen ihnen ist ebenfalls gegeben Die beiden Beziehungen erfordern, dass Programmentwickler sie manuell binden und zuweisen. Jeder Future ist voneinander isoliert, sodass CompletableFuture mehrere Futures in Reihe schalten kann, um einen Taskflow zu bilden .

Futrue verfügt nicht über einen guten Fehlerbehandlungsmechanismus:

Wenn während der Ausführung einer asynchronen Aufgabe eine Ausnahme auftritt, kann der Aufrufer diese bisher nicht passiv erkennen. Er muss die Ausnahme der get-Methode erfassen, um zu wissen, ob die Die asynchrone Aufgabe wird ausgeführt. Es ist ein Fehler aufgetreten und eine weitere Beurteilung und Verarbeitung ist erforderlich.

3. CompletableFuture implementiert asynchrone

@Slf4j
public class FutureManager {


    public String execute() throws Exception {


        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {


                System.out.println(" --- task start --- ");
                Thread.sleep(3000);
                System.out.println(" --- task finish ---");
                return "this is future execute final result!!!";
            }
        });


        //这里需要返回值时会阻塞主线程
        String result = future.get();
        log.info("Future get result: {}", result);
        return result;
    }


    @SneakyThrows
    public static void main(String[] args) {
        FutureManager manager = new FutureManager();
        manager.execute();
    }
}

Wir müssen in einigen Geschäftsszenarien ForkJoinPool nicht explizit verwenden . 4. @Async asynchron von Spring Dies ist der Modus zur Verwendung eines benutzerdefinierten Thread-Pools. Es wird nicht empfohlen, @Async direkt zu verwenden, um asynchron zu implementieren.

5. Das Spring ApplicationEvent-Ereignis ist asynchron

(1)定义事件

public class AsyncSendEmailEvent extends ApplicationEvent {
    /**
     * 邮箱
     **/
    private String email;
   /**
     * 主题
     **/
    private String subject;
    /**
     * 内容
     **/
    private String content;
  
    /**
     * 接收者
     **/
    private String targetUserId;

}

(2)定义事件处理器

@Slf4j
@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {

    @Autowired
    private IMessageHandler mesageHandler;
    
    @Async("taskExecutor")
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {
        if (event == null) {
            return;
        }

        String email = event.getEmail();
        String subject = event.getSubject();
        String content = event.getContent();
        String targetUserId = event.getTargetUserId();
        mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
      }
}

另外,可能有些时候采用ApplicationEvent实现异步的使用,当程序出现异常错误的时候,需要考虑补偿机制,那么这时候可以结合Spring Retry重试来帮助我们避免这种异常造成数据不一致问题。

6、消息队列

(1)回调事件消息生产者

@Slf4j
@Component
public class CallbackProducer {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendCallbackMessage(CallbackDTO allbackDTO, final long delayTimes) {

        log.info("生产者发送消息,callbackDTO,{}", callbackDTO);

        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
                return message;
            }
        });
    }
}

(2)回调事件消息消费者

@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
public class CallbackConsumer {

    @Autowired
    private IGlobalUserService globalUserService;

    @RabbitHandler
    public void handle(String json, Channel channel, @Headers Map<String, Object> map) throws Exception {

        if (map.get("error") != null) {
            //否认消息
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
            return;
        }

        try {
        
            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
            //执行业务逻辑
            globalUserService.execute(callbackDTO);
            //消息消息成功手动确认,对应消息确认模式acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);

        } catch (Exception e) {
            log.error("回调失败 -> {}", e);
        }
    }
}

7、ThreadUtil异步工具类

@Slf4j
public class ThreadUtils {

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            ThreadUtil.execAsync(() -> {
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
                int number = threadLocalRandom.nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("当前第:" + i + "个线程");
        }

        log.info("task finish!");
    }
}

8、Guava异步

Guava的ListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用「Guava ListenableFuture」可以帮我们检测Future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。

ListenableFuture是一个接口,它从jdk的Future接口继承,添加了void addListener(Runnable listener, Executor executor)方法。

我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例:

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("callable execute...")
                TimeUnit.SECONDS.sleep(1);
                return 1;
            }
        });

首先,通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后,使用此实例的submit方法即可初始化ListenableFuture对象。

ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了ListenableFuture实例,可以执行此Future并执行Future完成之后的回调函数。

Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        //成功执行...
        System.out.println("Get listenable future&#39;s result with callback " + result);
    }

    @Override
    public void onFailure(Throwable t) {
        //异常情况处理...
        t.printStackTrace();
    }
});

那么,以上就是本期介绍的实现异步的8种方式了。

Das obige ist der detaillierte Inhalt vonMehrere Methoden der asynchronen Programmierung, wie viele kennen Sie?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:Java后端技术全栈. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen