Maison >développement back-end >Tutoriel C#.Net >Explication détaillée de l'application d'ActiveMQ en C#

Explication détaillée de l'application d'ActiveMQ en C#

黄舟
黄舟original
2017-09-21 11:32:252990parcourir

ActiveMQ est une bonne chose, il va sans dire. ActiveMQ fournit une prise en charge de plusieurs langages, tels que Java, C, C++, C#, Ruby, Perl, Python, PHP, etc. Depuis que je développe une interface graphique sous Windows, je suis plus préoccupé par le C++ et C# de C# est très simple. Apache fournit un NMS (.Net Messaging Service) pour prendre en charge le développement .Net. L'application du C++ est relativement fastidieuse, et j'écrirai un article pour la présenter plus tard.

1. Accédez au site officiel d'ActiveMQ pour télécharger la dernière version d'ActiveMQ que j'ai téléchargée 5.3.1 auparavant, et la 5.3.2 est maintenant disponible.

2. Accédez au site officiel d'ActiveMQ pour télécharger la dernière version d'Apache.NMS. Vous devez télécharger deux packages bin : Apache.NMS et Apache.NMS.ActiveMQ. , vous pouvez également télécharger le package src. Je voudrais vous rappeler ici que si vous téléchargez la version 1.2.0 de NMS.ActiveMQ, Apache.NMS.ActiveMQ.dll a un bug en utilisation réelle, c'est-à-dire qu'une exception de fonction WaitOne sera levée lors de l'arrêt de l'application ActiveMQ. . Vérifiez le code source dans le package src et constatez qu'il est dû au code suivant dans Apache.NMS.ActiveMQ-1.2.0-srcsrcmaincsharpTransportInactivityMonitor.cs. Modifiez simplement le code source et recompilez. J'ai vérifié que la dernière version 1.3.0 a corrigé ce bug, il suffit donc de télécharger la dernière version.

private void StopMonitorThreads()   
        {   
            lock(monitor)   
            {   
                if(monitorStarted.CompareAndSet(true, false))   
                {   
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);   
                    // Attempt to wait for the Timers to shutdown, but don't wait   
                    // forever, if they don't shutdown after two seconds, just quit.   
                    this.readCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                    this.writeCheckTimer.Dispose(shutdownEvent);   
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));   
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)   
                    this.asyncTasks.Shutdown();   
                    this.asyncTasks = null;   
                    this.asyncWriteTask = null;   
                    this.asyncErrorTask = null;   
                }   
            }   
        }  
     private void StopMonitorThreads() 
        { 
            lock(monitor) 
            { 
                if(monitorStarted.CompareAndSet(true, false)) 
                { 
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);

                    // Attempt to wait for the Timers to shutdown, but don't wait 
                    // forever, if they don't shutdown after two seconds, just quit. 
                    this.readCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                    this.writeCheckTimer.Dispose(shutdownEvent); 
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000)); 
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext) 
                    this.asyncTasks.Shutdown(); 
                    this.asyncTasks = null; 
                    this.asyncWriteTask = null; 
                    this.asyncErrorTask = null; 
                } 
            } 
        }

3. Exécutez ActiveMQ et recherchez le dossier bin après avoir décompressé ActiveMQ : ...apache-activemq-5.3.1bin Exécutez le fichier batch activemq.bat pour démarrer le serveur ActiveMQ. 61616, cela peut être modifié dans le fichier de configuration.

4. Écrivez un programme C# pour implémenter une application simple d'ActiveMQ. Créez un nouveau projet C# (un projet Produit et un projet Consommateur), soit WinForm, soit un programme Console. Le projet Console est construit ici. Ajoutez des références à Apache.NMS.dll et Apache.NMS.ActiveMQ.dll, puis vous pouvez écrire. l'implémentation. Voici le code. Les codes simples d'implémentation Producteur et Consommateur sont les suivants :

producteur :

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Publish   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection Factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Producer for the topic/queue   
                        IMessageProducer prod = session.CreateProducer(   
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));   
                        //Send Messages   
                        int i = 0;   
                        while (!Console.KeyAvailable)   
                        {   
                            ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);   
                            System.Threading.Thread.Sleep(5000);   
                            i++;   
                        }   
                    }   
                }   
                Console.ReadLine();   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine("{0}",e.Message);   
                Console.ReadLine();   
            }   
        }   
    }   
}

consommateur :

using System;   
using System.Collections.Generic;   
using System.Text;   
using Apache.NMS;   
using Apache.NMS.ActiveMQ;   
using System.IO;   
using System.Xml.Serialization;   
using System.Runtime.Serialization.Formatters.Binary;   
namespace Subscribe   
{   
    class Program   
    {   
        static void Main(string[] args)   
        {   
            try  
            {   
                //Create the Connection factory   
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");   
                //Create the connection   
                using (IConnection connection = factory.CreateConnection())   
                {   
                    connection.ClientId = "testing listener";   
                    connection.Start();   
                    //Create the Session   
                    using (ISession session = connection.CreateSession())   
                    {   
                        //Create the Consumer   
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);   
                        consumer.Listener += new MessageListener(consumer_Listener);   
                        Console.ReadLine();   
                    }   
                    connection.Stop();   
                    connection.Close();   
                }   
            }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
        static void consumer_Listener(IMessage message)   
        {   
            try  
            {   
                ITextMessage msg = (ITextMessage)message;   
                Console.WriteLine("Receive: " + msg.Text);   
           }   
            catch (System.Exception e)   
            {   
                Console.WriteLine(e.Message);   
            }   
        }   
    }   
}

Fonction implémentée par le programme : le producteur crée un nom pour le sujet de test et envoie des messages à ce sujet toutes les 5 secondes. Le consommateur s'abonne au sujet de test. Par conséquent, tant que le producteur envoie un message du sujet de test au serveur ActiveMQ. Le serveur enverra le message au consommateur qui s'est abonné au sujet de test qui.

Compilez et générez producteur.exe et consommateur.exe, et exécutez les deux ex pour voir l'envoi et la réception des messages.

Cet exemple est un sujet. ActiveMQ prend également en charge une autre méthode : Queue, c'est-à-dire P2P. Quelle est la différence entre les deux ? La différence est que le sujet est diffusé, c'est-à-dire que si un sujet est abonné par plusieurs consommateurs, tant qu'un message atteint le serveur, le serveur enverra le message à tous les consommateurs tandis que la file d'attente est point à point ; Autrement dit, un message ne peut être envoyé qu'à un consommateur. Si une file d'attente est abonnée par plusieurs consommateurs, sauf circonstances particulières, les messages seront envoyés à différents consommateurs un par un, par exemple :

msg1-- >consommateur A

msg2-->consommateur B

msg3-->consommateur C

msg4-->consommateur A

msg5 -->consumer B

msg6-->consumer C

Cas particulier signifie : ActiveMQ prend en charge le mécanisme de filtrage, c'est-à-dire que le producteur peut définir les propriétés du message (Propriétés), qui correspond au Sélecteur côté consommateur, le message sera envoyé au consommateur uniquement si le sélecteur défini par le consommateur correspond aux Propriétés du message. Le sélecteur de support de sujet et de file d'attente.

Comment définir les propriétés et le sélecteur ? Veuillez regarder le code suivant :

producteur :

public void SetProperties()
{
ITextMessage msg = prod.CreateTextMessage();   
                            msg.Text = i.ToString();   
                            msg.Properties.SetString("myFilter", "test1");   
                            Console.WriteLine("Sending: " + i.ToString());   
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  
ITextMessage msg = prod.CreateTextMessage(); 
                            msg.Text = i.ToString(); 
                            msg.Properties.SetString("myFilter", "test1"); 
                            Console.WriteLine("Sending: " + i.ToString()); 
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

}

consommateur :

public void SetSelector()
{
//生成consumer时通过参数设置Selector   
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");  
//生成consumer时通过参数设置Selector 
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");
}

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn