Maison  >  Article  >  Java  >  Java Message Queue-Explication détaillée de l'intégration d'ActiveMq par Spring

Java Message Queue-Explication détaillée de l'intégration d'ActiveMq par Spring

黄舟
黄舟original
2017-03-07 10:01:082023parcourir

Cet article présente principalement l'explication détaillée de l'intégration de la file d'attente de messages Java-Spring avec ActiveMq. L'éditeur pense que c'est assez bon, je vais donc le partager avec vous maintenant et le donner comme référence. Suivons l'éditeur pour y jeter un œil

1 Présentation

Tout d'abord, passons en revue le service de messagerie Java avec vous. . Dans le blog précédent "Java Message Queue-JMS Overview", je l'ai analysé pour tout le monde :

1. Service de messagerie : un middleware utilisé pour résoudre le couplage entre deux ou plusieurs programmes, la couche inférieure. est implémenté par Java.

2. Avantages : asynchrone, fiable

3. Modèle de message : point à point, publication/abonnement

4. Objets dans JMS

Puis dans un autre blog "Java Message Queue-ActiveMq in Practice", nous avons démarré un projet ActiveMq de 0 à 1. Au cours du processus de développement du projet, nous avons une certaine compréhension d'ActiveMq Understanding :

1. Écrivez des clients dans plusieurs langues et protocoles. Langages : Java, C, C, C#, Ruby, Perl, Python, PHP. Protocoles d'application : OpenWire, Stomp REST, WS Notification, 🎜>3. Prise en charge de Spring, ActiveMQ peut être facilement intégré aux systèmes utilisant Spring et prend également en charge les fonctionnalités de Spring 2.0

4. Tests de serveurs J2EE (tels que Geronimo, JBoss 4, GlassFish, WebLogic), dans lesquels, grâce à la configuration des adaptateurs de ressources JCA 1.5, ActiveMQ peut être automatiquement déployé sur n'importe quel serveur commercial compatible avec J2EE 1.4

5 . Prend en charge plusieurs protocoles de transmission : in-VM, TCP, SSL, NIO, UDP, JGroups, JXTA

6. Prise en charge de la persistance des messages à grande vitesse via JDBC et le journal

7. La conception garantit un clustering haute performance, client-serveur, point à point


8 Prend en charge Ajax


Prend en charge l'intégration avec Axis


10. Vous pouvez facilement appeler le fournisseur JMS intégré pour des tests

Dans le prochain blog, je travaillerai avec vous pour intégrer Spring et ActiveMq. Spring JMS ActiveMQ Tomcat implémente des messages de file d'attente asynchrones point à point et un modèle PUB/SUB (publication/abonnement). Il s'agit d'un exemple simple et n'inclut aucune activité.


2. Structure du répertoire

2.1 Répertoire du projet

IDE sélectionné IDEA (recommandé à tous), dans l'ordre pour éviter de télécharger jar, la couche inférieure utilise maven pour créer un projet qui intègre Spring et ActiveMq 

2.2 pom. le fichier pom.xml ici est un peu long, je ne le développerai pas.

Nous pouvons voir qu'il n'y a que quelques dépendances, 1. Dépendance du noyau Spring 2. Noyau et pool ActiveMq (si les étudiants choisissent d'importer jar ici, ils peuvent directement importer l'activemq mentionné dans notre blog précédent - tout ce package jar) 3. Dépendances liées au servlet Java

La version de dépendance du pool ActiveMq que nous choisissons ici sera liée au dtd suivant, et la version doit correspondre, les étudiants attendront donc d'avoir configuré l'activemq fichier. , vous devez faire attention à la sélection de la version dtd

2.3 web.xml

web.xml est également similaire, spécifiez le fichier de configuration Spring, le nom springMvc, le format d'encodage

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://www.php.cn/">
 <modelVersion>4.0.0</modelVersion>
 <groupId>Crawl-Page</groupId>
 <artifactId>Crawl-Page</artifactId>
 <packaging>war</packaging>
 <version>1.0-SNAPSHOT</version>
 <name>Crawl-Page Maven Webapp</name>
 <url>http://www.php.cn/;/url>
 <!-- 版本管理 -->
 <properties>
 <springframework>4.1.8.RELEASE</springframework>
 </properties>

 <dependencies>
 <dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.10</version>
  <scope>test</scope>
 </dependency>


 <!-- JSP相关 -->
 <dependency>
 <groupId>jstl</groupId>
 <artifactId>jstl</artifactId>
 <version>1.2</version>
 </dependency>
 <dependency>
 <groupId>javax.servlet</groupId>
 <artifactId>servlet-api</artifactId>
 <scope>provided</scope>
 <version>2.5</version>
 </dependency>


 <!-- spring -->
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-tx</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>${springframework}</version>
 </dependency>
 <!-- xbean 如<amq:connectionFactory /> -->
 <dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
 </dependency>

 <!-- activemq -->
 <dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
 </dependency>
 <dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>5.12.1</version>
 </dependency>

 <!-- 自用jar包,可以忽略-->
 <dependency>
  <groupId>commons-httpclient</groupId>
  <artifactId>commons-httpclient</artifactId>
  <version>3.1</version>
 </dependency>
 </dependencies>

 <build>
 <finalName>Crawl-Page</finalName>
 <plugins>
  <plugin>
  <groupId>org.apache.tomcat.maven</groupId>
  <artifactId>tomcat7-maven-plugin</artifactId>
  <configuration>
   <port>8080</port>
   <path>/</path>
  </configuration>
  </plugin>
 </plugins>
 </build>

</project>

2.4 SpringMvc et applicationContext.xml

Le SpringMVC ici n'a rien de spécial. Les étudiants dans le besoin peuvent s'y référer :

applicationContext.xml est principalement utilisé pour charger des beans. Il n'y a pas de Java Beans spéciaux dans notre projet, il est donc uniquement utilisé pour indiquer le chemin d'analyse du package :

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
   http://www.php.cn/"
   version="3.0">

 <display-name>Archetype Created Web Application</display-name>

 <!-- 加载spring的配置文件,例如hibernate、jms等集成 -->
 <context-param>
 <param-name>contextConfigLocation</param-name>
 <param-value>
  classpath:applicationContext*.xml;
 </param-value>
 </context-param>

 <listener>
 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
 </listener>

 <servlet>
 <servlet-name>springMVC</servlet-name>
 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
 <init-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:spring-mvc.xml</param-value>
 </init-param>
 <load-on-startup>1</load-on-startup>
 </servlet>
 <servlet-mapping>
 <servlet-name>springMVC</servlet-name>
 <url-pattern>/</url-pattern>
 </servlet-mapping>

 <!-- 处理编码格式 -->
 <filter>
 <filter-name>characterEncodingFilter</filter-name>
 <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
 <init-param>
  <param-name>encoding</param-name>
  <param-value>UTF-8</param-value>
 </init-param>
 <init-param>
  <param-name>forceEncoding</param-name>
  <param-value>true</param-value>
 </init-param>
 </filter>
 <filter-mapping>
 <filter-name>characterEncodingFilter</filter-name>
 <url-pattern>/*</url-pattern>
 </filter-mapping>

</web-app>

2.5 applicationContext-ActiveMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.php.cn/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:aop="http://www.springframework.org/schema/aop"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xmlns:tx="http://www.springframework.org/schema/tx"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/aop
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/">

 <!-- 启用MVC注解 -->
 <mvc:annotation-driven />
 <!-- 指定Sping组件扫描的基本包路径 -->
 <context:component-scan base-package="com.Jayce" >
  <!-- 这里只扫描Controller,不可重复加载Service -->
  <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
 </context:component-scan>

 <!-- JSP视图解析器-->
 <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
  <property name="prefix" value="/WEB-INF/views/" />
  <property name="suffix" value=".jsp" />
  <!-- 定义其解析视图的order顺序为1 -->
  <property name="order" value="1" />
 </bean>
</beans>

Ici, nous expliquerons ce fichier de configuration. Si vous pouvez le comprendre à partir du fichier de configuration ci-dessus et l'ignorer. Les étudiants peuvent également le consulter sur le site officiel d'ActiveMQ.

1. DTD dans ActiveMq, avant de déclarer la configuration appropriée, nous devons importer la DTD dans ActiveMq, sinon Spring ne comprendra pas ce que signifient nos balises.

Nous avons configuré la version d'activemq dans pom. connectionFactory : un élément de configuration très simple, utilisé pour configurer l'adresse, le nom d'utilisateur et le mot de passe de notre fabrique de liens. Ce qu'il faut noter ici, c'est sélectionner le TCP. connexion au lieu de la connexion http
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xsi:schemaLocation="
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/">

 <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
 <!-- 配置扫描路径 -->
 <context:component-scan base-package="com.Jayce">
  <!-- 只扫描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加载 -->
  <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
 </context:component-scan>

</beans>

3. jmsTemplate : une configuration plus importante, ici la fabrique de connexions, la destination d'envoi du message par défaut, la durée de la connexion et la méthode de publication du message sont spécifiées


3. Structure du projet

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xsi:schemaLocation="
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/"
>

 <context:component-scan base-package="com.Jayce" />
 <mvc:annotation-driven />

 <amq:connectionFactory id="amqConnectionFactory"
       brokerURL="tcp://192.168.148.128:61616"
       userName="admin"
       password="admin" />

 <!-- 配置JMS连接工长 -->
 <bean id="connectionFactory"
   class="org.springframework.jms.connection.CachingConnectionFactory">
  <constructor-arg ref="amqConnectionFactory" />
  <property name="sessionCacheSize" value="100" />
 </bean>

 <!-- 定义消息队列(Queue) -->
 <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  <!-- 设置消息队列的名字 -->
  <constructor-arg>
   <value>Jaycekon</value>
  </constructor-arg>
 </bean>

 <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="defaultDestination" ref="demoQueueDestination" />
  <property name="receiveTimeout" value="10000" />
  <!-- true是topic,false是queue,默认是false,此处显示写出false -->
  <property name="pubSubDomain" value="false" />
 </bean>


 <!-- 配置消息队列监听者(Queue) -->
 <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

 <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
 <bean id="queueListenerContainer"
   class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="demoQueueDestination" />
  <property name="messageListener" ref="queueMessageListener" />
 </bean>

</beans>

3.1 ProducerService

Faire du producteur de message un service Lorsque nous devons envoyer un message, il suffit d'appeler l'instance ProducerService. La méthode sendMessage peut envoyer un message à la destination par défaut.

Deux méthodes d'envoi sont proposées ici, l'une consiste à envoyer à la destination par défaut et l'autre consiste à envoyer le message en fonction de la destination.

3.2 Service Consommateur


因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmsTemplate中的 receive 方法,就可以从里面获取到一条消息。

再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样ActiveMQ中才会确定这条消息已经被正确读取了。而整合了Spring之后,事务将由Spring 来管理。

3.3 MessageController

package com.Jayce.Controller;

import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Controller
public class MessageController {
 private Logger logger = LoggerFactory.getLogger(MessageController.class);
 @Resource(name = "demoQueueDestination")
 private Destination destination;

 //队列消息生产者
 @Resource(name = "producerService")
 private ProducerService producer;

 //队列消息消费者
 @Resource(name = "consumerService")
 private ConsumerService consumer;

 @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
 @ResponseBody
 public void send(String msg) {
  logger.info(Thread.currentThread().getName()+"------------send to jms Start");
  producer.sendMessage(msg);
  logger.info(Thread.currentThread().getName()+"------------send to jms End");
 }

 @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
 @ResponseBody
 public Object receive(){
  logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
  TextMessage tm = consumer.receive(destination);
  logger.info(Thread.currentThread().getName()+"------------receive from jms End");
  return tm;
 }

}

控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。

现在服务层和控制层都好了,接下来我们就进行一个简单的测试

 4、项目测试

4.1 启动ActiveMq

先确定你的ActiveMQ服务已经开启。

    

4.2 启动项目

项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有需要的同学可以使用一下。

<plugins>
  <plugin>
  <groupId>org.apache.tomcat.maven</groupId>
  <artifactId>tomcat7-maven-plugin</artifactId>
  <configuration>
   <port>8080</port>
   <path>/</path>
  </configuration>
  </plugin>
</plugins>

4.3 发送消息

这里用了Chrome 的一个插件PostMan 有兴趣的同学可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!

    

我们发送了一个post 请求之后,看一下服务器的效果:

 

我们可以看到,已经向队列发送了一条消息。我们看一下ActiveMq现在的状态:

 

我们可以看到,一条消息已经成功发送到了ActiveMq中。

4.4 接收消息

使用get请求访问服务器后台:

  

服务的输出:

 

ActiveMq服务器状态:

 

我们可以看到,消费者已经消费了一条信息,并且没有断开与ActiveMq之间的链接。

4.5 监听器

在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到ActiveMq了,可以用一个Redis 就足够了。

不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。

4.5.1 applicationContext-ActiveMQ.xml 配置

在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。

  

 <!-- 配置消息队列监听者(Queue) -->
 <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

 <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
 <bean id="queueListenerContainer"
   class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="demoQueueDestination" />
  <property name="messageListener" ref="queueMessageListener" />
 </bean>

4.5.2 MessageListener

我们需要创建一个类实现MessageListener 接口:

package com.Jayce.Filter;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
public class QueueMessageListener implements MessageListener {
 public void onMessage(Message message) {
  TextMessage tm = (TextMessage) message;
  try {
   System.out.println("QueueMessageListener监听到了文本消息:\t"
     + tm.getText());
   //do something ...
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}

实现接口的onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。 

4.5.3 测试

和上面一样,使用postMan 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:

    

再看看ActiveMQ服务器的状态:

 

我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。

这样子一整个项目下来,我们已经成功的整合了Spring和ActiveMQ。

4.6 压力测试

这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:

package com.Jaycekon.test;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/1/5.
 */
public class Client {

 @Test
 public void test() {
  HttpClient httpClient = new HttpClient();
  new Thread(new Sender(httpClient)).start();

 }

}

class Sender implements Runnable {
 public static AtomicInteger count = new AtomicInteger(0);
 HttpClient httpClient;

 public Sender(HttpClient client) {
  httpClient = client;
 }

 public void run() {
   try {
    System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
    PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
    post.addParameter("msg", "Hello world!");
    httpClient.executeMethod(post);
    System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());

   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }

这里面用了HttpClient 来向服务器发送Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。

5、项目源码:Crawl-Page_jb51.rar

Ce qui précède est l'explication détaillée de Java Message Queue-Spring intégrant ActiveMq. Pour plus de contenu connexe, veuillez faire attention au site Web PHP chinois (www.php.cn) !


Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn