Home >Java >javaTutorial >How does SpringBoot integrate RabbitMq?
Advanced Message Queuing Protocol (AMQP) is a platform-neutral wired protocol for message middleware. The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. Spring Boot provides some conveniences for working with AMQP via RabbitMQ, including the spring-boot-starter-amqp "Starter".
It is very simple to integrate RabbitMQ with springboot. If you simply use it with very little configuration, springboot provides various support for messages in the spring-boot-starter-amqp project.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
RabbitMQ is a lightweight, reliable, scalable, and portable message broker based on the AMQP protocol. Spring uses RabbitMQ to communicate via AMQP protocol.
RabbitMQ configuration is controlled by external configuration properties spring.rabbitmq.*. For example, you could declare the following partial application.properties in:
spring.rabbitmq.host = localhost spring.rabbitmq.port = 5672 spring.rabbitmq.username = guest spring.rabbitmq.password
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } }
rabbitTemplate It is the default implementation provided by springboot.
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); //往名称为 hello 的queue中发送消息 this.amqpTemplate.convertAndSend("hello",context); } }
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息接受者 * * @author itguang * @create @Component @RabbitListener(queues = "hello") //监听 名称为 hello 的queue public class HelloReceiver //消息处理器 @RabbitHandler public void process(String message){ System.out.println("Receiver:"+message); } }
package com.example.rabbitmqdemo; import com.example.rabbitmqdemo.rabbitmq.HelloSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqdemoApplicationTests @Autowired HelloSender helloSender; @Test public void contextLoads() { helloSender.send(); } }
View console output
send:hello----2018-04-21T11:29:47.739 Receiver:hello----2018-04-21T11:29:47.739
made a small modification to the above code. The receiving end registered two Receivers, Receiver1 and Receiver2. The sending end added the parameter count, and the receiving end printed the received parameters. The following is the test Code, send a hundred messages to observe the execution effect of the two receiving ends
Add a queue called hello2
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } @Bean public Queue queue2(){ return new Queue("hello2"); } }
Send a message to queue hello2 and accept a count parameter
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); this.amqpTemplate.convertAndSend("hello",context); } //给hello2发送消息,并接受一个计数参数 public void send2(int i){ String context = i+""; System.out.println(context+"--send:"); this.amqpTemplate.convertAndSend("hello2",context); } }
Two hello2 receivers
@Component @RabbitListener(queues = "hello2") public class HelloReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver1:"+message); } }
@Component @RabbitListener(queues = "hello2") public class HelloReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver2:"+message); } }
@Test public void manyReceiver(){ for (int i=0;i<100;i++){ helloSender.send2(i); } }
Check the console output:
0--send: 1--send: 2--send: 3--send: 4--send: ...(省略) 58--send: 59--send: 60--send: 61--send: 62--send: 63--send: Receiver2:1 Receiver1:0 64--send: 65--send: Receiver1:2 Receiver2:3 66--send: Receiver1:4 Receiver2:5 ...(省略)
You can see: when the message is sent to 63, the receiver Receiver has received the message,
Conclusion:
One sender, N receivers, after testing, the message will be evenly sent to N receivers
we can inject two senders and put them in the loop, as follows:
@Test public void many2many(){ for (int i=0;i<100;i++){ helloSender.send2(i); helloSender2.send2(i); } }
Run the unit test and view the console output:
0--send: 0--send: 1--send: 1--send: 2--send: 2--send: 3--send: 3--send: ...(省略) 22--send: 22--send: 23--send: 23--send: 24--send: 24--send: Receiver2:0 25--send: 25--send: Receiver2:1 26--send: Receiver2:2 26--send: Receiver2:3 27--send: Receiver1:0 27--send: Receiver2:4 Receiver1:1 28--send: Receiver2:5 Receiver1:2 28--send: Receiver2:6 Receiver1:3 29--send: Receiver2:7 Receiver1:4 29--send: Receiver2:8 Receiver1:5 30--send: Receiver2:9 Receiver1:6 30--send: 31--send: 31--send: 32--send: 32--send:
Conclusion: Just like one-to-many, the receiving end will still receive messages evenly
First we create an entity class object User, and note that the Serializable interface must be implemented.
package com.example.rabbitmqdemo.pojo; import java.io.Serializable; /** * @author itguang * @create public class User implements Serializable private String username; private String password; public User(String username, String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "User{" + "username='" + username + '\'' + ", password='" + password + '\'' + '}'; } }
Then create another queue in the configuration file, called object_queue
@Bean public Queue queue3(){ return new Queue("object_queue"); }
The following are the two senders of the User object, ObjectSender and receiver ObjectReceiver:
@Component public class ObjectSender @Autowired AmqpTemplate amqpTemplate; public void sendUser(User user){ System.out.println("Send object:"+user.toString()); this.amqpTemplate.convertAndSend("object_queue",user); } }
@Component @RabbitListener(queues = "object_queue") public class ObjectReceiver @RabbitHandler public void objectReceiver(User user){ System.out.println("Receiver object:"+user.toString()); } }
Run the unit test and view the console output:
Send object:User{username='李增光', password='666666'} Receiver object:User{username='李增光', password='666666'}
topic is the most flexible method in RabbitMQ and can be freely changed according to routing_key Bind different queues
First configure the topic rules, here use two queues to test
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author itguang * @create @Configuration public class TopicRabbitConfig final static String message = "topic.message"; final static String messages = "topic.messages"; //创建两个 Queue @Bean public Queue queueMessage(){ return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages(){ return new Queue(TopicRabbitConfig.messages); } //配置 TopicExchange,指定名称为 topicExchange @Bean public TopicExchange exchange(){ return new TopicExchange("topicExchange"); } //给队列绑定 exchange 和 routing_key @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){ return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){ return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
Message senders: both use topicExchange and bind to different routing_key
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class TopicSender @Autowired AmqpTemplate amqpTemplate; public void send1(){ String context = "hi, i am message 1"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange","topic.message",context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
Two message receivers, specify different queues respectively
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.message") public class TopicReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.message :"+ message); } }
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.messages") public class TopicReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.messages: "+ message); } }
Test:
Sending send1 will match both topic.# and topic.message Receiver can receive messages, send send2 only topic.# can match all messages that only Receiver2 listens to
Fanout is the broadcast mode or subscription we are familiar with mode, send a message to the Fanout switch, and all queues bound to this switch receive this message.
Fanout related configuration:
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.security.PublicKey; /** * @author itguang * @create @Configuration public class FanOutRabbitMq //创建三个队列 @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } //创建exchange,指定交换策略 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略: @Bean public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){ return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){ return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return
Message sender:
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略
package com.example.rabbitmqdemo.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class FanoutSender @Autowired AmqpTemplate amqpTemplate; public void send(){ String context = "hi, fanout msg "; System.out.println("Sender : " + context); //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略: amqpTemplate.convertAndSend("fanoutExchange","", context); } }
三个消息接受者:
@Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.A: "+message); } } @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.B: "+message); } } @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.C: "+message); } }
运行单元测试,查看结果:
Sender : hi, fanout msg Receiver form fanout.C: hi, fanout msg Receiver form fanout.A: hi, fanout msg Receiver form fanout.B: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
The above is the detailed content of How does SpringBoot integrate RabbitMq?. For more information, please follow other related articles on the PHP Chinese website!