首頁  >  文章  >  後端開發  >  python呼叫外部子程序,透過管道實現非同步標準輸入和輸出的

python呼叫外部子程序,透過管道實現非同步標準輸入和輸出的

高洛峰
高洛峰原創
2016-10-18 09:43:561732瀏覽

我們通常會遇到這樣的需求:透過C++或其他較底層的語言實作了一個複雜的功能模組,需要建構一個基於Web的Demo,方法查詢資料。由於Python語言的強大和簡潔,其用來搭建Demo非常合適,Flask框架和jinja2模組功能為python提供了方便的web開發能力。同時,python能夠很方便的同其他語言的程式碼互動。因此我們選擇python作為開發Demo的工具。假設我們需要呼叫的模組(提供底層服務)透過標準輸入循環讀入數據,處理完畢後把結果寫出到標出輸出,這樣的場景在Linux環境下很常見,依賴Linux強大的重定向能力。然而,非常不幸的是,底層模組有一個很重的初始化過程,因此我們不能夠每次查詢請求都去重新產生呼叫底層模組的子程序。解決方案就是只產生一次子進程,然後對每個請求透過管道(pipe)來和子進程互動。

Python的subprocess模組可以很容易地產生子進程,類似Linux系統呼叫fork和exec。 subprocess模組的Popen物件可能以非阻塞的方式呼叫外部可執行程序,因此我們使用Poen物件來實現需求。如果我們想要把資料寫入子進程的標準輸入stdin,那麼在創建Popen物件的時候就需要指定參數stdin為subprocess.PIPE;同樣,如果我們需要從子進程的標準輸出中讀取數據,那麼在建立Popen物件的時候就需要指定參數stdout為subprocess.PIPE。先看一個簡單的例子:

from subprocess import Popen, PIPE
p = Popen('less', stdin=PIPE, stdout=PIPE)
p.communicate('Line number %d.\n' % x)

   

communicate函數傳回一個二元組(stdoutdata, stderrdata),包含了子程序的標準輸出和標出錯誤的輸出資料。然而,由於Popen物件的communicate函數會阻塞父進程,同時也會關閉管道,因此每個Popen物件只能呼叫一次communicate函數,如果有多個要求必須重新產生Popen物件(重新初始化子程序),則不能滿足我們的需求。

因此,我們只有往Popen物件的stdin和stdout物件裡寫入和讀取資料才能實現我們的需求。然而,不幸的是subprocess模組預設情況下只運行在子程序結束的時候讀取一次標準輸出。 Both subprocess and os.popen* only allow input and output one time, and the output to be read only when the process terminates. 

進過一番研究之後我發現透過fcntl模組的fcntl函數可以把子進程的標準輸出改為非阻塞的方式,從而達到我們的目的。這樣困擾我許久的問題終於完美解決了。代碼如下:

#!/usr/bin/python                                                                                                                                                      
# -*- coding: utf-8 -*-
# author: weisu.yxd@taobao.com
from subprocess import Popen, PIPE
import fcntl, os
import time
class Server(object):
  def __init__(self, args, server_env = None):
    if server_env:
      self.process = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=server_env)
    else:
      self.process = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
    flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
    fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
  def send(self, data, tail = '\n'):
    self.process.stdin.write(data + tail)
    self.process.stdin.flush()
  def recv(self, t=.1, e=1, tr=5, stderr=0):
    time.sleep(t)
    if tr < 1:
        tr = 1 
    x = time.time()+t
    r = &#39;&#39;
    pr = self.process.stdout
    if stderr:
      pr = self.process.stdout
    while time.time() < x or r:
        r = pr.read()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            return r.rstrip()
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return r.rstrip()
if __name__ == "__main__":
  ServerArgs = [&#39;/home/weisu.yxd/QP/trunk/bin/normalizer&#39;, &#39;/home/weisu.yxd/QP/trunk/conf/stopfile.txt&#39;]
  server = Server(ServerArgs)
  test_data = &#39;在云端&#39;, &#39;云梯&#39;, &#39;摩萨德&#39;, &#39;Alisa&#39;, &#39;iDB&#39;, &#39;阿里大数据&#39;
  for x in test_data:
    server.send(x)
    print x, server.recv()

 另外,調用一些外部程序時,可能需要指定相應的環境變量,方式如下:

  my_env = os.environ
  my_env["LD_LIBRARY_PATH"] = "/path/to/lib"
  server = server.Server(cmd, my_env)


   


陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn