Home  >  Article  >  Backend Development  >  Detailed explanation of the application of ActiveMQ in C#

Detailed explanation of the application of ActiveMQ in C#

黄舟
黄舟Original
2017-09-21 11:32:252843browse

ActiveMQ is a good thing, needless to say. ActiveMQ provides multiple language support, such as Java, C, C++, C#, Ruby, Perl, Python, PHP, etc. Since I develop GUI under windows, I am more concerned about C++ and C#. ActiveMQ of C# is very simple. Apache provides NMS (.Net Messaging Service) to support .Net development. You can create a simple implementation with just the following steps. The application of C++ is relatively troublesome, and I will write an article to introduce it later.

1. Go to the official website of ActiveMQ to download the latest version of ActiveMQ. I downloaded 5.3.1 before, and 5.3.2 is now available.

2. Go to the official website of ActiveMQ to download the latest version of Apache.NMS. You need to download two bin packages: Apache.NMS and Apache.NMS.ActiveMQ. If you are interested in the source code, you can also download the src package. I would like to remind you here that if you download the 1.2.0 version of NMS.ActiveMQ, Apache.NMS.ActiveMQ.dll has a bug in actual use, that is, a WaitOne function exception will be thrown when stopping the ActiveMQ application. Check the source code in the src package and find that it is It is caused by the following code in Apache.NMS.ActiveMQ-1.2.0-src\src\main\csharp\Transport\InactivityMonitor.cs. Just modify the source code and recompile. I checked that the latest version 1.3.0 has fixed this bug, so just download the latest version.

private void StopMonitorThreads()   
        {   
            lock(monitor)   
            {   
                if(monitorStarted.CompareAndSet(true, false))   
                {   
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);   
                    // Attempt to wait for the Timers to shutdown, but don't wait   
                    // forever, if they don't shutdown after two seconds, just quit.   
                    this.readCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                    this.writeCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)   
                    this.asyncTasks.Shutdown();   
                    this.asyncTasks = null;   
                    this.asyncWriteTask = null;   
                    this.asyncErrorTask = null;   
                }   
            }   
        }  
     private void StopMonitorThreads() 
        { 
            lock(monitor) 
            { 
                if(monitorStarted.CompareAndSet(true, false)) 
                { 
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);

                    // Attempt to wait for the Timers to shutdown, but don't wait 
                    // forever, if they don't shutdown after two seconds, just quit. 
                    this.readCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                    this.writeCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) 
                    this.asyncTasks.Shutdown(); 
                    this.asyncTasks = null; 
                    this.asyncWriteTask = null; 
                    this.asyncErrorTask = null; 
                } 
            } 
        }

3. Run ActiveMQ, find the decompressed bin folder of ActiveMQ: ...\apache-activemq-5.3.1\bin, execute the activemq.bat batch file to start the ActiveMQ server, default port is 61616, this can be modified in the configuration file.

4. Write a C# program to implement a simple application of ActiveMQ. Create a new C# project (a Product project and a Consumer project), either WinForm or Console program. The Console project is built here. Add references to Apache.NMS.dll and Apache.NMS.ActiveMQ.dll, and then you can write the implementation. The code is here. The simple Producer and Consumer implementation codes are as follows:

producer:

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Publish   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection Factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Producer for the topic/queue   
                        IMessageProducer prod = session.CreateProducer(   
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));   
                        //Send Messages   
                        int i = 0;   
                        while (!Console.KeyAvailable)   
                        {   
                            ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);   
                            System.Threading.Thread.Sleep(5000);   
                            i++;   
                        }   
                    }   
                }   
                Console.ReadLine();   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine("{0}",e.Message);   
                Console.ReadLine();   
            }   
        }   
    }   
}

consumer:

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Subscribe   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                //Create the connection   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    connection.ClientId = "testing listener";   
                    connection.Start();   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Consumer   
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);   
                        consumer.Listener += new MessageListener(consumer_Listener);   
                        Console.ReadLine();   
                    }   
                    connection.Stop();   
                    connection.Close();   
                }   
            }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
        static void consumer_Listener(IMessage message)   
        {   
            try  
            {   
                ITextMessage msg = (ITextMessage)message;   
                Console.WriteLine("Receive: " + msg.Text);   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
    }   
}

Functions implemented by the program: Producer creates a topic named testing , and sends messages to the topic every 5 seconds. The consumer consumer is subscribed to the testing topic, so as long as the producer sends a message of the testing topic to the ActiveMQ server, the server will send the message to the consumer who is subscribed to the testing topic.

Compile and generate producer.exe and consumer.exe, and execute the two exe, you can see the sending and receiving of messages.

This example is a topic (Topic). ActiveMQ also supports another method: Queue, that is, P2P. What is the difference between the two? The difference is that Topic is broadcast, that is, if a Topic is subscribed by multiple consumers, then as long as a message reaches the server, the server will send the message to all consumers; while Queue is point-to-point, that is, one message can only Sent to a consumer. If a Queue is subscribed by multiple consumers, the messages will be sent to different consumers one by one unless there are special circumstances, such as:

msg1-->consumer A

msg2-->consumer B

msg3-->consumer C

msg4-->consumer A

msg5-->consumer B

msg6-->consumer C

Special case refers to: ActiveMQ supports filtering mechanism, that is, the producer can set the properties of the message (Properties), which corresponds to the Selector on the consumer side, and only the properties set by the consumer The message will be sent to the consumer only if the selector matches the Properties of the message. Both Topic and Queue support Selector.

How to set Properties and Selector? Please look at the following code:

producer:

public void SetProperties()
{
ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            msg.Properties.SetString("myFilter", "test1");   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  
ITextMessage msg = prod.CreateTextMessage(); 
                            msg.Text = i.ToString(); 
                            msg.Properties.SetString("myFilter", "test1"); 
                            Console.WriteLine("Sending: " + i.ToString()); 
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

}

consumer:

public void SetSelector()
{
//生成consumer时通过参数设置Selector   
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");  
//生成consumer时通过参数设置Selector 
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");
}

The above is the detailed content of Detailed explanation of the application of ActiveMQ in C#. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn