>  기사  >  Java  >  SpringBoot는 RabbitMq를 어떻게 통합하나요?

SpringBoot는 RabbitMq를 어떻게 통합하나요?

WBOY
WBOY앞으로
2023-05-25 10:00:38771검색

SpringBoot는 실제로 RabbitMq를 통합합니다

spring-boot-starter-amqp

AMQP(Advanced Message Queuing Protocol)는 메시지 미들웨어를 위한 플랫폼 중립적 유선 프로토콜입니다. Spring AMQP 프로젝트는 AMQP 기반 메시징 솔루션 개발에 핵심 Spring 개념을 적용합니다. Spring Boot는 spring-boot-starter-amqp "Starter"를 포함하여 RabbitMQ를 통해 AMQP 작업에 대한 몇 가지 편의를 제공합니다.

RabbitMQ를 springboot와 통합하는 것은 매우 간단합니다. 아주 약간의 구성만으로 사용한다면 springboot는 spring-boot-starter-amqp 프로젝트에서 메시지에 대한 다양한 지원을 제공합니다.

종속성 추가

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

RabbitMQ는 AMQP 프로토콜을 기반으로 하는 가볍고 안정적이며 확장 가능하고 휴대 가능한 메시지 브로커입니다. Spring은 AMQP 프로토콜을 통해 통신하기 위해 RabbitMQ를 사용합니다.

속성 구성

RabbitMQ 구성은 외부 구성 속성 spring.rabbitmq.*에 의해 제어됩니다. 예를 들어 다음 섹션에서 다음 application.properties를 선언할 수 있습니다.

spring.rabbitmq.host = localhost
 spring.rabbitmq.port = 5672
 spring.rabbitmq.username = guest
 spring.rabbitmq.password

빠르게 시작하기

1. 대기열 구성

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}

2 Sender

rabbitTemplate은 springboot에서 제공하는 기본 구현입니다.

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        //往名称为 hello 的queue中发送消息
        this.amqpTemplate.convertAndSend("hello",context);
    }

}
테스트

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 *
 * @author itguang
 * @create

@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver

    //消息处理器
    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver:"+message);

    }


}

콘솔 출력 결과 보기

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests

    @Autowired
    HelloSender helloSender;

    @Test
    public void contextLoads() {
        helloSender.send();

    }
}

일대다 전송: 하나의 발신자 및 여러 수신자

수신측에서는 Receiver1과 Receiver2 두 개를 등록했으며, 위 코드를 약간 수정했습니다. 송신측 조인 매개변수 수, 수신측은 수신된 매개변수를 인쇄합니다. 다음은 테스트 코드입니다. 두 수신측의 실행 효과를 관찰하기 위해 100개의 메시지를 보냅니다

hello2라는 대기열을 추가
  • send:hello----2018-04-21T11:29:47.739
    Receiver:hello----2018-04-21T11:29:47.739
큐 hello2 메시지로 보내기, 카운트 매개변수 허용
  • package com.example.rabbitmqdemo.config;
    
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ 配置类
     *
     * @author itguang
     * @create
    @Configuration
    public class RabbitConfig
    
    
    
        @Bean
        public Queue queue(){
            return new Queue("hello");
        }
    
        @Bean
        public Queue queue2(){
            return new Queue("hello2");
        }
    
    
    
    }
두 개의 hello2 수신자
  • package com.example.rabbitmqdemo.rabbitmq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDate;
    import java.time.LocalDateTime;
    import java.time.LocalTime;
    import java.util.Date;
    
    /**
     * 消息发送者
     *
     * @author itguang
     * @create
    
    @Component
    public class HelloSender
    
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        public void send(){
            String context = "hello----"+LocalDateTime.now();
            System.out.println("send:"+context);
            this.amqpTemplate.convertAndSend("hello",context);
        }
    
        //给hello2发送消息,并接受一个计数参数
        public void send2(int i){
            String context = i+"";
            System.out.println(context+"--send:");
            this.amqpTemplate.convertAndSend("hello2",context);
        }
    }
    @Component
    @RabbitListener(queues = "hello2")
    public class HelloReceiver1
    
    
        @RabbitHandler
        public void process(String message){
    
            System.out.println("Receiver1:"+message);
        }
    
    
    }
    Test
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2

    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver2:"+message);
    }
}
콘솔 출력 보기:

@Test
    public void manyReceiver(){
        for (int i=0;i<100;i++){
            helloSender.send2(i);
        }

    }

볼 수 있는 내용: 메시지가 63으로 전송되면, 수신자가 메시지를 받았습니다.

결론:


한 명의 발신자, N명의 수신자, 테스트 후 메시지는 N개의 수신자에게 균등하게 전송됩니다.

다대다: 여러 발신자가 여러 수신자에게

다음과 같이 두 개의 발신자를 주입하고 이를 루프에 넣을 수 있습니다.

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)

단위 테스트를 실행하고 콘솔 출력을 확인합니다.

@Test
    public void many2many(){
      for (int i=0;i<100;i++){
          helloSender.send2(i);
          helloSender2.send2(i);

      }
    }

결론: 일대다와 마찬가지로 수신자는 여전히 To 메시지를 균등하게 수신합니다.

객체 보내기

먼저 엔터티 클래스 객체 User를 생성합니다. 이 객체는 직렬화 가능 인터페이스를 구현해야 합니다.

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:

그런 다음 구성 파일에

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
 * @author itguang
 * @create
public class User implements Serializable


    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

라는 대기열을 생성합니다. 다음은 사용자 객체 송신자 ObjectSender 및 수신자 ObjectReceiver: object_queue

@Bean
    public Queue queue3(){
        return new Queue("object_queue");
    }
@Component
public class ObjectSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendUser(User user){

        System.out.println("Send object:"+user.toString());
        this.amqpTemplate.convertAndSend("object_queue",user);

    }
}
단위 테스트를 실행하고 콘솔 출력 결과 보기:

@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver

    @RabbitHandler
    public void objectReceiver(User user){

        System.out.println("Receiver object:"+user.toString());

    }
}
Topic Exchange

topic은 RabbitMQ에서 가장 유연한 방법이며 다양한 객체 기반으로 자유롭게 바인딩될 수 있습니다. Routing_key Queue

먼저 주제 규칙을 구성하세요. 여기에서는

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}
메시지 발신자를 테스트하는 데 사용됩니다. 둘 다 topicExchange를 사용하고 서로 다른 Routing_key

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author itguang
 * @create
@Configuration
public class TopicRabbitConfig


    final static String message = "topic.message";
    final static String messages = "topic.messages";


    //创建两个 Queue
    @Bean
    public Queue queueMessage(){
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(){
        return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名称为 topicExchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //给队列绑定 exchange 和 routing_key

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }


}
두 개의 메시지 수신자가 각각 서로 다른 대기열을 지정합니다.

보내는 send1은 topic.# 및 topic.message와 일치합니다. 두 수신자 모두 메시지를 받을 수 있으며, send2만 보내면 topic.#은 Receiver2만 듣는 모든 메시지와 일치할 수 있습니다

Fanout Exchange

Fanout은 브로드캐스트 모드이거나 우리에게 익숙한 구독 모드는 Fanout 스위치에 메시지를 보내고 이 스위치에 바인딩된 모든 대기열은 이 메시지를 받습니다.

팬아웃 관련 구성:

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class TopicSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1(){
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange","topic.message",context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}
메시지 보낸 사람:

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class FanoutSender


    @Autowired
    AmqpTemplate amqpTemplate;


    public void send(){

        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        amqpTemplate.convertAndSend("fanoutExchange","", context);

    }

}

三个消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.A: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.B: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.C: "+message);

    }

}

运行单元测试,查看结果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg 
Receiver form fanout.A: hi, fanout msg 
Receiver form fanout.B: hi, fanout msg

结果说明,绑定到fanout交换机上面的队列都收到了消息.

위 내용은 SpringBoot는 RabbitMq를 어떻게 통합하나요?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제