Home >Backend Development >Python Tutorial >Python coroutines and IO multiplexing methods

Python coroutines and IO multiplexing methods

高洛峰
高洛峰Original
2017-03-23 15:22:582636browse

协程

是一种用户态的请链接线程微线程

好处

无需线程上下文切换的开销

无需原子操作锁定及同步的开销

方便切换控制简化编程模型

高并发+高扩展性+低成本

缺点

无法利用多核的资源协程本质上就是一个单线程无需使用多核要结合多进程才能利用多核

进行阻塞操作会阻塞掉整个程序

用yield实现一个协程操作

def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)
 
 
def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
 
 
if __name__ == &#39;__main__&#39;:
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

对协程进行一个定义

1.必须在只有一个单线程里实现并发

2.修改共享数据不需加锁

3.用户程序里自己保存多个控制流的上下文栈

4.一个协程遇到IO操作自动切换到其它协程

一个协程的小例子手动切换IO操作

greenlet是一个用C实现的协程模块相比与python自带的yield它可以使你在任意函数之间随意切换而不需把这个函数先声明为generator

import greenlet
 
def test1():
    print(12)
    g2.switch()
    print(34)
    g2.switch()
 
def test2():
    print(56)
    g1.switch()
    print(78)
 
g1 = greenlet.greenlet(test1)
g2 = greenlet.greenlet(test2)
g1.switch()

Gevent 是一个第三方库可以轻松通过gevent实现并发同步或异步编程在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部但它们被协作式地调度

协程遇到IO操作自动切换

import gevent
 
 
def foo():
    print("Running in foo")
    gevent.sleep(2)
    print("Explicit context switch to foo again")
 
 
def bar():
    print("Explicit context to bar")
    gevent.sleep(1)
    print("Implicit context switch back to bar")
 
 
def func3():
    print("running func3")
    gevent.sleep(0)
    print("running func3 again")
 
gevent.joinall([
    gevent.spawn(foo),  # 生成
    gevent.spawn(bar),
    gevent.spawn(func3),
])

同步与异步的性能区别

import gevent
 
 
def task(pid):
    gevent.sleep(0.5)
    print("Task %s done" % pid)
 
 
def synchronous():
    for i in range(1,10):
        task(i)
 
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print("Synchronous:")
synchronous()
 
print("Asynchronous:")
asynchronous()

gevent并发爬网页

from urllib import request
import gevent
from gevent import monkey
 
 
def f(url):
    print("GET: %s" % url)
    resp = request.urlopen(url)
    data = resp.read()
    # file = open("url.html", "wb")
    # file.write(data)
    # file.close()
    print("%d bytes received from %s." % (len(data), url))
 
 
monkey.patch_all()  # 把当前程序的所有IO操作给我单独的做上标记
gevent.joinall([
    gevent.spawn(f, "http://www.yitongjia.com"),
    gevent.spawn(f, "http://www.jinyuncai.cn"),
    gevent.spawn(f, "https://www.guoshipm.com"),
])

通过gevent实现单线程下多socket并发

服务端

import sys
import socket
import time
import gevent
  
from gevent import socket,monkey
monkey.patch_all()
  
  
def server(port):
    s = socket.socket()
    s.bind((&#39;0.0.0.0&#39;, port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)
  
  
  
def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)
  
    except Exception as  ex:
        print(ex)
    finally:
        conn.close()
if __name__ == &#39;__main__&#39;:
    server(8001)

客户端

import socket
import threading
 
def sock_conn():
 
    client = socket.socket()
 
    client.connect(("localhost",8001))
    count = 0
    while True:
        #msg = input(">>:").strip()
        #if len(msg) == 0:continue
        client.send( ("hello %s" %count).encode("utf-8"))
 
        data = client.recv(1024)
 
        print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
        count +=1
    client.close()
 
 
for i in range(100):
    t = threading.Thread(target=sock_conn)
    t.start()
 
并发100个sock连接

事件驱动

通常我们在写服务器处理模型程序的时候有几种模型

每收到一个请求生成一个进程处理这个请求

每收到一个请求生成一个线程处理这个请求

每收到一个请求放入一个事件列表让主进程通过非阻塞IO方式来处理请求

第1中方法由于创建新的进程的开销比较大所以会导致服务器性能比较差,但实现比较简单。

第2种方式由于要涉及到线程的同步有可能会面临死锁等问题。

第3种方式在写应用程序代码时逻辑比前面两种都复杂。

综合考虑各方面因素一般普遍认为第3种方式是大多数网络服务器采用的方式

IO多路复用

一.介绍

内核空间和用户空间

现在操作系统都是采用虚拟存储器那么对32位操作系统而言它的寻址空间虚拟存储空间为4G2的32次方。操作系统的核心是内核独立于普通的应用程序可以访问受保护的内存空间也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核kernel保证内核的安全操心系统将虚拟空间划分为两部分一部分为内核空间一部分为用户空间。针对linux操作系统而言将最高的1G字节从虚拟地址0xC0000000到0xFFFFFFFF供内核使用称为内核空间而将较低的3G字节从虚拟地址0x00000000到0xBFFFFFFF供各个进程使用称为用户空间。

进程切换

为了控制进程的执行内核必须有能力挂起正在CPU上运行的进程并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说任何进程都是在操作系统内核的支持下运行的是与内核紧密相关的。

进程阻塞

正在执行的进程由于期待的某些事件未发生如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等则由系统自动执行阻塞原语(Block)使自己由运行状态变为阻塞状态。可见进程的阻塞是进程自身的一种主动行为也因此只有处于运行态的进程获得CPU才可能将其转为阻塞状态。当进程进入阻塞状态是不占用CPU资源的。

文件描述符fd

文件描述符File descriptor是计算机科学中的一个术语是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上它是一个索引值指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时内核向进程返回一个文件描述符。在程序设计中一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存IO

缓存 I/O 又被称作标准 I/O大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中操作系统会将 I/O 的数据缓存在文件系统的页缓存 page cache 中也就是说数据会先被拷贝到操作系统内核的缓冲区中然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点

数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

二.IO模式

刚才说了对于一次IO访问以read举例数据会先被拷贝到操作系统内核的缓冲区中然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说当一个read操作发生时它会经历两个阶段

1. 等待数据准备 (Waiting for the data to be ready)

2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段linux系统产生了下面五种网络模式的方案。

- 阻塞 I/Oblocking IO

- 非阻塞 I/Ononblocking IO

- I/O 多路复用 IO multiplexing

- 信号驱动 I/O signal driven IO

- 异步 I/Oasynchronous IO

阻塞IO

在linux中默认情况下所有的socket都是blocking一个典型的读操作流程大概是这样

当用户进程调用了recvfrom这个系统调用kernel就开始了IO的第一个阶段准备数据对于网络IO来说很多时候数据在一开始还没有到达。比如还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来。这个过程需要等待也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边整个进程会被阻塞当然是进程自己选择的阻塞。当kernel一直等到数据准备好了它就会将数据从kernel中拷贝到用户内存然后kernel返回结果用户进程才解除block的状态重新运行起来。

所以blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞IO

linux下可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时流程是这个样子

当用户进程发出read操作时如果kernel中的数据还没有准备好那么它并不会block用户进程而是立刻返回一个error。从用户进程角度讲 它发起一个read操作后并不需要等待而是马上就得到了一个结果。用户进程判断结果是一个error时它就知道数据还没有准备好于是它可以再次发送read操作。一旦kernel中的数据准备好了并且又再次收到了用户进程的system call那么它马上就将数据拷贝到了用户内存然后返回。

所以nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

IO多路复用

IO multiplexing就是我们说的selectpollepoll有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是selectpollepoll这个function会不断的轮询所负责的所有socket当某个socket有数据到达了就通知用户进程。

当用户进程调用了select那么整个进程会被block而同时kernel会“监视”所有select负责的socket当任何一个socket中的数据准备好了select就会返回。这个时候用户进程再调用read操作将数据从kernel拷贝到用户进程。

所以I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符而这些文件描述符套接字描述符其中的任意一个进入读就绪状态select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同事实上还更差一些。因为这里需要使用两个system call (select 和 recvfrom)而blocking IO只调用了一个system call (recvfrom)。但是用select的优势在于它可以同时处理多个connection。

所以如果处理的连接数不是很高的话使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快而是在于能处理更多的连接。

在IO multiplexing Model中实际中对于每一个socket一般都设置成为non-blocking但是如上图所示整个用户的process其实是一直被block的。只不过process是被select这个函数block而不是被socket IO给block

异步IO

用户进程发起read操作之后立刻就可以开始去做其它的事。而另一方面从kernel的角度当它受到一个asynchronous read之后首先它会立刻返回所以不会对用户进程产生任何block。然后kernel会等待数据准备完成然后将数据拷贝到用户内存当这一切都完成之后kernel会给用户进程发送一个signal告诉它read操作完成了。

IO多路复用select、poll、epoll

select例子

server端

import select
import socket
import sys
import queue
 
server = socket.socket()
server.bind(("0.0.0.0", 6666))
server.listen(1000)
 
server.setblocking(False)   # 不阻塞
 
msg_dic = {}
 
inputs = [server, ]
outputs = []
 
while True:
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    print(readable, writeable, exceptional)
    for r in readable:
        if r is server:     # 代表来了一个新连接
            conn, addr = server.accept()
            print("来了个新连接", addr)
            inputs.append(conn)     # 因为这个新建立的连接还没发数据过来,现在就接收的话程序就报错了
            # 所以要想实现这个客户端发数据来时server端能知道,就需要让select再监测这个conn
            msg_dic[conn] = queue.Queue()   # 初始化一个队列,后面存要返回客户端的数据
        else:
            data = r.recv(1024)
            print("收到数据", data)
            msg_dic[r].put(data)
            outputs.append(r)   # 放入返回的连接队列里
            # r.send(data)
            # print("send end...")
    for w in writeable:     # 要返回给客户端的连接列表
        data_to_client = msg_dic[w].get()
        w.send(data_to_client)      # 返回给客户端的元数据
        outputs.remove(w)       # 确保下次循环的时候,writeable不返回已经处理完的连接
 
    for e in exceptional:
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del msg_dic[e]

client端

import socket
 
HOST = "127.0.0.1"  # The remote host
PORT = 6666  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)
    print(&#39;Received&#39;, repr(data))
s.close()

selectors例子

selectors封装了底层的select或epoll,它可以根据不同的操作系统去判断使用select或是epoll

selectors服务端

import socket
import selectors
 
 
sel = selectors.DefaultSelector()
 
 
def accept(sock, mask):
    conn, addr = sock.accept()
    print(&#39;accepted&#39;, conn, &#39;from&#39;, addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
 
 
def read(conn, mask):
    data = conn.recv(1024)
    if data:
        print(&#39;echoing&#39;, repr(data), &#39;to&#39;, conn)
        conn.send(data)
    else:
        print(&#39;closing&#39;, conn)
        sel.unregister(conn)
        conn.close()
 
sock = socket.socket()
sock.bind(("0.0.0.0", 6666))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
 
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

selectors客户端

import socket
 
HOST = "127.0.0.1"  # The remote host
PORT = 6666  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)
    print(&#39;Received&#39;, repr(data))
s.close()

The above is the detailed content of Python coroutines and IO multiplexing methods. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn