ホームページ >バックエンド開発 >Python チュートリアル >Python の MQ メッセージ キューの実装とメッセージ キューの利点を分析する

Python の MQ メッセージ キューの実装とメッセージ キューの利点を分析する

巴扎黑
巴扎黑オリジナル
2017-08-16 13:37:046201ブラウズ

メッセージデータ送信におけるメッセージキュー(MQ、Message Queue)の保持の役割は、データ通信の保証とリアルタイム処理の利便性を提供します。ここでは、Python でのスレッドの MQ メッセージキューの実装とその分析について見ていきます。メッセージキューの利点

「メッセージキュー」は、送信中にメッセージを保存するコンテナです。メッセージ キュー マネージャーは、メッセージを送信元から宛先に中継するときに仲介者として機能します。キューの主な目的は、ルーティングを提供し、メッセージの配信を保証することです。メッセージの送信時に受信者が不在の場合、メッセージ キューはメッセージが正常に配信されるまでメッセージを保持します。メッセージ キューは、あらゆるアーキテクチャやアプリケーションにとって重要なコンポーネントであると私は考えています。以下に 10 の理由があります:

Python メッセージ キューの例:

1.threading+Queue はスレッド キューを実装します

#!/usr/bin/env python
 
import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
 """没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""
 def __init__(self, queue):
  threading.Thread.__init__(self)
  self.queue = queue
 
 def run(self):
  whileTrue:
   #消费者端,从队列中获取num
   num = self.queue.get()
   print "i'm num %s"%(num)
   time.sleep(1)
   #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
   self.queue.task_done()
 
start = time.time()
def main():
 #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发
 for i in range(10):
  t = ThreadNum(queue)
  t.setDaemon(True)
  t.start()
  
 #往队列中填错数据 
 for num in range(10):
   queue.put(num)
 #wait on the queue until everything has been processed
 queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

実行結果:

i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896

解釈:

具体的な作業手順は次のとおりです。

1. Queue.Queue() のインスタンスを作成し、データを入力します。

2. threading.Threadを継承して作成されたスレッドクラスに、入力されたデータインスタンスを渡します。

3. デーモンスレッドプールを生成します。

4. 毎回キューから 1 つのアイテムを取り出し、このスレッドのデータと実行メソッドを使用して、対応する作業を実行します。

5. この作業が完了したら、queue.task_done() 関数を使用して、タスクが完了したことを示すシグナルをキューに送信します。

6. キューで結合操作を実行するということは、実際には、キューが空になるまでメイン プログラムを終了するまで待機することを意味します。

このモードを使用するときに注意すべき点が 1 つあります。デーモン スレッドを true に設定すると、プログラムは実行後に自動的に終了します。利点は、キューに対して結合操作を実行したり、キューが空になるまで待ってから終了したりできることです。

2. 複数のキュー

いわゆる複数のキュー。1 つのキューの出力を別のキューの入力として使用できます

#!/usr/bin/env python
import Queue
import threading
import time
 
queue = Queue.Queue()
out_queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  def __init__(self, queue, out_queue):
    threading.Thread.__init__(self)
    self.queue = queue
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中取消息
      num = self.queue.get()
      bkeep = num
      
      #将bkeep放入队列中
      self.out_queue.put(bkeep)
 
      #signals to queue job is done
      self.queue.task_done()
 
class PrintLove(threading.Thread):
  def __init__(self, out_queue):
    threading.Thread.__init__(self)
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中获取消息并赋值给bkeep
      bkeep = self.out_queue.get()  
      keke = "I love " + str(bkeep)
      print keke,
      print self.getName()
      time.sleep(1)
 
      #signals to queue job is done
      self.out_queue.task_done()
 
start = time.time()
def main():
  #populate queue with data
  for num in range(10):
    queue.put(num)
    
  #spawn a pool of threads, and pass them queue instance
  for i in range(5):
    t = ThreadNum(queue, out_queue)
    t.setDaemon(True)
    t.start()
 
 
  for i in range(5):
    pl = PrintLove(out_queue)
    pl.setDaemon(True)
    pl.start()
 
  #wait on the queue until everything has been processed
  queue.join()
  out_queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

実行結果:

I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098

解釈:

ThreadNum クラスのワークフロー

定義キュー - -->スレッドの継承--->キューの初期化--->実行関数の定義--->キュー内のデータの取得--->データの処理--->データの挿入別のキューに送信します

メイン関数のワークフロー:

---> カスタム キューにデータをスローします

---> for ループが開始されると決定される スレッドの数----> ThreadNum クラスをインスタンス化する----> スレッドを開始し、デーモンをセットアップする

---> ---->PrintLove クラスをインスタンス化します--- >スレッドを開始してガードとして設定します

--->キュー内のメッセージが処理されるのを待ってから結合を実行します。つまり、メインプログラムを終了します。

MQ の一般的な実装を理解した後、メッセージ キューの利点をまとめてみましょう:

1. デカップリング

プロジェクトの開始時に、プロジェクトが将来どのようなニーズに遭遇するかを予測することは非常に困難です。メッセージ キューは、処理プロセスの途中に暗黙的なデータベースのインターフェイス層を挿入し、両側の処理プロセスはこのインターフェイスを実装する必要があります。これにより、同じインターフェイスの制約に従っている限り、両側のプロセスを独立して拡張または変更できます。

2. 冗長性

データの処理中にプロセスが失敗することがあります。データは永続化しない限り、永久に失われます。メッセージ キューは、データが完全に処理されるまで保持することで、データ損失のリスクを回避します。多くのメッセージ キューで使用される「挿入-取得-削除」パラダイムでは、キューからメッセージを削除する前に、処理プロセスでメッセージが処理されたことを明確に示し、データが安全であることを確認する必要があります。それを使用して完了しました。

3. スケーラビリティ

メッセージ キューは処理を分離するため、追加の処理を追加するだけでメッセージのキューイングと処理の頻度を簡単に増やすことができます。コードを変更したりパラメータを調整したりする必要はありません。拡張は電源ボタンを押すだけで簡単です。

4. 柔軟性とピーク処理能力

あなたのアプリケーションが Hacker News のホームページに掲載されていると、トラフィックが異常なレベルに達していることがわかります。訪問数が大幅に増加した場合でも、アプリケーションは機能し続ける必要がありますが、このようなトラフィックの急増は珍しいことであり、そのようなピークの訪問を処理できるという基準に基づいてスタンバイにリソースを投資するのは非常に無駄です。メッセージ キューを使用すると、要求の過負荷によって完全に機能不全に陥ることなく、重要なコンポーネントが増大するアクセス圧力に耐えることができます。 詳細については、ピーク処理能力に関するブログ投稿をご覧ください。

5. 回復可能性

システムの一部のコンポーネントに障害が発生しても、システム全体には影響しません。メッセージ キューによりプロセス間の結合が軽減されるため、メッセージを処理するプロセスがハングアップした場合でも、キューに追加されたメッセージはシステムの回復後に引き続き処理できます。多くの場合、リクエストの再試行または延期を許可できるかどうかで、多少不便を感じるユーザーとイライラするユーザーの違いが決まります。

6. 配信保証

メッセージ キューによって提供される冗長メカニズムにより、1 つのプロセスがキューを読み取る限り、メッセージが実際に処理されることが保証されます。これに基づいて、IronMQ は「1 回のみの配信」保証を提供します。キューからデータを受信して​​いるプロセスの数に関係なく、各メッセージは 1 回しか処理できません。これが可能になるのは、メッセージの取得が単にメッセージを「サブスクライブ」し、メッセージをキューから一時的に削除するだけであるためです。クライアントがメッセージの処理を終了したことを明示的に示さない限り、メッセージはキューに戻され、構成可能な時間が経過した後に再度処理できるようになります。

7. 順序の保証

多くの場合、データが処理される順序が重要です。メッセージ キューは本質的にソートされており、データが特定の順序で処理されることを保証できます。 IronMO は、メッセージが FIFO (先入れ先出し) 順序で処理されることを保証するため、キュー内のメッセージの位置はメッセージが取得された位置になります。

8. バッファリング

重要なシステムには、さまざまな量の処理時間を必要とする要素が存在します。たとえば、画像の読み込みは、フィルターを適用するよりも時間がかかりません。メッセージ キューはバッファリング層を使用して、タスクを最も効率的に実行できるようにします。キューへの書き込みは、キューからの読み取りの準備処理に制約されることなく、できるだけ早く処理されます。このバッファリングは、システムを流れるデータの速度を制御し、最適化するのに役立ちます。

9. データ フローを理解する

分散システムでは、ユーザーの操作にどのくらいの時間がかかるのか、またその理由を全体的に把握するのは大きな課題です。メッセージ シリーズは、データ フローが十分に最適化されていない、メッセージの処理頻度によってパフォーマンスが低下しているプロセスや領域を特定するのに役立ちます。

10. 非同期通信

多くの場合、メッセージをすぐに処理したくない、またはその必要はありません。メッセージ キューは、メッセージをキューに入れてもすぐには処理できない非同期処理メカニズムを提供します。必要な数のメッセージをキューに入れて、必要なときに処理できます。

以上がPython の MQ メッセージ キューの実装とメッセージ キューの利点を分析するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。