Home  >  Article  >  Backend Development  >  Python calls an external subprocess to implement asynchronous standard input and output through pipes

Python calls an external subprocess to implement asynchronous standard input and output through pipes

大家讲道理
大家讲道理Original
2016-11-07 17:06:381029browse

We usually encounter such needs: a complex functional module is implemented through C++ or other lower-level languages, and a Web-based Demo needs to be built to query data. Due to the power and simplicity of the Python language, it is very suitable for building demos. The Flask framework and jinja2 module functions provide python with convenient web development capabilities. At the same time, python can easily interact with codes in other languages. Therefore, we choose python as the tool for developing Demo. Assume that the module we need to call (providing underlying services) reads data in a loop through standard input, and after processing, writes the results to the marked output. This scenario is very common in the Linux environment and relies on Linux's powerful redirection capabilities. Unfortunately, however, the underlying module has a heavy initialization process, so we cannot re-spawn the child process that calls the underlying module for every query request. The solution is to spawn the child process only once, and then interact with the child process through a pipe for each request.

Python’s subprocess module can easily generate subprocesses, similar to the Linux system calls fork and exec. The Popen object of the subprocess module may call external executable programs in a non-blocking manner, so we use the Poen object to achieve our needs. If we want to write data to the standard input stdin of the subprocess, then we need to specify the parameter stdin as subprocess.PIPE when creating the Popen object; similarly, if we need to read data from the standard output of the subprocess, then in When creating a Popen object, you need to specify the parameter stdout as subprocess.PIPE. Let’s look at a simple example first:

from subprocess import Popen, PIPE
p = Popen('less', stdin=PIPE, stdout=PIPE)
p.communicate('Line number %d.\n' % x)

The communicate function returns a tuple (stdoutdata, stderrdata), which contains the standard output of the child process and the output data indicating errors. However, since the communicate function of the Popen object blocks the parent process and also closes the pipe, each Popen object can only call the communicate function once. If there are multiple requests, the Popen object must be regenerated (reinitializing the child process), which cannot be satisfied. our needs.

Therefore, we can only achieve our needs by writing and reading data to the stdin and stdout objects of the Popen object. Unfortunately, however, the subprocess module by default only runs and reads standard output once when the subprocess terminates. Both subprocess and os.popen* only allow input and output one time, and the output to be read only when the process terminates.

After some research, I found that the standard output of the subprocess can be output through the fcntl function of the fcntl module Change to a non-blocking method to achieve our purpose. This problem that has been bothering me for a long time has finally been solved perfectly. The code is as follows:

#!/usr/bin/python                                                                                                                                                      
# -*- coding: utf-8 -*-
# author: weisu.yxd@taobao.com
from subprocess import Popen, PIPE
import fcntl, os
import time
class Server(object):
  def __init__(self, args, server_env = None):
    if server_env:
      self.process = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=server_env)
    else:
      self.process = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
    flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
    fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
  def send(self, data, tail = '\n'):
    self.process.stdin.write(data + tail)
    self.process.stdin.flush()
  def recv(self, t=.1, e=1, tr=5, stderr=0):
    time.sleep(t)
    if tr < 1:
        tr = 1 
    x = time.time()+t
    r = &#39;&#39;
    pr = self.process.stdout
    if stderr:
      pr = self.process.stdout
    while time.time() < x or r:
        r = pr.read()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            return r.rstrip()
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return r.rstrip()
if __name__ == "__main__":
  ServerArgs = [&#39;/home/weisu.yxd/QP/trunk/bin/normalizer&#39;, &#39;/home/weisu.yxd/QP/trunk/conf/stopfile.txt&#39;]
  server = Server(ServerArgs)
  test_data = &#39;在云端&#39;, &#39;云梯&#39;, &#39;摩萨德&#39;, &#39;Alisa&#39;, &#39;iDB&#39;, &#39;阿里大数据&#39;
  for x in test_data:
    server.send(x)
    print x, server.recv()

In addition, when calling some external programs, you may need to specify the corresponding environment variables as follows:

  my_env = os.environ
  my_env["LD_LIBRARY_PATH"] = "/path/to/lib"
  server = server.Server(cmd, my_env)

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn