Heim >Java >javaLernprogramm >Detaillierte Erläuterung der Java-Nachrichtenwarteschlange zur Spring-Integration mit ActiveMq

Detaillierte Erläuterung der Java-Nachrichtenwarteschlange zur Spring-Integration mit ActiveMq

黄舟
黄舟Original
2017-03-07 10:01:082045Durchsuche

In diesem Artikel wird hauptsächlich die detaillierte Erklärung der Java-Nachrichtenwarteschlangen-Spring-Integration mit ActiveMq vorgestellt. Der Herausgeber findet ihn recht gut, daher werde ich ihn jetzt mit Ihnen teilen und als Referenz verwenden. Folgen wir dem Herausgeber, um einen Blick darauf zu werfen

1. Übersicht

Lassen Sie uns zunächst mit Ihnen einen Blick auf den Java-Messaging-Dienst werfen . Im vorherigen Blog „Java Message Queue-JMS Overview“ habe ich für alle analysiert:

1. Nachrichtendienst: Eine Middleware, die zur Lösung der Kopplung zwischen zwei oder mehreren Programmen verwendet wird implementiert durch Java.

2. Vorteile: asynchron, zuverlässig

3. Nachrichtenmodell: Punkt-zu-Punkt, veröffentlichen/abonnieren

4. Objekte in JMS

Dann haben wir in einem anderen Blog „Java Message Queue-ActiveMq in der Praxis“ ein ActiveMq-Projekt von 0 auf 1 gestartet. Während des Projektentwicklungsprozesses haben wir ein gewisses Verständnis von ActiveMq verstanden:

1. Schreiben Sie Clients in mehreren Sprachen und Protokollen. Sprachen: Java, C, C++, C#, Ruby, Perl, Python, PHP. Anwendungsprotokolle: OpenWire, Stomp REST, WS Notification, 🎜>3. ActiveMQ kann problemlos in Systeme mit Spring eingebettet werden und unterstützt auch die Funktionen von Spring 2.0

4 Tests von J2EE-Servern (wie Geronimo, JBoss 4, GlassFish, WebLogic), bei denen ActiveMQ durch die Konfiguration von JCA 1.5-Ressourcenadaptern automatisch auf jedem kommerziellen Server bereitgestellt werden kann, der mit J2EE 1.4

5 kompatibel ist . Unterstützt mehrere Übertragungsprotokolle: In-VM, TCP, SSL, NIO, UDP, JGroups, JXTA

6. Unterstützt Hochgeschwindigkeits-Nachrichtenpersistenz durch JDBC und Journal

7. Das Design gewährleistet leistungsstarkes Clustering, Client-Server, Punkt-zu-Punkt

Unterstützt die Integration mit Axis

10. Sie können den eingebetteten JMS-Anbieter ganz einfach zum Testen anrufen

Im nächsten Blog werde ich mit Ihnen an der Integration von Spring und ActiveMq arbeiten Spring+JMS+ActiveMQ+Tomcat implementiert asynchrone Punkt-zu-Punkt-Warteschlangennachrichten und das PUB/SUB-Modell (Publish/Subscribe). Es handelt sich um ein einfaches Beispiel und beinhaltet kein Geschäft.


2. Verzeichnisstruktur

2.1 Projektverzeichnis

IDE ausgewählte IDEA (für jeden empfohlen), in der Reihenfolge Um das Herunterladen von JAR zu vermeiden, erstellt die unterste Ebene ein Projekt, das Spring und ActiveMq integriert

  2.2 pom

Weil Die pom.xml-Datei hier ist etwas lang, ich werde sie nicht erweitern.

Wir können sehen, dass es nur wenige Abhängigkeiten gibt: 1. Spring-Core-Abhängigkeit 2. ActiveMq-Kern und -Pool (wenn sich die Schüler hier für den Import von JAR entscheiden, können sie das in unserem vorherigen Blog erwähnte ActiveMq direkt importieren -alle (dieses JAR-Paket) 3. Java-Servlet-bezogene Abhängigkeiten

Die Abhängigkeitsversion des ActiveMq-Pools, die wir hier auswählen, bezieht sich auf das nachfolgende DTD, und die Version muss übereinstimmen, sodass die Schüler warten müssen, bis sie ActiveMq konfigurieren Datei. Achten Sie auf die Auswahl der dtd-Version


2.3 web.xml

web.xml ist ebenfalls ähnlich. Geben Sie die Spring-Konfigurationsdatei, den SpringMvc-Namen und das Codierungsformat an

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://www.php.cn/">
 <modelVersion>4.0.0</modelVersion>
 <groupId>Crawl-Page</groupId>
 <artifactId>Crawl-Page</artifactId>
 <packaging>war</packaging>
 <version>1.0-SNAPSHOT</version>
 <name>Crawl-Page Maven Webapp</name>
 <url>http://www.php.cn/;/url>
 <!-- 版本管理 -->
 <properties>
 <springframework>4.1.8.RELEASE</springframework>
 </properties>

 <dependencies>
 <dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.10</version>
  <scope>test</scope>
 </dependency>


 <!-- JSP相关 -->
 <dependency>
 <groupId>jstl</groupId>
 <artifactId>jstl</artifactId>
 <version>1.2</version>
 </dependency>
 <dependency>
 <groupId>javax.servlet</groupId>
 <artifactId>servlet-api</artifactId>
 <scope>provided</scope>
 <version>2.5</version>
 </dependency>


 <!-- spring -->
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-tx</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-webmvc</artifactId>
  <version>${springframework}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>${springframework}</version>
 </dependency>
 <!-- xbean 如<amq:connectionFactory /> -->
 <dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>3.16</version>
 </dependency>

 <!-- activemq -->
 <dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
 </dependency>
 <dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>5.12.1</version>
 </dependency>

 <!-- 自用jar包,可以忽略-->
 <dependency>
  <groupId>commons-httpclient</groupId>
  <artifactId>commons-httpclient</artifactId>
  <version>3.1</version>
 </dependency>
 </dependencies>

 <build>
 <finalName>Crawl-Page</finalName>
 <plugins>
  <plugin>
  <groupId>org.apache.tomcat.maven</groupId>
  <artifactId>tomcat7-maven-plugin</artifactId>
  <configuration>
   <port>8080</port>
   <path>/</path>
  </configuration>
  </plugin>
 </plugins>
 </build>

</project>

2.4 SpringMvc und applicationContext.xml

Der SpringMVC hier ist nichts Besonderes. Studierende in Not können sich darauf beziehen:

applicationContext.xml wird hauptsächlich zum Laden von Beans verwendet. In unserem Projekt gibt es keine speziellen Java Beans, daher wird es nur zur Angabe des Paketscanpfads verwendet:
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
   http://www.php.cn/"
   version="3.0">

 <display-name>Archetype Created Web Application</display-name>

 <!-- 加载spring的配置文件,例如hibernate、jms等集成 -->
 <context-param>
 <param-name>contextConfigLocation</param-name>
 <param-value>
  classpath:applicationContext*.xml;
 </param-value>
 </context-param>

 <listener>
 <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
 </listener>

 <servlet>
 <servlet-name>springMVC</servlet-name>
 <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
 <init-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>classpath:spring-mvc.xml</param-value>
 </init-param>
 <load-on-startup>1</load-on-startup>
 </servlet>
 <servlet-mapping>
 <servlet-name>springMVC</servlet-name>
 <url-pattern>/</url-pattern>
 </servlet-mapping>

 <!-- 处理编码格式 -->
 <filter>
 <filter-name>characterEncodingFilter</filter-name>
 <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
 <init-param>
  <param-name>encoding</param-name>
  <param-value>UTF-8</param-value>
 </init-param>
 <init-param>
  <param-name>forceEncoding</param-name>
  <param-value>true</param-value>
 </init-param>
 </filter>
 <filter-mapping>
 <filter-name>characterEncodingFilter</filter-name>
 <url-pattern>/*</url-pattern>
 </filter-mapping>

</web-app>

2.5 applicationContext-ActiveMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.php.cn/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:aop="http://www.springframework.org/schema/aop"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xmlns:tx="http://www.springframework.org/schema/tx"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/aop
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/">

 <!-- 启用MVC注解 -->
 <mvc:annotation-driven />
 <!-- 指定Sping组件扫描的基本包路径 -->
 <context:component-scan base-package="com.Jayce" >
  <!-- 这里只扫描Controller,不可重复加载Service -->
  <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
 </context:component-scan>

 <!-- JSP视图解析器-->
 <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
  <property name="prefix" value="/WEB-INF/views/" />
  <property name="suffix" value=".jsp" />
  <!-- 定义其解析视图的order顺序为1 -->
  <property name="order" value="1" />
 </bean>
</beans>


Hier erklären wir diese Konfigurationsdatei, wenn Sie sie anhand der obigen Konfigurationsdatei verstehen und überspringen können. Studierende können es auch auf der offiziellen Website von ActiveMQ ansehen.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xsi:schemaLocation="
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/">

 <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
 <!-- 配置扫描路径 -->
 <context:component-scan base-package="com.Jayce">
  <!-- 只扫描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加载 -->
  <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
 </context:component-scan>

</beans>
1. DTD in ActiveMq, bevor wir die relevante Konfiguration deklarieren, müssen wir die DTD in ActiveMq importieren, sonst versteht Spring nicht, was unsere Tags bedeuten.

Wir haben die Version von activemq in der pom-Verbindungsfabrik konfiguriert: Ein sehr einfaches Konfigurationselement, das zum Konfigurieren der Adresse, des Benutzernamens und des Passworts unserer Link-Factory verwendet wird. Hier ist die Auswahl des TCP zu beachten Verbindung anstelle der http-Verbindung


3. jmsTemplate: eine wichtigere Konfiguration, hier werden die Verbindungsfabrik, das Standardziel zum Senden von Nachrichten, die Verbindungsdauer und die Nachrichtenveröffentlichungsmethode angegeben

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xsi:schemaLocation="
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/
  http://www.php.cn/"
>

 <context:component-scan base-package="com.Jayce" />
 <mvc:annotation-driven />

 <amq:connectionFactory id="amqConnectionFactory"
       brokerURL="tcp://192.168.148.128:61616"
       userName="admin"
       password="admin" />

 <!-- 配置JMS连接工长 -->
 <bean id="connectionFactory"
   class="org.springframework.jms.connection.CachingConnectionFactory">
  <constructor-arg ref="amqConnectionFactory" />
  <property name="sessionCacheSize" value="100" />
 </bean>

 <!-- 定义消息队列(Queue) -->
 <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  <!-- 设置消息队列的名字 -->
  <constructor-arg>
   <value>Jaycekon</value>
  </constructor-arg>
 </bean>

 <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="defaultDestination" ref="demoQueueDestination" />
  <property name="receiveTimeout" value="10000" />
  <!-- true是topic,false是queue,默认是false,此处显示写出false -->
  <property name="pubSubDomain" value="false" />
 </bean>


 <!-- 配置消息队列监听者(Queue) -->
 <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

 <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
 <bean id="queueListenerContainer"
   class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="demoQueueDestination" />
  <property name="messageListener" ref="queueMessageListener" />
 </bean>

</beans>
3. Projektstruktur

3.1 ProducerService

Machen Sie den Nachrichtenproduzenten zu einem Wenn wir eine Nachricht senden müssen, müssen wir nur die ProducerService-Instanz aufrufen. Die sendMessage-Methode kann eine Nachricht an das Standardziel senden.

Hier stehen zwei Sendemethoden zur Verfügung: Eine besteht darin, an das Standardziel zu senden, und die andere darin, die Nachricht basierend auf dem Ziel zu senden. 3.2 ConsumerService


因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmsTemplate中的 receive 方法,就可以从里面获取到一条消息。

再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样ActiveMQ中才会确定这条消息已经被正确读取了。而整合了Spring之后,事务将由Spring 来管理。

3.3 MessageController

package com.Jayce.Controller;

import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Controller
public class MessageController {
 private Logger logger = LoggerFactory.getLogger(MessageController.class);
 @Resource(name = "demoQueueDestination")
 private Destination destination;

 //队列消息生产者
 @Resource(name = "producerService")
 private ProducerService producer;

 //队列消息消费者
 @Resource(name = "consumerService")
 private ConsumerService consumer;

 @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
 @ResponseBody
 public void send(String msg) {
  logger.info(Thread.currentThread().getName()+"------------send to jms Start");
  producer.sendMessage(msg);
  logger.info(Thread.currentThread().getName()+"------------send to jms End");
 }

 @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
 @ResponseBody
 public Object receive(){
  logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
  TextMessage tm = consumer.receive(destination);
  logger.info(Thread.currentThread().getName()+"------------receive from jms End");
  return tm;
 }

}

控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。

现在服务层和控制层都好了,接下来我们就进行一个简单的测试

 4、项目测试

4.1 启动ActiveMq

先确定你的ActiveMQ服务已经开启。

    

4.2 启动项目

项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有需要的同学可以使用一下。

<plugins>
  <plugin>
  <groupId>org.apache.tomcat.maven</groupId>
  <artifactId>tomcat7-maven-plugin</artifactId>
  <configuration>
   <port>8080</port>
   <path>/</path>
  </configuration>
  </plugin>
</plugins>

4.3 发送消息

这里用了Chrome 的一个插件PostMan 有兴趣的同学可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!

    

我们发送了一个post 请求之后,看一下服务器的效果:

 

我们可以看到,已经向队列发送了一条消息。我们看一下ActiveMq现在的状态:

 

我们可以看到,一条消息已经成功发送到了ActiveMq中。

4.4 接收消息

使用get请求访问服务器后台:

  

服务的输出:

 

ActiveMq服务器状态:

 

我们可以看到,消费者已经消费了一条信息,并且没有断开与ActiveMq之间的链接。

4.5 监听器

在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到ActiveMq了,可以用一个Redis 就足够了。

不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。

4.5.1 applicationContext-ActiveMQ.xml 配置

在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。

  

 <!-- 配置消息队列监听者(Queue) -->
 <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

 <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
 <bean id="queueListenerContainer"
   class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory" />
  <property name="destination" ref="demoQueueDestination" />
  <property name="messageListener" ref="queueMessageListener" />
 </bean>

4.5.2 MessageListener

我们需要创建一个类实现MessageListener 接口:

package com.Jayce.Filter;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
public class QueueMessageListener implements MessageListener {
 public void onMessage(Message message) {
  TextMessage tm = (TextMessage) message;
  try {
   System.out.println("QueueMessageListener监听到了文本消息:\t"
     + tm.getText());
   //do something ...
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}

实现接口的onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。 

4.5.3 测试

和上面一样,使用postMan 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:

    

再看看ActiveMQ服务器的状态:

 

我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。

这样子一整个项目下来,我们已经成功的整合了Spring和ActiveMQ。

4.6 压力测试

这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:

package com.Jaycekon.test;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/1/5.
 */
public class Client {

 @Test
 public void test() {
  HttpClient httpClient = new HttpClient();
  new Thread(new Sender(httpClient)).start();

 }

}

class Sender implements Runnable {
 public static AtomicInteger count = new AtomicInteger(0);
 HttpClient httpClient;

 public Sender(HttpClient client) {
  httpClient = client;
 }

 public void run() {
   try {
    System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
    PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
    post.addParameter("msg", "Hello world!");
    httpClient.executeMethod(post);
    System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());

   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }

这里面用了HttpClient 来向服务器发送Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。

5、项目源码:Crawl-Page_jb51.rar

Das Obige ist die detaillierte Erklärung der Integration von ActiveMq in Java Message Queue-Spring. Weitere verwandte Inhalte finden Sie auf der chinesischen PHP-Website (www.php.cn).


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn