Heim >Java >javaLernprogramm >Wie integriert SpringBoot RabbitMq?
Advanced Message Queuing Protocol (AMQP) ist ein plattformneutrales kabelgebundenes Protokoll für Nachrichten-Middleware. Das Spring AMQP-Projekt wendet zentrale Spring-Konzepte auf die Entwicklung AMQP-basierter Messaging-Lösungen an. Spring Boot bietet einige Annehmlichkeiten für die Arbeit mit AMQP über RabbitMQ, einschließlich des spring-boot-starter-amqp „Starter“.
Es ist sehr einfach, RabbitMQ mit Springboot zu integrieren. Wenn Sie es nur mit sehr wenig Konfiguration verwenden, bietet Springboot verschiedene Unterstützung für Nachrichten im Spring-Boot-Starter-Amqp-Projekt.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
RabbitMQ ist ein leichter, zuverlässiger, skalierbarer und portabler Nachrichtenbroker, der auf dem AMQP-Protokoll basiert. Spring verwendet RabbitMQ für die Kommunikation über das AMQP-Protokoll.
Die RabbitMQ-Konfiguration wird durch externe Konfigurationseigenschaften spring.rabbitmq.* gesteuert. Sie können beispielsweise die folgenden application.properties im folgenden Abschnitt deklarieren:
spring.rabbitmq.host = localhost spring.rabbitmq.port = 5672 spring.rabbitmq.username = guest spring.rabbitmq.password
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } }
rabbitTemplate ist die von Springboot bereitgestellte Standardimplementierung.
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); //往名称为 hello 的queue中发送消息 this.amqpTemplate.convertAndSend("hello",context); } }
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息接受者 * * @author itguang * @create @Component @RabbitListener(queues = "hello") //监听 名称为 hello 的queue public class HelloReceiver //消息处理器 @RabbitHandler public void process(String message){ System.out.println("Receiver:"+message); } }
package com.example.rabbitmqdemo; import com.example.rabbitmqdemo.rabbitmq.HelloSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqdemoApplicationTests @Autowired HelloSender helloSender; @Test public void contextLoads() { helloSender.send(); } }
Sehen Sie sich die Ergebnisse der Konsolenausgabe an
send:hello----2018-04-21T11:29:47.739 Receiver:hello----2018-04-21T11:29:47.739
Der Empfänger hat zwei Empfänger, Receiver1 und Receiver2, und den Sender registriert Ende verbunden Parameteranzahl, das empfangende Ende druckt die empfangenen Parameter. Das Folgende ist der Testcode. Senden Sie hundert Nachrichten, um den Ausführungseffekt der beiden empfangenden Enden zu beobachten An Warteschlange senden Hallo2-Nachricht, akzeptiert einen Zählparameter
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } @Bean public Queue queue2(){ return new Queue("hello2"); } }
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); this.amqpTemplate.convertAndSend("hello",context); } //给hello2发送消息,并接受一个计数参数 public void send2(int i){ String context = i+""; System.out.println(context+"--send:"); this.amqpTemplate.convertAndSend("hello2",context); } }
@Component @RabbitListener(queues = "hello2") public class HelloReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver1:"+message); } }
Test
@Component @RabbitListener(queues = "hello2") public class HelloReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver2:"+message); } }
@Test public void manyReceiver(){ for (int i=0;i<100;i++){ helloSender.send2(i); } }
Wir Sie können zwei Absender wie folgt injizieren und in die Schleife einbinden:
0--send: 1--send: 2--send: 3--send: 4--send: ...(省略) 58--send: 59--send: 60--send: 61--send: 62--send: 63--send: Receiver2:1 Receiver1:0 64--send: 65--send: Receiver1:2 Receiver2:3 66--send: Receiver1:4 Receiver2:5 ...(省略)
Führen Sie den Unit-Test durch und sehen Sie sich die Konsolenausgabe an:@Test public void many2many(){ for (int i=0;i<100;i++){ helloSender.send2(i); helloSender2.send2(i); } }Schlussfolgerung: Genau wie bei einer Eins-zu-Viele-Methode empfängt der Empfänger weiterhin gleichmäßig die To-Nachricht
Zuerst erstellen wir ein Entitätsklassenobjekt Benutzer. Beachten Sie, dass es die serialisierbare Schnittstelle implementieren muss.
0--send: 0--send: 1--send: 1--send: 2--send: 2--send: 3--send: 3--send: ...(省略) 22--send: 22--send: 23--send: 23--send: 24--send: 24--send: Receiver2:0 25--send: 25--send: Receiver2:1 26--send: Receiver2:2 26--send: Receiver2:3 27--send: Receiver1:0 27--send: Receiver2:4 Receiver1:1 28--send: Receiver2:5 Receiver1:2 28--send: Receiver2:6 Receiver1:3 29--send: Receiver2:7 Receiver1:4 29--send: Receiver2:8 Receiver1:5 30--send: Receiver2:9 Receiver1:6 30--send: 31--send: 31--send: 32--send: 32--send:
Dann erstellen Sie eine Warteschlange in der Konfigurationsdatei mit dem Namen
Führen Sie den Komponententest aus und sehen Sie sich die Ergebnisse der Konsolenausgabe an:package com.example.rabbitmqdemo.pojo; import java.io.Serializable; /** * @author itguang * @create public class User implements Serializable private String username; private String password; public User(String username, String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "User{" + "username='" + username + '\'' + ", password='" + password + '\'' + '}'; } }Das Folgende sind die beiden Elemente des Benutzers Objekt Ein Sender ObjectSender und ein Empfänger ObjectReceiver:
@Bean public Queue queue3(){ return new Queue("object_queue"); }@Component public class ObjectSender @Autowired AmqpTemplate amqpTemplate; public void sendUser(User user){ System.out.println("Send object:"+user.toString()); this.amqpTemplate.convertAndSend("object_queue",user); } }
@Component @RabbitListener(queues = "object_queue") public class ObjectReceiver @RabbitHandler public void objectReceiver(User user){ System.out.println("Receiver object:"+user.toString()); } }
topic ist die flexibelste Methode in RabbitMQ und kann basierend auf frei an verschiedene Objekte gebunden werden Routing_Key-Warteschlangeobject_queue
Konfigurieren Sie hier zunächst zwei Warteschlangen zum Testen
Send object:User{username='李增光', password='666666'} Receiver object:User{username='李增光', password='666666'}
Nachrichtensender: Beide verwenden topicExchange und sind an unterschiedliche Routing_keys gebunden
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author itguang * @create @Configuration public class TopicRabbitConfig final static String message = "topic.message"; final static String messages = "topic.messages"; //创建两个 Queue @Bean public Queue queueMessage(){ return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages(){ return new Queue(TopicRabbitConfig.messages); } //配置 TopicExchange,指定名称为 topicExchange @Bean public TopicExchange exchange(){ return new TopicExchange("topicExchange"); } //给队列绑定 exchange 和 routing_key @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){ return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){ return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class TopicSender @Autowired AmqpTemplate amqpTemplate; public void send1(){ String context = "hi, i am message 1"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange","topic.message",context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange", "topic.messages", context); } }rrree
Das Senden von send1 stimmt mit topic.# und topic.message überein. Beide Receiver können Nachrichten empfangen, während send2 nur topic.# mit allen Nachrichten übereinstimmen kann, die nur Receiver2 abhört.
Fanout ExchangeFanout ist der Broadcast-Modus oder das Abonnement Der uns bekannte Modus sendet eine Nachricht an den Fanout-Switch und alle an diesen Switch gebundenen Warteschlangen empfangen diese Nachricht.
Fanout-bezogene Konfiguration:package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.message") public class TopicReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.message :"+ message); } }Nachrichtenabsender:
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略
package com.example.rabbitmqdemo.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class FanoutSender @Autowired AmqpTemplate amqpTemplate; public void send(){ String context = "hi, fanout msg "; System.out.println("Sender : " + context); //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略: amqpTemplate.convertAndSend("fanoutExchange","", context); } }
三个消息接受者:
@Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.A: "+message); } } @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.B: "+message); } } @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.C: "+message); } }
运行单元测试,查看结果:
Sender : hi, fanout msg Receiver form fanout.C: hi, fanout msg Receiver form fanout.A: hi, fanout msg Receiver form fanout.B: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
Das obige ist der detaillierte Inhalt vonWie integriert SpringBoot RabbitMq?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!