アズカバンの最近のテスト作業では、安定性テストのためにテスト環境でオンライン スケジュール シナリオをシミュレートする必要があります。そこで、私は以前の業務である Python に戻り、Python でスクリプトを作成して、同様のオンライン スケジューリング シナリオを構築しました。スクリプト作成プロセス中に、テスト環境で 10,000 のジョブ ストリームを作成するという要件に遭遇しました。
当初のアイデアは、azkaban プロジェクトの下でループ内でジョブ作成インターフェイスを 10,000 回呼び出すことでした (各フローにはジョブが 1 つだけ含まれます)。 azkaban 自体にはジョブ ストリームを追加/削除するためのインターフェイスがないため、すべてのジョブ ストリームの変更、追加、および削除は、対応して、Mammoth フロントエンドのジョブ作成インターフェイスのたびにプロジェクト zip パッケージを再アップロードすることによって実際に実装されます。調整されていますが、実際には、Mammoth は zip パッケージの内容を再統合し、その zip パッケージを azkaban に再アップロードします。プロセス全体は次のプロセスに分類できます。zip パッケージを解凍して zip の内容を取得します。パッケージを作成し、zip パッケージ内のファイルの内容を変更して、azkaban にアップロードされた zip パッケージを再パッケージ化します。したがって、サイクル数が遅くなるにつれて、zip パッケージに含まれるコンテンツが増え、インターフェイスの 1 回の実行にかかる時間が長くなります。実際にインターフェイスを初めて呼び出すのにかかる時間は 1 秒未満であり、サイクルが 1,000 回になると、インターフェイスを 1 回呼び出すのにかかる時間は 3 秒近くに達することがわかっています。したがって、このシーンを構築するために 10,000 回のループを想定すると、明らかに膨大な時間がかかります。
この文脈では、この問題に対処するためにマルチプロセス/マルチスレッドのアプローチを使用することを考えるのは自然です。
ご存知のとおり、オペレーティング システムは複数のタスクを同時に実行できます。たとえば、音楽を聴いたり、IM でチャットしたり、ブログを書いたりします。現在の CPU のほとんどはマルチコアですが、以前はシングルコア CPU であっても複数のタスクの並列実行をサポートしていました。
マルチタスクを実行するシングルコアCPUの原理:オペレーティングシステムは各タスクを交互に実行します。最初にタスク 1 を 0.01 秒間実行し、次にタスク 2 に切り替えて 0.01 秒間実行し、次にタスク 3 に切り替えて 0.01 秒間実行します...というように続きます。 CPU の実行速度は非常に速いため、ユーザーの主観的にはこれらのタスクが並行して実行されているように感じられます。
マルチタスクを実行するマルチコア CPU の原理: 実際のアプリケーションでは、タスクの数が CPU コアの数をはるかに超えることがよくあるため、オペレーティング システムは実際にこれらのマルチタスクを順番に実行するようにスケジュールします。それぞれのコア。
オペレーティングシステムにとって、アプリケーションはプロセスです。たとえば、ブラウザを開いた場合はプロセスであり、メモ帳を開いた場合はプロセスです。各プロセスには独自の固有のプロセス番号があります。これらはシステムのメモリ リソースを共有します。 プロセスとは、オペレーティングシステムがリソースを割り当てるための最小単位です。
ビデオとオーディオを同時に再生する必要があるビデオプレーヤーなどの各プロセスでは、少なくとも 2 つの「サブタスク」を同時に実行する必要があります。プロセス内のこれらのサブタスクはスレッドを通じて完了します。 スレッドは最小の実行単位です。プロセスには複数のスレッドを含めることができ、これらのスレッドは互いに独立しており、プロセスが所有するリソースを共有します。
マルチプロセッシングは、Python によって提供されるクロスプラットフォームのマルチプロセス モジュールであり、これを使用すると、さまざまなプラットフォーム (Unix/Linux、 Windows)を実行することができます。
以下は、マルチプロセスを使用してマルチプロセスプログラムを書くためのコードです:
#!/usr/bin/python# -*- coding: utf-8 -*author = 'zni.feng'import sys reload (sys) sys.setdefaultencoding('utf-8')from multiprocessing import Processimport osimport time#子进程fundef child_projcess_fun(name): print 'Child process %s with processId %s starts.' % (name, os.getpid()) time.sleep(3) print 'Child process %s with processId %s ends.' % (name, os.getpid())if name == "main": print 'Parent processId is: %s.' % os.getpid() p = Process(target = child_projcess_fun, args=('zni',)) print 'Process starts' p.start() #开始进程 p.join() #等待子进程结束后再继续往下执行 print 'Process ends.'
プログラムの出力:
Parent processId is: 11076. Process starts Child process zni with processId 11077 starts. Child process zni with processId 11077 ends. Process ends. [Finished in 3.1s]
場合によっては、複数のサブプロセスをバッチで作成したい場合がありますが、または、システム リソースが無限に消費されるのを避けるために、サブプロセスの数に上限を設けます。この作業はPool(プロセスプール)を通じて行うことができます。 Poolを使用するコードは以下の通りです:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 8 from multiprocessing import Pool 9 import os, time10 11 def child_process_test(name, sleep_time):12 print 'Child process %s with processId %s starts.' % (name, os.getpid())13 time.sleep(sleep_time)14 print 'Child process %s with processId %s ends.' % (name, os.getpid())15 16 if name == "main":17 print 'Parent processId is: %s.' % os.getpid()18 p = Pool() #进程池默认大小是cpu的核数19 #p = Pool(10) #生成一个容量为10的进程池,即最大同时执行10个子进程20 for i in range(5):21 p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向进程池提交目标请求22 23 print 'Child processes are running.'24 p.close()25 p.join() #用来等待进程池中的所有子进程结束再向下执行代码,必须在p.close()或者p.terminate()之后执行26 print 'All Processes end.'
プログラムの出力:
Parent processId is: 5050. Child processes are running. Child process zni_0 with processId 5052 starts. Child process zni_1 with processId 5053 starts. Child process zni_2 with processId 5054 starts. Child process zni_3 with processId 5055 starts. Child process zni_0 with processId 5052 ends. Child process zni_4 with processId 5052 starts. Child process zni_1 with processId 5053 ends. Child process zni_2 with processId 5054 ends. Child process zni_3 with processId 5055 ends. Child process zni_4 with processId 5052 ends. All Processes end. [Finished in 6.2s]
close()メソッドとterminate()メソッドの違い:
close:新しいプロセスを追加できないようにプロセス プールを閉じます。すでに実行されているプロセスは、終了するまで実行を続行するのを待機します。
terminate: スレッドプールを強制終了し、実行中のプロセスも強制終了します。
Pythonのマルチプロセッシングモジュールは、QueueやPipeなどの様々なプロセス間通信手段を提供します。
3.1 キュー、ロック
キューはマルチプロセッシングによって提供されるモジュールで、そのデータ構造は「FIFO-first in first out」キューです。一般的に使用されるメソッドは次のとおりです。 ;empty() はキューが空かどうかを判断します。
ロック: 複数の子プロセスが同じキューで書き込み操作を実行する場合、同時操作の競合を避けるために、子プロセスがキューへの書き込み権限のみを持ち、他の子プロセスは待機する必要があるようにロックを使用できます。ロックが解除されると、書き込み操作を再度開始できます。
以下は、プロセス間通信に Queue を使用するためのコードです: 親プロセス内に 2 つの子プロセスを作成し、それぞれキューに対する読み取り操作と書き込み操作を実装します
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Queue, Lock 8 import os, time, random 9 #写数据进程10 def write(q, lock, name):11 print 'Child Process %s starts' % name12 #获得锁13 lock.acquire()14 for value in ['A' , 'B', 'C']:15 print 'Put %s to queue...' % value16 q.put(value)17 time.sleep(random.random())18 #释放锁19 lock.release()20 print 'Child Process %s ends' % name21 22 #读数据进程23 def read(q, lock, name):24 print 'Child Process %s starts' % name25 while True: #持续地读取q中的数据26 value =q.get()27 print 'Get %s from queue.' % value28 print 'Child Process %s ends' % name29 30 if name == "main":31 #父进程创建queue,并共享给各个子进程32 q= Queue()33 #创建锁34 lock = Lock()35 #创建第一个“写”子进程36 pw = Process(target = write , args=(q, lock, 'WRITE', ))37 #创建“读”进程38 pr = Process(target = read, args=(q,lock, 'READ',))39 #启动子进程pw,写入:40 pw.start()41 #启动子进程pr,读取:42 pr.start()43 #等待pw结束:44 pw.join()45 #pr是个死循环,通过terminate杀死:46 pr.terminate()47 print 'Test finish.'
プログラムの出力結果は次のとおりです:
Child Process WRITE starts Put A to queue... Child Process READ starts Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. Child Process WRITE ends Test finish. [Finished in 2.0s]
3.2 Pipe
Pipe是另一种进程间通信的方式,俗称“管道”。它由两端组成,一端往管道里写入数据,另一端从管道里读取数据。
下面就是使用Pipe通信的代码:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Pipe 8 import os, time, random 9 10 #发送数据进程11 def send(child_pipe, name):12 print 'Child Process %s starts' % name13 child_pipe.send('This is Mr.Ni')14 child_pipe.close()15 time.sleep(random.random())16 print 'Child Process %s ends' % name17 18 #接收数据进程19 def recv(parent_pipe, name):20 print 'Child Process %s starts' % name21 print parent_pipe.recv()22 time.sleep(random.random())23 print 'Child Process %s ends' % name24 25 if name == "main":26 #创建管道27 parent,child = Pipe()28 #创建send进程29 ps = Process(target=send, args=(child, 'SEND'))30 #创建recv进程31 pr = Process(target=recv, args=(parent, 'RECEIVE'))32 #启动send进程33 ps.start()34 #等待send进程结束35 ps.join()36 #启动recv进程37 pr.start()38 #等待recv进程结束39 pr.join()40 print 'Test finish.'
程序的输出结果如下:
Child Process SEND starts Child Process SEND ends Child Process RECEIVE starts This is Mr.Ni Child Process RECEIVE ends Test finish. [Finished in 1.8s]
【相关推荐】
2. Python中推荐使用多进程而不是多线程?分享推荐使用多进程的原因
以上がPythonにおけるマルチプロセスとマルチスレッドの例(1)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。