ホームページ >バックエンド開発 >PHPチュートリアル >.netプラットフォームrabbitmqはカプセル化を使用します

.netプラットフォームrabbitmqはカプセル化を使用します

大家讲道理
大家讲道理オリジナル
2017-03-07 14:40:341534ブラウズ

序文

RabbitMq については誰もがよく知っています。この記事では主に RabbitMQ を学習した後の RabbitMQ.Client のカプセル化について説明します。記事の最後では、カプセル化されたコンポーネントとデモを紹介します。

Rabbitmqの動作

以下の図からわかるように、パブリッシャー (Publisher) はまずメッセージをエクスチェンジ (Exchange) に送信し、次にエクスチェンジから指定されたキュー (Queue) にメッセージを送信します。エクスチェンジとキューの間にはバインド関係があります。事前に宣言され、最終的に消費されます。顧客 (顧客) は、メッセージをサブスクライブするか、アクティブにフェッチすることによって、指定されたキューからメッセージを消費します。

したがって、今述べたサブスクリプションとアクティブな取得は、プッシュ (受動的) とプル (アクティブ) として理解できます。

プッシュでは、メッセージがキューに追加されている限り、アイドル状態のコンシューマーにそれを消費するように通知されます。 (私が探さなければ、あなたが私を探すのをただ待ちます、オブザーバーモード) プルの場合、コンシューマには通知されませんが、コンシューマはラウンドロビンまたは定期的にキュー メッセージをフェッチする主導権を握ります。 (必要なときだけあなたのところに行きます)

使用シナリオの例を示します。注文システムと出荷システムの 2 つのシステムがあるとします。商品をタイムリーに出荷するには、出荷メッセージの指示が注文システムから開始される必要があります。キューをサブスクライブし、指示が​​ある限り処理します。

ただし、プログラムではネットワークや DB タイムアウトなどの例外が発生する場合があり、この場合はメッセージが失敗キューにスローされます。この場合、再送信メカニズムが必要になります。ただし、 while(IsPostSuccess == True) はやりたくありません。例外が発生する限り、一定時間以内に例外が発生し、そのような再試行は無意味だからです。

現時点では、時間内にメッセージを処理する必要はありません。失敗したキュー メッセージを定期的に、または数分ごと (失敗数 * 間隔分) に取得して再送信する JOB があります。

パッケージの公開

手順: リンクの初期化 -> エクスチェンジャの宣言 -> キューの宣言 -> マシンとキュー バインドの変更 -> メッセージのパブリッシュ。宣言とバインドには非常に時間がかかるため、モデルを ConcurrentDictionary に保存したことに注意してください。次に、繰り返しキューへのメッセージの送信には再初期化が必要ありません。

リーリー

コードを表示

次回は、ネイティブ テストの公開速度のスクリーンショットです。

4.2W/Sは安定した速度で、デシリアライズ(ToJson)も若干速くなります。

サブスクリプションパッケージ

パブリッシュ時にはエクスチェンジャーとキューが宣言されてバインドされますが、サブスクライブ時にはキューを宣言するだけで済みます。以下のコードからわかるように、例外がキャッチされると、メッセージはカスタムの「デッド レター キュー」に送信され、別の JOB によって定期的に再送信されます。そのため、最終的な応答は成功します。

リーリー

コードを表示

次回は、ネイティブ テストの公開速度のスクリーンショットです。

速い場合は1.9K/S、遅い場合は1.7K/Sです。

プルパッケージ

コードに直接移動します。

リーリー

コードを表示

速いときは1.8K/s、安定しているときは1.5K/sです。

Rpc(远程调用)的封装

首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

        /// <summary>
        /// RPC客户端        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {            var channel = GetModel(exchange, queue, routingKey, isProperties);            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);            try
            {                var correlationId = Guid.NewGuid().ToString();                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());                var sw = Stopwatch.StartNew();                while (true)
                {                    var ea = consumer.Queue.Dequeue();                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {                        return ea.Body.DeserializeUtf8();
                    }                    if (sw.ElapsedMilliseconds > 30000)                        throw new Exception("等待响应超时");
                }
            }            catch (Exception ex)
            {                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;                try
                {
                    msg = handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

View Code

   可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

 结尾

本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。