>  기사  >  Java  >  Java API 개발에서 메시지 처리를 위해 HornetQ 사용

Java API 개발에서 메시지 처리를 위해 HornetQ 사용

PHPz
PHPz원래의
2023-06-17 23:27:091094검색

Java API 개발에서 메시지 처리에 HornetQ 사용

인터넷의 급속한 발전과 함께 수많은 정보 상호 작용이 등장했으며, 메시지 큐는 높은 동시성, 고가용성, 비동기성과 같은 문제를 해결하는 중요한 수단이 되었습니다. 처리. HornetQ는 JBoss가 개발한 JMS 프로토콜을 기반으로 하는 고성능, 고가용성 오픈 소스 메시징 미들웨어입니다. 이 기사에서는 Java API 개발에서 메시지 처리를 위해 HornetQ를 사용하는 방법을 소개합니다.

1. Quick Start

  1. HornetQ 다운로드

HornetQ 공식 홈페이지(http://hornetq.apache.org/downloads.html)에서는 다양한 형식의 다운로드 패키지를 제공합니다. 최종 -bin.tar.gz.

  1. HornetQ 설치

다운로드가 완료된 후 HornetQ-2.4.0.Final-bin.tar.gz의 압축을 로컬 폴더에 풀어주세요.

  1. Start HornetQ

HornetQ의 bin 디렉터리에 들어가서 다음 명령을 실행합니다.

  ./run.sh

다음 로그 정보는 HornetQ 서비스가 성공적으로 시작되었음을 나타냅니다.

  11:14:21,867 INFO [ServerImpl] HornetQ 서버 시작
 11:14:21,986 INFO [JournalStorageManager] NIO Journal 사용
 11:14:22,626 INFO [NettyAcceptor] Netty Acceptor 버전 #{version}을 시작했습니다
 11:14:22,697 INFO [HornetQServerImpl] HornetQ 서버 버전 #{version} [${name}] 시작됨

  1. HornetQ 콘솔 배포

HornetQ의 Hornetq-console.war을 Tomcat의 webapps 디렉토리에 넣고 Tomcat을 시작한 다음 http://localhost:8080/hornetq-를 통해 HornetQ에 액세스합니다. 콘솔 콘솔.

2. HornetQ 사용

  1. 메시지 게시 및 수신

HornetQ의 게시 및 구독 모델은 특정 주제에 메시지를 게시하고 여러 수신자가 동시에 이 주제를 구독할 수 있습니다. . 여러 게시자가 게시한 메시지를 수신할 수 있습니다.

(1) 메시지 게시자

먼저 메시지를 보낼 게시자(Publisher)를 생성하고 코드는 다음과 같습니다.

public class Publisher {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 发送消息
        JMSProducer producer = jmsContext.createProducer();
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.send(destination, "Hello, HornetQ!");

        // 关闭连接
        jmsContext.close();
    }
}

(2) 메시지 수신자

그런 다음 메시지를 수신하고 보낼 수신자(구독자)를 생성합니다. 메시지를 인쇄하면 코드는 다음과 같습니다.

public class Subscriber {

    public static void main(String[] args) throws Exception {

        // 初始化连接工厂等配置信息
        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 创建消费者
        Destination destination = HornetQJMSClient.createTopic("exampleTopic");
        JMSConsumer consumer = jmsContext.createConsumer(destination);

        // 接收消息并打印
        String message = null;
        do {
            message = consumer.receiveBody(String.class, 1000);
            System.out.println("Received message: " + message);
        } while (message != null);

        // 关闭连接
        jmsContext.close();
    }
}

Publisher와 Receiver를 실행한 후 아래 그림과 같이 HornetQ 콘솔에서 게시자가 보낸 메시지를 볼 수 있습니다.

  1. Message persistence

HornetQ는 메시지 처리 영구 저장을 지원합니다. 즉, HornetQ가 다운되더라도 메시지가 손실되지 않도록 보장할 수 있습니다.

(1) Sender

아래와 같이 메시지 지속성을 DeliveryMode.PERSISTENT로 설정해야 합니다.

public class Publisher {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(NettyConnectorFactory.class.getName()));
        JMSContext jmsContext = connectionFactory.createContext();

        // 设定持久性
        JMSProducer producer = jmsContext.createProducer();
        destination = HornetQJMSClient.createTopic("exampleTopic");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 发送消息
        producer.send(destination, "Hello, HornetQ!");

        jmsContext.close();
    }
}

(2) Receiver

HornetQ는 기본적으로 메시지를 지속하므로 다음 작업을 수행할 필요가 없습니다. 클라이언트 측에서 특정 구성을 수행하려면 이전 섹션의 Subscriber 클래스를 계속 사용하세요.

  1. 클러스터 모드

HornetQ는 고가용성을 제공하며 클러스터 모드에서 실행되어 메시지 안정성과 높은 동시성을 보장할 수 있습니다. HornetQ 클러스터 모드를 구현하는 단계는 다음과 같습니다.

(1) HornetQ 디렉터리를 복사하고 새 폴더를 만듭니다.

HornetQ 디렉터리를 복사하고 이름을 HornetQ2로 변경한 다음, 클러스터라는 새 폴더를 만들고 아래의 폴더를 변경합니다. HornetQ2 디렉터리 모든 데이터 디렉터리, 로그 디렉터리, tmp 디렉터리 및 기타 폴더를 클러스터 폴더에 복사합니다.

(2) 구성 파일 수정

HornetQ 디렉터리 하위의 example/configs/clustered 구성 폴더에서 hq-configuration.xml 파일과 server0, server1 폴더를 HornetQ2 디렉터리에 복사하고 다음과 같이 수정합니다. server0 폴더의 Hornetq-configuration.xml 파일:

   (a) 노드 이름을 server0

  로 변경합니다. (b) 클러스터 연결의 서버 사용자 이름과 서버 비밀번호를 "guest"

  로 변경합니다. (c) 수정 커넥터 주소를 로컬 IP 주소(예: 192.168.1.1

 )로 설정합니다. (d) 아래와 같이 jms-configuration에서 use-ha를 true

 로 설정합니다.

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
    <cluster-password>guest</cluster-password>
    <paging-directory>${data.dir:../data}/paging</paging-directory>
    <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
    <journal-directory>${data.dir:../data}/journal</journal-directory>
    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
    <journal-type>NIO</journal-type>
    <journal-datasync>true</journal-datasync>
    <journal-min-files>2</journal-min-files>
    <journal-pool-files>10</journal-pool-files>
    <journal-file-size>10240</journal-file-size>
    <journal-buffer-timeout>28000</journal-buffer-timeout>
    <journal-max-io>1</journal-max-io>
    <disk-scan-period>5000</disk-scan-period>
    <max-disk-usage>90</max-disk-usage>
    <critical-analyzer>true</critical-analyzer>
    <critical-analyzer-timeout>120000</critical-analyzer-timeout>
    <critical-analyzer-check-period>60000</critical-analyzer-check-period>
    <critical-analyzer-policy>HALT</critical-analyzer-policy>
    <page-sync-timeout>1628000</page-sync-timeout>
    <global-max-size>100Mb</global-max-size>
    <connectors>
        <connector name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5445"/>
        </connector>
    </connectors>
    <acceptors>
        <acceptor name="netty">
            <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
            <param key="host" value="192.168.1.1"/>
            <param key="port" value="5545"/>
        </acceptor>
    </acceptors>
    <cluster-connections>
        <cluster-connection name="my-cluster">
            <address>jms</address>
            <connector-ref>netty</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <forward-when-no-consumers>true</forward-when-no-consumers>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="my-discovery-group"/>
            <static-connectors>
                <connector-ref>netty</connector-ref>
            </static-connectors>
        </cluster-connection>
    </cluster-connections>
    <ha-policy>
        <replication>
            <slave>
                <allow-failback>true</allow-failback>
                <failback-delay>5000</failback-delay>
                <max-saved-replicated-journals-size>1000000</max-saved-replicated-journals-size>
                <restart-backup>true</restart-backup>
            </slave>
        </replication>
    </ha-policy>
</configuration>

그런 다음 동일한 방식으로 server1 폴더를 수정합니다. server0이 server1로 변경된 .xml 파일입니다.

(3) HornetQ 시작

HornetQ 및 HornetQ2의 bin 디렉터리에서 run.sh 명령을 실행하여 HornetQ 프로세스를 시작합니다. 이때 두 개의 HornetQ 노드가 클러스터를 형성하며 이는 HornetQ 콘솔을 통해 볼 수 있습니다.

3. 요약

이번 글의 소개를 통해 HornetQ의 기본적인 사용법과 클러스터 모드의 구성 방법에 대해 알아보았습니다. HornetQ를 사용하면 메시지 상호 작용 문제를 쉽게 해결하고 시스템의 견고성과 동시성을 향상시킬 수 있습니다. 동시에 HornetQ는 실제 필요에 따라 선택하고 구성할 수 있는 다양한 메시징 모드, 풍부한 메시지 지속성 메커니즘, 확장 플러그인 및 기타 기능도 지원합니다.

위 내용은 Java API 개발에서 메시지 처리를 위해 HornetQ 사용의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.