Home  >  Article  >  Backend Development  >  Code examples for writing non-blocking programs using Python’s Twisted framework

Code examples for writing non-blocking programs using Python’s Twisted framework

高洛峰
高洛峰Original
2017-02-03 16:33:301232browse

Let’s look at a piece of code first:

# ~*~ Twisted - A Python tale ~*~
 
from time import sleep
 
# Hello, I'm a developer and I mainly setup Wordpress.
def install_wordpress(customer):
  # Our hosting company Threads Ltd. is bad. I start installation and...
  print "Start installation for", customer
  # ...then wait till the installation finishes successfully. It is
  # boring and I'm spending most of my time waiting while consuming
  # resources (memory and some CPU cycles). It's because the process
  # is *blocking*.
  sleep(3)
  print "All done for", customer
 
# I do this all day long for our customers
def developer_day(customers):
  for customer in customers:
    install_wordpress(customer)
 
developer_day(["Bill", "Elon", "Steve", "Mark"])

Run it, the result is as follows:

   
$ ./deferreds.py 1
------ Running example 1 ------
Start installation for Bill
All done for Bill
Start installation
...
* Elapsed time: 12.03 seconds

This is a piece of code that is executed sequentially. For four consumers, it takes 3 seconds to install for one person, so for four people it takes 12 seconds. This processing is not very satisfactory, so take a look at the second example using threads:

import threading
 
# The company grew. We now have many customers and I can't handle the
# workload. We are now 5 developers doing exactly the same thing.
def developers_day(customers):
  # But we now have to synchronize... a.k.a. bureaucracy
  lock = threading.Lock()
  #
  def dev_day(id):
    print "Goodmorning from developer", id
    # Yuck - I hate locks...
    lock.acquire()
    while customers:
      customer = customers.pop(0)
      lock.release()
      # My Python is less readable
      install_wordpress(customer)
      lock.acquire()
    lock.release()
    print "Bye from developer", id
  # We go to work in the morning
  devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]
  [dev.start() for dev in devs]
  # We leave for the evening
  [dev.join() for dev in devs]
 
# We now get more done in the same time but our dev process got more
# complex. As we grew we spend more time managing queues than doing dev
# work. We even had occasional deadlocks when processes got extremely
# complex. The fact is that we are still mostly pressing buttons and
# waiting but now we also spend some time in meetings.
developers_day(["Customer %d" % i for i in xrange(15)])

Run it:

 $ ./deferreds.py 2
------ Running example 2 ------
Goodmorning from developer 0Goodmorning from developer
1Start installation forGoodmorning from developer 2
Goodmorning from developer 3Customer 0
...
from developerCustomer 13 3Bye from developer 2
* Elapsed time: 9.02 seconds

This time it is executed in parallel The code uses 5 worker threads. 15 consumers taking 3 seconds each means a total of 45 seconds, but using 5 threads to execute in parallel only takes a total of 9 seconds. This code is a bit complex, and a large part of the code is used to manage concurrency rather than focusing on algorithms or business logic. In addition, the program's output looks very mixed and difficult to read. Even simple multi-threaded code is difficult to write well, so we switch to using Twisted:

# For years we thought this was all there was... We kept hiring more
# developers, more managers and buying servers. We were trying harder
# optimising processes and fire-fighting while getting mediocre
# performance in return. Till luckily one day our hosting
# company decided to increase their fees and we decided to
# switch to Twisted Ltd.!
 
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task
 
# Twisted has a slightly different approach
def schedule_install(customer):
  # They are calling us back when a Wordpress installation completes.
  # They connected the caller recognition system with our CRM and
  # we know exactly what a call is about and what has to be done next.
  #
  # We now design processes of what has to happen on certain events.
  def schedule_install_wordpress():
      def on_done():
        print "Callback: Finished installation for", customer
    print "Scheduling: Installation for", customer
    return task.deferLater(reactor, 3, on_done)
  #
  def all_done(_):
    print "All done for", customer
  #
  # For each customer, we schedule these processes on the CRM
  # and that
  # is all our chief-Twisted developer has to do
  d = schedule_install_wordpress()
  d.addCallback(all_done)
  #
  return d
 
# Yes, we don't need many developers anymore or any synchronization.
# ~~ Super-powered Twisted developer ~~
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  #
  # Here's what has to be done today
  work = [schedule_install(customer) for customer in customers]
  # Turn off the lights when done
  join = defer.DeferredList(work)
  join.addCallback(lambda _: reactor.stop())
  #
  print "Bye from Twisted developer!"
# Even his day is particularly short!
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
 
# Reactor, our secretary uses the CRM and follows-up on events!
reactor.run()


## Running results:

------ Running example 3 ------
Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
....
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
...
All done for Customer 14
* Elapsed time: 3.18 seconds


This time we got perfectly executed code and readable output without using threads. We processed 15 consumers in parallel, which means that the execution time that originally took 45 seconds was completed within 3 seconds. The trick is that we replace all blocking calls to sleep() with the equivalent task.deferLater() and callback functions in Twisted. Since the processing is now happening elsewhere, we can serve 15 consumers simultaneously without any difficulty.

The processing operations mentioned earlier occur somewhere else. Now to explain, arithmetic operations still occur in the CPU, but the CPU processing speed is now very fast compared to disk and network operations. So feeding data to the CPU or sending data from the CPU to memory or another CPU takes most of the time. We use non-blocking operations to save time in this area. For example, task.deferLater() uses a callback function that will be activated when the data has been transferred.
Another very important point is the Goodmorning from Twisted developer and Bye from Twisted developer! information in the output. These two pieces of information have already been printed when the code starts executing. If the code executes to this point so early, when does our application actually start running? The answer is that for a Twisted application (including Scrapy) it is run in reactor.run(). Before calling this method, every Deferred chain that may be used in the application must be ready, and then the reactor.run() method will monitor and activate the callback function.
Note that one of the main rules of reactor is that you can perform any operation as long as it is fast enough and non-blocking.
Now, there is no part of the code that is used to manage multi-threads, but these callback functions still look a bit messy. It can be modified like this:

# Twisted gave us utilities that make our code way more readable!
@defer.inlineCallbacks
def inline_install(customer):
  print "Scheduling: Installation for", customer
  yield task.deferLater(reactor, 3, lambda: None)
  print "Callback: Finished installation for", customer
  print "All done for", customer
 
def twisted_developer_day(customers):
  ... same as previously but using inline_install() instead of schedule_install()
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()


The running result is the same as the previous example. The function of this code is the same as the previous example, but it looks more concise and clear. The inlineCallbacks generator can use some Python mechanisms to pause or resume execution of the inline_install() function. The inline_install() function becomes a Deferred object and runs in parallel for each consumer. Each time a yield occurs, the operation will be suspended on the current inline_install() instance until the Deferred object of the yield is completed and then resumed.

The only question now is, what if we don’t just have 15 consumers, but, for example, 10,000 consumers? This code will start 10,000 simultaneous execution sequences (such as HTTP requests, database writes, etc.). There may be nothing wrong with doing this, but it may also lead to various failures. In applications with huge concurrent requests, such as Scrapy, we often need to limit the number of concurrencies to an acceptable level. In the following example, we use task.Cooperator() to complete such a function. Scrapy also uses the same mechanism in its Item Pipeline to limit the number of concurrencies (ie, CONCURRENT_ITEMS setting):

@defer.inlineCallbacks
def inline_install(customer):
  ... same as above
 
# The new "problem" is that we have to manage all this concurrency to
# avoid causing problems to others, but this is a nice problem to have.
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  work = (inline_install(customer) for customer in customers)
  #
  # We use the Cooperator mechanism to make the secretary not
  # service more than 5 customers simultaneously.
  coop = task.Cooperator()
  join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])
  #
  join.addCallback(lambda _: reactor.stop())
  print "Bye from Twisted developer!"
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
 
# We are now more lean than ever, our customers happy, our hosting
# bills ridiculously low and our performance stellar.
# ~*~ THE END ~*~


Running results:

$ ./deferreds.py 5
------ Running example 5 ------
Goodmorning from Twisted developer
Bye from Twisted developer!
Scheduling: Installation for Customer 0
...
Callback: Finished installation for Customer 4
All done for Customer 4
Scheduling: Installation for Customer 5
...
Callback: Finished installation for Customer 14
All done for Customer 14
* Elapsed time: 9.19 seconds


As you can see from the above output, there seem to be 5 slots for processing consumers when the program is running. Unless a slot is freed, processing of the next consumer request will not begin. In this example, the processing time is all 3 seconds, so it looks like it is processed in batches of 5. The final performance is the same as using threads, but this time there is only one thread, and the code is simpler and easier to write correct code.

PS: deferToThread makes the synchronization function non-blocking

wisted defer.Deferred (from twisted.internet import defer) can return a deferred object.

Note: deferToThread is implemented using threads , excessive use is not recommended

***Change the synchronous function into asynchronous (return a Deferred)***
twisted's deferToThread (from twisted.internet.threads import deferToThread) also returns a deferred object, but the callback The function is processed in another thread, mainly used for database/file reading operations

..
 
# 代码片段
 
  def dataReceived(self, data):
    now = int(time.time())
 
    for ftype, data in self.fpcodec.feed(data):
      if ftype == 'oob':
        self.msg('OOB:', repr(data))
      elif ftype == 0x81: # 对服务器请求的心跳应答(这个是解析 防疲劳驾驶仪,发给gps上位机的,然后上位机发给服务器的)
        self.msg('FP.PONG:', repr(data))
      else:
        self.msg('TODO:', (ftype, data))
      d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid)
      d.addCallback(self._doResult, extra)


##The complete example here is for your reference

# -*- coding: utf-8 -*-
 
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
 
import functools
import time
 
# 耗时操作 这是一个同步阻塞函数
def mySleep(timeout):
  time.sleep(timeout)
 
  # 返回值相当于加进了callback里
  return 3
 
def say(result):
  print "耗时操作结束了, 并把它返回的结果给我了", result
 
# 用functools.partial包装一下, 传递参数进去
cb = functools.partial(mySleep, 3)
d = deferToThread(cb)
d.addCallback(say)
 
print "你还没有结束我就执行了, 哈哈"
 
reactor.run()

更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn