Heim > Artikel > Backend-Entwicklung > Warteschlange und Multiprozess in Python
Ich bin kürzlich auf ein Projekt gestoßen, das die Ausführung von Aufgaben in mehreren virtuellen Maschinen erforderte. Ich habe auf den Code früherer Projekte anderer Leute verwiesen und Multiprozess verwendet, um es zu verarbeiten, also habe ich online nach Multiprozessen in Python gesucht
1. Lassen Sie uns zuerst über Queue (Warteschlangenobjekt) sprechen
Queue ist die Standardbibliothek in Python, die direkt importiert und zitiert werden kann Ich habe gehört, dass die berühmten Wörter „Zuerst essen, zuerst ziehen“ und „Zuerst essen, zuerst erbrechen“ tatsächlich die hier erwähnte Warteschlange sind. Beim Erstellen der Warteschlange können Sie deren Kapazität definieren. Essen Sie nicht zu viel. Wenn Sie zu viel essen , wird beim Erstellen kein Fehler gemeldet oder weniger als 1 geschrieben. Die Zahl bedeutet unendlich
Import Queue
q = Queue.Queue(10)
in die Warteschlange stellen Value(put)
q.put('yang')
q.put(4)
q.put(['yan','xing'])
Den Wert in der Warteschlange abrufen ()
Die Standardwarteschlange lautet „Wer zuerst reinkommt, mahlt zuerst“
>>> q.get()
'yang'
>>> ; q.get()
4
>>> q .get()
['yan', 'xing']
Wenn eine Warteschlange leer ist , wenn Sie get zum Abrufen verwenden, wird es blockiert, daher wird es im Allgemeinen beim Abrufen der Warteschlange
get_nowait()-Methode verwendet. Diese Methode löst eine leere Ausnahme aus, wenn ein Wert aus einer leeren Warteschlange abgerufen wird
Die gebräuchlichere Methode besteht also darin, zunächst festzustellen, ob eine Warteschlange leer ist. Wenn nicht, nehmen Sie den Wert, wenn sie leer ist.
Häufig verwendete Methode in Warteschlangen
Queue.qsize() gibt die Größe der Warteschlange zurück
Queue.empty() Wenn die Warteschlange leer ist, wird True zurückgegeben, andernfalls False
Queue.full() Wenn die Warteschlange voll ist, wird True zurückgegeben , andernfalls False
Queue.get([block[, timeout]]) Holen Sie sich die Warteschlange, Timeout-Wartezeit
Queue.get_nowait () Entspricht Queue.get(False)
Nicht blockierende Queue.put (item) In die Warteschlange schreiben, Timeout-Wartezeit
Queue.put_nowait(item) Äquivalent zu Queue.put(item, False)
2. Verwendung des Unterprozesskonzepts in der Mehrfachverarbeitung
aus Multiprocessing-Importprozess
Sie können einen Unterprozess durch Prozess erstellen
p = Prozess(target =fun,args=(args))
Dann verwenden Sie p.start(), um den untergeordneten Prozess zu starten
Dann verwenden Sie die p.join()-Methode, um die Ausführung des untergeordneten Prozesses zu beenden, bevor der übergeordnete Prozess ausgeführt wird
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'
3. Verwendung von Pool
in der Mehrfachverarbeitung Wenn Sie mehrere untergeordnete Prozesse benötigen, können Sie darüber nachdenken Verwenden eines Prozesspools (Pool) zur Verwaltung
aus dem Multiprocessing-Importpool
from multiprocessing import Pool import os, time def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
Die Methode des Pools Das Erstellen von Unterprozessen unterscheidet sich vom Prozess. Es wird durch
p.apply_async(func,args=(args)) implementiert Wenn mein Computer beispielsweise über 4 CPUs verfügt, können die Unterprozesse task0, task1, task2 und task3 gleichzeitig gestartet werden, und task4 wird gestartet, nachdem der vorherige Prozess beendet wurde
Das Ergebnis nach dem Ausführen des obigen Programms wird tatsächlich separat gemäß 1, 2 und 3 im obigen Bild ausgeführt. Drucken Sie zuerst 1, dann 2 nach 3 Sekunden und dann 3 nach 3 Sekunden
Code p.close() dient dazu, den Prozesspool zu schließen und ihm keine Prozesse mehr hinzuzufügen. Der Aufruf der Methode „join()“ für das Poolobjekt wartet darauf, dass alle untergeordneten Prozesse ausgeführt werden. ) muss vor dem Aufruf von join() aufgerufen werden. Nach dem Aufruf von close() können Sie keine weiteren Prozesse hinzufügen.
Sie können auch die Anzahl der Prozesse für einen Instanzpool zu diesem Zeitpunkt definieren
Wenn im obigen Code p=Pool(5) ist, können alle untergeordneten Prozesse gleichzeitig verarbeitet werden
3. Kommunikation zwischen mehreren Unterprozessen
Die Kommunikation zwischen mehreren Unterprozessen erfordert beispielsweise die Verwendung der im ersten Schritt erwähnten Warteschlange Die folgende Anforderung lautet: Ein Unterprozess schreibt Daten in die Warteschlange und ein anderer Prozess entnimmt Daten aus der Warteschlange:
#coding:gbk from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): while True: if not q.empty(): value = q.get(True) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 等待pw结束: pw.join() # 启动子进程pr,读取: pr.start() pr.join() # pr进程里是死循环,无法等待其结束,只能强行终止: print print '所有数据都写入并且读完'
4. Mehrere interessante Fragen zum obigen Code
if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() p = Pool() pw = p.apply_async(write,args=(q,)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
Wenn die Hauptfunktion wie im obigen Beispiel geschrieben ist, Was ich ursprünglich wollte, ist, dass ich eine Warteschlange bekomme, sie als Parameter an jeden untergeordneten Prozess im Prozesspool übergebe, aber
RuntimeError: Queue-Objekte sollten nur durch Vererbung zwischen Prozessen geteilt werden
Fehler, überprüft, die allgemeine Idee ist, dass das Warteschlangenobjekt nicht zwischen dem übergeordneten Prozess und dem untergeordneten Prozess kommunizieren kann. Wenn Sie die Warteschlange im Prozesspool verwenden möchten, müssen Sie die Manager-Klasse von Multiprozess verwendenif __name__=='__main__': manager = multiprocessing.Manager() # 父进程创建Queue,并传给各个子进程: q = manager.Queue() p = Pool() pw = p.apply_async(write,args=(q,)) time.sleep(0.5) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
Auf diese Weise kann dieses Warteschlangenobjekt zwischen dem übergeordneten Prozess und dem untergeordneten Prozess kommunizieren. Wenn Sie keinen Pool verwenden, benötigen Sie keinen Manager . Sie können die Manager-Klasse in Zukunft im Multiprozess erweitern
关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁
#coding:gbk from multiprocessing import Process,Queue,Pool import multiprocessing import os, time, random # 写数据进程执行的代码: def write(q,lock): lock.acquire() #加上锁 for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) lock.release() #释放锁 # 读数据进程执行的代码: def read(q): while True: if not q.empty(): value = q.get(False) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': manager = multiprocessing.Manager() # 父进程创建Queue,并传给各个子进程: q = manager.Queue() lock = manager.Lock() #初始化一把锁 p = Pool() pw = p.apply_async(write,args=(q,lock)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
更多Warteschlange und Multiprozess in Python相关文章请关注PHP中文网!