Heim > Artikel > Backend-Entwicklung > Ausführliche Erläuterung der Anwendung von ActiveMQ in C#
ActiveMQ ist natürlich eine gute Sache. ActiveMQ bietet Unterstützung für mehrere Sprachen, wie Java, C, C++, C#, Ruby, Perl, Python, PHP usw. Da ich GUI unter Windows entwickle, mache ich mir mehr Sorgen um C++ und C#. Apache bietet NMS (.Net Messaging Service) zur Unterstützung der .Net-Entwicklung. Die Anwendung von C++ ist relativ mühsam, und ich werde später einen Artikel schreiben, um sie vorzustellen.
1. Gehen Sie zur offiziellen ActiveMQ-Website, um die neueste Version von ActiveMQ herunterzuladen, die ich zuvor heruntergeladen habe. 5.3.2 ist jetzt verfügbar.
2. Gehen Sie zur offiziellen Website von ActiveMQ, um die neueste Version von Apache.NMS herunterzuladen. Sie müssen zwei Bin-Pakete herunterladen: Apache.NMS und Apache.NMS.ActiveMQ , können Sie auch das src-Paket herunterladen. Ich möchte Sie hier daran erinnern, dass Apache.NMS.ActiveMQ.dll beim Herunterladen der Version 1.2.0 von NMS.ActiveMQ einen Fehler bei der tatsächlichen Verwendung aufweist, d. h. beim Stoppen der ActiveMQ-Anwendung wird eine WaitOne-Funktionsausnahme ausgelöst . Überprüfen Sie den Quellcode im src-Paket und stellen Sie fest, dass dies durch den folgenden Code in Apache.NMS.ActiveMQ-1.2.0-srcsrcmaincsharpTransportInactivityMonitor.cs verursacht wird. Ändern Sie einfach den Quellcode und kompilieren Sie ihn neu. Ich habe überprüft, ob die neueste Version 1.3.0 diesen Fehler behoben hat. Laden Sie also einfach die neueste Version herunter.
private void StopMonitorThreads() { lock(monitor) { if(monitorStarted.CompareAndSet(true, false)) { AutoResetEvent shutdownEvent = new AutoResetEvent(false); // Attempt to wait for the Timers to shutdown, but don't wait // forever, if they don't shutdown after two seconds, just quit. this.readCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); this.writeCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) this.asyncTasks.Shutdown(); this.asyncTasks = null; this.asyncWriteTask = null; this.asyncErrorTask = null; } } } private void StopMonitorThreads() { lock(monitor) { if(monitorStarted.CompareAndSet(true, false)) { AutoResetEvent shutdownEvent = new AutoResetEvent(false); // Attempt to wait for the Timers to shutdown, but don't wait // forever, if they don't shutdown after two seconds, just quit. this.readCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); this.writeCheckTimer.Dispose(shutdownEvent); shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) this.asyncTasks.Shutdown(); this.asyncTasks = null; this.asyncWriteTask = null; this.asyncErrorTask = null; } } }
3. Führen Sie ActiveMQ aus und suchen Sie nach dem Dekomprimieren von ActiveMQ den Ordner „bin“: ...apache-activemq-5.3.1bin. Führen Sie die Batchdatei „activemq.bat“ aus. Der Standardport ist 61616, dies kann in der Konfigurationsdatei geändert werden.
4. Schreiben Sie ein C#-Programm, um eine einfache Anwendung von ActiveMQ zu implementieren. Erstellen Sie ein neues C#-Projekt (ein Produktprojekt und ein Consumer-Projekt), entweder ein WinForm- oder ein Konsolenprogramm. Fügen Sie hier Verweise auf Apache.NMS.dll und Apache.NMS.ActiveMQ.dll hinzu, und dann können Sie schreiben die Implementierung. Hier ist der Code für die einfache Implementierung von Producer und Consumer:
Produzent:
using System; using System.Collections.Generic; using System.Text; using Apache.NMS; using Apache.NMS.ActiveMQ; using System.IO; using System.Xml.Serialization; using System.Runtime.Serialization.Formatters.Binary; namespace Publish { class Program { static void Main(string[] args) { try { //Create the Connection Factory IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { //Create the Session using (ISession session = connection.CreateSession()) { //Create the Producer for the topic/queue IMessageProducer prod = session.CreateProducer( new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing")); //Send Messages int i = 0; while (!Console.KeyAvailable) { ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); System.Threading.Thread.Sleep(5000); i++; } } } Console.ReadLine(); } catch (System.Exception e) { Console.WriteLine("{0}",e.Message); Console.ReadLine(); } } } }
Funktion implementiert von Das Programm: Producer erstellt einen Namen für das Testthema und sendet alle 5 Sekunden Nachrichten an dieses Thema. Daher abonniert der Consumer das Testthema, solange der Produzent eine Nachricht des Testthemas an den ActiveMQ-Server sendet Der Server sendet die Nachricht an den Verbraucher, der das Testthema abonniert hat.
using System; using System.Collections.Generic; using System.Text; using Apache.NMS; using Apache.NMS.ActiveMQ; using System.IO; using System.Xml.Serialization; using System.Runtime.Serialization.Formatters.Binary; namespace Subscribe { class Program { static void Main(string[] args) { try { //Create the Connection factory IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); //Create the connection using (IConnection connection = factory.CreateConnection()) { connection.ClientId = "testing listener"; connection.Start(); //Create the Session using (ISession session = connection.CreateSession()) { //Create the Consumer IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false); consumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } connection.Stop(); connection.Close(); } } catch (System.Exception e) { Console.WriteLine(e.Message); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = (ITextMessage)message; Console.WriteLine("Receive: " + msg.Text); } catch (System.Exception e) { Console.WriteLine(e.Message); } } } }
Kompilieren und generieren Sie Producer.exe und Consumer.exe und führen Sie die beiden Exes aus, um das Senden und Empfangen von Nachrichten zu sehen.
Verbraucher:
public void SetProperties() { ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); msg.Properties.SetString("myFilter", "test1"); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); msg.Properties.SetString("myFilter", "test1"); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); }
Das obige ist der detaillierte Inhalt vonAusführliche Erläuterung der Anwendung von ActiveMQ in C#. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!