首頁  >  文章  >  Java  >  Java API 開發中使用 HornetQ 進行訊息處理

Java API 開發中使用 HornetQ 進行訊息處理

PHPz
PHPz原創
2023-06-17 23:27:091094瀏覽

Java API 開發中使用 HornetQ 進行訊息處理

隨著互聯網的快速發展,大量的資訊互動湧現出來,訊息佇列成為解決高並發、高可用、非同步處理等問題的重要手段。 HornetQ 是由 JBoss 開發的基於 JMS 協定的高效能、高可用的開源訊息中間件。本文將介紹如何在 Java API 開發中使用 HornetQ 進行訊息處理。

一、快速入門

  1. 下載HornetQ

HornetQ的官網(http://hornetq.apache.org/downloads.html)提供了多種格式的下載包,這裡選擇HornetQ-2.4.0.Final-bin.tar.gz。

  1. 安裝 HornetQ

下載完成後,將 HornetQ-2.4.0.Final-bin.tar.gz 解壓縮到本機資料夾。

  1. 啟動HornetQ

進入HornetQ 的bin 目錄,執行下列指令:

  ./run.sh

#出現下列日誌訊息則表示成功啟動HornetQ 服務:

  11:14:21,867 INFO [ServerImpl] Starting HornetQ Server
  11:14:21,986 INFO [JournalStorageManagerJournal] U 11:14:21,986 INFO [JournalStorageManagerJournal] U 11:14:21,986 INFO [JournalStorageManagerJournal] UFIO 1212129:149:1499:142129:142129:2121299:2121299:21299:2129:14999:2129:142129:14] [NettyAcceptor] Started Netty Acceptor version #{version}
  11:14:22,697 INFO [HornetQServerImpl] HornetQ Server version #{version} [${name}] started

    部署HornetQ 控制台
將HornetQ 的hornetq-console.war 放入Tomcat 的webapps 目錄下,啟動Tomcat,透過http://localhost:8080/hornetq-console 存取HornetQ 控制台。

二、HornetQ 的使用

    發布和接收訊息
HornetQ 的發布訂閱模式是基於Topic 的,發布端向某個Topic 發布訊息,而多個接收端可以同時訂閱這個Topic,接收端就可以接收到多個發佈端發佈的訊息。

(1)訊息發布端

先建立一個發布端(Publisher)傳送訊息,程式碼如下:

public class Publisher {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 发送消息
        JMSProducer producer = jmsContext.createProducer();
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.send(destination, "Hello, HornetQ!");

        // 关闭连接
        jmsContext.close();
    }
}

(2)訊息接收端

再建立一個接收端(Subscriber)去接收訊息,並將訊息列印出來,程式碼如下:

public class Subscriber {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 创建消费者
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        JMSConsumer consumer = jmsContext.createConsumer(destination);

        // 接收消息并打印
        String message = null;
        do {
            message = consumer.receiveBody(String.class, 1000);
            System.out.println("Received message: " + message);
        } while (message != null);

        // 关闭连接
        jmsContext.close();
    }
}

在運行發布端和接收端之後,可以在HornetQ 控制台上查看發佈端發送的訊息,如下圖所示:

    訊息持久化
HornetQ 支援將訊息進行持久化存儲,這意味著即使HornetQ 宕機了,也能保證訊息不丟失。

(1)發送者

我們需要將訊息的持久性設定為DeliveryMode.PERSISTENT,如下所示:

public class Publisher {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 设定持久性
        JMSProducer producer = jmsContext.createProducer();
        destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 发送消息
        producer.send(destination, "Hello, HornetQ!");

        jmsContext.close();
    }
}

(2)接收者

HornetQ 預設已將訊息持久化存儲,因此不需要在接收者端進行特定的配置,繼續使用上一節中的Subscriber 類別即可。

    叢集模式
HornetQ 具有高可用性的特點,可以以叢集模式運行,以確保訊息的可靠性和高並發性。以下是實現HornetQ 叢集模式的步驟:

(1)拷貝HornetQ 目錄並新建資料夾

將HornetQ 目錄拷貝一份並重新命名為HornetQ2,然後新建一個名為cluster 的文件夾,並將HornetQ2 目錄下的data 目錄、log 目錄、tmp 目錄等資料夾全部複製到cluster 資料夾下。

(2)修改設定檔

在HornetQ 目錄下的examples/configs/clustered 設定檔

在HornetQ 目錄下的examples/configs/clustered 設定檔夾中,將hq-configuration.xml 檔案及server0 和server1 資料夾複製到HornetQ2 目錄中,並按照下面的方式修改server0 資料夾中的hornetq-configuration.xml 檔案:

#  (a)將節點名稱修改為server0

  (b)將cluster-connections 中的server-username 和server-password 修改為"guest"

  (c)修改connector 位址為本機IP 位址,如192.168.1.1

  (d)將jms-configuration 下的use -ha 設為true

  如下所示:

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
    <cluster-password>guest</cluster-password>
    <paging-directory>${data.dir:../data}/paging</paging-directory>
    <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
    <journal-directory>${data.dir:../data}/journal</journal-directory>
    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
    <journal-type>NIO</journal-type>
    <journal-datasync>true</journal-datasync>
    <journal-min-files>2</journal-min-files>
    <journal-pool-files>10</journal-pool-files>
    <journal-file-size>10240</journal-file-size>
    <journal-buffer-timeout>28000</journal-buffer-timeout>
    <journal-max-io>1</journal-max-io>
    <disk-scan-period>5000</disk-scan-period>
    <max-disk-usage>90</max-disk-usage>
    <critical-analyzer>true</critical-analyzer>
    <critical-analyzer-timeout>120000</critical-analyzer-timeout>
    <critical-analyzer-check-period>60000</critical-analyzer-check-period>
    <critical-analyzer-policy>HALT</critical-analyzer-policy>
    <page-sync-timeout>1628000</page-sync-timeout>
    <global-max-size>100Mb</global-max-size>
    <connectors>
        <connector name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5445"/>
        </connector>
    </connectors>
    <acceptors>
        <acceptor name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5545"/>
        </acceptor>
    </acceptors>
    <cluster-connections>
        <cluster-connection name="my-cluster">
            <address>jms</address>
            <connector-ref>netty</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <forward-when-no-consumers>true</forward-when-no-consumers>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="my-discovery-group"/>
            <static-connectors>
                <connector-ref>netty</connector-ref>
            </static-connectors>
        </cluster-connection>
    </cluster-connections>
    <ha-policy>
        <replication>
            <slave>
                <allow-failback>true</allow-failback>
                <failback-delay>5000</failback-delay>
                <max-saved-replicated-journals-size>1000000</max-saved-replicated-journals-size>
                <restart-backup>true</restart-backup>
            </slave>
        </replication>
    </ha-policy>
</configuration>

然後再按照同樣的方式修改server1 資料夾中的hornetq-configuration.xml 文件,其中將server0 改為server1。

(3)啟動HornetQ

依序在HornetQ 和HornetQ2 的bin 目錄下執行run.sh 指令啟動HornetQ 進程,此時兩個HornetQ 節點即形成了一個集群,可以透過HornetQ控制台查看。

三、總結

透過本文的介紹,我們了解了 HornetQ 的基本使用和群集模式的配置方法。使用 HornetQ 可以輕鬆解決訊息互動的問題,提升系統的健全性和並發能力。同時,HornetQ 也支援多種訊息傳遞模式、豐富的訊息持久化機制和擴充插件等特性,可根據實際需求進行選擇和配置。 ###

以上是Java API 開發中使用 HornetQ 進行訊息處理的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn