>  기사  >  백엔드 개발  >  C#에서 ActiveMQ를 적용하는 방법에 대한 자세한 설명

C#에서 ActiveMQ를 적용하는 방법에 대한 자세한 설명

黄舟
黄舟원래의
2017-09-21 11:32:252916검색

ActiveMQ는 말할 필요도 없이 좋은 것입니다. ActiveMQ는 Java, C, C++, C#, Ruby, Perl, Python, PHP 등과 같은 다중 언어 지원을 제공합니다. Windows에서 GUI를 개발하다 보니 C++과 C#에 더 관심이 가는데요. C#의 ActiveMQ는 매우 간단합니다. Apache는 .Net 개발을 지원하는 NMS(.Net Messaging Service)를 제공하므로 다음 단계만 수행하면 간단하게 구현할 수 있습니다. C++의 적용은 상대적으로 번거로운데, 나중에 이를 소개하는 글을 쓰겠습니다.

1. ActiveMQ 공식 웹사이트에 가서 최신 버전의 ActiveMQ를 다운로드하세요. 이전에 5.3.1을 다운로드했는데 이제 5.3.2가 나왔습니다.

2. 최신 버전의 Apache.NMS를 다운로드하려면 ActiveMQ 공식 웹사이트로 이동하세요. 두 개의 bin 패키지(Apache.NMS 및 Apache.NMS.ActiveMQ)를 다운로드해야 합니다. src 패키지도 다운로드하세요. NMS.ActiveMQ 1.2.0 버전을 다운로드하면 Apache.NMS.ActiveMQ.dll이 실제 사용시 버그가 있다는 점, 즉 ActiveMQ 애플리케이션을 중지할 때 WaitOne 함수 예외가 발생한다는 점을 알려드리고자 합니다. .src 패키지의 소스 코드를 확인하여 Apache.NMS.ActiveMQ-1.2.0-srcsrcmaincsharpTransportInactivityMonitor.cs의 다음 코드로 인해 발생하는지 확인합니다. 소스 코드를 수정하고 다시 컴파일하면 됩니다. 최신 버전인 1.3.0에서는 이 버그가 수정된 것을 확인했으니, 그냥 최신 버전을 다운로드 받으세요.

private void StopMonitorThreads()   
        {   
            lock(monitor)   
            {   
                if(monitorStarted.CompareAndSet(true, false))   
                {   
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);   
                    // Attempt to wait for the Timers to shutdown, but don't wait   
                    // forever, if they don't shutdown after two seconds, just quit.   
                    this.readCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                    this.writeCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)   
                    this.asyncTasks.Shutdown();   
                    this.asyncTasks = null;   
                    this.asyncWriteTask = null;   
                    this.asyncErrorTask = null;   
                }   
            }   
        }  
     private void StopMonitorThreads() 
        { 
            lock(monitor) 
            { 
                if(monitorStarted.CompareAndSet(true, false)) 
                { 
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);

                    // Attempt to wait for the Timers to shutdown, but don't wait 
                    // forever, if they don't shutdown after two seconds, just quit. 
                    this.readCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                    this.writeCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) 
                    this.asyncTasks.Shutdown(); 
                    this.asyncTasks = null; 
                    this.asyncWriteTask = null; 
                    this.asyncErrorTask = null; 
                } 
            } 
        }

3. ActiveMQ 압축을 푼 후 bin 폴더를 찾습니다: ...apache-activemq-5.3.1bin. activemq.bat 배치 파일을 실행하여 ActiveMQ 서버를 시작합니다. 파일에서 수정되어 구성되었습니다.

4. ActiveMQ의 간단한 애플리케이션을 구현하는 C# 프로그램을 작성합니다. WinForm 또는 콘솔 프로그램 중 하나인 새 C# 프로젝트(제품 프로젝트 및 소비자 프로젝트)를 생성합니다. 콘솔 프로젝트는 Apache.NMS.dll 및 Apache.NMS.ActiveMQ.dll에 대한 참조를 추가한 다음 작성할 수 있습니다. 구현 코드는 다음과 같습니다. 간단한 생산자 및 소비자 구현 코드는 다음과 같습니다.

producer:

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Publish   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection Factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Producer for the topic/queue   
                        IMessageProducer prod = session.CreateProducer(   
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));   
                        //Send Messages   
                        int i = 0;   
                        while (!Console.KeyAvailable)   
                        {   
                            ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);   
                            System.Threading.Thread.Sleep(5000);   
                            i++;   
                        }   
                    }   
                }   
                Console.ReadLine();   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine("{0}",e.Message);   
                Console.ReadLine();   
            }   
        }   
    }   
}

consumer:

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Subscribe   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                //Create the connection   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    connection.ClientId = "testing listener";   
                    connection.Start();   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Consumer   
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);   
                        consumer.Listener += new MessageListener(consumer_Listener);   
                        Console.ReadLine();   
                    }   
                    connection.Stop();   
                    connection.Close();   
                }   
            }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
        static void consumer_Listener(IMessage message)   
        {   
            try  
            {   
                ITextMessage msg = (ITextMessage)message;   
                Console.WriteLine("Receive: " + msg.Text);   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
    }   
}

프로그램에서 구현하는 기능: 생산자는 테스트라는 주제를 만들고 해당 주제에 메시지를 보냅니다. 소비자는 5초마다 테스트 주제를 구독하므로 생산자가 테스트 주제에 대한 메시지를 ActiveMQ 서버에 보내는 한 서버는 테스트 주제를 구독하는 소비자에게 메시지를 보냅니다.

producer.exe와 Consumer.exe를 컴파일 및 생성하고 두 exe를 실행하여 메시지 송수신을 확인하세요.

이 예제는 토픽(Topic)입니다. ActiveMQ는 또 다른 방법인 큐, 즉 P2P도 지원합니다. 차이점은 Topic이 브로드캐스트된다는 것입니다. 즉, 여러 소비자가 Topic을 구독하는 경우 메시지가 서버에 도달하는 한 서버는 모든 소비자에게 메시지를 보내는 반면 Queue는 지점 간입니다. 즉, 하나의 메시지는 소비자에게만 전송될 수 있습니다. 여러 소비자가 대기열을 구독하는 경우 특별한 상황이 없는 한 메시지는 다른 소비자에게 하나씩 전송됩니다. 예:

msg1-->소비자 A

msg2-- >소비자 B

msg3-->소비자 C

msg4-->소비자 A

msg5-->소비자 B

msg6-->소비자 C

특별한 경우를 참조합니다. 대상: ActiveMQ는 필터링 메커니즘을 지원합니다. 즉, 생산자는 소비자 측의 선택기에 해당하는 메시지 속성(속성)을 설정할 수 있습니다. 이는 소비자가 설정한 선택기가 메시지 속성과 일치하는 경우에만 가능합니다. 소비자에게 전송됩니다. 주제와 대기열 모두 선택기를 지원합니다.

속성과 선택기를 어떻게 설정하나요? 다음 코드를 참조하세요.

Producer:

public void SetProperties()
{
ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            msg.Properties.SetString("myFilter", "test1");   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  
ITextMessage msg = prod.CreateTextMessage(); 
                            msg.Text = i.ToString(); 
                            msg.Properties.SetString("myFilter", "test1"); 
                            Console.WriteLine("Sending: " + i.ToString()); 
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

}

consumer:

public void SetSelector()
{
//生成consumer时通过参数设置Selector   
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");  
//生成consumer时通过参数设置Selector 
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");
}

위 내용은 C#에서 ActiveMQ를 적용하는 방법에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.