Home >Java >javaTutorial >Microservice message queue and asynchronous communication tool implemented in Java

Microservice message queue and asynchronous communication tool implemented in Java

王林
王林Original
2023-08-09 18:49:47806browse

Microservice message queue and asynchronous communication tool implemented in Java

Microservice message queue and asynchronous communication tool implemented in Java

Introduction:
In today’s Internet era, the popularity of microservice architecture has become an indisputable fact . In the microservice architecture, message queue and asynchronous communication are indispensable key components. This article will introduce how to use Java to implement microservice message queues and asynchronous communication tools, and provide corresponding code examples.

1. Microservice Message Queue
1.1 What is a message queue?
Message queue is an application decoupled communication method that achieves loose coupling between the sender and the receiver by sending messages to the queue. The sender only needs to send the message to the queue and does not care how the message is processed. The receiver can asynchronously remove the message from the queue for processing.

1.2 Introduction to RabbitMQ
RabbitMQ is an open source message queuing system that uses AMQP (Advanced Message Queuing Protocol) as the message transmission protocol. It has high reliability, scalability and flexible routing mechanism, making it very suitable for building message queues in microservice architecture.

1.3 Use of RabbitMQ
1.3.1 Add dependencies
First, add RabbitMQ dependencies in the project's pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
</dependency>

1.3.2 Create a message producer

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer {
    private final static String QUEUE_NAME = "my_queue";

    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            // 创建连接
            Connection connection = factory.newConnection();
            
            // 创建通道
            Channel channel = connection.createChannel();
            
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 发送消息
            String message = "Hello, RabbitMQ!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("Message sent: " + message);
            
            // 关闭连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.3.3 Create a message consumer

import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "my_queue";

    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            // 创建连接
            Connection connection = factory.newConnection();
            
            // 创建通道
            Channel channel = connection.createChannel();
            
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 定义消息处理回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Message received: " + message);
            };
            
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
            
            // 阻塞线程,持续监听
            Thread.sleep(Long.MAX_VALUE);
            
            // 关闭连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.4 Run the example
Run the Producer and Consumer classes respectively, and you will see that the messages sent by the Producer are received by the Consumer.

2. Asynchronous communication tools
2.1 Introduction to CompletableFuture
CompletableFuture is a tool class introduced in Java8 for processing asynchronous tasks. It can handle asynchronous calls more conveniently, avoid cumbersome callback processing, and greatly improve the efficiency of concurrent programming.

2.2 Use of CompletableFuture
2.2.1 Creating an asynchronous task
Use the static method supplyAsync of CompletableFuture to create an asynchronous task with a return value.

import java.util.concurrent.CompletableFuture;

public class AsyncExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 异步执行的任务
            return "Hello, CompletableFuture!";
        });
        
        // 当任务执行完毕后调用回调函数进行处理
        future.thenAccept(result -> {
            System.out.println("Result: " + result);
        });
        
        // 其他业务逻辑
        // ...
    }
}

2.2.2 Combining multiple asynchronous tasks
CompletableFuture also supports combining multiple asynchronous tasks based on dependencies.

import java.util.concurrent.CompletableFuture;

public class AsyncExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 异步执行的任务1
            return "Hello";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 异步执行的任务2
            return "CompletableFuture";
        });
        
        // 通过thenCompose将任务1和任务2串行化
        CompletableFuture<String> combinedFuture = future1.thenCompose(result1 -> {
            return future2.thenApply(result2 -> {
                return result1 + ", " + result2;
            });
        });
        
        // 当所有任务执行完毕后调用回调函数进行处理
        combinedFuture.thenAccept(result -> {
            System.out.println("Combined Result: " + result);
        });
        
        // 其他业务逻辑
        // ...
    }
}

Summary:
By using RabbitMQ as a microservice message queue, asynchronous communication under a microservice architecture can be achieved. At the same time, the CompletableFuture tool class introduced in Java 8 also provides powerful support for asynchronous programming. By properly applying message queues and asynchronous communication tools, we can build scalable and reliable microservice systems.

References:

  1. RabbitMQ official documentation: https://www.rabbitmq.com/documentation.html
  2. CompletableFuture official documentation: https://docs .oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

The above is the detailed content of Microservice message queue and asynchronous communication tool implemented in Java. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn