<span style="color: #0000ff;">import</span><span style="color: #000000;"> socket </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> select </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> sys </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> os </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> errno </span><span style="color: #0000ff;">try</span><span style="color: #000000;">: </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> threading </span><span style="color: #0000ff;">except</span><span style="color: #000000;"> ImportError: </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> dummy_threading as threading </span><span style="color: #800080;">__all__</span> = [<span style="color: #800000;">"</span><span style="color: #800000;">TCPServer</span><span style="color: #800000;">"</span>,<span style="color: #800000;">"</span><span style="color: #800000;">UDPServer</span><span style="color: #800000;">"</span>,<span style="color: #800000;">"</span><span style="color: #800000;">ForkingUDPServer</span><span style="color: #800000;">"</span>,<span style="color: #800000;">"</span><span style="color: #800000;">ForkingTCPServer</span><span style="color: #800000;">"</span><span style="color: #000000;">, </span><span style="color: #800000;">"</span><span style="color: #800000;">ThreadingUDPServer</span><span style="color: #800000;">"</span>,<span style="color: #800000;">"</span><span style="color: #800000;">ThreadingTCPServer</span><span style="color: #800000;">"</span>,<span style="color: #800000;">"</span><span style="color: #800000;">BaseRequestHandler</span><span style="color: #800000;">"</span><span style="color: #000000;">, </span><span style="color: #800000;">"</span><span style="color: #800000;">StreamRequestHandler</span><span style="color: #800000;">"</span>,<span style="color: #800000;">"</span><span style="color: #800000;">DatagramRequestHandler</span><span style="color: #800000;">"</span><span style="color: #000000;">, </span><span style="color: #800000;">"</span><span style="color: #800000;">ThreadingMixIn</span><span style="color: #800000;">"</span>, <span style="color: #800000;">"</span><span style="color: #800000;">ForkingMixIn</span><span style="color: #800000;">"</span><span style="color: #000000;">] </span><span style="color: #0000ff;">if</span> hasattr(socket, <span style="color: #800000;">"</span><span style="color: #800000;">AF_UNIX</span><span style="color: #800000;">"</span><span style="color: #000000;">): </span><span style="color: #800080;">__all__</span>.extend([<span style="color: #800000;">"</span><span style="color: #800000;">UnixStreamServer</span><span style="color: #800000;">"</span>,<span style="color: #800000;">"</span><span style="color: #800000;">UnixDatagramServer</span><span style="color: #800000;">"</span><span style="color: #000000;">, </span><span style="color: #800000;">"</span><span style="color: #800000;">ThreadingUnixStreamServer</span><span style="color: #800000;">"</span><span style="color: #000000;">, </span><span style="color: #800000;">"</span><span style="color: #800000;">ThreadingUnixDatagramServer</span><span style="color: #800000;">"</span>])
timeout =<span style="color: #000000;"> None </span><span style="color: #0000ff;">def</span> <span style="color: #800080;">__init__</span><span style="color: #000000;">(self, server_address, RequestHandlerClass): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Constructor. May be extended, do not override.</span><span style="color: #800000;">"""</span><span style="color: #000000;"> self.server_address </span>=<span style="color: #000000;"> server_address self.RequestHandlerClass </span>=<span style="color: #000000;"> RequestHandlerClass self.</span><span style="color: #800080;">__is_shut_down</span> =<span style="color: #000000;"> threading.Event() self.</span><span style="color: #800080;">__shutdown_request</span> = False
<span style="color: #0000ff;">def</span> serve_forever(self, poll_interval=0.5<span style="color: #000000;">): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. </span><span style="color: #800000;">"""</span><span style="color: #000000;"> self.</span><span style="color: #800080;">__is_shut_down</span><span style="color: #000000;">.clear() </span><span style="color: #0000ff;">try</span><span style="color: #000000;">: </span><span style="color: #0000ff;">while</span> <span style="color: #0000ff;">not</span> self.<span style="color: #800080;">__shutdown_request</span><span style="color: #000000;">: r, w, e </span>=<span style="color: #000000;"> _eintr_retry(select.select, [self], [], [], poll_interval) </span><span style="color: #0000ff;">if</span> self <span style="color: #0000ff;">in</span><span style="color: #000000;"> r: self._handle_request_noblock() </span><span style="color: #0000ff;">finally</span><span style="color: #000000;">: self.</span><span style="color: #800080;">__shutdown_request</span> =<span style="color: #000000;"> False self.</span><span style="color: #800080;">__is_shut_down</span>.set()
The select() function is used here, that is, server_forever accepts a poll_interval=0.5 parameter, which represents the time used for select polling, and then enters an infinite loop. In this loop, select every poll_interval seconds Poll once (block here) to monitor network IO. Once a new network connection request arrives, the _handle_request_noblock() method will be called to handle the new connection.
<span style="color: #0000ff;">def</span><span style="color: #000000;"> _handle_request_noblock(self): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). </span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">try</span><span style="color: #000000;">: request, client_address </span>=<span style="color: #000000;"> self.get_request() </span><span style="color: #0000ff;">except</span><span style="color: #000000;"> socket.error: </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">if</span><span style="color: #000000;"> self.verify_request(request, client_address): </span><span style="color: #0000ff;">try</span><span style="color: #000000;">: self.process_request(request, client_address) </span><span style="color: #0000ff;">except</span><span style="color: #000000;">: self.handle_error(request, client_address) self.shutdown_request(request)</span>
英文说明已经说的很明确,该方法处理的是一个非阻塞请求,首先通过get_request()方法获取连接,具体实现在其子类,一旦获取了连接,立即调用verify_request认证连接信息,通过认证,则调用process_request()方法处理请求,如果中途出现错误,则调用handle_error处理错误,同时,调用shutdown_request()方法结束这个连接。
<span style="color: #0000ff;">def</span><span style="color: #000000;"> verify_request(self, request, client_address): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Verify the request. May be overridden. Return True if we should proceed with this request. </span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">return</span><span style="color: #000000;"> True </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> process_request(self, request, client_address): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. </span><span style="color: #800000;">"""</span><span style="color: #000000;"> self.finish_request(request, client_address) self.shutdown_request(request) </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> handle_error(self, request, client_address): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Handle an error gracefully. May be overridden. The default is to print a traceback and continue. </span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">print</span> <span style="color: #800000;">'</span><span style="color: #800000;">-</span><span style="color: #800000;">'</span>*40 <span style="color: #0000ff;">print</span> <span style="color: #800000;">'</span><span style="color: #800000;">Exception happened during processing of request from</span><span style="color: #800000;">'</span><span style="color: #000000;">, </span><span style="color: #0000ff;">print</span><span style="color: #000000;"> client_address </span><span style="color: #0000ff;">import</span><span style="color: #000000;"> traceback traceback.print_exc() </span><span style="color: #008000;">#</span><span style="color: #008000;"> XXX But this goes to stderr!</span> <span style="color: #0000ff;">print</span> <span style="color: #800000;">'</span><span style="color: #800000;">-</span><span style="color: #800000;">'</span>*40 <span style="color: #0000ff;">def</span><span style="color: #000000;"> shutdown_request(self, request): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Called to shutdown and close an individual request.</span><span style="color: #800000;">"""</span><span style="color: #000000;"> self.close_request(request)</span><pre name=<span style="color: #800000;">"</span><span style="color: #800000;">code</span><span style="color: #800000;">"</span> <span style="color: #0000ff;">class</span>=<span style="color: #800000;">"</span><span style="color: #800000;">python</span><span style="color: #800000;">"</span>> <span style="color: #0000ff;">def</span><span style="color: #000000;"> finish_request(self, request, client_address): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Finish one request by instantiating RequestHandlerClass.</span><span style="color: #800000;">"""</span><span style="color: #000000;"> self.RequestHandlerClass(request, client_address, self)</span>
<span style="color: #0000ff;">def</span><span style="color: #000000;"> handle_request(self): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Handle one request, possibly blocking. Respects self.timeout. </span><span style="color: #800000;">"""</span> <span style="color: #008000;">#</span><span style="color: #008000;"> Support people who used socket.settimeout() to escape</span> <span style="color: #008000;">#</span><span style="color: #008000;"> handle_request before self.timeout was available.</span> timeout =<span style="color: #000000;"> self.socket.gettimeout() </span><span style="color: #0000ff;">if</span> timeout <span style="color: #0000ff;">is</span><span style="color: #000000;"> None: timeout </span>=<span style="color: #000000;"> self.timeout </span><span style="color: #0000ff;">elif</span> self.timeout <span style="color: #0000ff;">is</span> <span style="color: #0000ff;">not</span><span style="color: #000000;"> None: timeout </span>=<span style="color: #000000;"> min(timeout, self.timeout) fd_sets </span>=<span style="color: #000000;"> _eintr_retry(select.select, [self], [], [], timeout) </span><span style="color: #0000ff;">if</span> <span style="color: #0000ff;">not</span><span style="color: #000000;"> fd_sets[0]: self.handle_timeout() </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> self._handle_request_noblock()</span>
上面已经提到,如果你没有用到server_forever()方法,说明你希望使用的是阻塞请求来处理连接,如英文描述所说,该方法只是处理一个阻塞的请求,仍然使用select()方法轮询来监听网络连接,但是需要考虑时间超时影响,一旦超时,调用handle_timeout()方法处理超时,一般在子类重写该方法;如果在超时之前监听到了网络的连接请求,则同server_forever一样,调用_handle_request_noblock()方法,完成对新的连接的请求处理。
<span style="color: #0000ff;">class</span><span style="color: #000000;"> BaseRequestHandler: </span><span style="color: #800000;">"""</span><span style="color: #800000;">Base class for request handler classes. This class is instantiated for each request to be handled. The constructor sets the instance variables request, client_address and server, and then calls the handle() method. To implement a specific service, all you need to do is to derive a class which defines a handle() method. The handle() method can find the request as self.request, the client address as self.client_address, and the server (in case it needs access to per-server information) as self.server. Since a separate instance is created for each request, the handle() method can define arbitrary other instance variariables. </span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">def</span> <span style="color: #800080;">__init__</span><span style="color: #000000;">(self, request, client_address, server): self.request </span>=<span style="color: #000000;"> request self.client_address </span>=<span style="color: #000000;"> client_address self.server </span>=<span style="color: #000000;"> server self.setup() </span><span style="color: #0000ff;">try</span><span style="color: #000000;">: self.handle() </span><span style="color: #0000ff;">finally</span><span style="color: #000000;">: self.finish() </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> setup(self): </span><span style="color: #0000ff;">pass</span> <span style="color: #0000ff;">def</span><span style="color: #000000;"> handle(self): </span><span style="color: #0000ff;">pass</span> <span style="color: #0000ff;">def</span><span style="color: #000000;"> finish(self): </span><span style="color: #0000ff;">pass</span>
以上描述说明,所有requestHandler都继承BaseRequestHandler基类,该类会处理每一个请求。在__init__中初始化实例变量request、client_address、server,然后调用handle()方法完成请求处理。那么,我们唯一需要做的就是重写好Handle()方法,处理所有的请求。
<span style="color: #0000ff;">class</span><span style="color: #000000;"> TCPServer(BaseServer): address_family </span>=<span style="color: #000000;"> socket.AF_INET socket_type </span>=<span style="color: #000000;"> socket.SOCK_STREAM request_queue_size </span>= 5<span style="color: #000000;"> allow_reuse_address </span>=<span style="color: #000000;"> False </span><span style="color: #0000ff;">def</span> <span style="color: #800080;">__init__</span>(self, server_address, RequestHandlerClass, bind_and_activate=<span style="color: #000000;">True): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Constructor. May be extended, do not override.</span><span style="color: #800000;">"""</span><span style="color: #000000;"> BaseServer.</span><span style="color: #800080;">__init__</span><span style="color: #000000;">(self, server_address, RequestHandlerClass) self.socket </span>=<span style="color: #000000;"> socket.socket(self.address_family, self.socket_type) </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> bind_and_activate: </span><span style="color: #0000ff;">try</span><span style="color: #000000;">: self.server_bind() self.server_activate() </span><span style="color: #0000ff;">except</span><span style="color: #000000;">: self.server_close() </span><span style="color: #0000ff;">raise</span> <span style="color: #0000ff;">def</span><span style="color: #000000;"> server_bind(self): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Called by constructor to bind the socket. May be overridden. </span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">if</span><span style="color: #000000;"> self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, </span>1<span style="color: #000000;">) self.socket.bind(self.server_address) self.server_address </span>=<span style="color: #000000;"> self.socket.getsockname() </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> server_activate(self): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Called by constructor to activate the server. May be overridden. </span><span style="color: #800000;">"""</span><span style="color: #000000;"> self.socket.listen(self.request_queue_size) </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> server_close(self): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Called to clean-up the server. May be overridden. </span><span style="color: #800000;">"""</span><span style="color: #000000;"> self.socket.close() </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> fileno(self): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Return socket file number. Interface required by select(). </span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">return</span><span style="color: #000000;"> self.socket.fileno() </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> get_request(self): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Get the request and client address from the socket. May be overridden. </span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">return</span><span style="color: #000000;"> self.socket.accept() </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> shutdown_request(self, request): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Called to shutdown and close an individual request.</span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">try</span><span style="color: #000000;">: </span><span style="color: #008000;">#</span><span style="color: #008000;">explicitly shutdown. socket.close() merely releases</span> <span style="color: #008000;">#</span><span style="color: #008000;">the socket and waits for GC to perform the actual close.</span> <span style="color: #000000;"> request.shutdown(socket.SHUT_WR) </span><span style="color: #0000ff;">except</span><span style="color: #000000;"> socket.error: </span><span style="color: #0000ff;">pass</span> <span style="color: #008000;">#</span><span style="color: #008000;">some platforms may raise ENOTCONN here</span> <span style="color: #000000;"> self.close_request(request) </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> close_request(self, request): </span><span style="color: #800000;">"""</span><span style="color: #800000;">Called to clean up an individual request.</span><span style="color: #800000;">"""</span><span style="color: #000000;"> request.close()</span>
<span style="color: #0000ff;">class</span><span style="color: #000000;"> UDPServer(TCPServer): </span><span style="color: #800000;">"""</span><span style="color: #800000;">UDP server class.</span><span style="color: #800000;">"""</span><span style="color: #000000;"> allow_reuse_address </span>=<span style="color: #000000;"> False socket_type </span>=<span style="color: #000000;"> socket.SOCK_DGRAM max_packet_size </span>= 8192 <span style="color: #0000ff;">def</span><span style="color: #000000;"> get_request(self): data, client_addr </span>=<span style="color: #000000;"> self.socket.recvfrom(self.max_packet_size) </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> (data, self.socket), client_addr </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> server_activate(self): </span><span style="color: #008000;">#</span><span style="color: #008000;"> No need to call listen() for UDP.</span> <span style="color: #0000ff;">pass</span> <span style="color: #0000ff;">def</span><span style="color: #000000;"> shutdown_request(self, request): </span><span style="color: #008000;">#</span><span style="color: #008000;"> No need to shutdown anything.</span> <span style="color: #000000;"> self.close_request(request) </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> close_request(self, request): </span><span style="color: #008000;">#</span><span style="color: #008000;"> No need to close anything.</span> <span style="color: #0000ff;">pass</span>
继承自TCPServer,将socket改为了SOCK_DGRAM型,并修改了get_request,用于从SOCK_DGRAM中获取request。同时server_activate、shutdown_request、close_request都改成了空(UDP不需要),比TCP简单一些。
<span style="color: #0000ff;">class</span><span style="color: #000000;"> StreamRequestHandler(BaseRequestHandler): rbufsize </span>= -1<span style="color: #000000;"> wbufsize </span>=<span style="color: #000000;"> 0 timeout </span>=<span style="color: #000000;"> None disable_nagle_algorithm </span>=<span style="color: #000000;"> False </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> setup(self): self.connection </span>=<span style="color: #000000;"> self.request </span><span style="color: #0000ff;">if</span> self.timeout <span style="color: #0000ff;">is</span> <span style="color: #0000ff;">not</span><span style="color: #000000;"> None: self.connection.settimeout(self.timeout) </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> self.disable_nagle_algorithm: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) self.rfile </span>= self.connection.makefile(<span style="color: #800000;">'</span><span style="color: #800000;">rb</span><span style="color: #800000;">'</span><span style="color: #000000;">, self.rbufsize) self.wfile </span>= self.connection.makefile(<span style="color: #800000;">'</span><span style="color: #800000;">wb</span><span style="color: #800000;">'</span><span style="color: #000000;">, self.wbufsize) </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> finish(self): </span><span style="color: #0000ff;">if</span> <span style="color: #0000ff;">not</span><span style="color: #000000;"> self.wfile.closed: </span><span style="color: #0000ff;">try</span><span style="color: #000000;">: self.wfile.flush() </span><span style="color: #0000ff;">except</span><span style="color: #000000;"> socket.error: </span><span style="color: #008000;">#</span><span style="color: #008000;"> An final socket error may have occurred here, such as</span> <span style="color: #008000;">#</span><span style="color: #008000;"> the local error ECONNABORTED.</span> <span style="color: #0000ff;">pass</span><span style="color: #000000;"> self.wfile.close() self.rfile.close()</span>
最主要的功能是根据socket生成了读写socket用的两个文件对象(可以理解为句柄)rfile和wfile
<span style="color: #0000ff;">class</span><span style="color: #000000;"> DatagramRequestHandler(BaseRequestHandler): </span><span style="color: #008000;">#</span><span style="color: #008000;"> XXX Regrettably, I cannot get this working on Linux;</span> <span style="color: #008000;">#</span><span style="color: #008000;"> s.recvfrom() doesn't return a meaningful client address.</span> <span style="color: #800000;">"""</span><span style="color: #800000;">Define self.rfile and self.wfile for datagram sockets.</span><span style="color: #800000;">"""</span> <span style="color: #0000ff;">def</span><span style="color: #000000;"> setup(self): </span><span style="color: #0000ff;">try</span><span style="color: #000000;">: </span><span style="color: #0000ff;">from</span> cStringIO <span style="color: #0000ff;">import</span><span style="color: #000000;"> StringIO </span><span style="color: #0000ff;">except</span><span style="color: #000000;"> ImportError: </span><span style="color: #0000ff;">from</span> StringIO <span style="color: #0000ff;">import</span><span style="color: #000000;"> StringIO self.packet, self.socket </span>=<span style="color: #000000;"> self.request self.rfile </span>=<span style="color: #000000;"> StringIO(self.packet) self.wfile </span>=<span style="color: #000000;"> StringIO() </span><span style="color: #0000ff;">def</span><span style="color: #000000;"> finish(self): self.socket.sendto(self.wfile.getvalue(), self.client_address)</span>
同样是生成rfile和wfile,但UDP不直接关联socket。这里的rfile是直接由从UDP中读取的数据生成的,wfile则是新建了一个StringIO,用于写数据。