ホームページ >Java >&#&チュートリアル >SpringBoot は RabbitMq をどのように統合しますか?
Advanced Message Queuing Protocol (AMQP) は、メッセージ ミドルウェア用のプラットフォーム中立の有線プロトコルです。 Spring AMQP プロジェクトは、Spring のコア概念を AMQP ベースのメッセージング ソリューションの開発に適用します。 Spring Boot は、spring-boot-starter-amqp "Starter" など、RabbitMQ 経由で AMQP を操作するための便利な機能をいくつか提供します。
RabbitMQ と springboot を統合するのは非常に簡単です。ごくわずかな設定で使用するだけであれば、springboot は spring-boot-starter-amqp プロジェクト内のメッセージに対するさまざまなサポートを提供します。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
RabbitMQ は、AMQP プロトコルに基づいた、軽量で信頼性が高く、スケーラブルでポータブルなメッセージ ブローカーです。 Spring は RabbitMQ を使用して AMQP プロトコル経由で通信します。
RabbitMQ 構成は、外部構成プロパティ spring.rabbitmq.* によって制御されます。たとえば、次の部分的な application.properties を次のように宣言できます:
spring.rabbitmq.host = localhost spring.rabbitmq.port = 5672 spring.rabbitmq.username = guest spring.rabbitmq.password
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } }
rabbitTemplate Itこれは、springboot によって提供されるデフォルトの実装です。
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); //往名称为 hello 的queue中发送消息 this.amqpTemplate.convertAndSend("hello",context); } }
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息接受者 * * @author itguang * @create @Component @RabbitListener(queues = "hello") //监听 名称为 hello 的queue public class HelloReceiver //消息处理器 @RabbitHandler public void process(String message){ System.out.println("Receiver:"+message); } }
package com.example.rabbitmqdemo; import com.example.rabbitmqdemo.rabbitmq.HelloSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqdemoApplicationTests @Autowired HelloSender helloSender; @Test public void contextLoads() { helloSender.send(); } }
コンソール出力の表示
send:hello----2018-04-21T11:29:47.739 Receiver:hello----2018-04-21T11:29:47.739
# は、上記のコードに少し変更を加えました。受信側は 2 つの受信機、Receiver1 と Receiver2 を登録しました。送信側はパラメータ数を追加し、受信側は受信したパラメータを出力しました。以下はテスト コードでは、100 個のメッセージを送信して、2 つの受信側の実行効果を観察します。
hello2
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ 配置类 * * @author itguang * @create @Configuration public class RabbitConfig @Bean public Queue queue(){ return new Queue("hello"); } @Bean public Queue queue2(){ return new Queue("hello2"); } }
package com.example.rabbitmqdemo.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; /** * 消息发送者 * * @author itguang * @create @Component public class HelloSender @Autowired private AmqpTemplate amqpTemplate; public void send(){ String context = "hello----"+LocalDateTime.now(); System.out.println("send:"+context); this.amqpTemplate.convertAndSend("hello",context); } //给hello2发送消息,并接受一个计数参数 public void send2(int i){ String context = i+""; System.out.println(context+"--send:"); this.amqpTemplate.convertAndSend("hello2",context); } }
@Component @RabbitListener(queues = "hello2") public class HelloReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver1:"+message); } }
@Component @RabbitListener(queues = "hello2") public class HelloReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver2:"+message); } }テスト
@Test public void manyReceiver(){ for (int i=0;i<100;i++){ helloSender.send2(i); } }
0--send: 1--send: 2--send: 3--send: 4--send: ...(省略) 58--send: 59--send: 60--send: 61--send: 62--send: 63--send: Receiver2:1 Receiver1:0 64--send: 65--send: Receiver1:2 Receiver2:3 66--send: Receiver1:4 Receiver2:5 ...(省略)メッセージが 63 に送信されると、受信側 Receiver がメッセージを受信したことがわかります。
結論:
多対多: 複数の送信者複数の受信者の場合 次のように 2 つのセンダーを挿入してループに入れることができます:1 人の送信者、N 人の受信者、テスト後、メッセージは N 人の受信者に均等に送信されます
@Test public void many2many(){ for (int i=0;i<100;i++){ helloSender.send2(i); helloSender2.send2(i); } }単体テストを実行してコンソール出力を表示します:
0--send: 0--send: 1--send: 1--send: 2--send: 2--send: 3--send: 3--send: ...(省略) 22--send: 22--send: 23--send: 23--send: 24--send: 24--send: Receiver2:0 25--send: 25--send: Receiver2:1 26--send: Receiver2:2 26--send: Receiver2:3 27--send: Receiver1:0 27--send: Receiver2:4 Receiver1:1 28--send: Receiver2:5 Receiver1:2 28--send: Receiver2:6 Receiver1:3 29--send: Receiver2:7 Receiver1:4 29--send: Receiver2:8 Receiver1:5 30--send: Receiver2:9 Receiver1:6 30--send: 31--send: 31--send: 32--send: 32--send:
結論: 次のようになります。 1 対多の場合でも、受信側はメッセージを均等に受信します。 #送信オブジェクト
package com.example.rabbitmqdemo.pojo; import java.io.Serializable; /** * @author itguang * @create public class User implements Serializable private String username; private String password; public User(String username, String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "User{" + "username='" + username + '\'' + ", password='" + password + '\'' + '}'; } }
次に、構成ファイル内に
object_queue<pre class="brush:php;toolbar:false">@Bean
public Queue queue3(){
return new Queue("object_queue");
}</pre>
という名前の別のキューを作成します。次に、User オブジェクトの 2 つの送信側 ObjectSender と受信側 ObjectReceiver を示します。
@Component public class ObjectSender @Autowired AmqpTemplate amqpTemplate; public void sendUser(User user){ System.out.println("Send object:"+user.toString()); this.amqpTemplate.convertAndSend("object_queue",user); } }
@Component @RabbitListener(queues = "object_queue") public class ObjectReceiver @RabbitHandler public void objectReceiver(User user){ System.out.println("Receiver object:"+user.toString()); } }
単体テストを実行し、コンソール出力を表示します。
Send object:User{username='李增光', password='666666'} Receiver object:User{username='李增光', password='666666'}
Topic Exchange
topic は RabbitMQ で最も柔軟な方法であり、routing_key のバインドに応じて自由に変更できます。キュー最初にトピック ルールを設定します。ここでは 2 つのキューを使用してテストします。
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author itguang * @create @Configuration public class TopicRabbitConfig final static String message = "topic.message"; final static String messages = "topic.messages"; //创建两个 Queue @Bean public Queue queueMessage(){ return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages(){ return new Queue(TopicRabbitConfig.messages); } //配置 TopicExchange,指定名称为 topicExchange @Bean public TopicExchange exchange(){ return new TopicExchange("topicExchange"); } //给队列绑定 exchange 和 routing_key @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){ return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){ return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
メッセージ送信者: どちらも topicExchange を使用し、異なる routing_key にバインドします。
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class TopicSender @Autowired AmqpTemplate amqpTemplate; public void send1(){ String context = "hi, i am message 1"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange","topic.message",context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
2 つのメッセージ受信者、それぞれ異なるキューを指定します
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.message") public class TopicReceiver1 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.message :"+ message); } }
package com.example.rabbitmqdemo.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component @RabbitListener(queues = "topic.messages") public class TopicReceiver2 @RabbitHandler public void process(String message){ System.out.println("Receiver topic.messages: "+ message); } }
Test:
send1 の送信は topic.# と topic.message の両方に一致します 受信者はメッセージを受信でき、送信 send2 のみ topic.# はすべてのメッセージに一致しますReceiver2 のみがリッスンするFanout Exchange
Fanout は、よく知られたブロードキャスト モードまたはサブスクリプション モードであり、メッセージを Fanout スイッチに送信し、すべてのキューがバインドされます。このスイッチにこのメッセージを受信します。ファンアウト関連の設定:
package com.example.rabbitmqdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.security.PublicKey; /** * @author itguang * @create @Configuration public class FanOutRabbitMq //创建三个队列 @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } //创建exchange,指定交换策略 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略: @Bean public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){ return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){ return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return
メッセージ送信者:
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略
package com.example.rabbitmqdemo.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author itguang * @create @Component public class FanoutSender @Autowired AmqpTemplate amqpTemplate; public void send(){ String context = "hi, fanout msg "; System.out.println("Sender : " + context); //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略: amqpTemplate.convertAndSend("fanoutExchange","", context); } }
三个消息接受者:
@Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.A: "+message); } } @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.B: "+message); } } @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC @RabbitHandler public void process(String message){ System.out.println("Receiver form fanout.C: "+message); } }
运行单元测试,查看结果:
Sender : hi, fanout msg Receiver form fanout.C: hi, fanout msg Receiver form fanout.A: hi, fanout msg Receiver form fanout.B: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.
以上がSpringBoot は RabbitMq をどのように統合しますか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。