>백엔드 개발 >PHP 튜토리얼 >.net 플랫폼 Rabbitmq는 캡슐화를 사용합니다.

.net 플랫폼 Rabbitmq는 캡슐화를 사용합니다.

大家讲道理
大家讲道理원래의
2017-03-07 14:40:341541검색

머리말

누구나 RabbitMq에 익숙합니다. 이 기사에서는 주로 RabbitMQ를 학습한 후 RabbitMQ.Client의 캡슐화를 공유합니다. 기사 마지막 부분에서는 캡슐화된 구성 요소와 데모를 소개하겠습니다.

RabbitMQ의 운영

아래 그림에서 볼 수 있듯이 게시자(Publisher)는 먼저 메시지를 Exchange(Exchange)로 보낸 다음 Exchange에서 지정된 큐(Queue)로 메시지를 보냅니다. 고객(Customer)은 을 통해 을 구독하거나 가 소비를 위해 지정된 큐 메시지 를 적극적으로 검색합니다.

그렇다면 방금 언급한 구독과 능동 검색은 푸시(수동)와 풀(능동)로 이해될 수 있습니다.

푸시, 메시지가 대기열에 추가되는 한 유휴 소비자는 소비하라는 알림을 받습니다. (내가 너를 찾지 않으면 네가 나를 찾을 때까지 기다릴게 관찰자 모드)

Pull을 수행하면 소비자에게 알림이 전달되지 않지만 소비자는 라운드 로빈 방식이나 정기적인 간격으로 대기열 메시지를 가져오는 작업을 주도하게 됩니다. (필요할 때만 찾아가요)

사용 시나리오의 예를 들어보겠습니다. 주문 시스템과 배송 시스템이라는 두 가지 시스템이 있다고 가정합니다. 배송 메시지 지침은 적시에 상품을 배송하기 위해 필요합니다. 대기열을 구독하고 지침이 있는 한 처리합니다.

그러나 프로그램에서 때때로 네트워크 또는 DB 시간 초과와 같은 예외가 발생하고 메시지가 실패 대기열에 던져집니다. 이 경우 재전송 메커니즘이 필요합니다. 하지만 while(IsPostSuccess == True) 은 하고 싶지 않습니다. 왜냐하면 예외가 발생하는 한 일정 시간 내에 예외가 발생하게 되고 이러한 재시도는 의미가 없기 때문입니다.

이때, 실패한 큐 메시지를 정기적으로 또는 몇 분(실패 횟수 * 간격 분)마다 가져와서 다시 보내는 JOB이 있습니다.

패키지 게시

단계: 링크 초기화 -> 교환기 선언 -> 대기열 선언 -> 시스템 및 대기열 바인딩 변경 -> 메시지 게시. 선언과 바인딩에는 시간이 많이 걸리기 때문에 모델을 ConcurrentDictionary에 저장했습니다. 둘째, 반복되는 대기열에 메시지를 보내는 데는 다시 초기화가 필요하지 않습니다.

으아아아

코드 보기

다음은 기본 테스트의 게시 속도에 대한 스크린샷입니다.

4.2W/S는 안정적인 속도이며 역직렬화(ToJson)가 약간 더 빠릅니다.

구독 패키지

퍼블리시할 때에는 Exchanger와 큐를 선언하고 바인드하지만, 구독할 때에는 큐만 선언하면 됩니다. 아래 코드에서 볼 수 있듯이 예외가 발생하면 메시지가 사용자 정의 "배달 못한 편지 대기열"로 전송되고 다른 JOB에 의해 정기적으로 다시 전송되므로 최종 응답이 성공합니다.

으아아아

코드 보기

다음은 기본 테스트의 게시 속도에 대한 스크린샷입니다.

빠르면 1.9K/S, 느리면 1.7K/S입니다.

패키지 가져오기

코드로 직접 이동하십시오.

으아아아

코드 보기

빠르면 1.8K/s, 안정적일 때는 1.5K/S입니다.

Rpc(远程调用)的封装

首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

        /// <summary>
        /// RPC客户端        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {            var channel = GetModel(exchange, queue, routingKey, isProperties);            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);            try
            {                var correlationId = Guid.NewGuid().ToString();                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());                var sw = Stopwatch.StartNew();                while (true)
                {                    var ea = consumer.Queue.Dequeue();                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {                        return ea.Body.DeserializeUtf8();
                    }                    if (sw.ElapsedMilliseconds > 30000)                        throw new Exception("等待响应超时");
                }
            }            catch (Exception ex)
            {                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;                try
                {
                    msg = handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

View Code

   可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

 结尾

本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.