Home  >  Article  >  Java  >  Using HornetQ for message processing in Java API development

Using HornetQ for message processing in Java API development

PHPz
PHPzOriginal
2023-06-17 23:27:091094browse

Using HornetQ for message processing in Java API development

With the rapid development of the Internet, a large number of information interactions have emerged, and message queues have become an important means to solve problems such as high concurrency, high availability, and asynchronous processing. HornetQ is a high-performance, high-availability open source messaging middleware based on the JMS protocol developed by JBoss. This article will introduce how to use HornetQ for message processing in Java API development.

1. Quick Start

  1. Download HornetQ

The official website of HornetQ (http://hornetq.apache.org/downloads.html) provides Download packages in multiple formats, choose HornetQ-2.4.0.Final-bin.tar.gz here.

  1. Installing HornetQ

After the download is complete, unzip HornetQ-2.4.0.Final-bin.tar.gz to a local folder.

  1. Start HornetQ

Enter the bin directory of HornetQ and execute the following command:

 ./run.sh

The following log appears The information indicates that the HornetQ service was successfully started:

 11:14:21,867 INFO [ServerImpl] Starting HornetQ Server
 11:14:21,986 INFO [JournalStorageManager] Using NIO Journal
 11:14:22,626 INFO [NettyAcceptor] Started Netty Acceptor version #{version}
 11:14:22,697 INFO [HornetQServerImpl] HornetQ Server version #{version} [${name}] started

  1. Deploy HornetQ console

Put HornetQ's hornetq-console.war into Tomcat's webapps directory, start Tomcat, and access the HornetQ console through http://localhost:8080/hornetq-console.

2. The use of HornetQ

  1. Publishing and receiving messages

HornetQ’s publish and subscribe model is based on Topic, and the publishing end publishes to a certain Topic message, and multiple receivers can subscribe to this Topic at the same time, and the receiver can receive messages published by multiple publishers.

(1) Message publishing end

First create a publishing end (Publisher) to send messages, the code is as follows:

public class Publisher {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 发送消息
        JMSProducer producer = jmsContext.createProducer();
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.send(destination, "Hello, HornetQ!");

        // 关闭连接
        jmsContext.close();
    }
}

(2) Message receiving end

Create another receiver (Subscriber) to receive the message and print it out. The code is as follows:

public class Subscriber {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 创建消费者
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        JMSConsumer consumer = jmsContext.createConsumer(destination);

        // 接收消息并打印
        String message = null;
        do {
            message = consumer.receiveBody(String.class, 1000);
            System.out.println("Received message: " + message);
        } while (message != null);

        // 关闭连接
        jmsContext.close();
    }
}

After running the publisher and receiver, you can view the messages sent by the publisher on the HornetQ console , as shown in the figure below:

  1. Message persistence

HornetQ supports persistent storage of messages, which means that even if HornetQ is down, the message can be guaranteed to be persisted. lost.

(1) Sender

We need to set the persistence of the message to DeliveryMode.PERSISTENT, as follows:

public class Publisher {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 设定持久性
        JMSProducer producer = jmsContext.createProducer();
        destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 发送消息
        producer.send(destination, "Hello, HornetQ!");

        jmsContext.close();
    }
}

(2) Receiver

HornetQ has persisted messages by default, so there is no need to perform specific configuration on the receiver side. Just continue to use the Subscriber class in the previous section.

  1. Cluster Mode

HornetQ features high availability and can be run in cluster mode to ensure message reliability and high concurrency. The following are the steps to implement HornetQ cluster mode:

(1) Copy the HornetQ directory and create a new folder

Copy the HornetQ directory and rename it to HornetQ2, and then create a new file named cluster folder, and copy all the data directory, log directory, tmp directory and other folders under the HornetQ2 directory to the cluster folder.

(2) Modify the configuration file

In the examples/configs/clustered configuration folder under the HornetQ directory, copy the hq-configuration.xml file and server0 and server1 folders to the HornetQ2 directory , and modify the hornetq-configuration.xml file in the server0 folder as follows:

  (a) Modify the node name to server0

  (b) Change the cluster-connections in Change server-username and server-password to "guest"

 (c) Modify the connector address to the local IP address, such as 192.168.1.1

 (d) Change use under jms-configuration -ha is set to true

As shown below:

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
    <cluster-password>guest</cluster-password>
    <paging-directory>${data.dir:../data}/paging</paging-directory>
    <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
    <journal-directory>${data.dir:../data}/journal</journal-directory>
    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
    <journal-type>NIO</journal-type>
    <journal-datasync>true</journal-datasync>
    <journal-min-files>2</journal-min-files>
    <journal-pool-files>10</journal-pool-files>
    <journal-file-size>10240</journal-file-size>
    <journal-buffer-timeout>28000</journal-buffer-timeout>
    <journal-max-io>1</journal-max-io>
    <disk-scan-period>5000</disk-scan-period>
    <max-disk-usage>90</max-disk-usage>
    <critical-analyzer>true</critical-analyzer>
    <critical-analyzer-timeout>120000</critical-analyzer-timeout>
    <critical-analyzer-check-period>60000</critical-analyzer-check-period>
    <critical-analyzer-policy>HALT</critical-analyzer-policy>
    <page-sync-timeout>1628000</page-sync-timeout>
    <global-max-size>100Mb</global-max-size>
    <connectors>
        <connector name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5445"/>
        </connector>
    </connectors>
    <acceptors>
        <acceptor name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5545"/>
        </acceptor>
    </acceptors>
    <cluster-connections>
        <cluster-connection name="my-cluster">
            <address>jms</address>
            <connector-ref>netty</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <forward-when-no-consumers>true</forward-when-no-consumers>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="my-discovery-group"/>
            <static-connectors>
                <connector-ref>netty</connector-ref>
            </static-connectors>
        </cluster-connection>
    </cluster-connections>
    <ha-policy>
        <replication>
            <slave>
                <allow-failback>true</allow-failback>
                <failback-delay>5000</failback-delay>
                <max-saved-replicated-journals-size>1000000</max-saved-replicated-journals-size>
                <restart-backup>true</restart-backup>
            </slave>
        </replication>
    </ha-policy>
</configuration>

Then modify the hornetq-configuration.xml file in the server1 folder in the same way, changing server0 to server1.

(3) Start HornetQ

Execute the run.sh command in the bin directory of HornetQ and HornetQ2 to start the HornetQ process. At this time, the two HornetQ nodes form a cluster, which can be used through HornetQ Check the console.

3. Summary

Through the introduction of this article, we have learned about the basic use of HornetQ and the configuration method of cluster mode. Using HornetQ can easily solve the problem of message interaction and improve the robustness and concurrency of the system. At the same time, HornetQ also supports multiple messaging modes, rich message persistence mechanisms, extension plug-ins and other features, which can be selected and configured according to actual needs.

The above is the detailed content of Using HornetQ for message processing in Java API development. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn