Maison  >  Article  >  développement back-end  >  Analyser l'implémentation par Python de la file d'attente de messages MQ et les avantages de la file d'attente de messages

Analyser l'implémentation par Python de la file d'attente de messages MQ et les avantages de la file d'attente de messages

巴扎黑
巴扎黑original
2017-08-16 13:37:046116parcourir

Le rôle de préservation de la file d'attente de messages (MQ, Message Queue) dans la transmission des données de message offre une garantie de communication des données et une commodité dans le traitement en temps réel. Nous examinerons ici l'implémentation des threads de la file d'attente de messages MQ en Python et le. file d'attente des messages Analyse des avantages

La "file d'attente des messages" est un conteneur qui enregistre les messages lors de leur transmission. Le gestionnaire de files d'attente de messages agit comme intermédiaire lors du relais des messages de sa source vers sa destination. L'objectif principal d'une file d'attente est d'assurer le routage et de garantir la livraison des messages ; si le destinataire n'est pas disponible lorsque le message est envoyé, Message Queue conserve le message jusqu'à ce qu'il puisse être livré avec succès. Je pense que la file d'attente de messages est un composant crucial pour toute architecture ou application. Voici dix raisons :

Exemple de file d'attente de messages Python :

1.threading+Implémentation de la file d'attente de threads

#!/usr/bin/env python
 
import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
 """没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""
 def __init__(self, queue):
  threading.Thread.__init__(self)
  self.queue = queue
 
 def run(self):
  whileTrue:
   #消费者端,从队列中获取num
   num = self.queue.get()
   print "i'm num %s"%(num)
   time.sleep(1)
   #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
   self.queue.task_done()
 
start = time.time()
def main():
 #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发
 for i in range(10):
  t = ThreadNum(queue)
  t.setDaemon(True)
  t.start()
  
 #往队列中填错数据 
 for num in range(10):
   queue.put(num)
 #wait on the queue until everything has been processed
 queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

Résultats d'exécution :

i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896

Interprétation :

Les étapes de travail spécifiques sont décrites comme suit :

1, créez une Queue.Queue () et remplissez avec des données.

2, transmettez l'instance de données remplie à la classe de thread, qui est créée en héritant de threading.Thread.

3. Générez un pool de threads démon.

4. Prenez un élément de la file d'attente à chaque fois et utilisez la méthode data and run dans ce fil pour effectuer le travail correspondant.

5. Après avoir terminé ce travail, utilisez la fonction queue.task_done() pour envoyer un signal à la file d'attente indiquant que la tâche est terminée.

6. Effectuer une opération de jointure sur la file d'attente signifie en fait attendre que la file d'attente soit vide avant de quitter le programme principal.

Une chose à noter lors de l'utilisation de ce mode : en définissant le thread démon sur true, le programme se fermera automatiquement après son exécution. L'avantage est que vous pouvez effectuer une opération de jointure sur la file d'attente ou attendre que la file d'attente soit vide avant de sortir.

2. Files d'attente multiples

Les soi-disant files d'attente multiples, la sortie d'une file d'attente peut être utilisée comme entrée d'une autre file d'attente

#!/usr/bin/env python
import Queue
import threading
import time
 
queue = Queue.Queue()
out_queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  def __init__(self, queue, out_queue):
    threading.Thread.__init__(self)
    self.queue = queue
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中取消息
      num = self.queue.get()
      bkeep = num
      
      #将bkeep放入队列中
      self.out_queue.put(bkeep)
 
      #signals to queue job is done
      self.queue.task_done()
 
class PrintLove(threading.Thread):
  def __init__(self, out_queue):
    threading.Thread.__init__(self)
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中获取消息并赋值给bkeep
      bkeep = self.out_queue.get()  
      keke = "I love " + str(bkeep)
      print keke,
      print self.getName()
      time.sleep(1)
 
      #signals to queue job is done
      self.out_queue.task_done()
 
start = time.time()
def main():
  #populate queue with data
  for num in range(10):
    queue.put(num)
    
  #spawn a pool of threads, and pass them queue instance
  for i in range(5):
    t = ThreadNum(queue, out_queue)
    t.setDaemon(True)
    t.start()
 
 
  for i in range(5):
    pl = PrintLove(out_queue)
    pl.setDaemon(True)
    pl.start()
 
  #wait on the queue until everything has been processed
  queue.join()
  out_queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

Résultats d'exécution :

I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098

Interprétation :

Workflow de la classe ThreadNum

Définir la file d'attente--->Hériter le threading---->Initialiser file d'attente-- -->Définir la fonction d'exécution--->obtenir les données dans la file d'attente---->Traiter les données---->mettre les données dans une autre file d'attente-->Envoyer un signal à la file d'attente pour indiquer à la file d'attente que le traitement est terminé

Flux de travail de la fonction principale :

---> >---> La boucle for est déterminée comme étant démarrée Nombre de threads----> Instancier la classe ThreadNum----> Démarrez le thread et configurez la garde

--- > La boucle for détermine le nombre de threads démarrés----> Instancier la classe PrintLove ---> Démarrez le fil et définissez-le comme garde

---> file d'attente à traiter, puis exécutez la jointure. Autrement dit, quittez le programme principal.

Après avoir compris la mise en œuvre générale de MQ, résumons les avantages des files d'attente de messages :

1 Découplage

Au début du projet, prédisez les risques futurs du projet. est extrêmement difficile de trouver ce dont on a besoin. La file d'attente de messages insère une couche d'interface implicite basée sur les données au milieu du processus de traitement, et les processus de traitement des deux côtés doivent implémenter cette interface. Cela vous permet d’étendre ou de modifier les processus des deux côtés indépendamment, à condition qu’ils respectent les mêmes contraintes d’interface.

2. Redondance

Parfois, le processus échoue lors du traitement des données. Si les données ne sont pas conservées, elles sont perdues à jamais. Message Queuing évite le risque de perte de données en conservant les données jusqu'à ce qu'elles aient été complètement traitées. Dans le paradigme « insérer-obtenir-supprimer » utilisé par de nombreuses files d'attente de messages, avant de supprimer un message de la file d'attente, votre processus de traitement doit indiquer clairement que le message a été traité pour garantir la sécurité de vos données. fini de l'utiliser.

3. Évolutivité

Étant donné que la file d'attente des messages découple votre traitement, il est facile d'augmenter la fréquence de mise en file d'attente et de traitement des messages ; Il n'est pas nécessaire de modifier le code ou d'ajuster les paramètres. L'expansion est aussi simple que d'appuyer sur le bouton d'alimentation.

4. Flexibilité et capacité de traitement maximale

Lorsque votre application est sur la page d'accueil de Hacker News, vous constaterez que le trafic a atteint un niveau inhabituel. Votre application doit toujours continuer à fonctionner lorsque le nombre de visites augmente considérablement, mais de tels pics de trafic sont rares ; ce serait un énorme gaspillage d'investir des ressources en veille sur la base de la capacité à gérer de tels pics de visites. L'utilisation de files d'attente de messages permet aux composants critiques de résister à une pression d'accès accrue sans s'effondrer complètement en raison de requêtes surchargées. Consultez notre article de blog sur les capacités de traitement de pointe pour plus d'informations à ce sujet.

5. Récupérabilité

Lorsque certains composants du système tombent en panne, cela n'affectera pas l'ensemble du système. La file d'attente de messages réduit le couplage entre les processus, de sorte que même si un processus traitant des messages raccroche, les messages ajoutés à la file d'attente peuvent toujours être traités après la récupération du système. La possibilité d'autoriser la nouvelle tentative ou le report des requêtes fait souvent la différence entre un utilisateur légèrement gêné et un utilisateur frustré.

6. Garantie de livraison

Le mécanisme de redondance fourni par la file d'attente des messages garantit que le message peut être réellement traité, tant qu'un processus lit la file d'attente. Sur cette base, IronMQ offre une garantie « livraison unique ». Quel que soit le nombre de processus recevant des données de la file d'attente, chaque message ne peut être traité qu'une seule fois. Ceci est possible car recevoir un message « s'abonne » simplement au message, le supprimant temporairement de la file d'attente. À moins que le client n'indique explicitement qu'il a terminé le traitement du message, le message sera remis dans la file d'attente et pourra être traité à nouveau après une période de temps configurable.

7. Garanties de commande

Dans de nombreux cas, l'ordre dans lequel les données sont traitées est important. La file d'attente des messages est intrinsèquement triée et peut garantir que les données seront traitées dans un ordre spécifique. IronMO garantit que les messages sont traités dans l'ordre FIFO (premier entré, premier sorti), de sorte que la position des messages dans la file d'attente correspond à la position à partir de laquelle ils ont été récupérés.

8. Buffering

Dans tout système important, il y aura des éléments qui nécessiteront des temps de traitement différents. Par exemple, charger une image prend moins de temps que appliquer un filtre. Les files d'attente de messages utilisent une couche tampon pour aider les tâches à s'exécuter le plus efficacement possible : les écritures dans la file d'attente sont traitées aussi rapidement que possible, sans être contraintes par un traitement préparatoire à la lecture dans la file d'attente. Cette mise en mémoire tampon permet de contrôler et d'optimiser la vitesse à laquelle les données circulent dans le système.

9. Comprendre le flux de données

Dans un système distribué, il est très difficile d'avoir une idée globale de la durée des opérations utilisateur et de leur raison. Les séries de messages peuvent aider à identifier les processus ou les domaines sous-performants en fonction de la fréquence de traitement des messages, là où le flux de données n'est pas suffisamment optimisé.

10. Communication asynchrone

Souvent, vous ne voulez pas ou n'avez pas besoin de traiter les messages immédiatement. Les files d'attente de messages fournissent un mécanisme de traitement asynchrone qui vous permet de placer un message dans la file d'attente mais de ne pas le traiter immédiatement. Vous pouvez mettre autant de messages que vous le souhaitez dans la file d'attente et les traiter quand vous en avez envie.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn