Java API 開發中使用 HornetQ 進行訊息處理
隨著互聯網的快速發展,大量的資訊互動湧現出來,訊息佇列成為解決高並發、高可用、非同步處理等問題的重要手段。 HornetQ 是由 JBoss 開發的基於 JMS 協定的高效能、高可用的開源訊息中間件。本文將介紹如何在 Java API 開發中使用 HornetQ 進行訊息處理。
一、快速入門
HornetQ的官網(http://hornetq.apache.org/downloads.html)提供了多種格式的下載包,這裡選擇HornetQ-2.4.0.Final-bin.tar.gz。
下載完成後,將 HornetQ-2.4.0.Final-bin.tar.gz 解壓縮到本機資料夾。
進入HornetQ 的bin 目錄,執行下列指令:
./run.sh
#出現下列日誌訊息則表示成功啟動HornetQ 服務:
11:14:21,867 INFO [ServerImpl] Starting HornetQ Server
11:14:21,986 INFO [JournalStorageManagerJournal] U 11:14:21,986 INFO [JournalStorageManagerJournal] U 11:14:21,986 INFO [JournalStorageManagerJournal] UFIO 1212129:149:1499:142129:142129:2121299:2121299:21299:2129:14999:2129:142129:14] [NettyAcceptor] Started Netty Acceptor version #{version}
11:14:22,697 INFO [HornetQServerImpl] HornetQ Server version #{version} [${name}] started
public class Publisher { public static void main(String[] args) throws Exception { // 初始化连接工厂等配置信息 ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 发送消息 JMSProducer producer = jmsContext.createProducer(); Destination destination = HornetQJMSClient.createTopic("exampleTopic"); producer.send(destination, "Hello, HornetQ!"); // 关闭连接 jmsContext.close(); } }(2)訊息接收端再建立一個接收端(Subscriber)去接收訊息,並將訊息列印出來,程式碼如下:
public class Subscriber { public static void main(String[] args) throws Exception { // 初始化连接工厂等配置信息 ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 创建消费者 Destination destination = HornetQJMSClient.createTopic("exampleTopic"); JMSConsumer consumer = jmsContext.createConsumer(destination); // 接收消息并打印 String message = null; do { message = consumer.receiveBody(String.class, 1000); System.out.println("Received message: " + message); } while (message != null); // 关闭连接 jmsContext.close(); } }在運行發布端和接收端之後,可以在HornetQ 控制台上查看發佈端發送的訊息,如下圖所示:
public class Publisher { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName())); JMSContext jmsContext = connectionFactory.createContext(); // 设定持久性 JMSProducer producer = jmsContext.createProducer(); destination = HornetQJMSClient.createTopic("exampleTopic"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 发送消息 producer.send(destination, "Hello, HornetQ!"); jmsContext.close(); } }(2)接收者HornetQ 預設已將訊息持久化存儲,因此不需要在接收者端進行特定的配置,繼續使用上一節中的Subscriber 類別即可。
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> <cluster-password>guest</cluster-password> <paging-directory>${data.dir:../data}/paging</paging-directory> <bindings-directory>${data.dir:../data}/bindings</bindings-directory> <journal-directory>${data.dir:../data}/journal</journal-directory> <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> <journal-type>NIO</journal-type> <journal-datasync>true</journal-datasync> <journal-min-files>2</journal-min-files> <journal-pool-files>10</journal-pool-files> <journal-file-size>10240</journal-file-size> <journal-buffer-timeout>28000</journal-buffer-timeout> <journal-max-io>1</journal-max-io> <disk-scan-period>5000</disk-scan-period> <max-disk-usage>90</max-disk-usage> <critical-analyzer>true</critical-analyzer> <critical-analyzer-timeout>120000</critical-analyzer-timeout> <critical-analyzer-check-period>60000</critical-analyzer-check-period> <critical-analyzer-policy>HALT</critical-analyzer-policy> <page-sync-timeout>1628000</page-sync-timeout> <global-max-size>100Mb</global-max-size> <connectors> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="192.168.1.1"/> <param key="port" value="5445"/> </connector> </connectors> <acceptors> <acceptor name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="host" value="192.168.1.1"/> <param key="port" value="5545"/> </acceptor> </acceptors> <cluster-connections> <cluster-connection name="my-cluster"> <address>jms</address> <connector-ref>netty</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>true</forward-when-no-consumers> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> <static-connectors> <connector-ref>netty</connector-ref> </static-connectors> </cluster-connection> </cluster-connections> <ha-policy> <replication> <slave> <allow-failback>true</allow-failback> <failback-delay>5000</failback-delay> <max-saved-replicated-journals-size>1000000</max-saved-replicated-journals-size> <restart-backup>true</restart-backup> </slave> </replication> </ha-policy> </configuration>然後再按照同樣的方式修改server1 資料夾中的hornetq-configuration.xml 文件,其中將server0 改為server1。 (3)啟動HornetQ依序在HornetQ 和HornetQ2 的bin 目錄下執行run.sh 指令啟動HornetQ 進程,此時兩個HornetQ 節點即形成了一個集群,可以透過HornetQ控制台查看。 三、總結透過本文的介紹,我們了解了 HornetQ 的基本使用和群集模式的配置方法。使用 HornetQ 可以輕鬆解決訊息互動的問題,提升系統的健全性和並發能力。同時,HornetQ 也支援多種訊息傳遞模式、豐富的訊息持久化機制和擴充插件等特性,可根據實際需求進行選擇和配置。 ###
以上是Java API 開發中使用 HornetQ 進行訊息處理的詳細內容。更多資訊請關注PHP中文網其他相關文章!