ホームページ >php教程 >php手册 >RabbitMQ を使用した NET 環境

RabbitMQ を使用した NET 環境

WBOY
WBOYオリジナル
2016-07-06 13:30:181172ブラウズ

1 環境セットアップ まず、RabbitMQ は Erlang で書かれており、Erlang ランタイム環境で実行する必要があるため、RabbitMQ Server をインストールする前に Erlang ランタイム環境をインストールする必要があります。Erlang から対応するプラットフォームのインストール ファイルをダウンロードできます。公式ウェブサイト。ランタイム環境がインストールされていない場合は、RabbitMQ サーバーをインストールするときに、最初にインストールするように求められます

1 環境設定

まず、RabbitMQ は Erlang で書かれており、Erlang ランタイム環境で実行する必要があるため、RabbitMQ サーバーをインストールする前に Erlang ランタイム環境をインストールする必要があります。Erlang 公式 Web サイトにアクセスして、対応するインストール ファイルをダウンロードできます。プラットホーム。ランタイム環境がインストールされていない場合、RabbitMQ Server をインストールするときに、最初に Erlang 環境をインストールするように求められます。 インストールが完了したら、Erlang のインストール パスがシステムの環境変数に登録されていることを確認してください。 Erlang をインストールすると、この環境は自動的にセットアップされます。そうでない場合は、以下のようにセットアップします。

次に、RabbitMQ 公式 Web サイトにアクセスして RabbitMQ Server サーバー プログラムをダウンロードし、ダウンロードする適切なプラットフォームのバージョンを選択します。インストールが完了したら、使用を開始できます。

これで、RabbitMQ サーバーを構成できるようになりました。

まず、RabbitMQサーバーのインストールディレクトリに移動します:

sbin の下には、RabbitMQ サーバーを制御するために使用される多くのバッチ ファイルがあります。

最も簡単な方法は、RabbitMQ を Windows サービスとしてバックグラウンドで実行することです。そのため、管理者権限で cmd を開き、sbin ディレクトリに切り替えて、次の 3 つのコマンドを実行する必要があります:

リーリー

RabbitMQサーバーが起動しました (起動に失敗する場合は、インストール後にサービスが起動しているかどうかを確認してください。起動していない場合は、インストールされているバージョンが原因である可能性があります)。

sbin ディレクトリの下にある Rabbitmqctl.bat スクリプトを使用して、cmd で直接 Rabbitmqctl ステータスを表示および制御できます。次の結果が表示されない場合は、C:Windows ディレクトリに移動し、.erlang.cookie ファイルをユーザー ディレクトリ C:Users{username} にコピーする必要があります。これは、Erlang との対話を可能にする Erlang Cookie ファイルです。

RabbitMQ サーバーにはユーザーの概念もあります。インストール後、rabbitmqctl list_users コマンドを使用して上記の現在のユーザーを確認します。

次のコマンドを使用してユーザーを追加し、権限を設定できます:

リーリー

上記のコマンドは、test という名前のユーザーを追加し、パスワード 123456 を設定します。次のコマンドは、ユーザー test に、すべてのメッセージ キューの構成、読み取りおよび書き込み権限を付与します。

次のコマンドを使用して、デフォルトのゲスト ユーザーを削除できます:

リーリー

パスワードを変更したい場合は、次のコマンドを使用できます:

リーリー

2 始めましょう

.NET で RabbitMQ を使用するには、RabbitMQ クライアント アセンブリをダウンロードする必要があります。ダウンロードして解凍すると、RabbitMQ クライアントである RabbitMQ.Client.dll を入手できます。

RabitMQ を使用する前に、次の基本概念を説明する必要があります:

RabbitMQ はメッセージブローカーです。メッセージ プロデューサー (PROducer) からメッセージを受信し、送信と受信の間に、設定されたルールに従ってメッセージをルーティング、キャッシュ、および永続化できます。

一般に、RabbitMQ およびメッセージを参照する場合、いくつかの固有名詞が使用されます。

    生産とは送信することを意味します。メッセージを送信するプログラムはプロデューサーです。通常、次のことを表すために「P」を使用します:

RabbitMQ を使用した NET 環境

    キューはメールボックスの名前です。メッセージはアプリケーションと RabbitMQ の間で転送され、キューにのみ保存できます。 キューの容量に制限はなく、必要な数のメッセージを保存できます。基本的には無限のバッファーです。複数のプロデューサーが同じキューにメッセージを送信でき、複数のコンシューマーが同じキューからデータを取得することもできます。キューは次のように描画できます (画像はキューの名前です):

RabbitMQ を使用した NET 環境

  • 消费(Consuming)和获取消息是一样的意思。一个消费者(RabbitMQ を使用した NET 環境)就是一个等待获取消息的程序。我们把它画作"C":

RabbitMQ を使用した NET 環境

通常,消息生产者,消息消费者和消息代理不在同一台机器上。

2.1 Hello World

为了展示RabbitMQ的基本使用,我们发送一个HelloWorld消息,然后接收并处理。

rabbitmq hello world

首先创建一个控制台程序,用来将消息发送到RabbitMQ的消息队列中,代码如下:


首先,需要创建一个ConnectionFactory,设置目标,由于是在本机,所以设置为localhost,如果RabbitMQ不在本机,只需要设置目标机器的ip地址或者机器名称即可,然后设置前面创建的用户名test和密码123456。

紧接着要创建一个Channel,如果要发送消息,需要创建一个队列,然后将消息发布到这个队列中。在创建队列的时候,只有RabbitMQ上该队列不存在,才会去创建。消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化和然后转化为二进制数组。

现在客户端发送代码已经写好了,运行之后,消息会发布到RabbitMQ的消息队列中,现在需要编写服务端的代码连接到RabbitMQ上去获取这些消息。

同样,创建一个名为Receive的服务端控制台应用程序,服务端代码如下:


和发送一样,首先需要定义连接,然后声明消息队列。要接收消息,需要定义一个Consume,然后从消息队列中不断DeRabbitMQ を使用した NET 環境消息,然后处理。

现在发送端和接收端的代码都写好了,运行发送端,发送消息:

 

现在,名为hello的消息队列中,发送了一条消息。这条消息存储到了RabbitMQ的服务器上了。使用rabbitmqctl 的list_RabbitMQ を使用した NET 環境s可以查看所有的消息队列,以及里面的消息个数,可以看到,目前Rabbitmq上只有一个消息队列,里面只有一条消息:

rabbitmqctl list_RabbitMQ を使用した NET 環境s
Listing RabbitMQ を使用した NET 環境s ...
hello   1

现在运行接收端程序:

可以看到,已经接受到了客户端发送的Hello World,现在再来看RabitMQ上的消息队列信息:

rabbitmqctl list_RabbitMQ を使用した NET 環境s
Listing RabbitMQ を使用した NET 環境s ...
hello   0

可以看到,hello这个队列中的消息队列个数为0,这表示,当接收端,接收到消息之后,RabbitMQ上就把这个消息删掉了。

2.2 工作队列

前面的例子展示了如何往一个指定的消息队列中发送和收取消息。现在我们创建一个工作队列(work RabbitMQ を使用した NET 環境)来将一些耗时的任务分发给多个工作者(workers):

rabbitmq-work RabbitMQ を使用した NET 環境

工作队列(work RabbitMQ を使用した NET 環境s, 又称任务队列Task Queues)的主要思想是为了避免立即执行并等待一些占用大量资源、时间的操作完成。而是把任务(Task)当作消息发送到队列中,稍后处理。一个运行在后台的工作者(worker)进程就会取出任务然后处理。当运行多个工作者(workers)时,任务会在它们之间共享。

这个在网络应用中非常有用,它可以在短暂的HTTP请求中处理一些复杂的任务。在一些实时性要求不太高的地方,我们可以处理完主要操作之后,以消息的方式来处理其他的不紧要的操作,比如写日志等等。

准备

在第一部分,发送了一个包含“Hello World!”的字符串消息。现在发送一些字符串,把这些字符串当作复杂的任务。这里使用time.sleep()函数来模拟耗时的任务。在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如"Hello..."就会耗时3秒钟。

对之前示例的send.cs做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中。

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "test";
    factory.Password = "123456";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);
            string message = <strong>GetMessage(args);</strong>
          <strong>  var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;</strong>

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "hello", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

加粗部分是经过修改过了的。

接着我们修改接收端,让他根据消息中的逗点的个数来Sleep对应的秒数:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "test";
    factory.Password = "123456";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var RabbitMQ を使用した NET 環境 = new QueueingBasicConsumer(channel);
            channel.BasicConsume("hello", true, RabbitMQ を使用した NET 環境);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)RabbitMQ を使用した NET 環境.Queue.DeRabbitMQ を使用した NET 環境();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

              <strong>  int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);</strong>
                        
                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");
            }
        }
    }
}

轮询分发

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

现在,我们先启动两个接收端,等待接受消息,然后启动一个发送端开始发送消息。

Send message RabbitMQ を使用した NET 環境 

在cmd条件下,发送了5条消息,每条消息后面的逗点表示该消息需要执行的时长,来模拟耗时的操作。

然后可以看到,两个接收端依次接收到了发出的消息:

receive message RabbitMQ を使用した NET 環境 

默认,RabbitMQ会将每个消息按照顺序依次分发给下一个消费者。所以每个消费者接收到的消息个数大致是平均的。 这种消息分发的方式称之为轮询(round-robin)。

2.3 消息响应

当处理一个比较耗时得任务的时候,也许想知道消费者(RabbitMQ を使用した NET 環境s)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者(RabbitMQ を使用した NET 環境s)之后,马上就会将该消息从队列中移除。此时,如果把处理这个消息的工作者(worker)停掉,正在处理的这条消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望该消息会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。

如果消费者(RabbitMQ を使用した NET 環境)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(RabbitMQ を使用した NET 環境)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息响应默认是开启的。在之前的例子中使用了no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

channel.BasicConsume("hello", <strong>false</strong>, RabbitMQ を使用した NET 環境);

while (true)
{
    var ea = (BasicDeliverEventArgs)RabbitMQ を使用した NET 環境.Queue.DeRabbitMQ を使用した NET 環境();

    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine("Received {0}", message);
    Console.WriteLine("Done");

    <strong>channel.BasicAck(ea.DeliveryTag, false);</strong>
}

现在,可以保证,即使正在处理消息的工作者被停掉,这些消息也不会丢失,所有没有被应答的消息会被重新发送给其他工作者.

一个很常见的错误就是忘掉了BasicAck这个方法,这个错误很常见,但是后果很严重. 当客户端退出时,待处理的消息就会被重新分发,但是RabitMQ会消耗越来越多的内存,因为这些没有被应答的消息不能够被释放。调试这种case,可以使用rabbitmqct打印messages_unacknoledged字段。

rabbitmqctl list_RabbitMQ を使用した NET 環境s name messages_ready messages_unacknowledged
Listing RabbitMQ を使用した NET 環境s ...
hello    0       0
...done.

2.4 消息持久化

前面已经搞定了即使消费者down掉,任务也不会丢失,但是,如果RabbitMQ Server停掉了,那么这些消息还是会丢失。

当RabbitMQ Server 关闭或者崩溃,那么里面存储的队列和消息默认是不会保存下来的。如果要让RabbitMQ保存住消息,需要在两个地方同时设置:需要保证队列和消息都是持久化的。

首先,要保证RabbitMQ不会丢失队列,所以要做如下设置:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

虽然在语法上是正确的,但是在目前阶段是不正确的,因为我们之前已经定义了一个非持久化的hello队列。RabbitMQ不允许我们使用不同的参数重新定义一个已经存在的同名队列,如果这样做就会报错。现在,定义另外一个不同名称的队列:

bool durable = true;
channel.RabbitMQ を使用した NET 環境Declare("task_RabbitMQ を使用した NET 環境", durable, false, false, null);

RabbitMQ を使用した NET 環境Declare 这个改动需要在发送端和接收端同时设置。

现在保证了task_RabbitMQ を使用した NET 環境这个消息队列即使在RabbitMQ Server重启之后,队列也不会丢失。 然后需要保证消息也是持久化的, 这可以通过设置IBasicProperties.SetPersistent 为true来实现:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms

2.5 公平分发

你可能会注意到,消息的分发可能并没有如我们想要的那样公平分配。比如,对于两个工作者。当奇数个消息的任务比较重,但是偶数个消息任务比较轻时,奇数个工作者始终处理忙碌状态,而偶数个工作者始终处理空闲状态。但是RabbitMQ并不知道这些,他仍然会平均依次的分发消息。

为了改变这一状态,我们可以使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息,或者换句话说。在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。

channel.BasicQos(0, 1, false); 

2.6 完整实例

现在将所有这些放在一起:

发送端代码如下:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "test";
    factory.Password = "123456";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
                   
            bool durable = true;
            channel.QueueDeclare("task_RabbitMQ を使用した NET 環境", durable, false, false, null);
                    
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);
                  

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "task_RabbitMQ を使用した NET 環境", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

接收端代码如下:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "test";
    factory.Password = "123456";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            bool durable = true;
            channel.QueueDeclare("task_RabbitMQ を使用した NET 環境", durable, false, false, null);
            channel.BasicQos(0, 1, false);

            var RabbitMQ を使用した NET 環境 = new QueueingBasicConsumer(channel);
            channel.BasicConsume("task_RabbitMQ を使用した NET 環境", false, RabbitMQ を使用した NET 環境);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)RabbitMQ を使用した NET 環境.Queue.DeRabbitMQ を使用した NET 環境();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");

                channel.BasicAck(ea.DeliveryTag, false);
            }
        }
    }
}

三 管理界面

RabbitMQ还有一个管理界面,通过该界面可以查看RabbitMQ Server 当前的状态,该界面是以插件形式提供的,并且在安装RabbitMQ的时候已经自带了该插件。需要做的是在RabbitMQ控制台界面中启用该插件,命令如下:

rabbitmq-plugins enable rabbitmq_management

rabbitmq management

现在,在浏览器中输入 http://server-name:15672/ server-name换成机器地址或者域名,如果是本地的,直接用localhost在输入之后,弹出登录界面,使用我们之前创建的用户登录。

 .

在该界面上可以看到当前RabbitMQServer的所有状态。

四 总结

本文简单介绍了消息队列的相关概念,并介绍了RabbitMQ消息代理的基本原理以及在Windows 上如何安装RabbitMQ和在.NET中如何使用RabbitMQ。消息队列在构建分布式系统和提高系统的可扩展性和响应性方面有着很重要的作用,希望本文对您了解消息队列以及如何使用RabbitMQ有所帮助。

五 参考文献

  1. http://www.infoq.com/cn/articles/message-based-distributed-architecture
  2. http://www.rabbitmq.com/getstarted.html
  3. http://www.codethinked.com/using-rabbitmq-with-c-and-net
  4. http://www.infoq.com/cn/articles/AMQP-RabbitMQ
  5. http://www.infoq.com/cn/articles/ebay-scalability-best-practices

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。