>  기사  >  백엔드 개발  >  Python의 Twisted 프레임워크를 사용하여 비차단 프로그램을 작성하기 위한 코드 예제

Python의 Twisted 프레임워크를 사용하여 비차단 프로그램을 작성하기 위한 코드 예제

高洛峰
高洛峰원래의
2017-02-03 16:33:301231검색

먼저 코드를 살펴보겠습니다:

# ~*~ Twisted - A Python tale ~*~
 
from time import sleep
 
# Hello, I'm a developer and I mainly setup Wordpress.
def install_wordpress(customer):
  # Our hosting company Threads Ltd. is bad. I start installation and...
  print "Start installation for", customer
  # ...then wait till the installation finishes successfully. It is
  # boring and I'm spending most of my time waiting while consuming
  # resources (memory and some CPU cycles). It's because the process
  # is *blocking*.
  sleep(3)
  print "All done for", customer
 
# I do this all day long for our customers
def developer_day(customers):
  for customer in customers:
    install_wordpress(customer)
 
developer_day(["Bill", "Elon", "Steve", "Mark"])

실행해 보면 다음과 같습니다.

   
$ ./deferreds.py 1
------ Running example 1 ------
Start installation for Bill
All done for Bill
Start installation
...
* Elapsed time: 12.03 seconds

실행되는 코드입니다. 순차적으로. 소비자 4명이면 1명이 설치하는데 3초가 걸리므로 4명이 설치하면 12초가 걸린다. 이 처리는 그다지 만족스럽지 않으므로 스레드를 사용한 두 번째 예를 살펴보십시오.

import threading
 
# The company grew. We now have many customers and I can't handle the
# workload. We are now 5 developers doing exactly the same thing.
def developers_day(customers):
  # But we now have to synchronize... a.k.a. bureaucracy
  lock = threading.Lock()
  #
  def dev_day(id):
    print "Goodmorning from developer", id
    # Yuck - I hate locks...
    lock.acquire()
    while customers:
      customer = customers.pop(0)
      lock.release()
      # My Python is less readable
      install_wordpress(customer)
      lock.acquire()
    lock.release()
    print "Bye from developer", id
  # We go to work in the morning
  devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]
  [dev.start() for dev in devs]
  # We leave for the evening
  [dev.join() for dev in devs]
 
# We now get more done in the same time but our dev process got more
# complex. As we grew we spend more time managing queues than doing dev
# work. We even had occasional deadlocks when processes got extremely
# complex. The fact is that we are still mostly pressing buttons and
# waiting but now we also spend some time in meetings.
developers_day(["Customer %d" % i for i in xrange(15)])

실행:

 $ ./deferreds.py 2
------ Running example 2 ------
Goodmorning from developer 0Goodmorning from developer
1Start installation forGoodmorning from developer 2
Goodmorning from developer 3Customer 0
...
from developerCustomer 13 3Bye from developer 2
* Elapsed time: 9.02 seconds

이번에는 병렬로 실행됩니다. 코드 5개의 작업자 스레드를 사용합니다. 15명의 소비자가 각각 3초를 소비한다는 것은 총 45초를 의미하지만, 5개의 스레드를 사용하여 병렬로 실행하면 총 9초밖에 걸리지 않습니다. 이 코드는 다소 복잡하며, 코드의 상당 부분이 알고리즘이나 비즈니스 로직에 초점을 맞추기보다는 동시성을 관리하는 데 사용됩니다. 게다가 프로그램의 출력은 매우 혼합되어 읽기 어려워 보입니다. 간단한 멀티 스레드 코드도 잘 작성하기 어렵기 때문에 Twisted를 사용하여 전환합니다:

# For years we thought this was all there was... We kept hiring more
# developers, more managers and buying servers. We were trying harder
# optimising processes and fire-fighting while getting mediocre
# performance in return. Till luckily one day our hosting
# company decided to increase their fees and we decided to
# switch to Twisted Ltd.!
 
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task
 
# Twisted has a slightly different approach
def schedule_install(customer):
  # They are calling us back when a Wordpress installation completes.
  # They connected the caller recognition system with our CRM and
  # we know exactly what a call is about and what has to be done next.
  #
  # We now design processes of what has to happen on certain events.
  def schedule_install_wordpress():
      def on_done():
        print "Callback: Finished installation for", customer
    print "Scheduling: Installation for", customer
    return task.deferLater(reactor, 3, on_done)
  #
  def all_done(_):
    print "All done for", customer
  #
  # For each customer, we schedule these processes on the CRM
  # and that
  # is all our chief-Twisted developer has to do
  d = schedule_install_wordpress()
  d.addCallback(all_done)
  #
  return d
 
# Yes, we don't need many developers anymore or any synchronization.
# ~~ Super-powered Twisted developer ~~
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  #
  # Here's what has to be done today
  work = [schedule_install(customer) for customer in customers]
  # Turn off the lights when done
  join = defer.DeferredList(work)
  join.addCallback(lambda _: reactor.stop())
  #
  print "Bye from Twisted developer!"
# Even his day is particularly short!
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
 
# Reactor, our secretary uses the CRM and follows-up on events!
reactor.run()


실행 결과:

------ Running example 3 ------
Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
....
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
...
All done for Customer 14
* Elapsed time: 3.18 seconds


이번에는 스레드를 사용하지 않고도 완벽하게 실행된 코드와 읽을 수 있는 출력을 얻을 수 있습니다. 15명의 Consumer를 병렬로 처리했는데, 이는 원래 45초가 걸렸던 실행 시간이 3초 안에 완료되었다는 의미입니다. 비결은 sleep()에 대한 모든 차단 호출을 Twisted의 동일한 task.deferLater() 및 콜백 함수로 대체한다는 것입니다. 이제 처리가 다른 곳에서 이루어지기 때문에 아무런 어려움 없이 15명의 소비자에게 동시에 서비스를 제공할 수 있습니다.
앞서 언급한 처리 작업은 다른 곳에서 발생합니다. 이제 설명하자면, 산술 연산은 여전히 ​​CPU에서 발생하지만 CPU 처리 속도는 이제 디스크 및 네트워크 작업에 비해 매우 빠릅니다. 따라서 CPU에 데이터를 공급하거나 CPU에서 메모리나 다른 CPU로 데이터를 보내는 데 대부분의 시간이 걸립니다. 이 영역에서 시간을 절약하기 위해 비차단 작업을 사용합니다. 예를 들어 task.deferLater()는 데이터가 전송될 때 활성화되는 콜백 함수를 사용합니다.
또 다른 매우 중요한 점은 Twisted 개발자의 Goodmorning과 Twisted 개발자의 안녕 메시지입니다! 이 두 가지 정보는 코드 실행이 시작될 때 이미 인쇄되었습니다. 코드가 이 시점까지 너무 일찍 실행된다면 애플리케이션은 실제로 언제 실행되기 시작합니까? 대답은 Twisted 애플리케이션(Scrapy 포함)의 경우 Reactor.run()에서 실행된다는 것입니다. 이 메소드를 호출하기 전에 애플리케이션에서 사용될 수 있는 모든 Deferred 체인이 준비되어 있어야 하며, 그러면 Reactor.run() 메소드가 콜백 함수를 모니터링하고 활성화합니다.
리액터의 주요 규칙 중 하나는 충분히 빠르고 차단되지 않는 한 모든 작업을 수행할 수 있다는 것입니다.
이제 코드에는 여러 스레드를 관리하는 데 사용되는 부분이 없지만 이러한 콜백 함수는 여전히 약간 지저분해 보입니다. 다음과 같이 수정할 수 있습니다:

# Twisted gave us utilities that make our code way more readable!
@defer.inlineCallbacks
def inline_install(customer):
  print "Scheduling: Installation for", customer
  yield task.deferLater(reactor, 3, lambda: None)
  print "Callback: Finished installation for", customer
  print "All done for", customer
 
def twisted_developer_day(customers):
  ... same as previously but using inline_install() instead of schedule_install()
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()


실행 결과는 이전 예제와 동일합니다. 이 코드는 이전 예제와 동일한 작업을 수행하지만 더 간결하고 명확해 보입니다. inlineCallbacks 생성기는 일부 Python 메커니즘을 사용하여 inline_install() 함수의 실행을 일시 중지하거나 재개할 수 있습니다. inline_install() 함수는 Deferred 객체가 되어 각 소비자에 대해 병렬로 실행됩니다. 산출이 발생할 때마다 산출의 지연된 객체가 완료될 때까지 현재 inline_install() 인스턴스에서 작업이 일시 중지된 다음 다시 시작됩니다.
이제 유일한 질문은 소비자가 15명이 아니라 10,000명이라면 어떻게 될까요? 이 코드는 10,000개의 동시 실행 시퀀스(예: HTTP 요청, 데이터베이스 쓰기 등)를 시작합니다. 이렇게 하면 아무런 문제가 없을 수도 있지만 다양한 실패로 이어질 수도 있습니다. Scrapy와 같이 대규모 동시 요청이 있는 애플리케이션에서는 동시성 수를 허용 가능한 수준으로 제한해야 하는 경우가 많습니다. 다음 예제에서는 task.Cooperator()를 사용하여 이러한 함수를 완료합니다. Scrapy는 또한 항목 파이프라인에서 동일한 메커니즘을 사용하여 동시성 수를 제한합니다(예: CONCURRENT_ITEMS 설정):

@defer.inlineCallbacks
def inline_install(customer):
  ... same as above
 
# The new "problem" is that we have to manage all this concurrency to
# avoid causing problems to others, but this is a nice problem to have.
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  work = (inline_install(customer) for customer in customers)
  #
  # We use the Cooperator mechanism to make the secretary not
  # service more than 5 customers simultaneously.
  coop = task.Cooperator()
  join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])
  #
  join.addCallback(lambda _: reactor.stop())
  print "Bye from Twisted developer!"
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
 
# We are now more lean than ever, our customers happy, our hosting
# bills ridiculously low and our performance stellar.
# ~*~ THE END ~*~


실행 결과:

$ ./deferreds.py 5
------ Running example 5 ------
Goodmorning from Twisted developer
Bye from Twisted developer!
Scheduling: Installation for Customer 0
...
Callback: Finished installation for Customer 4
All done for Customer 4
Scheduling: Installation for Customer 5
...
Callback: Finished installation for Customer 14
All done for Customer 14
* Elapsed time: 9.19 seconds


위 출력에서 ​​볼 수 있듯이 프로그램이 실행될 때 소비자 처리 슬롯이 5개 있는 것 같습니다. 슬롯이 해제되지 않으면 다음 소비자 요청 처리가 시작되지 않습니다. 이 예에서는 처리 시간이 모두 3초이므로 5개씩 일괄 처리되는 것처럼 보입니다. 최종 성능은 스레드를 사용한 것과 동일하지만 이번에는 스레드가 하나뿐이어서 코드가 더 간단하고 올바른 코드를 작성하기가 더 쉽습니다.

..
 
# 代码片段
 
  def dataReceived(self, data):
    now = int(time.time())
 
    for ftype, data in self.fpcodec.feed(data):
      if ftype == 'oob':
        self.msg('OOB:', repr(data))
      elif ftype == 0x81: # 对服务器请求的心跳应答(这个是解析 防疲劳驾驶仪,发给gps上位机的,然后上位机发给服务器的)
        self.msg('FP.PONG:', repr(data))
      else:
        self.msg('TODO:', (ftype, data))
      d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid)
      d.addCallback(self._doResult, extra)




여기의 전체 예는 다음과 같습니다. 참고하세요

# -*- coding: utf-8 -*-
 
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
 
import functools
import time
 
# 耗时操作 这是一个同步阻塞函数
def mySleep(timeout):
  time.sleep(timeout)
 
  # 返回值相当于加进了callback里
  return 3
 
def say(result):
  print "耗时操作结束了, 并把它返回的结果给我了", result
 
# 用functools.partial包装一下, 传递参数进去
cb = functools.partial(mySleep, 3)
d = deferToThread(cb)
d.addCallback(say)
 
print "你还没有结束我就执行了, 哈哈"
 
reactor.run()

更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.