ホームページ >バックエンド開発 >Python チュートリアル >Python の Twisted フレームワークを使用してノンブロッキング プログラムを作成するコード例

Python の Twisted フレームワークを使用してノンブロッキング プログラムを作成するコード例

高洛峰
高洛峰オリジナル
2017-02-03 16:33:301310ブラウズ

まずコードを見てみましょう:

# ~*~ Twisted - A Python tale ~*~
 
from time import sleep
 
# Hello, I'm a developer and I mainly setup Wordpress.
def install_wordpress(customer):
  # Our hosting company Threads Ltd. is bad. I start installation and...
  print "Start installation for", customer
  # ...then wait till the installation finishes successfully. It is
  # boring and I'm spending most of my time waiting while consuming
  # resources (memory and some CPU cycles). It's because the process
  # is *blocking*.
  sleep(3)
  print "All done for", customer
 
# I do this all day long for our customers
def developer_day(customers):
  for customer in customers:
    install_wordpress(customer)
 
developer_day(["Bill", "Elon", "Steve", "Mark"])

実行すると、結果は次のようになります:

   
$ ./deferreds.py 1
------ Running example 1 ------
Start installation for Bill
All done for Bill
Start installation
...
* Elapsed time: 12.03 seconds

これは、順次実行されるコードです。消費者が 4 人の場合、インストールには 1 人で 3 秒かかるため、4 人の場合は 12 秒かかります。これはあまり満足のいくものではないので、スレッドを使用した 2 番目の例を見てください:

import threading
 
# The company grew. We now have many customers and I can't handle the
# workload. We are now 5 developers doing exactly the same thing.
def developers_day(customers):
  # But we now have to synchronize... a.k.a. bureaucracy
  lock = threading.Lock()
  #
  def dev_day(id):
    print "Goodmorning from developer", id
    # Yuck - I hate locks...
    lock.acquire()
    while customers:
      customer = customers.pop(0)
      lock.release()
      # My Python is less readable
      install_wordpress(customer)
      lock.acquire()
    lock.release()
    print "Bye from developer", id
  # We go to work in the morning
  devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]
  [dev.start() for dev in devs]
  # We leave for the evening
  [dev.join() for dev in devs]
 
# We now get more done in the same time but our dev process got more
# complex. As we grew we spend more time managing queues than doing dev
# work. We even had occasional deadlocks when processes got extremely
# complex. The fact is that we are still mostly pressing buttons and
# waiting but now we also spend some time in meetings.
developers_day(["Customer %d" % i for i in xrange(15)])

実行してください:

 $ ./deferreds.py 2
------ Running example 2 ------
Goodmorning from developer 0Goodmorning from developer
1Start installation forGoodmorning from developer 2
Goodmorning from developer 3Customer 0
...
from developerCustomer 13 3Bye from developer 2
* Elapsed time: 9.02 seconds

今回は、5 つのワーカー スレッドを使用して並列実行されるコードです。 15 個のコンシューマーがそれぞれ 3 秒かかるとすると、合計 45 秒かかりますが、5 つのスレッドを使用して並列実行すると、合計で 9 秒しかかかりません。このコードは少し複雑で、コードの大部分は、アルゴリズムやビジネス ロジックに焦点を当てるのではなく、同時実行性の管理に使用されます。さらに、プログラムの出力は非常に複雑で読みにくいように見えます。単純なマルチスレッド コードでもうまく書くのは難しいため、Twisted を使用することにしました:

# For years we thought this was all there was... We kept hiring more
# developers, more managers and buying servers. We were trying harder
# optimising processes and fire-fighting while getting mediocre
# performance in return. Till luckily one day our hosting
# company decided to increase their fees and we decided to
# switch to Twisted Ltd.!
 
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task
 
# Twisted has a slightly different approach
def schedule_install(customer):
  # They are calling us back when a Wordpress installation completes.
  # They connected the caller recognition system with our CRM and
  # we know exactly what a call is about and what has to be done next.
  #
  # We now design processes of what has to happen on certain events.
  def schedule_install_wordpress():
      def on_done():
        print "Callback: Finished installation for", customer
    print "Scheduling: Installation for", customer
    return task.deferLater(reactor, 3, on_done)
  #
  def all_done(_):
    print "All done for", customer
  #
  # For each customer, we schedule these processes on the CRM
  # and that
  # is all our chief-Twisted developer has to do
  d = schedule_install_wordpress()
  d.addCallback(all_done)
  #
  return d
 
# Yes, we don't need many developers anymore or any synchronization.
# ~~ Super-powered Twisted developer ~~
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  #
  # Here's what has to be done today
  work = [schedule_install(customer) for customer in customers]
  # Turn off the lights when done
  join = defer.DeferredList(work)
  join.addCallback(lambda _: reactor.stop())
  #
  print "Bye from Twisted developer!"
# Even his day is particularly short!
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
 
# Reactor, our secretary uses the CRM and follows-up on events!
reactor.run()


実行結果:

------ Running example 3 ------
Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
....
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
...
All done for Customer 14
* Elapsed time: 3.18 seconds


今回は、スレッドを使用せずに完全に実行されたコードと読み取り可能な出力が得られました。 15 個のコンシューマーを並行して処理しました。つまり、当初 45 秒かかった実行時間は 3 秒以内に完了しました。秘訣は、sleep() へのブロック呼び出しをすべて、Twisted の同等の task.deferLater() およびコールバック関数に置き換えることです。処理は別の場所で行われるため、15 人の消費者に同時に問題なくサービスを提供できます。
前述の処理操作は別の場所で行われます。ここで説明すると、算術演算は依然として CPU 内で発生しますが、CPU の処理速度はディスクやネットワークの演算に比べて非常に高速になっています。そのため、CPU へのデータの供給、または CPU からメモリまたは別の CPU へのデータの送信にほとんどの時間がかかります。この領域での時間を節約するために、非ブロッキング操作を使用します。たとえば、task.deferLater() は、データが転送されたときにアクティブ化されるコールバック関数を使用します。
もう 1 つの非常に重要な点は、出力内の Twisted 開発者からのおはようと、Twisted 開発者からのバイバイ情報です。これら 2 つの情報は、コードの実行開始時にすでに出力されています。コードがこの時点まで非常に早く実行される場合、アプリケーションはいつ実際に実行を開始するのでしょうか?答えは、Twisted アプリケーション (Scrapy を含む) の場合は、reactor.run() で実行されるということです。このメソッドを呼び出す前に、アプリケーションで使用できるすべての Deferred チェーンの準備ができている必要があります。その後、reactor.run() メソッドがコールバック関数を監視してアクティブ化します。
actor の主なルールの 1 つは、十分に高速でブロックされない限り、どのような操作も実行できることに注意してください。
さて、コードにはマルチスレッドの管理に使用される部分はありませんが、これらのコールバック関数はまだ少し乱雑に見えます。次のように変更できます:

# Twisted gave us utilities that make our code way more readable!
@defer.inlineCallbacks
def inline_install(customer):
  print "Scheduling: Installation for", customer
  yield task.deferLater(reactor, 3, lambda: None)
  print "Callback: Finished installation for", customer
  print "All done for", customer
 
def twisted_developer_day(customers):
  ... same as previously but using inline_install() instead of schedule_install()
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()


実行結果は前の例と同じです。このコードの機能は前の例と同じですが、より簡潔で明確になっています。 inlineCallbacks ジェネレーターは、いくつかの Python メカニズムを使用して、inline_install() 関数の実行を一時停止または再開できます。 inline_install() 関数は Deferred オブジェクトになり、各コンシューマーに対して並列実行されます。イールドが発生するたびに、オペレーションは、イールドの Deferred オブジェクトが完了するまで現在の inline_install() インスタンスで一時停止され、その後再開されます。
今の唯一の疑問は、消費者が 15 人ではなく、たとえば 10,000 人だったらどうなるかということです。このコードは、10,000 の同時実行シーケンス (HTTP リクエスト、データベース書き込みなど) を開始します。これを行うことに問題はないかもしれませんが、さまざまな失敗を引き起こす可能性もあります。 Scrapy など、大量の同時リクエストを伴うアプリケーションでは、多くの場合、同時実行数を許容レベルに制限する必要があります。次の例では、task.Cooperator() を使用してこのような関数を完了します。 Scrapy は、同じメカニズムを使用して、アイテム パイプラインの同時実行数 (つまり、CONCURRENT_ITEMS 設定) を制限します。プログラムの実行中にコンシューマーを処理するためのスロットが 5 つあるようです。スロットが解放されない限り、次のコンシューマ要求の処理は開始されません。この例では、処理時間はすべて 3 秒なので、5 回に分けて処理されているように見えます。最終的なパフォーマンスはスレッドを使用した場合と同じですが、今回はスレッドが 1 つだけなので、コードがよりシンプルになり、正しいコードを書きやすくなります。

追記: deferToThread は同期関数をノンブロッキングにします

wisted defer.Deferred (twisted.internet import defer から) は遅延オブジェクトを返すことができます


注: deferToThread はスレッドを使用して実装されており、過度の使用は推奨されません

***同期関数を非同期関数に変える (Deferred を返す)***

twisted の deferToThread (twisted.internet.threads import deferToThread から) も遅延オブジェクトを返しますが、コールバック関数は別のスレッドで処理され、主にデータベース/ファイルの読み取りに使用されます。フェッチ操作

@defer.inlineCallbacks
def inline_install(customer):
  ... same as above
 
# The new "problem" is that we have to manage all this concurrency to
# avoid causing problems to others, but this is a nice problem to have.
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  work = (inline_install(customer) for customer in customers)
  #
  # We use the Cooperator mechanism to make the secretary not
  # service more than 5 customers simultaneously.
  coop = task.Cooperator()
  join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])
  #
  join.addCallback(lambda _: reactor.stop())
  print "Bye from Twisted developer!"
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
 
# We are now more lean than ever, our customers happy, our hosting
# bills ridiculously low and our performance stellar.
# ~*~ THE END ~*~


ここにある完全な例は参考用です

# -*- coding: utf-8 -*-
 
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
 
import functools
import time
 
# 耗时操作 这是一个同步阻塞函数
def mySleep(timeout):
  time.sleep(timeout)
 
  # 返回值相当于加进了callback里
  return 3
 
def say(result):
  print "耗时操作结束了, 并把它返回的结果给我了", result
 
# 用functools.partial包装一下, 传递参数进去
cb = functools.partial(mySleep, 3)
d = deferToThread(cb)
d.addCallback(say)
 
print "你还没有结束我就执行了, 哈哈"
 
reactor.run()

更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。