首頁  >  文章  >  Java  >  Java訊息佇列-Spring整合ActiveMq的詳解

Java訊息佇列-Spring整合ActiveMq的詳解

黄舟
黄舟原創
2017-03-07 10:01:082042瀏覽

本篇文章主要介紹了詳解Java訊息佇列-Spring整合ActiveMq ,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟著小編過來看看吧

1、概述

先跟大家一起回顧Java 訊息服務,在我先前的部落格《Java訊息佇列-JMS概述》中,我為大家分析了:

1.訊息服務:一個中間件,用來解決兩個活多個程式之間的耦合,底層由Java 實作。

2.優勢:非同步、可靠

3.訊息模型:點對點,發布/訂閱

4.JMS中的對象

然後在另一篇部落格《Java訊息隊列-ActiveMq實戰》中,和大家一起從0到1的開啟了一個ActiveMq 的項目,在項目開發的過程中,我們對ActiveMq有了一定的了解:  

1.多種語言和協定編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協定: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2.完全支援JMS1.1和J2EE 1.4規範(持久化,XA訊息,交易)

#3.對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性

4.通過了常見J2EE伺服器(如Geronimo ,JBoss 4, GlassFish,WebLogic)的測試,其中透過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業伺服器上

#5.支援多種傳送協定:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

#6.支援透過JDBC和journal提供高速的訊息持久化

#7.從設計上保證了高效能的叢集,客戶端-伺服器,點對點

8.支援Ajax

9.支援與Axis的整合

10.可以很容易得調用內嵌JMS provider,進行測試

在接下來的這篇博客中,我會和大家一起來整合Spring 和ActiveMq,這篇博文,我們基於Spring+JMS+ActiveMQ+Tomcat,實作了Point-To-Point的非同步佇列訊息和PUB/SUB(發佈/訂閱)模型,簡單實例,不包含任何業務。

2、目錄結構

2.1 專案目錄

IDE選擇了IDEA(建議大家使用),為了避免下載jar 的各種麻煩,底層使用maven搭建了一個項目,整合了Spring 和ActiveMq

#   

 2.2 pom.xml

## 2.2 pom.xml

 2.2 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://www.php.cn/">
 <modelVersion>4.0.0</modelVersion>
 <groupId>Crawl-Page</groupId>
 <artifactId>Crawl-Page</artifactId>
 <packaging>war</packaging>
 <version>1.0-SNAPSHOT</version>
 <name>Crawl-Page Maven Webapp</name>
 <url>http://www.php.cn/;/url>
 <!-- 版本管理 -->
 <properties>
 <springframework>4.1.8.RELEASE</springframework>
 </properties>

 <dependencies>
 <dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.10</version>
  <scope>test</scope>
 </dependency>


 <!-- JSP相关 -->
 <dependency>
 <groupId>jstl</groupId>
 <artifactId>jstl</artifactId>
 <version>1.2</version>
 </dependency>
 <dependency>
 <groupId>javax.servlet</groupId>
 <artifactId>servlet-api</artifactId>
 <scope>provided</scope>
 <version>2.5</version>
 </dependency>


 <!-- spring -->
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-tx</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>${springframework}</version>
 </dependency>
 <!-- xbean 如<amq:connectionFactory /> -->
 <dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
 </dependency>

 <!-- activemq -->
 <dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
 </dependency>
 <dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>5.12.1</version>
 </dependency>

 <!-- 自用jar包,可以忽略-->
 <dependency>
  <groupId>commons-httpclient</groupId>
  <artifactId>commons-httpclient</artifactId>
  <version>3.1</version>
 </dependency>
 </dependencies>

 <build>
 <finalName>Crawl-Page</finalName>
 <plugins>
  <plugin>
  <groupId>org.apache.tomcat.maven</groupId>
  <artifactId>tomcat7-maven-plugin</artifactId>
  <configuration>
   <port>8080</port>
   <path>/</path>
  </configuration>
  </plugin>
 </plugins>
 </build>

</project>

因為這裡pom.xml 檔案有點長,就不展開了。

我們可以看到其實依賴也就幾個,1、Spring 核心依賴2、ActiveMq core和pool(這裡如果同學們選擇導入jar,可以直接導入我們上一篇博客中說道的那個activemq -all 這個jar套件)3、java servlet 相關依賴

這裡面我們選擇的ActiveMq pool 的依賴版本會和之後的dtd 有關係,需要版本對應,所以同學等下配置activemq 文件的時候,需要注意dtd 版本選擇

2.3 web.xml

web.xml 也大同小異,指定Spring 配置文件,springMvc 命名,編碼格式

#

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
   http://www.php.cn/"
   version="3.0">

 <display-name>Archetype Created Web Application</display-name>

 <!-- 加载spring的配置文件,例如hibernate、jms等集成 -->
 <context-param>
 <param-name>contextConfigLocation</param-name>
 <param-value>
  classpath:applicationContext*.xml;
 </param-value>
 </context-param>

 <listener>
 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
 </listener>

 <servlet>
 <servlet-name>springMVC</servlet-name>
 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
 <init-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:spring-mvc.xml</param-value>
 </init-param>
 <load-on-startup>1</load-on-startup>
 </servlet>
 <servlet-mapping>
 <servlet-name>springMVC</servlet-name>
 <url-pattern>/</url-pattern>
 </servlet-mapping>

 <!-- 处理编码格式 -->
 <filter>
 <filter-name>characterEncodingFilter</filter-name>
 <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
 <init-param>
  <param-name>encoding</param-name>
  <param-value>UTF-8</param-value>
 </init-param>
 <init-param>
  <param-name>forceEncoding</param-name>
  <param-value>true</param-value>
 </init-param>
 </filter>
 <filter-mapping>
 <filter-name>characterEncodingFilter</filter-name>
 <url-pattern>/*</url-pattern>
 </filter-mapping>

</web-app>

2.4 SpringMvc 和applicationContext.xml

這裡面的SpringMVC沒什麼特別,有需要的同學可以參考一下:


<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.php.cn/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:aop="http://www.springframework.org/schema/aop"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xmlns:tx="http://www.springframework.org/schema/tx"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/aop
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/">

 <!-- 启用MVC注解 -->
 <mvc:annotation-driven />
 <!-- 指定Sping组件扫描的基本包路径 -->
 <context:component-scan base-package="com.Jayce" >
  <!-- 这里只扫描Controller,不可重复加载Service -->
  <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
 </context:component-scan>

 <!-- JSP视图解析器-->
 <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
  <property name="prefix" value="/WEB-INF/views/" />
  <property name="suffix" value=".jsp" />
  <!-- 定义其解析视图的order顺序为1 -->
  <property name="order" value="1" />
 </bean>
</beans>

applicationContext.xml 主要使用來裝載Bean,我們專案中並沒有什麼特別的Java Bean,因此只用來指出套件掃描路徑:

##

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xsi:schemaLocation="
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/">

 <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
 <!-- 配置扫描路径 -->
 <context:component-scan base-package="com.Jayce">
  <!-- 只扫描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加载 -->
  <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
 </context:component-scan>

</beans>

#  2.5 applicationContext-ActiveMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xsi:schemaLocation="
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/"
>

 <context:component-scan base-package="com.Jayce" />
 <mvc:annotation-driven />

 <amq:connectionFactory id="amqConnectionFactory"
       brokerURL="tcp://192.168.148.128:61616"
       userName="admin"
       password="admin" />

 <!-- 配置JMS连接工长 -->
 <bean id="connectionFactory"
   class="org.springframework.jms.connection.CachingConnectionFactory">
  <constructor-arg ref="amqConnectionFactory" />
  <property name="sessionCacheSize" value="100" />
 </bean>

 <!-- 定义消息队列(Queue) -->
 <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  <!-- 设置消息队列的名字 -->
  <constructor-arg>
   <value>Jaycekon</value>
  </constructor-arg>
 </bean>

 <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="defaultDestination" ref="demoQueueDestination" />
  <property name="receiveTimeout" value="10000" />
  <!-- true是topic,false是queue,默认是false,此处显示写出false -->
  <property name="pubSubDomain" value="false" />
 </bean>


 <!-- 配置消息队列监听者(Queue) -->
 <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

 <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
 <bean id="queueListenerContainer"
   class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="demoQueueDestination" />
  <property name="messageListener" ref="queueMessageListener" />
 </bean>

</beans>

##這裡跟大家講解這個設定文件,如果大家能夠從上述設定檔看懂,可以跳過。同學們也可以在ActiveMQ官網中的查看。

1、ActiveMq 中的DTD,我們在聲明相關配置之前,我們需要先導入ActiveMq 中的DTD,不然Spring 並不理解我們的標籤是什麼意思。 我們在pom.xml 檔案中有配置了activemq 的版本依賴我們這裡的版本,需要和依賴的版本一樣,不然是找不到相關的dtd

#2、amq: connectionFactory:很直白的一個配置項,用來配置我們連結工廠的位址和使用者名稱密碼,這裡要注意的是選擇tcp連線而不是http連線


3、jmsTemplate:比較重要的一個配置,這裡指定了連接工廠,預設訊息發送目的地,還有連接時間長,發布訊息的方式

# 3、專案結構

## 3.1 ProducerService


package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ProducerService {

 @Resource(name="jmsTemplate")
 private JmsTemplate jmsTemplate;

 public void sendMessage(Destination destination,final String msg){
  System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
  jmsTemplate.send(destination, new MessageCreator() {
   public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(msg);
   }
  });
 }

 public void sendMessage(final String msg){
  String destination = jmsTemplate.getDefaultDestinationName();
  System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
  jmsTemplate.send(new MessageCreator() {
   public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(msg);
   }
  });
 }
}

#將訊息生產者做成一個服務,當我們需要傳送訊息的時候,只需要呼叫ProducerService實例中的sendMessage 方法就可以向預設目的發送一個訊息。

這裡提供了兩個發送方式,一個是發送到預設的目的地,一個是根據目的地發送訊息。 ######3.2 ConsumerService############
package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ConsumerService {
 @Resource(name="jmsTemplate")
 private JmsTemplate jmsTemplate;

 public TextMessage receive(Destination destination){
  TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
  try{
   System.out.println("从队列" + destination.toString() + "收到了消息:\t"
     + textMessage.getText());
  } catch (JMSException e) {
   e.printStackTrace();
  }
  return textMessage;
 }
}
######

因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmsTemplate中的 receive 方法,就可以从里面获取到一条消息。

再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样ActiveMQ中才会确定这条消息已经被正确读取了。而整合了Spring之后,事务将由Spring 来管理。

3.3 MessageController

package com.Jayce.Controller;

import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Controller
public class MessageController {
 private Logger logger = LoggerFactory.getLogger(MessageController.class);
 @Resource(name = "demoQueueDestination")
 private Destination destination;

 //队列消息生产者
 @Resource(name = "producerService")
 private ProducerService producer;

 //队列消息消费者
 @Resource(name = "consumerService")
 private ConsumerService consumer;

 @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
 @ResponseBody
 public void send(String msg) {
  logger.info(Thread.currentThread().getName()+"------------send to jms Start");
  producer.sendMessage(msg);
  logger.info(Thread.currentThread().getName()+"------------send to jms End");
 }

 @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
 @ResponseBody
 public Object receive(){
  logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
  TextMessage tm = consumer.receive(destination);
  logger.info(Thread.currentThread().getName()+"------------receive from jms End");
  return tm;
 }

}

控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。

现在服务层和控制层都好了,接下来我们就进行一个简单的测试

 4、项目测试

4.1 启动ActiveMq

先确定你的ActiveMQ服务已经开启。

    

4.2 启动项目

项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有需要的同学可以使用一下。

<plugins>
  <plugin>
  <groupId>org.apache.tomcat.maven</groupId>
  <artifactId>tomcat7-maven-plugin</artifactId>
  <configuration>
   <port>8080</port>
   <path>/</path>
  </configuration>
  </plugin>
</plugins>

4.3 发送消息

这里用了Chrome 的一个插件PostMan 有兴趣的同学可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!

    

我们发送了一个post 请求之后,看一下服务器的效果:

 

我们可以看到,已经向队列发送了一条消息。我们看一下ActiveMq现在的状态:

 

我们可以看到,一条消息已经成功发送到了ActiveMq中。

4.4 接收消息

使用get请求访问服务器后台:

  

服务的输出:

 

ActiveMq服务器状态:

 

我们可以看到,消费者已经消费了一条信息,并且没有断开与ActiveMq之间的链接。

4.5 监听器

在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到ActiveMq了,可以用一个Redis 就足够了。

不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。

4.5.1 applicationContext-ActiveMQ.xml 配置

在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。

  

 <!-- 配置消息队列监听者(Queue) -->
 <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

 <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
 <bean id="queueListenerContainer"
   class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="demoQueueDestination" />
  <property name="messageListener" ref="queueMessageListener" />
 </bean>

4.5.2 MessageListener

我们需要创建一个类实现MessageListener 接口:

package com.Jayce.Filter;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
public class QueueMessageListener implements MessageListener {
 public void onMessage(Message message) {
  TextMessage tm = (TextMessage) message;
  try {
   System.out.println("QueueMessageListener监听到了文本消息:\t"
     + tm.getText());
   //do something ...
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}

实现接口的onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。 

4.5.3 测试

和上面一样,使用postMan 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:

    

再看看ActiveMQ服务器的状态:

 

我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。

这样子一整个项目下来,我们已经成功的整合了Spring和ActiveMQ。

4.6 压力测试

这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:

package com.Jaycekon.test;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/1/5.
 */
public class Client {

 @Test
 public void test() {
  HttpClient httpClient = new HttpClient();
  new Thread(new Sender(httpClient)).start();

 }

}

class Sender implements Runnable {
 public static AtomicInteger count = new AtomicInteger(0);
 HttpClient httpClient;

 public Sender(HttpClient client) {
  httpClient = client;
 }

 public void run() {
   try {
    System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
    PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
    post.addParameter("msg", "Hello world!");
    httpClient.executeMethod(post);
    System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());

   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }

这里面用了HttpClient 来向服务器发送Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。

5、项目源码:Crawl-Page_jb51.rar

 以上就是Java訊息佇列-Spring整合ActiveMq的詳解的內容,更多相關內容請關注PHP中文網(www.php.cn)!


#
陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn