ホームページ >バックエンド開発 >Python チュートリアル >Python を使用した同時プログラミング
コンピューター プログラムの同時実行はよく議論されるトピックですが、今日は Python でのさまざまな同時実行方法について説明したいと思います。
同時実行方式
スレッド
マルチスレッドは、あらゆる言語を使用するときにほぼすべてのプログラマーが最初に考えるツールです (JS プログラマーは避けてください)、マルチスレッドの使用は効果的です CPU リソースを活用します (Python を除く) 。ただし、マルチスレッドによってもたらされるプログラムの複雑さ、特に競合するリソースの同期の問題は避けられません。
ただし、Python では Global Interpretation Lock (GIL) が使用されているため、コードを複数のコアで同時に実行することはできません。つまり、Python のマルチスレッドを同時に実行できないことに気づくでしょう。 - スレッド化によって、Python コードを追加すると、プログラムの実行効率が大幅に低下します。さらに詳しく知りたい場合は、この記事を読むことをお勧めします。実際、マルチスレッド プログラミング モデルを使用するのは非常に難しく、プログラマーは間違いを犯しやすいです。これはプログラマーのせいではありません。なぜなら、並列思考は非人間的であり、私たちのほとんどは逐次的に思考するからです (統合失調症については議論されていません)。 、フォン・ノイマンによって設計されたコンピュータ アーキテクチャも逐次実行に基づいています。マルチスレッド プログラムを常に処理できない場合は、おめでとうございます。あなたは正常な思考のプログラマーです:)
Python には 2 つのスレッド インターフェイス セットがあり、1 つはスレッド モジュールであり、基本的な低レベル (低レベル) を提供します。 ) Level) インターフェイス。スレッドの実行本体として Function を使用します。また、(Java に似た) 使いやすいオブジェクトベースのインターフェイスを提供するスレッド モジュールのグループもあり、Thread オブジェクトを継承してスレッドを実装したり、Timer などの他のスレッド関連オブジェクトも提供したりできます。 Lock
スレッドモジュールの使用例
1
2
3
4
5
import thread
def worker():
"""スレッドワーカー関数"""
PRint 'Worker'
thread.start_new_thread(worker)
スレッドモジュールの使用例
1
2
3
4
5
6
インポートスレッド
def worker():
"""スレッドワーカー関数"""
print 'Worker'
t = threading.Thread(target=worker)
t.start()
or Java スタイル
1
2
3
4
5
6
7
8
9
10
インポートスレッドing
クラスワーカー(スレッドing.Thread) :
def __init__(self) :
PASSf DEF RUN ():
"" Thread Worker Function "" "
Print 'worker'
t = worker ()
t.start ()
Process (プロセス)
前述のグローバル解釈ロックの問題のため、Python でのより良い並列方法は、CPU リソースを使用できる複数のプロセスを使用することです。非常に効果的に、真の並行性を実現します。もちろん、プロセスのオーバーヘッドはスレッドのオーバーヘッドよりも大きいため、驚くべき数の同時プロセスを作成したい場合は、マシンが強力な心臓を備えているかどうかを考慮する必要があります。
Python の multiprocess モジュールには、スレッド化と同様のインターフェイスがあります。
1
2
3
4
5
6
7
8
from multiprocessing import Process
def ():
"""スレッドワーカーfunction"""
print 'Worker'
p = Process(target=worker)
p.start()
p.join()
🎜 スレッドは同じアドレス空間とメモリを共有しているため、したがって、スレッド間の通信は非常に簡単ですが、プロセス間の通信はより複雑です。一般的なプロセス間通信には、パイプ、メッセージ キュー、ソケット インターフェイス (TCP/IP) などが含まれます。 🎜🎜Python の multiprocess モジュールは、プロセス間でメッセージを簡単に転送できるカプセル化されたパイプとキューを提供します。 🎜🎜Pythonプロセス間の同期には、スレッドと同じロックが使用されます。 🎜🎜さらに、Python はスレッドを簡単に管理および制御できるプロセス プール Pool オブジェクトも提供します。 🎜🎜リモート分散ホスト (分散ノード)🎜
ビッグデータ時代の到来により、ムーアの定理は単一のマシンでは効果を失ったようです。データの計算と処理には、複数のホスト ノードで並行して実行されるプログラムが必要です。 . ソフトウェア アーキテクチャで考慮する必要がある問題。
リモート ホスト間のプロセス間通信にはいくつかの一般的な方法があります
TCP/IP
TCP/IP はすべてのリモート通信の基礎ですが、API は比較的低レベルで使用が難しいため、一般的には考慮されていません
リモート関数呼び出し
RPC は、リモート プロセス間通信の初期の手段です。 Python にはオープンソース実装 RPyC があります
リモート オブジェクト
リモート オブジェクトは、より高レベルのカプセル化であり、プログラムはローカル オブジェクトと同じ方法でリモート オブジェクトのローカル プロキシを操作できます。 CORBA は、リモート オブジェクトで最も広く使用されている仕様です。CORBA の最大の利点は、さまざまな言語とプラットフォームで通信できることです。さまざまな言語やプラットフォームにも、Java の RMI、MS の DCOM
Python のオープンソース実装など、独自のリモート オブジェクト実装があり、リモート オブジェクトのサポートが多数あります
Dopy
Fnorb (CORBA)
ICE
omniORB (CORBA)
Pyro
YAMI
Message Queue
RPC やリモート オブジェクトと比較して、メッセージは、Python インターフェイスをサポートする一般的なメッセージ メカニズムです。
RabbitMQ
ZeroMQ
Kafka
AWS SQS + BOTO
リモートホストでの同時実行とローカルマルチプロセスの実行には大きな違いはなく、どちらもプロセス間通信の問題を解決する必要があります。もちろん、リモート プロセスの管理と調整はローカル プロセスよりも複雑です。
Python には、分散同時実行をサポートし、効果的な管理方法を提供するオープンソース フレームワークが数多くあります。以下のものが挙げられます。
Celery
Celery は、分散システム タスクで非同期に実行でき、効果的な管理とスケジューリングを提供する、非常に成熟した Python 分散フレームワークです。機能。こちらを参照してください
SCOOP
SCOOP (Scalable COncurrent Operations in Python) は、同時実行用の Future インターフェイスを使用した、シンプルで使いやすい分散呼び出しインターフェイスを提供します。
Dispy
Celery や SCOOP と比較して、Dispy はより軽量な分散並列サービスを提供します
PP
PP (Parallel Python) は別の軽量 Python 並列サービスです。こちらを参照してください
Asyncoro
Asyncoro は別の Python フレームワークです。分散同時実行性を実現するためにジェネレーターを使用します
もちろん、他にも多くのシステムがありますが、1 つずつリストしていません
さらに、多くの分散システムは、Spark
疑似スレッドをサポートしています。
一般的ではない別の同時実行メソッドがあります。これは、スレッドのように見え、スレッド インターフェイスに似たインターフェイスを使用しますが、実際には非スレッド メソッドを使用します。対応するスレッド オーバーヘッドは次のとおりです。保存されていません。 greenletgreenlet は、プロセス内同時実行をサポートする軽量のコルーチンを提供します。 greenlet は Stackless の副産物であり、mirco-thread と呼ばれるテクノロジーをサポートするためにタスクレットを使用します。 以下に、greenlet を使用した疑似スレッドの例を示します 123 45。 6789101112131415from greenlet import greenlet def test1 (): print 12 gr2.switch() print 34 def test2(): print 56 gr1.switch()
Print 78 gr1 = greenlet(test1)gr2 = greenさせて( test2)gr1.switch() 上記のプログラムを実行すると、次の結果が得られます: 123
12
56
34
疑似スレッド gr1 スイッチは 12 を出力し、次に gr2 スイッチを呼び出して 56 を取得し、その後 gr1 に戻り、34 を出力します。その後、疑似スレッド gr1 が終了してプログラムが終了するため、78 は出力されません。この例から、擬似スレッドを使用するとプログラムの実行フローを効果的に制御できることがわかりますが、擬似スレッドには実際の同時実行性がありません。
Eventlet、gevent、concurence はすべて、greenlet に基づいた同時実行性を提供します。
eventlet http://eventlet.net/
eventlet は、ネットワーク呼び出しの同時実行を提供する Python ライブラリであり、ユーザーは非ブロッキングな方法でブロッキング IO 操作を呼び出すことができます。
1
2
3
4
5
6
7
8
9
10
11
12
eventlet.green からイベントレット
をインポートimport urllib2
urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']
def fetch(url):
return urllib2.urlopen(url).read()
EPool = Eventlet.greenPool () FOR BODY in Pool.imap (fetch, urls): Print ("Got Body", Len (Body) 実行結果は以下の通りです
123 ('got body', 17629)('got body', 1270)('got body', 46949)
イベントレット注文urllib2ジェネレータ操作をサポートするように変更されており、インターフェイスは urllib2 と一貫しています。ここでの GreenPool は、Python の Pool インターフェイスと一致しています。 geventgevent は、eventlet と似ています。その違いについては、
import gevent
from gevent importソケット
urls = ['www.google.com', 'www.example.com' を参照してください。 , 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for urls]
gevent.joinall(jobs, timeout=2)
print [job.value for job in jobs]
実行結果は次のとおりです:
1
[ '206.169.145.226', '93.184.216.34', 3分]
concurrence https://github.com/concurrence/concurrence
concurence は別のグリーンレットです ネットワーク同時実行性を提供します 私はオープンソース ライブラリを使用したことがないので、自分で試すことができます。
実用的なアプリケーション
同時実行性が必要となる状況は通常 2 つあります。1 つは計算集約型であり、プログラムが大量の CPU リソースを必要とすることを意味し、もう 1 つは IO 集約型であり、プログラムで大量の読み取りが行われる可能性があります。書き込み操作には、ファイルの読み取りと書き込み、ネットワーク要求の送受信などが含まれます。
計算負荷の高いアプリケーション
計算負荷の高いアプリケーションに対応して、有名なモンテカルロ アルゴリズムを選択して PI 値を計算します。基本原理は次のとおりです
モンテカルロアルゴリズムは、統計原理を使用して円周率をシミュレートし、計算します。正方形において、1/4の円の領域に入るランダムな点(赤い点)の確率は、その円周率に比例します。エリア。つまり、確率 p = Pi * R * R / 4: R * R です。ここで、R は正方形の辺の長さと円の半径です。つまり、確率は円周率の 1/4 です。この結論を使用すると、点が 4 分の 1 円に当たる確率をシミュレーションする限り、この確率を求めることができます。多くの実験を行って、大量の点を生成し、その点がどの領域にあるかを確認し、結果を計算します。 基本的なアルゴリズムは次のとおりです: 12345 from math import hypertfrom random import randa
def test (試行):
return sum(hypot(random(), random())
ここで、テストメソッドは n (試行) 回のテストを実行し、4 分の 1 の円を返します。ポイント。判定方法は、点から円の中心までの距離がR未満であれば円上にあります。
大量の同時実行により、複数のテストを迅速に実行でき、実行するテストの数が増えるほど、結果は真の pi に近づきます。
ここでは、さまざまな同時実行方式のプログラム コードを示します
非同時実行
最初は単一スレッドで実行しますが、パフォーマンスがどのようになるかを確認するためにプロセスで実行します
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
数学インポート Hypot より
ランダムからインポートランダム
インポートイベントレット
インポート時間
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, Trys):
ts = time.time()
result = map(test, [tries] * nbFutures)
ret = 4. * sum(result) / float (nbFu tures * Trys) スパン
マルチスレッド スレッドプールを使用するために、マルチスレッドをカプセル化したマルチプロセッシングのダミーパッケージを使用します。ここのコードではスレッドについてまったく言及していませんが、間違いなくマルチスレッドであることに注意してください。テストの結果、スレッド プール数を 1 に設定した場合の実行結果は、スレッド プール数を 5 に設定した場合とほぼ 2 倍高速であることがわかりました。同時実行を行わない場合よりも、テスト データの所要時間は 5 秒から 9 秒になりました。したがって、計算負荷の高いタスクの場合は、マルチスレッドをあきらめたほうがよいでしょう。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing.dummy import Pool
from mathインポートhypot
fromランダムインポートランダム
インポート時間
デフォルトテスト( Trys):
return sum(hypot(random(), random())
def calcPi(nbFutures, Trys):
ts = time.time()
p = Pool(1)
result = p.map(test, [tries] * nbFutures)
ret = 4. * sum(result) / float(nbFutures * Trys)
span = time.time( ) - ts
print "消費時間"、span
return ret
if __name__ == '__main__':
p = Pool()
print("pi = {}".format(calcPi( 3000, 4000)))
multiprocess multiprocess
理論的には、計算集約型のタスクでは、マルチプロセスの同時実行を使用する方が適切です。次の例では、プロセス プールのサイズは次のように設定されています。 5. 変更します。プロセス プールのサイズが結果に与える影響を確認できます。プロセス プールを 1 に設定すると、この時点では同時実行性がないため、マルチスレッドの結果に必要な時間は同様になります。 2 に設定すると、応答時間は同時実行なしの場合の半分になりますが、プロセス プールを拡張し続けるとパフォーマンスにほとんど影響がなく、おそらく Apple Air の CPU のみが低下する可能性があります。コアが2つありますか?
非常に大規模なプロセス プールを設定すると、システムはあまりにも多くのプロセスの作成をサポートできないため、注意してください。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
マルチプロセッシングインポートプールから
数学からインポートhypot
ランダムからインポートランダム
インポート時間
デフォルトテスト(試行):
return sum(hypot(random(), random())
def calcPi(nbFutures, Trys):
ts = time.time()
p = プール(5)
result = p.map(test, [tries] * nbFutures) ret = 4. * sum(result) / float(nbFutures * Trys) span = time.time() - ts
print "消費時間"、span
return ret
if __name__ == '__main__':
print("pi = {}".format(calcPi(3000, 4000)))
gevent ( pseudo-thread)
geventでもeventletでも、実際には同時実行がないため、応答時間は同時実行なしの場合とあまり変わりません。これはテスト結果と一致しています。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import gevent
数学から import hybrid
import time
def test(tries):
return sum(hypot(random(),ランダム())
def calcPi(nbFutures, Trys):
ts = time.time()
jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
gevent.joinall(jobs, timeout=2)
ret = 4. * sum([job.value for jobs in jobs]) / float(nbFutures * Trys)
span = time.time() - ts
print 「所要時間」、span
return ret print calcPi(3000,4000) イベントレット(伪線程)12345678910111213
14151617 1819 from math import hypotfrom random import randomimport eventletimport time def test(trys): return sum(hypot(random(), random ()) def calcPi(nbFutures, Trys): ts = time.time() pool = eventlet.GreenPool() result = プール。 IMAP(test、[tries]*nbfutures) ret = 4.*sum(result)/float(nbfutures*trie) return ret print calcPi(3000,4000) SCOOPSCOOP内の将来のインターフェース標識PEP-3148の定義、ヤ就Python3 で提供される将来のインターフェイスです。省スペースな SCOOP 構成環境 (単体マシン、ワーカー 4 台) でのパフォーマンスは向上していますが、2 つのプロセス池構成とは異なります。 67891011121314151617
1819 from math import hypot from random import randomfrom scoop import futures import time def test(tries): return sum(hypot(random(), random()) span = time.time() - ts print 「消費時間」、span
return ret
if __name__ == 「__main__」 :
print("pi = {}".format(calcPi(3000, 4000)))
セロリ
任务代码
1
2
3
4
5
6
7
8
9
10
11
from celery import Celery
from math import hypot
ランダムインポートからランダム
アプリ = セロリ(' task'、backend='amqp'、broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
@app. task
def test(tries):
return sum(hypot(random(), random())
客户端代
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
セロリ 輸入品group
タスクからインポートテスト
インポート時間
def calcPi(nbFutures, Trys):
ts = time.time()
result = group(test.s(tries) for i in xrange (nbFutures))().get()
ret = 4. * sum(result) / float(nbFutures * tries)
span = time.time() - ts
print 「消費時間」、スパン
return ret
print calcPi(3000, 4000)
Celery を使用した同時実行テストの結果は予想外でした (環境は単一マシン、4frefork 同時実行、メッセージ ブローカーは RabbitMQ です)。応答時間は全テスト ケースの中で最悪で、同時実行なしの場合の 5 ~ 6 倍でした。 。これは、制御調整のオーバーヘッドが大きすぎることが原因である可能性があります。このようなコンピューティング タスクには、Celery は適切な選択ではない可能性があります。 asyncoro Asyncoro のテスト結果は、非同時実行性と一致しています。 123456789101112 13141516 171819 asyncoroをインポート 数学からインポートhypotランダムからインポートランダムインポート時間 defテスト(試行): 利回り合計(hypot(ランダム()、ランダム()) def calcPi(nbFutures, Trys): ts = time.time() coros = [ asyncoro.Coro (test,t) for t in [tries] * nbFutures] ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * Trys) scan = time.time( ) - ts print "time Spend "、span return ret print calcPi(3000,4000) IO 集中型 IO 集中型のタスクも一般的な使用例です。たとえば、次のようになります。ネットワーク WEB サーバーはその一例であり、1 秒あたりに処理できるリクエストの数は WEB サーバーの重要な指標です。 最も単純な例として Web ページの読み取りを考えてみましょう 12345678910
11 12
1314151617 数学インポートhypotからインポート時間import urllib2
urls = ['http://www.google. com' , ' http://www.example.com', 'http://www.python.org'] def test(url): return urllib2.urlopen(url).read() def testIO(nbFutures): ts = time.time() map(test, urls * nbFutures) span = time.time() - ts 印刷時間 "、span
testIO(10) さまざまな同時実行ライブラリのコードは比較的似ているため、1 つずつリストしません。計算負荷の高いコードを参考として参照してください。 テストを通じて、IO 集中型のタスクでは、マルチスレッドまたはマルチプロセスを使用すると、プログラムの効率が効果的に向上することがわかりました。同時実行なしでは時間が 9 秒から 0.03 秒に増加しました。同時に、eventlet/gevent は、非常に便利なノンブロッキングの非同期呼び出しモードを提供します。応答時間が同等であれば、スレッドと擬似スレッドの方が消費するリソースが少ないため、ここではスレッドまたは擬似スレッドを使用することをお勧めします。 概要Python では、さまざまなシナリオに応じてさまざまな同時実行メソッドを選択する必要があります。適切な方法を選択するには、その方法の原理を理解するだけでなく、いくつかのテストと実験を行う必要があります。データは、選択を行うための最良の参考になります。 上記は Python を使用した同時プログラミングの内容です。その他の関連記事については、PHP 中国語 Web サイト (www.php.cn) に注目してください。