Home >Java >javaTutorial >How SpringBoot integrates RabbitMQ

How SpringBoot integrates RabbitMQ

王林
王林forward
2023-06-01 09:53:011379browse

1. Introduction to RabbitMQ

RabbitMQ is a type of message middleware that implements AMQP (Advanced Message Queuing Protocol). It originally originated from the financial system and is used to store and forward messages in distributed systems. It performs well in terms of usability, scalability, and high availability. RabbitMQ is mainly implemented to achieve two-way decoupling between systems. When the producer generates a large amount of data and the consumer cannot consume it quickly, an intermediate layer is needed. Save this data.

AMQP, Advanced Message Queuing Protocol, is an open standard for application layer protocols and is designed for message-oriented middleware. The main purpose of message middleware is to decouple components so that message senders and receivers do not interfere with each other and are independent of each other. Therefore, the sender does not need to know the existence of the user, and vice versa. The salient features of AMQP include message orientation, queuing, routing (including point-to-point and publish/subscribe), reliability and security.

RabbitMQ is an open source AMQP implementation. The server is written in Erlang language and supports a variety of clients, such as: Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, etc. , supports AJAX. This technology demonstrates good ease of use, scalability, and high availability in storing and forwarding messages in distributed systems.

How SpringBoot integrates RabbitMQ

2. Related concepts

Usually when we talk about queue services, there are three concepts: message sender, queue, message receiver, RabbitMQ is here On top of the basic concept, an additional layer of abstraction is added, and an exchange is added between the message sender and the queue. In this way, the message sender and the queue are not directly connected, and instead the message sender delivers the message to the queue. Exchange, the exchanger sends the message to the queue according to the scheduling policy

  • The P on the left represents the producer, which is the program that sends messages to RabbitMQ.

  • The middle is RabbitMQ, which includes switches and queues.

  • The C on the right represents the consumer, which is the program that gets messages from RabbitMQ.

There are four important concepts, namely: virtual host, switch, queue, and binding.

  • Virtual host: A virtual host holds a set of switches, queues and bindings. Why do you need multiple virtual hosts? It's very simple. In RabbitMQ, users can only control permissions at the granularity of the virtual host. Therefore, if you need to prohibit group A from accessing group B's switches/queues/bindings, you must create a virtual host for A and B respectively. Every RabbitMQ server has a default virtual host "/".

  • Switch: Exchange is used to forward messages, but it does not store them. If there is no Queue bind to Exchange, it will directly discard the messages sent by the Producer. information.

There is a more important concept here: routing key. Based on the routing key, the switch forwards the message to the corresponding queue.

  • Binding: That is, the switch needs to be bound to the queue. As shown in the figure above, it is a many-to-many relationship.

It is very simple to integrate RabbitMQ with SpringBoot. If you just use it with very little configuration, springboot provides various support for messages in the spring-boot-starter-amqp project.

How SpringBoot integrates RabbitMQ

How SpringBoot integrates RabbitMQ

3. Simple use

1. Configure the pom package

The main thing is to add spring- boot-starter-amqp support

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Configuration file

Configure the installation address, port and account information of rabbitmq.

spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

3. Queue configuration

@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}

4.Sender

rabbitTemplate是springboot 提供的默认实现
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}

5.Receiver

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}

6.Test

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}

Note:The queue names of the sender and the receiver must be consistent, otherwise they cannot receive

Many-to-many use

One sender, N receivers or What happens with N senders and N receivers?

One-to-many sending

A small modification was made to the above code. The receiving end registered two Receivers, Receiver1 and Receiver2, and the sending end added parameter counting. The receiving end prints the parameters received. The following is the test code, which sends a hundred messages to observe the execution effect of the two receiving ends.

@Test
public void oneToMany() throws Exception {
for (int i=0;i<100;i++){
neoSender.send(i);
}
}

The results are as follows:

Receiver 1: spirng boot neo queue ****** 11

Receiver 2: spirng boot neo queue ****** 12
Receiver 2: spirng boot neo queue ****** 14
Receiver 1: spirng boot neo queue ****** 13
Receiver 2: spirng boot neo queue ****** 15
Receiver 1: spirng boot neo queue ****** 16
Receiver 1: spirng boot neo queue ****** 18
Receiver 2: spirng boot neo queue ****** 17
Receiver 2: spirng boot neo queue ***** * 19
Receiver 1: spirng boot neo queue ****** 20

根据返回结果得到以下结论

一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中

多对多发送

复制了一份发送者,加入标记,在一百个循环中相互交替发送

@Test
public void manyToMany() throws Exception {
for (int i=0;i<100;i++){
neoSender.send(i);
neoSender2.send(i);
}
}

结果如下:

Receiver 1: spirng boot neo queue ****** 20
Receiver 2: spirng boot neo queue ****** 20
Receiver 1: spirng boot neo queue ****** 21
Receiver 2: spirng boot neo queue ****** 21
Receiver 1: spirng boot neo queue ****** 22
Receiver 2: spirng boot neo queue ****** 22
Receiver 1: spirng boot neo queue ****** 23
Receiver 2: spirng boot neo queue ****** 23
Receiver 1: spirng boot neo queue ****** 24
Receiver 2: spirng boot neo queue ****** 24
Receiver 1: spirng boot neo queue ****** 25
Receiver 2: spirng boot neo queue ****** 25

结论:和一对多一样,接收端仍然会均匀接收到消息.

四、高级使用

//对象的支持
//springboot以及完美的支持对象的发送和接收,不需要格外的配置。
//发送者
public void send(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
}
...
//接受者
@RabbitHandler
public void process(User user) {
System.out.println("Receiver object : " + user);
}

结果如下:

Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}

1.Topic Exchange

在RabbitMQ中,Topic是最灵活的一种方式,它允许根据routing_key随意绑定到不同的队列

首先对topic规则配置,这里使用两个队列来测试

How SpringBoot integrates RabbitMQ

@Configuration
public class TopicRabbitConfig {

final static String message = "topic.message";
final static String messages = "topic.messages";

@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}

@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}

@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}

使用queueMessages同时匹配两个队列,queueMessage只匹配"topic.message"队列

public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}

public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}

发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息

2.Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

How SpringBoot integrates RabbitMQ

Fanout 相关配置:

@Configuration
public class FanoutRabbitConfig {

@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}

@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}

@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:

public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}

结果如下:

Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg

结果说明,绑定到fanout交换机上面的队列都收到了消息.

The above is the detailed content of How SpringBoot integrates RabbitMQ. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete