Heim  >  Artikel  >  Backend-Entwicklung  >  Ausführliche Erläuterung der Anwendung von ActiveMQ in C#

Ausführliche Erläuterung der Anwendung von ActiveMQ in C#

黄舟
黄舟Original
2017-09-21 11:32:252844Durchsuche

ActiveMQ ist natürlich eine gute Sache. ActiveMQ bietet Unterstützung für mehrere Sprachen, wie Java, C, C++, C#, Ruby, Perl, Python, PHP usw. Da ich GUI unter Windows entwickle, mache ich mir mehr Sorgen um C++ und C#. Apache bietet NMS (.Net Messaging Service) zur Unterstützung der .Net-Entwicklung. Die Anwendung von C++ ist relativ mühsam, und ich werde später einen Artikel schreiben, um sie vorzustellen.

1. Gehen Sie zur offiziellen ActiveMQ-Website, um die neueste Version von ActiveMQ herunterzuladen, die ich zuvor heruntergeladen habe. 5.3.2 ist jetzt verfügbar.

2. Gehen Sie zur offiziellen Website von ActiveMQ, um die neueste Version von Apache.NMS herunterzuladen. Sie müssen zwei Bin-Pakete herunterladen: Apache.NMS und Apache.NMS.ActiveMQ , können Sie auch das src-Paket herunterladen. Ich möchte Sie hier daran erinnern, dass Apache.NMS.ActiveMQ.dll beim Herunterladen der Version 1.2.0 von NMS.ActiveMQ einen Fehler bei der tatsächlichen Verwendung aufweist, d. h. beim Stoppen der ActiveMQ-Anwendung wird eine WaitOne-Funktionsausnahme ausgelöst . Überprüfen Sie den Quellcode im src-Paket und stellen Sie fest, dass dies durch den folgenden Code in Apache.NMS.ActiveMQ-1.2.0-srcsrcmaincsharpTransportInactivityMonitor.cs verursacht wird. Ändern Sie einfach den Quellcode und kompilieren Sie ihn neu. Ich habe überprüft, ob die neueste Version 1.3.0 diesen Fehler behoben hat. Laden Sie also einfach die neueste Version herunter.

private void StopMonitorThreads()   
        {   
            lock(monitor)   
            {   
                if(monitorStarted.CompareAndSet(true, false))   
                {   
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);   
                    // Attempt to wait for the Timers to shutdown, but don't wait   
                    // forever, if they don't shutdown after two seconds, just quit.   
                    this.readCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                    this.writeCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)   
                    this.asyncTasks.Shutdown();   
                    this.asyncTasks = null;   
                    this.asyncWriteTask = null;   
                    this.asyncErrorTask = null;   
                }   
            }   
        }  
     private void StopMonitorThreads() 
        { 
            lock(monitor) 
            { 
                if(monitorStarted.CompareAndSet(true, false)) 
                { 
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);

                    // Attempt to wait for the Timers to shutdown, but don't wait 
                    // forever, if they don't shutdown after two seconds, just quit. 
                    this.readCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                    this.writeCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) 
                    this.asyncTasks.Shutdown(); 
                    this.asyncTasks = null; 
                    this.asyncWriteTask = null; 
                    this.asyncErrorTask = null; 
                } 
            } 
        }

3. Führen Sie ActiveMQ aus und suchen Sie nach dem Dekomprimieren von ActiveMQ den Ordner „bin“: ...apache-activemq-5.3.1bin. Führen Sie die Batchdatei „activemq.bat“ aus. Der Standardport ist 61616, dies kann in der Konfigurationsdatei geändert werden.

4. Schreiben Sie ein C#-Programm, um eine einfache Anwendung von ActiveMQ zu implementieren. Erstellen Sie ein neues C#-Projekt (ein Produktprojekt und ein Consumer-Projekt), entweder ein WinForm- oder ein Konsolenprogramm. Fügen Sie hier Verweise auf Apache.NMS.dll und Apache.NMS.ActiveMQ.dll hinzu, und dann können Sie schreiben die Implementierung. Hier ist der Code für die einfache Implementierung von Producer und Consumer:

Produzent:

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Publish   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection Factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Producer for the topic/queue   
                        IMessageProducer prod = session.CreateProducer(   
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));   
                        //Send Messages   
                        int i = 0;   
                        while (!Console.KeyAvailable)   
                        {   
                            ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);   
                            System.Threading.Thread.Sleep(5000);   
                            i++;   
                        }   
                    }   
                }   
                Console.ReadLine();   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine("{0}",e.Message);   
                Console.ReadLine();   
            }   
        }   
    }   
}

Funktion implementiert von Das Programm: Producer erstellt einen Namen für das Testthema und sendet alle 5 Sekunden Nachrichten an dieses Thema. Daher abonniert der Consumer das Testthema, solange der Produzent eine Nachricht des Testthemas an den ActiveMQ-Server sendet Der Server sendet die Nachricht an den Verbraucher, der das Testthema abonniert hat.

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Subscribe   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                //Create the connection   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    connection.ClientId = "testing listener";   
                    connection.Start();   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Consumer   
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);   
                        consumer.Listener += new MessageListener(consumer_Listener);   
                        Console.ReadLine();   
                    }   
                    connection.Stop();   
                    connection.Close();   
                }   
            }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
        static void consumer_Listener(IMessage message)   
        {   
            try  
            {   
                ITextMessage msg = (ITextMessage)message;   
                Console.WriteLine("Receive: " + msg.Text);   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
    }   
}

Kompilieren und generieren Sie Producer.exe und Consumer.exe und führen Sie die beiden Exes aus, um das Senden und Empfangen von Nachrichten zu sehen.

Dieses Beispiel unterstützt auch eine andere Methode: Queue, also P2P. Was ist der Unterschied zwischen den beiden? Der Unterschied besteht darin, dass das Thema gesendet wird. Wenn also ein Thema von mehreren Verbrauchern abonniert wird, sendet der Server die Nachricht an alle Verbraucher, solange die Nachricht Punkt-zu-Punkt ist Das heißt, eine Nachricht kann nur an einen Verbraucher gesendet werden. Wenn eine Warteschlange von mehreren Verbrauchern abonniert wird, werden die Nachrichten, sofern keine besonderen Umstände vorliegen, einzeln an verschiedene Verbraucher gesendet, zum Beispiel:

msg1-- >Verbraucher A

msg2-->Verbraucher B

msg3-->Verbraucher C

msg4-->Verbraucher A

msg5 -->Verbraucher B

msg6-->Verbraucher C

Sonderfall bedeutet: ActiveMQ unterstützt den Filtermechanismus, das heißt, der Produzent kann die Eigenschaften der Nachricht festlegen (Eigenschaften), Dies entspricht dem Selektor auf der Verbraucherseite. Die Nachricht wird nur dann an den Verbraucher gesendet, wenn der vom Verbraucher festgelegte Selektor mit den Eigenschaften der Nachricht übereinstimmt. Sowohl Topic als auch Queue unterstützen Selector.

Wie stelle ich Eigenschaften und Selektor ein? Bitte schauen Sie sich den folgenden Code an:

Produzent:

Verbraucher:

public void SetProperties()
{
ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            msg.Properties.SetString("myFilter", "test1");   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  
ITextMessage msg = prod.CreateTextMessage(); 
                            msg.Text = i.ToString(); 
                            msg.Properties.SetString("myFilter", "test1"); 
                            Console.WriteLine("Sending: " + i.ToString()); 
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

}

Das obige ist der detaillierte Inhalt vonAusführliche Erläuterung der Anwendung von ActiveMQ in C#. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn