Home >Java >javaTutorial >How SpringBoot integrates RabbitMQ
RabbitMQ is a type of message middleware that implements AMQP (Advanced Message Queuing Protocol). It originally originated from the financial system and is used to store and forward messages in distributed systems. It performs well in terms of usability, scalability, and high availability. RabbitMQ is mainly implemented to achieve two-way decoupling between systems. When the producer generates a large amount of data and the consumer cannot consume it quickly, an intermediate layer is needed. Save this data.
AMQP, Advanced Message Queuing Protocol, is an open standard for application layer protocols and is designed for message-oriented middleware. The main purpose of message middleware is to decouple components so that message senders and receivers do not interfere with each other and are independent of each other. Therefore, the sender does not need to know the existence of the user, and vice versa. The salient features of AMQP include message orientation, queuing, routing (including point-to-point and publish/subscribe), reliability and security.
RabbitMQ is an open source AMQP implementation. The server is written in Erlang language and supports a variety of clients, such as: Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, etc. , supports AJAX. This technology demonstrates good ease of use, scalability, and high availability in storing and forwarding messages in distributed systems.
Usually when we talk about queue services, there are three concepts: message sender, queue, message receiver, RabbitMQ is here On top of the basic concept, an additional layer of abstraction is added, and an exchange is added between the message sender and the queue. In this way, the message sender and the queue are not directly connected, and instead the message sender delivers the message to the queue. Exchange, the exchanger sends the message to the queue according to the scheduling policy
The P on the left represents the producer, which is the program that sends messages to RabbitMQ.
The middle is RabbitMQ, which includes switches and queues.
The C on the right represents the consumer, which is the program that gets messages from RabbitMQ.
There are four important concepts, namely: virtual host, switch, queue, and binding.
Virtual host: A virtual host holds a set of switches, queues and bindings. Why do you need multiple virtual hosts? It's very simple. In RabbitMQ, users can only control permissions at the granularity of the virtual host. Therefore, if you need to prohibit group A from accessing group B's switches/queues/bindings, you must create a virtual host for A and B respectively. Every RabbitMQ server has a default virtual host "/".
Switch: Exchange is used to forward messages, but it does not store them. If there is no Queue bind to Exchange, it will directly discard the messages sent by the Producer. information.
Binding: That is, the switch needs to be bound to the queue. As shown in the figure above, it is a many-to-many relationship.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2. Configuration file
Configure the installation address, port and account information of rabbitmq.
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=192.168.0.86 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=1234563. Queue configuration
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
rabbitTemplate是springboot 提供的默认实现 public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } }
Note:The queue names of the sender and the receiver must be consistent, otherwise they cannot receive
Many-to-many use
One sender, N receivers or What happens with N senders and N receivers?One-to-many sending
A small modification was made to the above code. The receiving end registered two Receivers, Receiver1 and Receiver2, and the sending end added parameter counting. The receiving end prints the parameters received. The following is the test code, which sends a hundred messages to observe the execution effect of the two receiving ends.@Test public void oneToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); } }
The results are as follows:
Receiver 1: spirng boot neo queue ****** 11Receiver 2: spirng boot neo queue ****** 12
Receiver 2: spirng boot neo queue ****** 14
Receiver 1: spirng boot neo queue ****** 13
Receiver 2: spirng boot neo queue ****** 15
Receiver 1: spirng boot neo queue ****** 16
Receiver 1: spirng boot neo queue ****** 18
Receiver 2: spirng boot neo queue ****** 17
Receiver 2: spirng boot neo queue ***** * 19
Receiver 1: spirng boot neo queue ****** 20
根据返回结果得到以下结论
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多发送
复制了一份发送者,加入标记,在一百个循环中相互交替发送
@Test public void manyToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); neoSender2.send(i); } }
结果如下:
Receiver 1: spirng boot neo queue ****** 20
Receiver 2: spirng boot neo queue ****** 20
Receiver 1: spirng boot neo queue ****** 21
Receiver 2: spirng boot neo queue ****** 21
Receiver 1: spirng boot neo queue ****** 22
Receiver 2: spirng boot neo queue ****** 22
Receiver 1: spirng boot neo queue ****** 23
Receiver 2: spirng boot neo queue ****** 23
Receiver 1: spirng boot neo queue ****** 24
Receiver 2: spirng boot neo queue ****** 24
Receiver 1: spirng boot neo queue ****** 25
Receiver 2: spirng boot neo queue ****** 25
结论:和一对多一样,接收端仍然会均匀接收到消息.
//对象的支持 //springboot以及完美的支持对象的发送和接收,不需要格外的配置。 //发送者 public void send(User user) { System.out.println("Sender object: " + user.toString()); this.rabbitTemplate.convertAndSend("object", user); } ... //接受者 @RabbitHandler public void process(User user) { System.out.println("Receiver object : " + user); }
结果如下:
Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}
在RabbitMQ中,Topic是最灵活的一种方式,它允许根据routing_key随意绑定到不同的队列
首先对topic规则配置,这里使用两个队列来测试
@Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
使用queueMessages同时匹配两个队列,queueMessage只匹配"topic.message"队列
public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context); }
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置:
@Configuration public class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); }
结果如下:
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
The above is the detailed content of How SpringBoot integrates RabbitMQ. For more information, please follow other related articles on the PHP Chinese website!