Maison >développement back-end >Tutoriel Python >Tutoriel sur la façon d'utiliser la file d'attente de messages (RabbitMQ) en Python
RabbitMQ est un système de messagerie d'entreprise complet et réutilisable basé sur AMQP. Il fait suite à l’accord open source Mozilla Public License. L'article suivant vous présente principalement le didacticiel sur la façon d'utiliser Python pour faire fonctionner la file d'attente de messages RabbitMQ. Les amis dans le besoin peuvent s'y référer.
Avant-propos
RabbitMQ est un système de messagerie d'entreprise complet et réutilisable basé sur AMQP. Il fait suite à l’accord open source Mozilla Public License.
MQ signifie Message Queue. Message Queuing (MQ) est une méthode de communication d'application à application. Les applications communiquent en lisant et en écrivant des messages (données spécifiques à l'application) vers et depuis les files d'attente sans nécessiter une connexion dédiée pour les relier. La messagerie fait référence aux programmes communiquant entre eux en envoyant des données dans des messages, plutôt qu'en s'appelant directement, ce qui est généralement utilisé pour des techniques telles que les appels de procédure à distance. La file d'attente fait référence aux applications communiquant via des files d'attente. L'utilisation de files d'attente supprime l'exigence selon laquelle les applications de réception et d'envoi s'exécutent simultanément.
Scénarios d'application :
RabbitMQ est sans aucun doute l'une des files d'attente de messages les plus populaires à l'heure actuelle, et elle prend en charge divers environnements linguistiques. J'ai Il est nécessaire d'apprendre et de comprendre cet outil. Il existe environ trois scénarios d'utilisation pour les files d'attente de messages :
1. Intégration du système et conception de systèmes distribués. Différents sous-systèmes sont connectés via des messages, et cette solution s'est progressivement développée vers un style architectural, à savoir « l'architecture passant par des messages ».
2. Lorsque la méthode de traitement de synchronisation dans le système affecte sérieusement le débit, comme la journalisation. Si nous devons enregistrer tous les journaux de comportement des utilisateurs dans le système, l'enregistrement synchrone des journaux affectera inévitablement la vitesse de réponse du système. Lorsque nous envoyons des messages de journal à la file d'attente des messages, le sous-système de journalisation consommera les informations des journaux de manière asynchrone.
3. Haute disponibilité du système, comme les scénarios de vente flash e-commerce. Lorsque le serveur d'applications ou le serveur de base de données reçoit un grand nombre de requêtes à un moment donné, un temps d'arrêt du système se produit. Si la requête peut être transmise à la file d'attente des messages, puis que le serveur consomme ces messages, la requête deviendra plus fluide et la disponibilité du système sera améliorée.
1. Environnement d'installation
Tout d'abord, installez RabbitMQ sur Linux
# 环境为CentOS 7 yum install rabbitmq-server # 安装RabbitMQ systemctl start rabbitmq-server # 启动 systemctl enable rabbitmq-server # 开机自启 systemctl stop firewall-cmd # 临时关闭防火墙
Utilisez ensuite pip pour installer le package de développement Python3
pip3 install pika
Après avoir installé le logiciel, vous pouvez visiter http:// 115. xx.xx.xx:15672/ pour accéder à la page Web intégrée pour afficher et gérer RabbitMQ. Le mot de passe de l'utilisateur administrateur par défaut est invité
2. Ajoutez simplement un message à la file d'attente
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:25 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Producer import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx')) # 创建频道对象 channel = connection.channel() # 指定一个队列,如果该队列不存在则创建 channel.queue_declare(queue='test_queue') # 提交消息 for i in range(10): channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i)) print("sent...") # 关闭连接 connection.close()
3. Recevez simplement les messages de la file d'attente
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:40 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Consumer import pika credentials = pika.PlainCredentials('guest', 'guest') # 连接到RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel() # 指定一个队列,如果该队列不存在则创建 channel.queue_declare(queue='test_queue') # 定义一个回调函数 def callback(ch, method, properties, body): print(body.decode('utf-8')) # 告诉RabbitMQ使用callback来接收信息 channel.basic_consume(callback, queue='test_queue', no_ack=False) print('waiting...') # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。 channel.start_consuming()
4. . Dans le cas où le consommateur se déconnecte
Imaginez une situation comme celle-ci :
Le consommateur se retire de la file d'attente des messages que j'ai obtenue. n morceaux de données, et alors que j'étais sur le point de les traiter, la machine est tombée en panne. Que dois-je faire ? Il existe un ACK dans RabbieMQ qui peut être utilisé pour confirmer la fin du traitement du consommateur. C'est un peu similaire à l'ACK dans le réseau. Chaque fois que le consommateur obtient des données de la file d'attente, la file d'attente ne supprimera pas les données immédiatement, mais attendra l'ACK correspondant. Une fois que le consommateur a obtenu les données et terminé le traitement, il enverra un paquet ACK à la file d'attente pour informer RabbitMQ que le message a été traité et peut être supprimé. À ce moment, RabbitMQ supprimera les données de la file d'attente. Ainsi, dans ce cas, même si le consommateur se déconnecte, il n’y a pas de problème, les données existeront toujours dans la file d’attente, laissant le soin aux autres consommateurs de les traiter.
est implémenté en Python comme ceci :
Le consommateur a une telle ligne de code channel.basic_consume(callback, queue='test_queue', no_ack=False)
, où no_ack=False
signifie ne pas envoyer un paquet de confirmation. Le modifier en no_ack=True enverra un paquet de confirmation à RabbitMQ après chaque traitement pour confirmer que le message est traité.
5. Et si RabbitMQ tombe en panne
Bien qu'il y ait un paquet ACK, que se passe-t-il si RabbitMQ raccroche les données ? Il y aura toujours des pertes. Nous pouvons donc mettre en place un stockage de persistance des données pour RabbitMQ. RabbitMQ conservera les données sur le disque pour garantir que la file d'attente sera toujours là au prochain démarrage.
est implémenté en Python comme ceci :
我们声明一个队列是这样的channel.queue_declare(queue='test_queue')
,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True)
。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))
。
六、最简单的发布订阅
最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。
发布者代码:
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:21 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Publisher import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx')) # 创建频道对象 channel = connection.channel() # 定义交换机,exchange表示交换机名称,type表示类型 channel.exchange_declare(exchange='my_fanout', type='fanout') message = 'Hello Python' # 将消息发送到交换机 channel.basic_publish(exchange='my_fanout', # 指定exchange routing_key='', # fanout下不需要配置,配置了也不会生效 body=message) connection.close()
订阅者代码:
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:20 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Subscriber import pika credentials = pika.PlainCredentials('guest', 'guest') # 连接到RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel() # 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型 channel.exchange_declare(exchange='my_fanout', type='fanout') # 随机创建队列 result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除 queue_name = result.method.queue # 将队列与exchange进行绑定 channel.queue_bind(exchange='my_fanout', queue=queue_name) # 定义回调方法 def callback(ch, method, properties, body): print(body.decode('utf-8')) # 从队列获取信息 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
总结
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!