ホームページ >Java >&#&チュートリアル >SpringBoot が RabbitMQ を統合する方法
RabbitMQ は、AMQP (Advanced Message Queuing Protocol) を実装したメッセージ ミドルウェアの一種で、元々は金融システムに由来し、分散システムでメッセージを保存および転送するために使用されます。使いやすさ、拡張性、高可用性の点で優れたパフォーマンスを発揮します。 RabbitMQ は主に、システム間の双方向のデカップリングを実現するために実装されています。プロデューサーが大量のデータを生成し、コンシューマーがそれをすぐに消費できない場合、中間層が必要になります。このデータを保存します。
AMQP (Advanced Message Queuing Protocol) は、アプリケーション層プロトコルのオープン標準であり、メッセージ指向ミドルウェア向けに設計されています。メッセージ ミドルウェアの主な目的は、メッセージの送信者と受信者が互いに干渉せず、互いに独立できるようにコンポーネントを分離することです。したがって、送信者はユーザーの存在を知る必要はなく、またその逆も同様です。 AMQP の顕著な機能には、メッセージ指向、キューイング、ルーティング (ポイントツーポイントおよびパブリッシュ/サブスクライブを含む)、信頼性、セキュリティが含まれます。
RabbitMQ はオープン ソースの AMQP 実装です。サーバーは Erlang 言語で書かれており、Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、などのさまざまなクライアントをサポートしています。 STOMP などは AJAX をサポートしています。このテクノロジーは、分散システムでのメッセージの保存と転送において、優れた使いやすさ、拡張性、高可用性を実証します。
通常、キュー サービスについて話すときは、メッセージ送信者、キュー、メッセージ受信者という 3 つの概念があります。RabbitMQ はここにあります。基本概念の最上位に抽象化層が追加され、メッセージ送信者とキューの間に交換が追加されます。この方法では、メッセージ送信者とキューは直接接続されず、代わりにメッセージ送信者がメッセージを配信します。 Exchange では、エクスチェンジャーは、スケジューリング ポリシーに従ってメッセージをキューに送信します。
左側の P は、RabbitMQ にメッセージを送信するプログラムであるプロデューサーを表します。
真ん中は RabbitMQ で、スイッチとキューが含まれています。
右側の C はコンシューマ、つまり RabbitMQ からメッセージを取得するプログラムを表します。
4 つの重要な概念があります。つまり、仮想ホスト、スイッチ、キュー、バインディングです。
仮想ホスト: 仮想ホストは、一連のスイッチ、キュー、バインディングを保持します。なぜ複数の仮想ホストが必要なのでしょうか?それは非常に簡単で、RabbitMQ では、ユーザーは仮想ホストの粒度でのみアクセス許可を制御できます。したがって、グループ A がグループ B のスイッチ/キュー/バインディングにアクセスすることを禁止する必要がある場合は、A と B にそれぞれ仮想ホストを作成する必要があります。すべての RabbitMQ サーバーにはデフォルトの仮想ホスト「/」があります。
Switch: Exchange はメッセージの転送に使用されますが、メッセージは保存されません。Exchange にバインドされたキューがない場合、送信されたメッセージは直接破棄されます。プロデューサーによる情報。
バインディング: つまり、スイッチをキューにバインドする必要があり、上の図に示すように、多対多の関係になります。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.設定ファイル
rabbitmq のインストール アドレス、ポート、アカウント情報を設定します。
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=192.168.0.86 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=1234563. キュー構成
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
rabbitTemplate是springboot 提供的默认实现 public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } }
注:送信側と受信側のキュー名は一貫している必要があります。そうでない場合は受信できません。
多対多使用
1 つの送信者、N 人の受信者、または N 人の送信者と N 人の受信者では何が起こりますか?1 対多の送信
上記のコードに小さな変更が加えられ、受信側で 2 つの Receiver Receiver1 と Receiver2 が登録され、送信側で Receiver1 と Receiver2 が追加されました。パラメータのカウント。受信側は受信したパラメータを出力します。次のテスト コードは、2 つの受信側の実行効果を観察するために 100 個のメッセージを送信します。@Test public void oneToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); } }
結果は次のとおりです。
レシーバー 1: ブート neo キューを起動 ****** 11レシーバー 2: ブート neo キューを起動 ****** 12
レシーバー 2: 起動boot neo キュー ****** 14
レシーバー 1: 起動 neo キュー ****** 13
レシーバー 2: 起動 neo キュー ****** 15
レシーバー 1: spirng boot neo キュー ****** 16
レシーバー 1: spirng boot neo キュー ****** 18
Receiver 2: spirng boot neo キュー ****** 17
Receiver 2 : spirng boot neo キュー ***** * 19
受信機 1: spirng boot neo キュー ****** 20
根据返回结果得到以下结论
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多发送
复制了一份发送者,加入标记,在一百个循环中相互交替发送
@Test public void manyToMany() throws Exception { for (int i=0;i<100;i++){ neoSender.send(i); neoSender2.send(i); } }
结果如下:
Receiver 1: spirng boot neo queue ****** 20
Receiver 2: spirng boot neo queue ****** 20
Receiver 1: spirng boot neo queue ****** 21
Receiver 2: spirng boot neo queue ****** 21
Receiver 1: spirng boot neo queue ****** 22
Receiver 2: spirng boot neo queue ****** 22
Receiver 1: spirng boot neo queue ****** 23
Receiver 2: spirng boot neo queue ****** 23
Receiver 1: spirng boot neo queue ****** 24
Receiver 2: spirng boot neo queue ****** 24
Receiver 1: spirng boot neo queue ****** 25
Receiver 2: spirng boot neo queue ****** 25
结论:和一对多一样,接收端仍然会均匀接收到消息.
//对象的支持 //springboot以及完美的支持对象的发送和接收,不需要格外的配置。 //发送者 public void send(User user) { System.out.println("Sender object: " + user.toString()); this.rabbitTemplate.convertAndSend("object", user); } ... //接受者 @RabbitHandler public void process(User user) { System.out.println("Receiver object : " + user); }
结果如下:
Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}
在RabbitMQ中,Topic是最灵活的一种方式,它允许根据routing_key随意绑定到不同的队列
首先对topic规则配置,这里使用两个队列来测试
@Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
使用queueMessages同时匹配两个队列,queueMessage只匹配"topic.message"队列
public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context); }
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置:
@Configuration public class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); }
结果如下:
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
以上がSpringBoot が RabbitMQ を統合する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。