ホームページ  >  記事  >  Java  >  Java でリモート接続して Rabbitmq を呼び出す方法を共有しますか?

Java でリモート接続して Rabbitmq を呼び出す方法を共有しますか?

黄舟
黄舟オリジナル
2018-05-24 11:51:253148ブラウズ

RabbitMQ リモート呼び出しテスト、外部マシン 192.168.174.132 で RabbitMQ を使用します。操作プロセスについては、ブログ投稿「RabbitMQ へのリモート アクセスの問題の解決」を参照してください。

SendTest:

package com.mq.rabbitmq.rabbitmqtest;
  
import java.util.Date;
  
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
  
public class ReceiveTest {
    private final static String QUEUE_NAME = "ftpAgent";
    private final static String userName = "admin";
    private final static String password = "admin";
    private final static String virtualHost = "/";
    private final static int portNumber = 5672;
    private final static String hostName = "master";
    private final static String host = "192.168.174.132";
  
    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException {
  
        ConnectionFactory factory = new ConnectionFactory();
//      factory.setHost("192.168.174.160");
        factory.setUsername(userName);
        factory.setPassword(password);
//      factory.setVirtualHost(virtualHost);
        factory.setHost(host);
        factory.setPort(portNumber);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
//      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
          
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
  
        Date nowTime = new Date();
          
        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          String message = new String(delivery.getBody());
          System.out.println("RecieveTime: " + nowTime);
          System.out.println(" [x] Received '" + message + "'");
        }
  
    }
}

IDEA を開き、Maven プロジェクトを作成します (Java は問題ありません)。

Java でリモート接続して Rabbitmq を呼び出す方法を共有しますか?

pom.xmlファイルは以下の通りです

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.zhenqi</groupId>
 <artifactId>rabbitmq-study</artifactId>
 <version>1.0-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>rabbitmq-study</name>
 <url>http://maven.apache.org</url>

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>

  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  <dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>4.1.0</version>
   <exclusions>
    <exclusion>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-api</artifactId>
    </exclusion>
   </exclusions>
  </dependency>

  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.21</version>
  </dependency>

  <dependency>
   <groupId>commons-lang</groupId>
   <artifactId>commons-lang</artifactId>
   <version>2.6</version>
  </dependency>

 </dependencies>
</project>

rabbitmqにリモートアクセスするには、/etc/rabbitmq/rabbitmq.confを編集し、以下の内容を追加する必要があります。

[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

管理者ロールの追加

rabbitmqctl set_user_tags openstack administrator

抽象キューの作成EndPoint.java

package com.zhenqi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * Created by wuming on 2017/7/16.
 */
public abstract class EndPoint {

  protected Channel channel;
  protected Connection connection;
  protected String endPointName;

  public EndPoint(String endpointName) throws Exception {

    this.endPointName = endpointName;

    //创建一个连接工厂 connection factory
    ConnectionFactory factory = new ConnectionFactory();

    //设置rabbitmq-server服务IP地址
    factory.setHost("192.168.146.128");
    factory.setUsername("openstack");
    factory.setPassword("rabbitmq");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    //得到 连接
    connection = factory.newConnection();

    //创建 channel实例
    channel = connection.createChannel();

    channel.queueDeclare(endpointName, false, false, false, null);
  }

  /**
   * 关闭channel和connection。并非必须,因为隐含是自动调用的。
   * @throws IOException
   */
  public void close() throws Exception{
    this.channel.close();
    this.connection.close();
  }
}

プロデューサーProducer.java

プロデューサークラスのタスクは、キュー

package com.zhenqi;
import org.apache.commons.lang.SerializationUtils;
import java.io.Serializable;
/**
 * Created by wuming on 2017/7/16.
 */
public class Producer extends EndPoint {

  public Producer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void sendMessage(Serializable object) throws Exception {
    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
  }
}

コンシューマーQueueConsumerにメッセージを書き込むことです。 java

コンシューマはスレッド モードで実行でき、さまざまなイベントに対してさまざまなコールバック関数を使用できます。その中で最も重要なのは、新しいメッセージの到着イベントを処理することです。

package com.zhenqi;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by wuming on 2017/7/16.
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {

  private Logger LOG=Logger.getLogger(QueueConsumer.class);

  public QueueConsumer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void handleConsumeOk(String s) {

  }

  public void handleCancelOk(String s) {

  }

  public void handleCancel(String s) throws IOException {

  }

  public void handleShutdownSignal(String s, ShutdownSignalException e) {

  }

  public void handleRecoverOk(String s) {
    LOG.info("Consumer "+s +" registered");
  }

  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    Map map = (HashMap) SerializationUtils.deserialize(bytes);
    LOG.info("Message Number "+ map.get("message number") + " received.");
  }

  public void run() {
    try{
      channel.basicConsume(endPointName, true,this);
    }catch(IOException e){
      e.printStackTrace();
    }
  }
}

テスト

コンシューマスレッドを実行し、大量のメッセージの生成を開始します。これらのメッセージはコンシューマによって取得されます

package com.zhenqi;

import java.util.HashMap;

/**
 * Created by wuming on 2017/7/16.
 */
public class TestRabbitmq {

  public static void main(String[] args){
    try{
      QueueConsumer consumer = new QueueConsumer("queue");
      Thread consumerThread = new Thread(consumer);
      consumerThread.start();

      Producer producer = new Producer("queue");

      for (int i = 0; i < 100000; i++){
        HashMap message = new HashMap();
        message.put("message number", i);
        producer.sendMessage(message);
        System.out.println("Message Number "+ i +" sent.");
      }
    }catch(Exception e){
      e.printStackTrace();
    }

  }
}

以上がJava でリモート接続して Rabbitmq を呼び出す方法を共有しますか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。