Home >Java >javaTutorial >Using HornetQ for message processing in Java API development
Using HornetQ for message processing in Java API development
With the rapid development of the Internet, a large number of information interactions have emerged, and message queues have become an important means to solve problems such as high concurrency, high availability, and asynchronous processing. HornetQ is a high-performance, high-availability open source messaging middleware based on the JMS protocol developed by JBoss. This article will introduce how to use HornetQ for message processing in Java API development.
1. Quick Start
The official website of HornetQ (http://hornetq.apache.org/downloads.html) provides Download packages in multiple formats, choose HornetQ-2.4.0.Final-bin.tar.gz here.
After the download is complete, unzip HornetQ-2.4.0.Final-bin.tar.gz to a local folder.
Enter the bin directory of HornetQ and execute the following command:
./run.sh
The following log appears The information indicates that the HornetQ service was successfully started:
11:14:21,867 INFO [ServerImpl] Starting HornetQ Server
11:14:21,986 INFO [JournalStorageManager] Using NIO Journal
11:14:22,626 INFO [NettyAcceptor] Started Netty Acceptor version #{version}
11:14:22,697 INFO [HornetQServerImpl] HornetQ Server version #{version} [${name}] started
Put HornetQ's hornetq-console.war into Tomcat's webapps directory, start Tomcat, and access the HornetQ console through http://localhost:8080/hornetq-console.
2. The use of HornetQ
HornetQ’s publish and subscribe model is based on Topic, and the publishing end publishes to a certain Topic message, and multiple receivers can subscribe to this Topic at the same time, and the receiver can receive messages published by multiple publishers.
(1) Message publishing end
First create a publishing end (Publisher) to send messages, the code is as follows:
public class Publisher { public static void main(String[] args) throws Exception { // 初始化连接工厂等配置信息 ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 发送消息 JMSProducer producer = jmsContext.createProducer(); Destination destination = HornetQJMSClient.createTopic("exampleTopic"); producer.send(destination, "Hello, HornetQ!"); // 关闭连接 jmsContext.close(); } }
(2) Message receiving end
Create another receiver (Subscriber) to receive the message and print it out. The code is as follows:
public class Subscriber { public static void main(String[] args) throws Exception { // 初始化连接工厂等配置信息 ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 创建消费者 Destination destination = HornetQJMSClient.createTopic("exampleTopic"); JMSConsumer consumer = jmsContext.createConsumer(destination); // 接收消息并打印 String message = null; do { message = consumer.receiveBody(String.class, 1000); System.out.println("Received message: " + message); } while (message != null); // 关闭连接 jmsContext.close(); } }
After running the publisher and receiver, you can view the messages sent by the publisher on the HornetQ console , as shown in the figure below:
HornetQ supports persistent storage of messages, which means that even if HornetQ is down, the message can be guaranteed to be persisted. lost.
(1) Sender
We need to set the persistence of the message to DeliveryMode.PERSISTENT, as follows:
public class Publisher { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 设定持久性 JMSProducer producer = jmsContext.createProducer(); destination = HornetQJMSClient.createTopic("exampleTopic"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 发送消息 producer.send(destination, "Hello, HornetQ!"); jmsContext.close(); } }
(2) Receiver
HornetQ has persisted messages by default, so there is no need to perform specific configuration on the receiver side. Just continue to use the Subscriber class in the previous section.
HornetQ features high availability and can be run in cluster mode to ensure message reliability and high concurrency. The following are the steps to implement HornetQ cluster mode:
(1) Copy the HornetQ directory and create a new folder
Copy the HornetQ directory and rename it to HornetQ2, and then create a new file named cluster folder, and copy all the data directory, log directory, tmp directory and other folders under the HornetQ2 directory to the cluster folder.
(2) Modify the configuration file
In the examples/configs/clustered configuration folder under the HornetQ directory, copy the hq-configuration.xml file and server0 and server1 folders to the HornetQ2 directory , and modify the hornetq-configuration.xml file in the server0 folder as follows:
(a) Modify the node name to server0
(b) Change the cluster-connections in Change server-username and server-password to "guest"
(c) Modify the connector address to the local IP address, such as 192.168.1.1
(d) Change use under jms-configuration -ha is set to true
As shown below:
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> <cluster-password>guest</cluster-password> <paging-directory>${data.dir:../data}/paging</paging-directory> <bindings-directory>${data.dir:../data}/bindings</bindings-directory> <journal-directory>${data.dir:../data}/journal</journal-directory> <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> <journal-type>NIO</journal-type> <journal-datasync>true</journal-datasync> <journal-min-files>2</journal-min-files> <journal-pool-files>10</journal-pool-files> <journal-file-size>10240</journal-file-size> <journal-buffer-timeout>28000</journal-buffer-timeout> <journal-max-io>1</journal-max-io> <disk-scan-period>5000</disk-scan-period> <max-disk-usage>90</max-disk-usage> <critical-analyzer>true</critical-analyzer> <critical-analyzer-timeout>120000</critical-analyzer-timeout> <critical-analyzer-check-period>60000</critical-analyzer-check-period> <critical-analyzer-policy>HALT</critical-analyzer-policy> <page-sync-timeout>1628000</page-sync-timeout> <global-max-size>100Mb</global-max-size> <connectors> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="192.168.1.1"/> <param key="port" value="5445"/> </connector> </connectors> <acceptors> <acceptor name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="host" value="192.168.1.1"/> <param key="port" value="5545"/> </acceptor> </acceptors> <cluster-connections> <cluster-connection name="my-cluster"> <address>jms</address> <connector-ref>netty</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>true</forward-when-no-consumers> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> <static-connectors> <connector-ref>netty</connector-ref> </static-connectors> </cluster-connection> </cluster-connections> <ha-policy> <replication> <slave> <allow-failback>true</allow-failback> <failback-delay>5000</failback-delay> <max-saved-replicated-journals-size>1000000</max-saved-replicated-journals-size> <restart-backup>true</restart-backup> </slave> </replication> </ha-policy> </configuration>
Then modify the hornetq-configuration.xml file in the server1 folder in the same way, changing server0 to server1.
(3) Start HornetQ
Execute the run.sh command in the bin directory of HornetQ and HornetQ2 to start the HornetQ process. At this time, the two HornetQ nodes form a cluster, which can be used through HornetQ Check the console.
3. Summary
Through the introduction of this article, we have learned about the basic use of HornetQ and the configuration method of cluster mode. Using HornetQ can easily solve the problem of message interaction and improve the robustness and concurrency of the system. At the same time, HornetQ also supports multiple messaging modes, rich message persistence mechanisms, extension plug-ins and other features, which can be selected and configured according to actual needs.
The above is the detailed content of Using HornetQ for message processing in Java API development. For more information, please follow other related articles on the PHP Chinese website!