搜索
首页后端开发php教程.net平台的rabbitmq使用封装

前言

RabbitMq大家再熟悉不过,这篇文章主要整对rabbitmq学习后封装RabbitMQ.Client的一个分享。文章最后,我会把封装组件和demo奉上。

Rabbitmq的运作

从下图可以看出,发布者(Publisher)是把消息先发送到交换器(Exchange),再从交换器发送到指定队列(Queue),而先前已经声明交换器与队列绑定关系,最后消费者(Customer)通过订阅或者主动取指定队列消息进行消费。

那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。

推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)

拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)

使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。

可是程序偶尔会出异常,例如网络或者DB超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(IsPostSuccess == True),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。

这个时候不需要及时的去处理消息,有个JOB定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。

Publish(发布)的封装

步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将Model存到了ConcurrentDictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。

 1         /// <summary> 2         /// 交换器声明 3         /// </summary> 4         /// <param name="iModel"></param> 5         /// <param name="exchange">交换器</param> 6         /// <param name="type">交换器类型: 7         /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 8         /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 9         /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog10         /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都11         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout12         /// 交换机转发消息是最快的。13         /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多14         /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”15         /// 只会匹配到“audit.irs”。</param>16         /// <param name="durable">持久化</param>17         /// <param name="autoDelete">自动删除</param>18         /// <param name="arguments">参数</param>19         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,20             bool durable = true,21             bool autoDelete = false, IDictionary<string, object> arguments = null)22         {23             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();24             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);25         }26 27         /// <summary>28         /// 队列声明29         /// </summary>30         /// <param name="channel"></param>31         /// <param name="queue">队列</param>32         /// <param name="durable">持久化</param>33         /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,34         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可35         /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连36         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者37         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>38         /// <param name="autoDelete">自动删除</param>39         /// <param name="arguments">参数</param>40         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,41             bool autoDelete = false, IDictionary<string, object> arguments = null)42         {43             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();44             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);45         }46 47         /// <summary>48         /// 获取Model49         /// </summary>50         /// <param name="exchange">交换机名称</param>51         /// <param name="queue">队列名称</param>52         /// <param name="routingKey"></param>53         /// <param name="isProperties">是否持久化</param>54         /// <returns></returns>55         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)56         {57             return ModelDic.GetOrAdd(queue, key =>58             {59                 var model = _conn.CreateModel();60                 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);61                 QueueDeclare(model, queue, isProperties);62                 model.QueueBind(queue, exchange, routingKey);63                 ModelDic[queue] = model;64                 return model;65             });66         }67 68         /// <summary>69         /// 发布消息70         /// </summary>71         /// <param name="routingKey">路由键</param>72         /// <param name="body">队列信息</param>73         /// <param name="exchange">交换机名称</param>74         /// <param name="queue">队列名</param>75         /// <param name="isProperties">是否持久化</param>76         /// <returns></returns>77         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)78         {79             var channel = GetModel(exchange, queue, routingKey, isProperties);80 81             try82             {83                 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());84             }85             catch (Exception ex)86             {87                 throw ex.GetInnestException();88             }89         }

View Code

下次是本机测试的发布速度截图:

4.2W/S属于稳定速度,把反序列化(ToJson)会稍微快一些。

 

Subscribe(订阅)的封装

发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的“死信队列”里,由另外的JOB进行定时重发,因此,finally是应答成功的。

        /// <summary>
        /// 获取Model        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {            return ModelDic.GetOrAdd(queue, value =>
             {                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);                 //每次消费的消息数
                 model.BasicQos(0, 1, false);

                 ModelDic[queue] = model;                 return model;
             });
        }    

        /// <summary>
        /// 接收消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消费处理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                try
                {
                    handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

View Code

下次是本机测试的发布速度截图:

快的时候有1.9K/S,慢的时候也有1.7K/S

 

Pull(拉)的封装

直接上代码:

        /// <summary>
        /// 获取消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消费处理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {            var channel = GetModel(exchange, queue, routingKey);            var result = channel.BasicGet(queue, false);            if (result.IsNull())                return;            var msg = result.Body.DeserializeUtf8().FromJson<T>();            try
            {
                handler(msg);
            }            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
            }            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }

View Code

快的时候有1.8K/s,稳定是1.5K/S

 

Rpc(远程调用)的封装

首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

        /// <summary>
        /// RPC客户端        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {            var channel = GetModel(exchange, queue, routingKey, isProperties);            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);            try
            {                var correlationId = Guid.NewGuid().ToString();                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());                var sw = Stopwatch.StartNew();                while (true)
                {                    var ea = consumer.Queue.Dequeue();                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {                        return ea.Body.DeserializeUtf8();
                    }                    if (sw.ElapsedMilliseconds > 30000)                        throw new Exception("等待响应超时");
                }
            }            catch (Exception ex)
            {                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;                try
                {
                    msg = handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

View Code

   可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

 结尾

本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
如何在PHP中使用RabbitMQ实现分布式消息处理如何在PHP中使用RabbitMQ实现分布式消息处理Jul 18, 2023 am 11:00 AM

如何在PHP中使用RabbitMQ实现分布式消息处理引言:在大规模应用程序开发中,分布式系统已成为一个常见的需求。分布式消息处理是这样的一种模式,通过将任务分发到多个处理节点,可以提高系统的效率和可靠性。RabbitMQ是一个开源的,可靠的消息队列系统,它采用AMQP协议来实现消息的传递和处理。在本文中,我们将介绍如何在PHP中使用RabbitMQ来实现分布

在Go语言中使用RabbitMQ:完整指南在Go语言中使用RabbitMQ:完整指南Jun 19, 2023 am 08:10 AM

随着现代应用程序的复杂性增加,消息传递已成为一种强大的工具。在这个领域,RabbitMQ已成为一个非常受欢迎的消息代理,可以用于在不同的应用程序之间传递消息。在这篇文章中,我们将探讨如何在Go语言中使用RabbitMQ。本指南将涵盖以下内容:RabbitMQ简介RabbitMQ安装RabbitMQ基础概念Go语言中的RabbitMQ入门RabbitMQ和Go

SpringBoot怎么整合RabbitMQ实现延迟队列SpringBoot怎么整合RabbitMQ实现延迟队列May 16, 2023 pm 08:31 PM

如何保证消息不丢失rabbitmq消息投递路径生产者->交换机->队列->消费者总的来说分为三个阶段。1.生产者保证消息投递可靠性。2.mq内部消息不丢失。3.消费者消费成功。什么是消息投递可靠性简单点说就是消息百分百发送到消息队列中。我们可以开启confirmCallback生产者投递消息后,mq会给生产者一个ack.根据ack,生产者就可以确认这条消息是否发送到mq.开启confirmCallback修改配置文件#NONE:禁用发布确认模式,是默认值,CORRELATED:

分享几个.NET开源的AI和LLM相关项目框架分享几个.NET开源的AI和LLM相关项目框架May 06, 2024 pm 04:43 PM

当今人工智能(AI)技术的发展如火如荼,它们在各个领域都展现出了巨大的潜力和影响力。今天大姚给大家分享4个.NET开源的AI模型LLM相关的项目框架,希望能为大家提供一些参考。https://github.com/YSGStudyHards/DotNetGuide/blob/main/docs/DotNet/DotNetProjectPicks.mdSemanticKernelSemanticKernel是一种开源的软件开发工具包(SDK),旨在将大型语言模型(LLM)如OpenAI、Azure

C#的就业前景如何C#的就业前景如何Oct 19, 2023 am 11:02 AM

无论您是初学者还是有经验的专业人士,掌握C#将为您的职业发展铺平道路。

go-zero与RabbitMQ的应用实践go-zero与RabbitMQ的应用实践Jun 23, 2023 pm 12:54 PM

现在越来越多的企业开始采用微服务架构模式,而在这个架构中,消息队列成为一种重要的通信方式,其中RabbitMQ被广泛应用。而在go语言中,go-zero是近年来崛起的一种框架,它提供了很多实用的工具和方法,让开发者更加轻松地使用消息队列,下面我们将结合实际应用,来介绍go-zero和RabbitMQ的使用方法和应用实践。1.RabbitMQ概述Rabbit

Swoole与RabbitMQ集成实践:打造高可用性消息队列系统Swoole与RabbitMQ集成实践:打造高可用性消息队列系统Jun 14, 2023 pm 12:56 PM

随着互联网时代的到来,消息队列系统变得越来越重要。它可以使不同的应用之间实现异步操作、降低耦合度、提高可扩展性,进而提升整个系统的性能和用户体验。在消息队列系统中,RabbitMQ是一个强大的开源消息队列软件,它支持多种消息协议、被广泛应用于金融交易、电子商务、在线游戏等领域。在实际应用中,往往需要将RabbitMQ和其他系统进行集成。本文将介绍如何使用sw

Java框架和.NET框架的性能差异Java框架和.NET框架的性能差异Jun 03, 2024 am 09:19 AM

在高并发请求处理方面,.NETASP.NETCoreWebAPI性能优于JavaSpringMVC,原因包括:AOT提前编译,减少启动时间;更精细的内存管理,由开发人员负责分配和释放对象内存。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
2 周前By尊渡假赌尊渡假赌尊渡假赌
仓库:如何复兴队友
1 个月前By尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island冒险:如何获得巨型种子
4 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具