Heim >Backend-Entwicklung >PHP-Tutorial >Die .net-Plattform Rabbitmq verwendet Kapselung

Die .net-Plattform Rabbitmq verwendet Kapselung

大家讲道理
大家讲道理Original
2017-03-07 14:40:341570Durchsuche

Vorwort

Jeder ist mit RabbitMq vertraut. In diesem Artikel wird hauptsächlich die Kapselung von RabbitMQ.Client nach dem Erlernen von RabbitMq beschrieben. Am Ende des Artikels werde ich die gekapselten Komponenten und die Demo vorstellen.

Betrieb von Rabbitmq

Wie aus der folgenden Abbildung ersichtlich ist, sendet der Herausgeber (Herausgeber) die Nachricht zunächst an die Börse (Exchange) und sendet sie dann von der Börse an die angegebene Warteschlange (Warteschlange). Es besteht eine Bindungsbeziehung zwischen der Börse und der Warteschlange wurde zuvor deklariert, und der endgültige Verbrauch Der Kunde (Kunde) abonniert über oder ruft aktiv bestimmte Warteschlangennachrichten für den Verbrauch ab.

Dann kann das gerade erwähnte Abonnement und der aktive Abruf als Push (passiv) und Pull (aktiv) verstanden werden.

Push: Solange eine Nachricht zur Warteschlange hinzugefügt wird, werden inaktive Verbraucher zum Konsumieren benachrichtigt. (Wenn ich nicht nach dir suche, warte ich einfach darauf, dass du nach mir suchst, Beobachtermodus)

Pull: Der Verbraucher wird nicht benachrichtigt, aber der Verbraucher ergreift die Initiative, um die Warteschlangennachrichten im Round-Robin-Verfahren oder in regelmäßigen Abständen abzurufen. (Ich gehe nur zu dir, wenn ich es brauche)

Lassen Sie mich ein Beispiel für ein Nutzungsszenario geben: Ein Bestellsystem und ein Versandsystem. Anweisungen zum Versand von Nachrichten werden vom Bestellsystem initiiert Abonnieren Sie die Warteschlange und verarbeiten Sie sie, solange Anweisungen vorliegen.

Allerdings stößt das Programm gelegentlich auf Ausnahmen, wie z. B. eine Netzwerk- oder DB-Zeitüberschreitung, und die Nachricht wird in die Fehlerwarteschlange geworfen. In diesem Fall ist ein Mechanismus zum erneuten Senden erforderlich. Aber ich möchte while(IsPostSuccess == True) nicht machen, denn solange eine Ausnahme auftritt, wird es innerhalb eines bestimmten Zeitraums Ausnahmen geben und ein solcher Wiederholungsversuch ist bedeutungslos.

Zu diesem Zeitpunkt besteht keine Notwendigkeit, die Nachricht rechtzeitig zu verarbeiten. Es gibt einen JOB, um die fehlgeschlagene Warteschlangennachricht regelmäßig oder alle paar Minuten (Anzahl der Fehler * Intervallminuten) abzurufen und erneut zu senden.

Paket veröffentlichen

Schritte: Link initialisieren – Austauscher deklarieren – Warteschlange deklarieren – Maschine und Warteschlangenbindung ändern – Nachricht veröffentlichen. Beachten Sie, dass ich das Modell in ConcurrentDictionary gespeichert habe, da Deklaration und Bindung sehr zeitaufwändig sind. Zweitens erfordert das Senden von Nachrichten an wiederholte Warteschlangen keine Neuinitialisierung.

 1         /// <summary> 2         /// 交换器声明 3         /// </summary> 4         /// <param name="iModel"></param> 5         /// <param name="exchange">交换器</param> 6         /// <param name="type">交换器类型: 7         /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 8         /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 9         /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog10         /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都11         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout12         /// 交换机转发消息是最快的。13         /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多14         /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”15         /// 只会匹配到“audit.irs”。</param>16         /// <param name="durable">持久化</param>17         /// <param name="autoDelete">自动删除</param>18         /// <param name="arguments">参数</param>19         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,20             bool durable = true,21             bool autoDelete = false, IDictionary<string, object> arguments = null)22         {23             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();24             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);25         }26 27         /// <summary>28         /// 队列声明29         /// </summary>30         /// <param name="channel"></param>31         /// <param name="queue">队列</param>32         /// <param name="durable">持久化</param>33         /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,34         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可35         /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连36         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者37         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>38         /// <param name="autoDelete">自动删除</param>39         /// <param name="arguments">参数</param>40         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,41             bool autoDelete = false, IDictionary<string, object> arguments = null)42         {43             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();44             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);45         }46 47         /// <summary>48         /// 获取Model49         /// </summary>50         /// <param name="exchange">交换机名称</param>51         /// <param name="queue">队列名称</param>52         /// <param name="routingKey"></param>53         /// <param name="isProperties">是否持久化</param>54         /// <returns></returns>55         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)56         {57             return ModelDic.GetOrAdd(queue, key =>58             {59                 var model = _conn.CreateModel();60                 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);61                 QueueDeclare(model, queue, isProperties);62                 model.QueueBind(queue, exchange, routingKey);63                 ModelDic[queue] = model;64                 return model;65             });66         }67 68         /// <summary>69         /// 发布消息70         /// </summary>71         /// <param name="routingKey">路由键</param>72         /// <param name="body">队列信息</param>73         /// <param name="exchange">交换机名称</param>74         /// <param name="queue">队列名</param>75         /// <param name="isProperties">是否持久化</param>76         /// <returns></returns>77         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)78         {79             var channel = GetModel(exchange, queue, routingKey, isProperties);80 81             try82             {83                 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());84             }85             catch (Exception ex)86             {87                 throw ex.GetInnestException();88             }89         }

Code anzeigen

Das nächste Mal ist ein Screenshot der Veröffentlichungsgeschwindigkeit des nativen Tests:

4,2 W/S ist eine stabile Geschwindigkeit und die Deserialisierung (ToJson) wird etwas schneller sein.

Abonnementpaket

Beim Veröffentlichen werden der Austauscher und die Warteschlange deklariert und gebunden. Beim Abonnieren müssen Sie jedoch nur die Warteschlange deklarieren. Wie Sie dem folgenden Code entnehmen können, wird die Nachricht beim Abfangen einer Ausnahme an die benutzerdefinierte „Warteschlange für unzustellbare Nachrichten“ gesendet und regelmäßig von einem anderen JOB erneut gesendet. Daher ist die endgültige Antwort erfolgreich.

        /// <summary>
        /// 获取Model        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {            return ModelDic.GetOrAdd(queue, value =>
             {                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);                 //每次消费的消息数
                 model.BasicQos(0, 1, false);

                 ModelDic[queue] = model;                 return model;
             });
        }    

        /// <summary>
        /// 接收消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消费处理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                try
                {
                    handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

Code anzeigen

Das nächste Mal ist ein Screenshot der Veröffentlichungsgeschwindigkeit des nativen Tests:

Wenn es schnell ist, beträgt es 1,9 K/S, und wenn es langsam ist, beträgt es 1,7 K/S.

Paket ziehen

Gehen Sie direkt zum Code:

        /// <summary>
        /// 获取消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消费处理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {            var channel = GetModel(exchange, queue, routingKey);            var result = channel.BasicGet(queue, false);            if (result.IsNull())                return;            var msg = result.Body.DeserializeUtf8().FromJson<T>();            try
            {
                handler(msg);
            }            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
            }            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }

Code anzeigen

Wenn es schnell ist, beträgt es 1,8 K/s, und wenn es stabil ist, sind es 1,5 K/s.

Rpc(远程调用)的封装

首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

        /// <summary>
        /// RPC客户端        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {            var channel = GetModel(exchange, queue, routingKey, isProperties);            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);            try
            {                var correlationId = Guid.NewGuid().ToString();                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());                var sw = Stopwatch.StartNew();                while (true)
                {                    var ea = consumer.Queue.Dequeue();                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {                        return ea.Body.DeserializeUtf8();
                    }                    if (sw.ElapsedMilliseconds > 30000)                        throw new Exception("等待响应超时");
                }
            }            catch (Exception ex)
            {                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;                try
                {
                    msg = handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

View Code

   可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

 结尾

本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn