Maison  >  Article  >  Java  >  Une brève analyse de l'intégration Spring de JMS Active MQ

Une brève analyse de l'intégration Spring de JMS Active MQ

巴扎黑
巴扎黑original
2017-06-23 10:10:521440parcourir

1. Intégrez avec spring pour réaliser une réception synchrone des messages de ptp

pom.xml:

   <!--  --><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.7.RELEASE</version></dependency>
  <!--  --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.9.0</version></dependency>

spring-jms. xml :

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xmlns:amq="http://activemq.apache.org/schema/core"xsi:schemaLocation="http://activemq.apache.org/schema/core

http://www.springframework.org/schema/jms

http://www.springframework.org/schema/beans
"><!-- ActiveMQConnectionFactory就是JMS中负责创建到ActiveMQ连接的工厂类 --><bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" > <property name="brokerURL" value="tcp://192.168.0.224:61616"/>  </bean><!-- 创建连接池 --><bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  <property name="connectionFactory" ref="connectionFactory"/>  <property name="maxConnections" value="10"/>  </bean>  <!-- Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory --><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  <property name="connectionFactory" ref="cachingConnectionFactory"/>  </bean> <!--这个是队列目的地,点对点的-->  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  <constructor-arg index="0" value="spring-queue"/> </bean>   </beans>

ConnectionFactory est utilisé pour générer des liens vers le serveur JMS Spring nous fournit plusieurs ConnectionFactory, dont SingleConnectionFactory et CachingConnectionFactory. SingleConnectionFactory renverra toujours le même lien pour les demandes d'établissement d'un lien de serveur JMS et ignorera l'appel de méthode close de la connexion. CachingConnectionFactory hérite de SingleConnectionFactory, il possède donc toutes les fonctions de SingleConnectionFactory et ajoute également une nouvelle fonction de mise en cache, qui peut mettre en cache Session, MessageProducer et MessageConsumer. Ici, nous utilisons CachingConnectionFactory comme exemple.

Producteur de message :

   = ClassPathXmlApplicationContext("spring-jms.xml"=(JmsTemplate) context.getBean("jmsTemplate"=(Destination) context.getBean("queueDestination" Message createMessage(Session session)  session.createTextMessage("Hello spring JMS"

Consommateur :

package com.jalja.org.jms.spring;import javax.jms.Destination;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;public class SpringJmsReceive {public static void main(String[] args) {
    ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml");
    JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
    Destination queueDestination=(Destination) context.getBean("queueDestination");
    String msg=(String) jmsTemplate.receiveAndConvert(queueDestination);
    System.out.println(msg);
  }
}

2. Appel asynchrone PTP

Nous configurons directement l'écouteur pour recevoir des messages de manière asynchrone au printemps, ce qui équivaut à configurer le consommateur au printemps, et il n'est pas nécessaire de démarrer le consommateur lors de la réception de messages.

spring-jms.xml :

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xmlns:amq="http://activemq.apache.org/schema/core"xsi:schemaLocation="http://activemq.apache.org/schema/core

http://www.springframework.org/schema/jms

http://www.springframework.org/schema/beans
"><!-- ActiveMQConnectionFactory就是JMS中负责创建到ActiveMQ连接的工厂类 --><bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" > <property name="brokerURL" value="tcp://192.168.0.224:61616"/>  </bean><!-- 创建连接池 --><bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  <property name="connectionFactory" ref="connectionFactory"/>  <property name="maxConnections" value="10"/>  </bean>  <!-- Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory --><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  <property name="connectionFactory" ref="cachingConnectionFactory"/>  </bean> <!--这个是队列目的地,点对点的-->  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  <constructor-arg index="0" value="spring-queue"/> </bean>   <!-- 消息监听器 -->  <bean id="myMessageListener" class="com.jalja.org.jms.spring.yb.MyMessageListener"/>  <!-- 消息监听容器 -->  <bean id="jmsContainer"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">  <property name="connectionFactory" ref="cachingConnectionFactory" />  <property name="destination" ref="queueDestination" />  <property name="messageListener" ref="myMessageListener" />  </bean> </beans>

Une fois que le producteur a envoyé un message à la destination spécifiée, l'étape suivante consiste pour le consommateur à traiter le message à la destination spécifiée. Alors, comment le consommateur sait-il qu’un producteur a envoyé un message à la destination spécifiée ? Ceci est réalisé grâce au conteneur d'écoute des messages MessageListenerContainer que Spring encapsule pour nous. Il est responsable de la réception des informations et de la distribution des informations reçues au véritable MessageListener pour traitement. Chaque consommateur a besoin d'un MessageListenerContainer correspondant pour chaque destination. Pour le conteneur d'écoute de messages, en plus de savoir quelle destination écouter, il doit également savoir où écouter. C'est-à-dire qu'il doit également savoir quel serveur JMS écouter. Cela se fait en injectant un MessageConnectionFactory dans. lors de la configuration de MessageConnectionFactory. Ainsi, lorsque nous configurons un MessageListenerContainer, nous avons trois attributs qui doivent être spécifiés. L'un est le ConnectionFactory qui indique où écouter ; l'autre est la Destination qui indique ce qu'il faut écouter et l'autre est le MessageListener qui traite le message après avoir reçu le message ; message. Spring nous propose deux types de MessageListenerContainer, SimpleMessageListenerContainer et DefaultMessageListenerContainer.
  SimpleMessageListenerContainer : SimpleMessageListenerContainer créera une session et un consommateur au début, et utilisera la méthode JMS standard MessageConsumer.setMessageListener() pour enregistrer l'écouteur et laisser le fournisseur JMS appeler la fonction de rappel de l'écouteur. Il ne s'adapte pas dynamiquement aux besoins d'exécution et ne participe pas à la gestion des transactions externes. En termes de compatibilité, elle est très proche de la spécification JMS autonome, mais n'est généralement pas compatible avec les limitations JMS de Java EE.

 DefaultMessageListenerContainer : Dans la plupart des cas, nous utilisons toujours DefaultMessageListenerContainer. Par rapport à SimpleMessageListenerContainer, DefaultMessageListenerContainer s'adaptera dynamiquement aux besoins d'exécution et pourra participer à la gestion des transactions externes. Il s'agit d'un bon équilibre entre de faibles exigences envers les fournisseurs JMS, des fonctionnalités avancées telles que la participation aux transactions et une compatibilité avec les environnements Java EE.

Producteur de message :

    public static void main(String[] args) {
        ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml");
        JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
        Destination queueDestination=(Destination) context.getBean("queueDestination");
        System.out.println("异步调用执行开始");
        jmsTemplate.send(queueDestination, new MessageCreator(){
            @Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage("Hello spring JMS");
            }
        });
        System.out.println("异步调用执行结束");
    }

Écouteur de message : MyMessageListener

public class MyMessageListener implements MessageListener{
    @Overridepublic void onMessage(Message message) {
        TextMessage msg= (TextMessage) message;try {
            System.out.println("你好:"+msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Démarrer l'écoute du producteur de message Le résultat de l'exécution du processeur est :

异步调用执行开始
异步调用执行结束
你好:Hello spring JMS

3. Publiez et abonnez-vous pour recevoir de manière synchrone

spring-jms.xml:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xmlns:amq="http://activemq.apache.org/schema/core"xsi:schemaLocation="http://activemq.apache.org/schema/core

http://www.springframework.org/schema/jms

http://www.springframework.org/schema/beans
"><!-- ActiveMQConnectionFactory就是JMS中负责创建到ActiveMQ连接的工厂类 --><bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" > <property name="brokerURL" value="tcp://192.168.0.224:61616"/>  </bean><!-- 创建连接池 --><bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  <property name="connectionFactory" ref="connectionFactory"/>  <property name="maxConnections" value="10"/>  </bean>  <!-- Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory --><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  <property name="connectionFactory" ref="cachingConnectionFactory"/>  </bean> <!--这个是队列目的地,发布订阅-->  <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">  <constructor-arg index="0" value="spring-Topic"/> </bean>   </beans>

Producteur :

    public static void main(String[] args) {
        ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml");
        JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
        Destination topicDestination=(Destination) context.getBean("topicDestination");
        jmsTemplate.send(topicDestination, new MessageCreator(){
            @Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage("Hello spring JMS topicDestination");
            }
        });
    }

Consommateur :

public class SpringJmsSubscriber {public static void main(String[] args) {
        ApplicationContext context=new ClassPathXmlApplicationContext("spring-jms.xml");
        JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
        Destination topicDestination=(Destination) context.getBean("topicDestination");
        String msg=(String) jmsTemplate.receiveAndConvert(topicDestination);
        System.out.println(msg);
    }
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn