ホームページ  >  記事  >  バックエンド開発  >  Pythonでのマルチプロセスとプロセスプール(処理ライブラリ)のサンプルコード

Pythonでのマルチプロセスとプロセスプール(処理ライブラリ)のサンプルコード

零下一度
零下一度オリジナル
2017-06-16 11:13:102086ブラウズ

この記事では主にPythonのマルチプロセスとプロセスプール(Processing library)の詳しい解説を紹介していますので、必要な方は参考にしてください

環境:win7+python2.7

私は以前からマルチプロセスやマルチスレッドを学びたいと思っていましたが、基礎知識と簡単な概要を見ただけでは、それをどのように応用するのか理解できませんでした。少し前に、github でクローラプロジェクトを目にしました。マルチプロセスとマルチスレッド関連のコンテンツを調べました。次に、関連するナレッジ ポイントといくつかのアプリケーションを記録として書き留めます

まず、プロセスとは何かについて説明します。コンピュータ上のプログラムのアクティビティ。プログラムが実行されると、プロセスがシステム プロセスとユーザー プロセスに分けられます。実行状態のオペレーティング システム自体と、ユーザーが開始したすべてのプロセスはユーザー プロセスです。プロセスは、オペレーティング システムがリソースを割り当てる単位です。

直感的に言えば、タスク マネージャーで system とマークされているユーザー名がシステム プロセスであり、管理者とマークされているものがユーザー プロセスです。さらに、net は netro、lcacal サービスはローカル サービスです。百科事典にありますので、こちらを参照してください。 いくつかの労力を節約する必要があります。そうしないと、元に戻すことができません

1. マルチプロセッシングの簡単な使い方

図に示すように、マルチプロセッシングには複数の機能があります。ここでは、私が現時点で知っていることだけを話します。

プロセスの作成

:Process(target=メインの実行関数、name=カスタムプロセス名はオプション、args= (パラメータ))メソッド:

    is_alive(): プロセスが生きているかどうかを判断します
  1. join([timeout]): 次のステップを実行する前に子プロセスが終了する場合があります。プロセスがブロックされている場合、プログラムが実行を継続できるようにタイムアウトが設定されます。
  2. run(): プロセスを作成している場合 オブジェクトの使用時にターゲットが指定されていない場合、プロセスの run メソッドは次のようになります。デフォルトで実行されます
  3. start(): プロセスを開始します、run()を区別します
  4. terminate(): プロセスを終了します、プロセスの終了はそれほど単純ではないようです psutilを使用する方が良いでしょう。パッケージについてはまた機会があれば書きます。

  5. このうち、Processはstart()でプロセスを開始します。

属性:


    authkey: ドキュメント内の authkey() 関数で次の文を見つけました: これまでのところ、関連するアプリケーションの例は見つかりません。このキーはどのように使用されますか?
  1. デーモンについては言及されていません: 親プロセスが終了すると自動的に終了し、新しいプロセスを生成することはできません。 start() の前に設定する必要があります。
  2. exitcode: プロセスの実行中に、 -N の場合は None で、シグナル N によって終了されたことを示します
  3. name: カスタマイズされたプロセスの名前
  4. pid: 各プロセスは一意の PID 番号を持ちます。
1.Process(),start(),join()

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if name == 'main':
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p2.start()
 p1.join()
 p2.join()
 b=time.time()
 print 'finish',b-a
ここではp1とp2の合計2つのプロセスが開かれており、arg=(4,)の4はfun1関数のパラメータです, ここで tulpe 型を使用するには、パラメータが 2 つ以上ある場合、arg=(パラメータ 1, パラメータ 2...) として、start() を使用して p1 を待つように設定します。次のステップを実行する前に終了する p2 プロセス。以下を参照してください。操作の結果、fun2 と fun1 は基本的に同時に実行を開始します (fun1 は 4 秒間スリープし、fun2 は 6 秒間スリープします)。 print 'finish' ステートメントと b-a ステートメントが実行されます

this is fun2 Mon Jun 05 13:48:04 2017
this is fun1 Mon Jun 05 13:48:04 2017
fun1 finish Mon Jun 05 13:48:08 2017
fun2 finish Mon Jun 05 13:48:10 2017
finish 6.20300006866

Process finished with exit code 0

start() と join() が異なる位置にある場合に何が起こるかを見てみましょう

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if name == 'main':
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p1.join()
 p2.start()
 p2.join()
 b=time.time()
 print 'finish',b-a

結果:

this is fun1 Mon Jun 05 14:19:28 2017
fun1 finish Mon Jun 05 14:19:32 2017
this is fun2 Mon Jun 05 14:19:32 2017
fun2 finish Mon Jun 05 14:19:38 2017
finish 10.1229999065

Process finished with exit code 0

見て、fun1 関数が最初に実行されます。 、それが完了してから fun2 が実行され、次に print 'finish' が実行されます。つまり、最初にプロセス p1 が実行され、次にプロセス p2 が実行されます。 これで、join() の魅力に達したと感じます。 join() をコメントアウトして、再度何が起こるかを確認します

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if name == 'main':
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p2.start()
 p1.join()
 #p2.join()
 b=time.time()
 print 'finish',b-a

結果:

this is fun1 Mon Jun 05 14:23:57 2017
this is fun2 Mon Jun 05 14:23:58 2017
fun1 finish Mon Jun 05 14:24:01 2017
finish 4.05900001526
fun2 finish Mon Jun 05 14:24:04 2017

Process finished with exit code 0

今回は fun1 の実行が終了しています (p1 プロセスが join() を使用しているため、メイン プログラムは p1 の実行が終了するのを待ってから、次のプログラムを実行します)ステップ)、メインプロセスの print 'finish' を実行し続け、最後に fun2 の実行が完了した後に終了します


2.name, daemon, is_alive():

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if name == 'main':
 a=time.time()
 p1=Process(name='fun1进程',target=fun1,args=(4,))
 p2 = Process(name='fun2进程',target=fun2, args=(6,))
 p1.daemon=True
 p2.daemon = True
 p1.start()
 p2.start()
 p1.join()
 print p1,p2
 print '进程1:',p1.is_alive(),'进程2:',p2.is_alive()
 #p2.join()
 b=time.time()
 print 'finish',b-a
Result:

this is fun2 Mon Jun 05 14:43:49 2017
this is fun1 Mon Jun 05 14:43:49 2017
fun1 finish Mon Jun 05 14:43:53 2017
<Process(fun1进程, stopped daemon)> <Process(fun2进程, started daemon)>
进程1: False 进程2: True
finish 4.06500005722

Process finished with exit code 0

name がプロセスに名前を付けることであることがわかります。 print 'process 1:', p1.is_alive(), 'process 2:', p2.is_alive() という文に到達すると、p1 プロセスは終了します (戻り値)。 False) の場合、p2 プロセスは引き続き実行されます (True を返します)。ただし、p2 は join() を使用しないため、daemon=Ture が使用されるため、親プロセスは終了後に自動的に終了します。 p2 プロセスが終了する前にプログラム全体が強制終了されます。 .

3.run()

run() Process がターゲット関数を指定しない場合、デフォルトで run() 関数がプログラムの実行に使用されます。

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a = time.time()
 p=Process()
 p.start()
 p.join()
 b = time.time()
 print &#39;finish&#39;, b - a

結果:

finish 0.0840001106262

結果から、プロセス p には何もありません プロセスを正常に実行させるために、Jiangzi は次のように書きました:

目的関数にはパラメーターがありません:

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1():
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(2)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a = time.time()
 p=Process()
 p.run=fun1
 p.start()
 p.join()
 b = time.time()
 print &#39;finish&#39;, b - a

結果:

this is fun1 Mon Jun 05 16:34:41 2017
fun1 finish Mon Jun 05 16:34:43 2017
finish 2.11500000954

Process finished with exit code 0

目的関数にはパラメーターがあります:

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a = time.time()
 p=Process()
 p.run=fun1(2)
 p.start()
 p.join()
 b = time.time()
 print &#39;finish&#39;, b - a

結果:

this is fun1 Mon Jun 05 16:36:27 2017
fun1 finish Mon Jun 05 16:36:29 2017
Process Process-1:
Traceback (most recent call last):
 File "E:\Anaconda2\lib\multiprocessing\process.py", line 258, in _bootstrap
 self.run()
TypeError: &#39;NoneType&#39; object is not callable
finish 2.0529999733

Process finished with exit code 0

目的関数にパラメータがあり、例外が発生しました。理由はまだわかりませんが、実際には最後のパラメータがプロセスに割り当てられており、他のパラメータが存在しないことがわかりました。 、この例外が発生します 知っている人は教えてください

2. プロセスプール

对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程

Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去

apply_async(函数,(参数)):非阻塞,其中参数是tulpe类型,

apply(函数,(参数)):阻塞

close():关闭pool,不能再添加新的任务

terminate():结束运行的进程,不再处理未完成的任务

join():和Process介绍的作用一样, 但要在close或terminate之后使用。

1.单个进程池

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for i in range(3,8):
  pool.apply_async(fun1,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print &#39;finish&#39;,b-a

结果:

this is fun1 Mon Jun 05 15:15:38 2017
this is fun1 Mon Jun 05 15:15:38 2017
this is fun1 Mon Jun 05 15:15:38 2017
fun1 finish Mon Jun 05 15:15:41 2017
this is fun1 Mon Jun 05 15:15:41 2017
fun1 finish Mon Jun 05 15:15:42 2017
this is fun1 Mon Jun 05 15:15:42 2017
fun1 finish Mon Jun 05 15:15:43 2017
fun1 finish Mon Jun 05 15:15:47 2017
fun1 finish Mon Jun 05 15:15:49 2017
finish 11.1370000839

Process finished with exit code 0

从上面的结果可以看到,设置了3个运行进程上限,15:15:38这个时间同时开始三个进程,当第一个进程结束时(参数为3秒那个进程),会添加新的进程,如此循环,直至进程池运行完再执行主进程语句b=time.time() print 'finish',b-a .这里用到非阻塞apply_async(),再来对比下阻塞apply()

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for i in range(3,8):
  pool.apply(fun1,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print &#39;finish&#39;,b-a

结果:

this is fun1 Mon Jun 05 15:59:26 2017
fun1 finish Mon Jun 05 15:59:29 2017
this is fun1 Mon Jun 05 15:59:29 2017
fun1 finish Mon Jun 05 15:59:33 2017
this is fun1 Mon Jun 05 15:59:33 2017
fun1 finish Mon Jun 05 15:59:38 2017
this is fun1 Mon Jun 05 15:59:38 2017
fun1 finish Mon Jun 05 15:59:44 2017
this is fun1 Mon Jun 05 15:59:44 2017
fun1 finish Mon Jun 05 15:59:51 2017
finish 25.1610000134

Process finished with exit code 0

可以看到,阻塞是当一个进程结束后,再进行下一个进程,一般我们都用非阻塞apply_async()

2.多个进程池

上面是使用单个进程池的,对于多个进程池,我们可以用for循环,直接看代码

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print &#39;this is fun1&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun1 finish&#39;,time.ctime()

def fun2(t):
 print &#39;this is fun2&#39;,time.ctime()
 time.sleep(t)
 print &#39;fun2 finish&#39;,time.ctime()

if name == &#39;main&#39;:
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for fun in [fun1,fun2]:
  for i in range(3,8):
   pool.apply_async(fun,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print &#39;finish&#39;,b-a

结果:

this is fun1 Mon Jun 05 16:04:38 2017
this is fun1 Mon Jun 05 16:04:38 2017
this is fun1 Mon Jun 05 16:04:38 2017
fun1 finish Mon Jun 05 16:04:41 2017
this is fun1 Mon Jun 05 16:04:41 2017
fun1 finish Mon Jun 05 16:04:42 2017
this is fun1 Mon Jun 05 16:04:42 2017
fun1 finish Mon Jun 05 16:04:43 2017
this is fun2 Mon Jun 05 16:04:43 2017
fun2 finish Mon Jun 05 16:04:46 2017
this is fun2 Mon Jun 05 16:04:46 2017
fun1 finish Mon Jun 05 16:04:47 2017
this is fun2 Mon Jun 05 16:04:47 2017
fun1 finish Mon Jun 05 16:04:49 2017
this is fun2 Mon Jun 05 16:04:49 2017
fun2 finish Mon Jun 05 16:04:50 2017
this is fun2 Mon Jun 05 16:04:50 2017
fun2 finish Mon Jun 05 16:04:52 2017
fun2 finish Mon Jun 05 16:04:55 2017
fun2 finish Mon Jun 05 16:04:57 2017
finish 19.1670000553

Process finished with exit code 0

看到了,在fun1运行完接着运行fun2.

另外对于没有参数的情况,就直接 pool.apply_async(funtion),无需写上参数.

在学习编写程序过程,曾遇到不用if _name_ == '_main_':而直接运行程序,这样结果会出错,经查询,在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if _name_ == ‘_main_' :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。原因有人这么说:在执行的時候,由于你写的 py 会被当成module 读进执行。所以,一定要判断自身是否为 _main_。也就是要:

if name == ‘main&#39; :
# do something.

这里我自己还搞不清楚,期待以后能够理解

以上がPythonでのマルチプロセスとプロセスプール(処理ライブラリ)のサンプルコードの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。