>Java >java지도 시간 >SpringBoot가 RabbitMQ를 통합하는 방법

SpringBoot가 RabbitMQ를 통합하는 방법

王林
王林앞으로
2023-06-01 09:53:011379검색

1. RabbitMQ 소개

RabbitMQ는 AMQP(Advanced Message Queuing Protocol)를 구현한 일종의 메시지 미들웨어로 원래 금융 시스템에서 유래되었으며 분산 시스템에서 메시지를 저장하고 전달하는 데 사용됩니다. 사용성, 확장성, 고가용성 등의 측면에서 좋은 성능을 발휘합니다. RabbitMQ는 주로 시스템 간 양방향 분리를 달성하기 위해 구현됩니다. 생산자가 대량의 데이터를 생성하고 소비자가 이를 빠르게 소비할 수 없는 경우 중간 계층이 필요합니다. 이 데이터를 저장하세요.

AMQP(Advanced Message Queuing Protocol)는 애플리케이션 계층 프로토콜에 대한 개방형 표준이며 메시지 지향 미들웨어용으로 설계되었습니다. 메시지 미들웨어의 주요 목적은 메시지 발신자와 수신자가 서로 간섭하지 않고 서로 독립적이도록 구성 요소를 분리하는 것입니다. 따라서 송신자는 사용자의 존재를 알 필요가 없으며, 그 반대의 경우도 마찬가지이다. AMQP의 주요 기능에는 메시지 방향, 대기열, 라우팅(지점 간 및 게시/구독 포함), 안정성 및 보안이 포함됩니다.

RabbitMQ는 오픈 소스 AMQP 구현입니다. 서버는 Erlang 언어로 작성되었으며 Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP 등과 같은 다양한 클라이언트를 지원합니다. , AJAX를 지원합니다. 이 기술은 분산 시스템에서 메시지를 저장하고 전달할 때 사용 편의성, 확장성 및 고가용성을 입증합니다.

SpringBoot가 RabbitMQ를 통합하는 방법

2. 관련 개념

보통 큐 서비스에 관해 이야기할 때 RabbitMQ는 이 기본 개념 위에 추가 추상화 계층을 추가했습니다. 이런 방식으로 메시지 발신자와 큐 사이에 직접적인 연결이 존재하지 않으며, 대신 메시지 발신자는 메시지를 exchange로 보내고, exchange는 이에 따라 메시지를 큐로 보냅니다.

  • 왼쪽의 P는 RabbitMQ에 메시지를 보내는 프로그램인 생산자를 나타냅니다.

  • 중앙에는 스위치와 큐가 포함된 RabbitMQ가 있습니다. 오른쪽의

  • C는 Consumer를 나타내며 RabbitMQ로부터 메시지를 받는 프로그램입니다.

4가지 중요한 개념이 있습니다. 즉, 가상 호스트, 스위치, 대기열 및 바인딩입니다.

  • 가상 호스트: 가상 호스트는 일련의 스위치, 대기열 및 바인딩을 보유합니다. 여러 가상 호스트가 필요한 이유는 무엇입니까? RabbitMQ에서는 사용자가 가상 ​​호스트의 세분성에서만 권한을 제어할 수 있습니다. 따라서 그룹 A가 그룹 B의 스위치/대기열/바인딩에 액세스하는 것을 금지해야 하는 경우 A와 B에 대해 각각 가상 호스트를 생성해야 합니다. 모든 RabbitMQ 서버에는 기본 가상 호스트 "/"가 있습니다.

  • Switch: Exchange는 메시지를 전달하는 데 사용되지만 Exchange에 대한 대기열 바인드가 없으면 생산자가 보낸 메시지를 직접 삭제합니다.

여기에는 라우팅 키라는 더 중요한 개념이 있습니다. 스위치는 라우팅 키에 따라 메시지를 해당 큐로 전달합니다.

  • 바인딩: 즉, 스위치는 큐에 바인딩되어야 합니다. 위 그림에서 볼 수 있듯이 다대다 관계입니다.

RabitMQ를 SpringBoot와 통합하는 것은 매우 간단합니다. 아주 작은 구성만으로 사용한다면 springboot는 spring-boot-starter-amqp 프로젝트에서 메시지에 대한 다양한 지원을 제공합니다.

SpringBoot가 RabbitMQ를 통합하는 방법

SpringBoot가 RabbitMQ를 통합하는 방법

3. 간단한 사용

1. pom 패키지 구성

주로 spring-boot-starter-amqp 지원 추가

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 구성 파일

설치 주소 및 포트 구성 Rabbitmq 및 계정 정보입니다.

spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

3. 대기열 구성

@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}

4. Sender

rabbitTemplate是springboot 提供的默认实现
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}

6. Test

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}

참고:
발신자와 수신자의 대기열 이름이 일치해야 합니다. , 그렇지 않으면

수신할 수 없습니다. 다대다

하나의 송신자, N개의 수신자 또는 N개의 송신자와 N개의 수신자를 사용하면 어떻게 될까요?

일대다 전송

수신측에서는 Receiver1과 Receiver2 두 개를 등록하고, 수신측에서는 수신된 매개변수를 인쇄했습니다. 다음은 테스트 코드입니다. 100개의 메시지를 보내 두 수신 측의 실행 효과를 관찰합니다.

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}

결과는 다음과 같습니다.

Receiver 1: spirng boot neo queue ****** 11

수신기 2: spirng boot neo queue ** **** 12
수신기 2: spirng boot neo queue ****** 14

수신기 1: spirng boot neo queue ****** 13
수신기 2: spirng boot neo queue ****** 15
수신기 1: spirng boot neo queue ****** 16
수신기 1: spirng boot neo queue ****** 18
수신기 2: spirng boot neo queue ** **** 17
수신기 2: spirng boot neo queue ****** 19
수신기 1: spirng boot neo queue ****** 20

根据返回结果得到以下结论

一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中

多对多发送

复制了一份发送者,加入标记,在一百个循环中相互交替发送

@Test
public void manyToMany() throws Exception {
for (int i=0;i<100;i++){
neoSender.send(i);
neoSender2.send(i);
}
}

结果如下:

Receiver 1: spirng boot neo queue ****** 20
Receiver 2: spirng boot neo queue ****** 20
Receiver 1: spirng boot neo queue ****** 21
Receiver 2: spirng boot neo queue ****** 21
Receiver 1: spirng boot neo queue ****** 22
Receiver 2: spirng boot neo queue ****** 22
Receiver 1: spirng boot neo queue ****** 23
Receiver 2: spirng boot neo queue ****** 23
Receiver 1: spirng boot neo queue ****** 24
Receiver 2: spirng boot neo queue ****** 24
Receiver 1: spirng boot neo queue ****** 25
Receiver 2: spirng boot neo queue ****** 25

结论:和一对多一样,接收端仍然会均匀接收到消息.

四、高级使用

//对象的支持
//springboot以及完美的支持对象的发送和接收,不需要格外的配置。
//发送者
public void send(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
}
...
//接受者
@RabbitHandler
public void process(User user) {
System.out.println("Receiver object : " + user);
}

结果如下:

Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}

1.Topic Exchange

在RabbitMQ中,Topic是最灵活的一种方式,它允许根据routing_key随意绑定到不同的队列

首先对topic规则配置,这里使用两个队列来测试

SpringBoot가 RabbitMQ를 통합하는 방법

@Configuration
public class TopicRabbitConfig {

final static String message = "topic.message";
final static String messages = "topic.messages";

@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}

@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}

@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}

使用queueMessages同时匹配两个队列,queueMessage只匹配"topic.message"队列

public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}

public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}

发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息

2.Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

SpringBoot가 RabbitMQ를 통합하는 방법

Fanout 相关配置:

@Configuration
public class FanoutRabbitConfig {

@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}

@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}

@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:

public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}

结果如下:

Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg

结果说明,绑定到fanout交换机上面的队列都收到了消息.

위 내용은 SpringBoot가 RabbitMQ를 통합하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제