Heim  >  Artikel  >  Java  >  Wie integriert SpringBoot RabbitMq?

Wie integriert SpringBoot RabbitMq?

WBOY
WBOYnach vorne
2023-05-25 10:00:38770Durchsuche

SpringBoot integriert RabbitMq in der Praxis

spring-boot-starter-amqp

Advanced Message Queuing Protocol (AMQP) ist ein plattformneutrales kabelgebundenes Protokoll für Nachrichten-Middleware. Das Spring AMQP-Projekt wendet zentrale Spring-Konzepte auf die Entwicklung AMQP-basierter Messaging-Lösungen an. Spring Boot bietet einige Annehmlichkeiten für die Arbeit mit AMQP über RabbitMQ, einschließlich des spring-boot-starter-amqp „Starter“.

Es ist sehr einfach, RabbitMQ mit Springboot zu integrieren. Wenn Sie es nur mit sehr wenig Konfiguration verwenden, bietet Springboot verschiedene Unterstützung für Nachrichten im Spring-Boot-Starter-Amqp-Projekt.

Abhängigkeiten hinzufügen

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

RabbitMQ ist ein leichter, zuverlässiger, skalierbarer und portabler Nachrichtenbroker, der auf dem AMQP-Protokoll basiert. Spring verwendet RabbitMQ für die Kommunikation über das AMQP-Protokoll.

Eigenschaftskonfiguration

Die RabbitMQ-Konfiguration wird durch externe Konfigurationseigenschaften spring.rabbitmq.* gesteuert. Sie können beispielsweise die folgenden application.properties im folgenden Abschnitt deklarieren:

spring.rabbitmq.host = localhost
 spring.rabbitmq.port = 5672
 spring.rabbitmq.username = guest
 spring.rabbitmq.password

Schnellstart

1. Warteschlangenkonfiguration

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * @author itguang
 * @create
@Configuration
public class RabbitConfig

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}

2 Sender

rabbitTemplate ist die von Springboot bereitgestellte Standardimplementierung.

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
 * 消息发送者
 *
 * @author itguang
 * @create

@Component
public class HelloSender


    @Autowired
    private AmqpTemplate amqpTemplate;


    public void send(){
        String context = "hello----"+LocalDateTime.now();
        System.out.println("send:"+context);
        //往名称为 hello 的queue中发送消息
        this.amqpTemplate.convertAndSend("hello",context);
    }

}

3 Empfänger

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息接受者
 *
 * @author itguang
 * @create

@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver

    //消息处理器
    @RabbitHandler
    public void process(String message){
        System.out.println("Receiver:"+message);

    }


}

Test

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests

    @Autowired
    HelloSender helloSender;

    @Test
    public void contextLoads() {
        helloSender.send();

    }
}

Sehen Sie sich die Ergebnisse der Konsolenausgabe an

send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739

Eins-zu-viele-Senden: ein Absender und mehrere Empfänger

Der Empfänger hat zwei Empfänger, Receiver1 und Receiver2, und den Sender registriert Ende verbunden Parameteranzahl, das empfangende Ende druckt die empfangenen Parameter. Das Folgende ist der Testcode. Senden Sie hundert Nachrichten, um den Ausführungseffekt der beiden empfangenden Enden zu beobachten An Warteschlange senden Hallo2-Nachricht, akzeptiert einen Zählparameter

  • package com.example.rabbitmqdemo.config;
    
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ 配置类
     *
     * @author itguang
     * @create
    @Configuration
    public class RabbitConfig
    
    
    
        @Bean
        public Queue queue(){
            return new Queue("hello");
        }
    
        @Bean
        public Queue queue2(){
            return new Queue("hello2");
        }
    
    
    
    }

  • Zwei hello2-Empfänger
  • package com.example.rabbitmqdemo.rabbitmq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDate;
    import java.time.LocalDateTime;
    import java.time.LocalTime;
    import java.util.Date;
    
    /**
     * 消息发送者
     *
     * @author itguang
     * @create
    
    @Component
    public class HelloSender
    
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        public void send(){
            String context = "hello----"+LocalDateTime.now();
            System.out.println("send:"+context);
            this.amqpTemplate.convertAndSend("hello",context);
        }
    
        //给hello2发送消息,并接受一个计数参数
        public void send2(int i){
            String context = i+"";
            System.out.println(context+"--send:");
            this.amqpTemplate.convertAndSend("hello2",context);
        }
    }
    @Component
    @RabbitListener(queues = "hello2")
    public class HelloReceiver1
    
    
        @RabbitHandler
        public void process(String message){
    
            System.out.println("Receiver1:"+message);
        }
    
    
    }

    Test

    @Component
    @RabbitListener(queues = "hello2")
    public class HelloReceiver2
    
        @RabbitHandler
        public void process(String message){
            System.out.println("Receiver2:"+message);
        }
    }
  • Konsolenausgabe anzeigen:
@Test
    public void manyReceiver(){
        for (int i=0;i<100;i++){
            helloSender.send2(i);
        }

    }
    Sie können Folgendes sehen: Wenn die Nachricht an 63 gesendet wird, wird die Empfänger Empfänger hat die Nachricht erhalten,
  • Fazit:

Ein Absender, N Empfänger, nach dem Testen wird die Nachricht gleichmäßig an N Empfänger gesendet

Many-to-many: Mehrere Absender an viele Empfänger


Wir Sie können zwei Absender wie folgt injizieren und in die Schleife einbinden:

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)
Führen Sie den Unit-Test durch und sehen Sie sich die Konsolenausgabe an:

@Test
    public void many2many(){
      for (int i=0;i<100;i++){
          helloSender.send2(i);
          helloSender2.send2(i);

      }
    }
Schlussfolgerung: Genau wie bei einer Eins-zu-Viele-Methode empfängt der Empfänger weiterhin gleichmäßig die To-Nachricht

Objekt senden

Zuerst erstellen wir ein Entitätsklassenobjekt Benutzer. Beachten Sie, dass es die serialisierbare Schnittstelle implementieren muss.

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:

Dann erstellen Sie eine Warteschlange in der Konfigurationsdatei mit dem Namen

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
 * @author itguang
 * @create
public class User implements Serializable


    private String username;
    private String password;

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

Das Folgende sind die beiden Elemente des Benutzers Objekt Ein Sender ObjectSender und ein Empfänger ObjectReceiver:

@Bean
    public Queue queue3(){
        return new Queue("object_queue");
    }
@Component
public class ObjectSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendUser(User user){

        System.out.println("Send object:"+user.toString());
        this.amqpTemplate.convertAndSend("object_queue",user);

    }
}
Führen Sie den Komponententest aus und sehen Sie sich die Ergebnisse der Konsolenausgabe an:

@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver

    @RabbitHandler
    public void objectReceiver(User user){

        System.out.println("Receiver object:"+user.toString());

    }
}

Topic Exchange

topic ist die flexibelste Methode in RabbitMQ und kann basierend auf frei an verschiedene Objekte gebunden werden Routing_Key-Warteschlangeobject_queue

Konfigurieren Sie hier zunächst zwei Warteschlangen zum Testen

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}

Nachrichtensender: Beide verwenden topicExchange und sind an unterschiedliche Routing_keys gebunden

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author itguang
 * @create
@Configuration
public class TopicRabbitConfig


    final static String message = "topic.message";
    final static String messages = "topic.messages";


    //创建两个 Queue
    @Bean
    public Queue queueMessage(){
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(){
        return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名称为 topicExchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //给队列绑定 exchange 和 routing_key

    @Bean
    public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }


}

Zwei Nachrichtenempfänger geben jeweils unterschiedliche Warteschlangen an.

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class TopicSender

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1(){
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange","topic.message",context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}
rrree

Das Senden von send1 stimmt mit topic.# und topic.message überein. Beide Receiver können Nachrichten empfangen, während send2 nur topic.# mit allen Nachrichten übereinstimmen kann, die nur Receiver2 abhört.

Fanout Exchange

Fanout ist der Broadcast-Modus oder das Abonnement Der uns bekannte Modus sendet eine Nachricht an den Fanout-Switch und alle an diesen Switch gebundenen Warteschlangen empfangen diese Nachricht.

Fanout-bezogene Konfiguration:

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1

    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver topic.message :"+ message);

    }

}
Nachrichtenabsender:

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author itguang
 * @create
@Component
public class FanoutSender


    @Autowired
    AmqpTemplate amqpTemplate;


    public void send(){

        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
        amqpTemplate.convertAndSend("fanoutExchange","", context);

    }

}

三个消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.A: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.B: "+message);

    }

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC


    @RabbitHandler
    public void process(String message){

        System.out.println("Receiver form fanout.C: "+message);

    }

}

运行单元测试,查看结果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg 
Receiver form fanout.A: hi, fanout msg 
Receiver form fanout.B: hi, fanout msg

结果说明,绑定到fanout交换机上面的队列都收到了消息.

Das obige ist der detaillierte Inhalt vonWie integriert SpringBoot RabbitMq?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen