SpringBoot
Integration RabbitMQ
はメッセージの送信を実現します。
1. maven
依存関係を追加します
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. application.yml 設定ファイルを追加します
spring: rabbitmq: host: 192.168.3.19 port: 5672 username: admin password: xxxx
3. スイッチ、キュー、バインディングを設定します
@Bean public DirectExchange myExchange() { DirectExchange directExchange = new DirectExchange("myExchange"); return directExchange; } @Bean public Queue myQueue() { Queue queue = new Queue("myQueue"); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey"); }
4. プロダクションはメッセージを送信します
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message); System.out.println("【发送消息】" + message) return "【send message】" + message; }
5. コンシューマはメッセージを受信します
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time);
6. プロダクション側を呼び出してメッセージを送信します hello
、コンソール出力:
#[メッセージ送信] helloメッセージが正常に受信されたことを示します。 メッセージ損失分析 メッセージの生成から消費まで、次の段階でメッセージ損失が発生する可能性があります:[メッセージ受信] hello 現在時刻 2022-05-12 10:21:14
ストレージ自体はdown
実稼働フェーズ
Configuration application.yml
spring: rabbitmq: # 消息确认机制 生产者 -> 交换机 publisher-confirms: true # 消息返回机制 交换机 -> 队列 publisher-returns: trueConfiguration
@Configuration @Slf4j public class RabbitConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【发送成功】"); } else { log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息发送失败】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; } }
Producer
からswitch へのメッセージ、confirmCallback# があります## モードを確認します。メッセージが正常に送信された後、メッセージはメソッド confirm(CorrelationData correlationData, boolean ack, String Cause) を呼び出し、ack
に基づいてメッセージが正常に送信されたかどうかを判断します。 switch
から queue
へのメッセージには
リターン モードがあります。 メッセージの送信 製品メッセージ コンソール出力は次のとおりです:
[メッセージの受信]製品メッセージ 現在時刻 2022 -05 -12 11:27:56[correlationData]:null
[ack]true
[送信成功]
生産終了シミュレーション メッセージが失われます
ここには 2 つの解決策があります:
// myExchange 修改成 myExchangexxxxx rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
[送信に失敗しました]
[correlationData]:CorrelationData [id=7d468b47-b422-4523-b2a2] -06b14aef073c]送信が失敗した場合は、メッセージを再試行できます
スイッチはメッセージを受信し、成功通知を返します。コンソール出力:
スイッチは正しく、キューは送信に失敗しました。存在しないというメッセージが送信されます:
[ack ]true
[cause]null[送信成功]
[返信コード]312スイッチはキューを見つけられず、障害情報を返しました:
[メッセージ]製品メッセージ
[メッセージの送信に失敗しました]
RabbitMQメッセージがキュー。まず、キューとスイッチを正しく設定し、
キューの永続性を有効にし、キューとスイッチを作成します
デフォルト設定は永続性です
キューの永続性を非永続性に変更します: @Bean
public Queue myQueue() {
Queue queue = new Queue("myQueue",false);
return queue;
}
メッセージの送信後、メッセージはキューに保存され、メッセージの RabbitMQ が再起動されます。もはや存在しない。
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }再起動後も、キュー内のメッセージはまだ存在します。
コンシューマ エンド
コンシューマ エンドはデフォルトで開始されます
自動確認モードキュー メッセージがコンシューマによって受信されると、メッセージは関係なくキューから自動的に削除されます消費者側からのメッセージがあるかどうかのニュース。したがって、コンシューマーがメッセージを正常に消費できることを確認するには、自動モードを手動確認モードに変更します。
spring: rabbitmq: # 手动消息确认 listener: simple: acknowledge-mode: manual
メッセージを消費して受信した後、手動による確認が必要です: <pre class="brush:java;">channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);</pre><pre class="brush:java;"> @RabbitListener(queuesToDeclare = @Queue("myQueue"))
public void process(String msg, Channel channel, Message message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String time = sdf.format(date);
System.out.println("【接收信息】" + msg + " 当前时间" + time);
System.out.println(message.getMessageProperties().getDeliveryTag());
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}</pre>
追加しない場合:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
2 つのメッセージを送信します
メッセージを受信した後、確認は行われず、キューに戻されます:
プロジェクトを再起動します。その後、キュー内のメッセージはコンシューマに送信されますが、ACK 確認がなければ、キューに戻され続けます。 。channel.basicAck を追加した後、プロジェクトを再起動します
basicAck メソッド
multiple の最後のパラメータは、前のキューを削除することを意味します。
multiple は
true に設定され、後続のキューはすべてクリアされます
以上がSpringBoot+RabbitMQ を使用して信頼性の高いメッセージ送信を実現する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。