Maison >Java >javaDidacticiel >Comment SpringBoot intègre RabbitMQ
RabbitMQ est un type d'intergiciel de messagerie qui implémente l'AMQP (Advanced Message Queuing Protocol). Il provient à l'origine du système financier et est utilisé pour stocker et transférer des messages. systèmes distribués, qui fonctionnent bien en termes de facilité d'utilisation, d'évolutivité et de haute disponibilité. RabbitMQ est principalement implémenté pour réaliser un découplage bidirectionnel entre les systèmes. Lorsque le producteur génère une grande quantité de données et que le consommateur ne peut pas les consommer rapidement, une couche intermédiaire est nécessaire. Enregistrez ces données.
AMQP, Advanced Message Queuing Protocol, est un standard ouvert pour les protocoles de couche application et est conçu pour les middlewares orientés messages. L'objectif principal du middleware de messages est de découpler les composants afin que les expéditeurs et les destinataires des messages n'interfèrent pas les uns avec les autres et soient indépendants les uns des autres. L’expéditeur n’a donc pas besoin de connaître l’existence de l’utilisateur, et vice versa. Les principales fonctionnalités d'AMQP incluent l'orientation des messages, la mise en file d'attente, le routage (y compris point à point et publication/abonnement), la fiabilité et la sécurité.
RabbitMQ est une implémentation open source d'AMQP. Le côté serveur est écrit en langage Erlang et prend en charge une variété de clients, tels que : Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript. , XMPP, STOMP, etc., prennent en charge AJAX. Cette technologie démontre une bonne facilité d'utilisation, une évolutivité et une haute disponibilité dans le stockage et le transfert de messages dans des systèmes distribués.
Habituellement, lorsque nous parlons de services de file d'attente, il existe trois concepts : l'expéditeur du message, la file d'attente, Pour le destinataire du message, RabbitMQ a ajouté une couche d'abstraction supplémentaire en plus de ce concept de base, en ajoutant un échange (Exchange) entre l'expéditeur du message et la file d'attente. De cette manière, l'expéditeur du message et la file d'attente ne sont pas directement connectés, et. à la place, devenir L'expéditeur envoie le message à l'échange, et l'échange envoie le message à la file d'attente selon la politique de planification
Le P à gauche représente le producteur, ce qui signifie envoyer des messages au programme RabbitMQ.
Le milieu est RabbitMQ, qui comprend des commutateurs et des files d'attente.
C à droite représente le consommateur, qui est le programme qui reçoit les messages de RabbitMQ.
Les concepts les plus importants sont 4 , à savoir : hôte virtuel, commutateur, file d'attente et liaison.
Hôte virtuel : Un hôte virtuel contient un ensemble de commutateurs, de files d'attente et de liaisons. Pourquoi avez-vous besoin de plusieurs hôtes virtuels ? C'est très simple. Dans RabbitMQ, les utilisateurs ne peuvent contrôler les autorisations qu'avec la granularité de l'hôte virtuel. Par conséquent, si vous devez interdire au groupe A d'accéder aux commutateurs/files d'attente/liaisons du groupe B, vous devez créer un hôte virtuel pour A et B respectivement. Chaque serveur RabbitMQ possède un hôte virtuel par défaut "/".
Switch : Exchange est utilisé pour transférer les messages, mais il ne les stockera pas s'il n'y a pas de liaison de file d'attente à Exchange, il le fera. supprimera directement le message envoyé par le producteur.
Il y a ici un concept plus important : la clé de routage. Selon la clé de routage, le commutateur transmet le message à la file d'attente correspondante.
Liaison : C'est-à-dire que le commutateur doit être lié à la file d'attente. Comme le montre la figure ci-dessus, il s'agit d'un plusieurs. -à-plusieurs relations.
SpringBoot intégrer RabbitMQ est très simple Si vous l'utilisez simplement et configurez très peu, springboot fournit divers supports pour les messages dans le projet spring-boot-starter-amqp. .
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2 Fichier de configuration
Configurez l'adresse d'installation de RabbitMQ, Informations sur le port et le compte.
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=192.168.0.86 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=1234563. Configuration de la file d'attente
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
rabbitTemplate是springboot 提供的默认实现 public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } }
Remarque :Le nom de la file d'attente de l'expéditeur et du destinataire doit être cohérent, sinon il ne peut pas être reçu#🎜 🎜 #
Utilisation plusieurs à plusieurs
Que se passera-t-il s'il y a un expéditeur, N récepteurs ou N expéditeurs et N récepteurs la situation ?
Envoi un à plusieurs
L'extrémité de réception a enregistré deux récepteurs, Receiver1 et Receiver2. , et envoyé La fin ajoute le nombre de paramètres et l'extrémité réceptrice imprime les paramètres reçus. Voici le code de test, qui envoie une centaine de messages pour observer l'effet d'exécution des deux extrémités réceptrices. Les résultats sont les suivants : # 🎜🎜#
Récepteur 1 : spirng boot neo queue ****** 11Récepteur 2 : spirng boot neo file d'attente ****** 12#🎜🎜 #Récepteur 2 : file d'attente néo de démarrage spirng ****** 14Récepteur 1 : file d'attente néo de démarrage spirng ****** 13Récepteur 2 : spirng boot neo queue ****** 15
Récepteur 1 : spirng boot neo queue ****** 16Récepteur 1 : spirng boot neo queue ****** 18#🎜 🎜#Récepteur 2 : file d'attente néo de démarrage spirng ** **** 17Récepteur 2 : file d'attente néo de démarrage spirng ****** 19
Récepteur 1 : file d'attente néo de démarrage spirng ***** *20
根据返回结果得到以下结论
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多发送
复制了一份发送者,加入标记,在一百个循环中相互交替发送
@Test public void manyToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); neoSender2.send(i); } }
结果如下:
Receiver 1: spirng boot neo queue ****** 20
Receiver 2: spirng boot neo queue ****** 20
Receiver 1: spirng boot neo queue ****** 21
Receiver 2: spirng boot neo queue ****** 21
Receiver 1: spirng boot neo queue ****** 22
Receiver 2: spirng boot neo queue ****** 22
Receiver 1: spirng boot neo queue ****** 23
Receiver 2: spirng boot neo queue ****** 23
Receiver 1: spirng boot neo queue ****** 24
Receiver 2: spirng boot neo queue ****** 24
Receiver 1: spirng boot neo queue ****** 25
Receiver 2: spirng boot neo queue ****** 25
结论:和一对多一样,接收端仍然会均匀接收到消息.
//对象的支持 //springboot以及完美的支持对象的发送和接收,不需要格外的配置。 //发送者 public void send(User user) { System.out.println("Sender object: " + user.toString()); this.rabbitTemplate.convertAndSend("object", user); } ... //接受者 @RabbitHandler public void process(User user) { System.out.println("Receiver object : " + user); }
结果如下:
Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}
在RabbitMQ中,Topic是最灵活的一种方式,它允许根据routing_key随意绑定到不同的队列
首先对topic规则配置,这里使用两个队列来测试
@Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
使用queueMessages同时匹配两个队列,queueMessage只匹配"topic.message"队列
public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context); }
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置:
@Configuration public class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); }
结果如下:
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!