Heim > Artikel > Backend-Entwicklung > Gleichzeitige Programmierung mit Python
Die gleichzeitige Ausführung von Computerprogrammen ist ein oft diskutiertes Thema. Heute möchte ich verschiedene Parallelitätsmethoden unter Python diskutieren.
Parallelitätsmethoden
Thread
Multi-Threading ist das Tool (JS), an das fast jeder Programmierer zuerst denkt, wenn er jede Sprache verwendet. Programmierer vermeiden bitte die Verwendung von Multi -Threading kann CPU-Ressourcen effektiv nutzen (Python-Ausnahme). Die durch Multithreading verursachte Komplexität von Programmen ist jedoch unvermeidlich, insbesondere das Synchronisationsproblem konkurrierender Ressourcen.
Aufgrund der Verwendung von Global Interpretation Lock (GIL) in Python kann der Code jedoch nicht gleichzeitig auf mehreren Kernen ausgeführt werden. Mit anderen Worten, Pythons Multithreading kann nicht gleichzeitig ausgeführt werden Stellen Sie fest, dass die Ausführungseffizienz des Programms nach der Verwendung mehrerer Threads zur Verbesserung Ihres Python-Codes gesunken ist. Wenn Sie mehr Details erfahren möchten, empfehle ich Ihnen, diesen Artikel zu lesen. Tatsächlich ist es sehr schwierig, ein Multithread-Programmiermodell zu verwenden, und Programmierer können leicht Fehler machen. Dies ist nicht die Schuld des Programmierers, da paralleles Denken unmenschlich ist und die meisten von uns seriell denken (Schizophrenie wird nicht diskutiert). , und die von Neumann entworfene Computerarchitektur basiert ebenfalls auf sequentieller Ausführung. Wenn Sie also Ihr Multithread-Programm nicht immer fertigstellen können, herzlichen Glückwunsch, Sie sind ein normal denkender Programmierer:)
Python bietet zwei Sätze von Thread-Schnittstellen, einer ist das Thread-Modul, das grundlegende Low bereitstellt Die Level-Schnittstelle verwendet Function als laufenden Hauptteil des Threads. Eine weitere Gruppe ist das Threading-Modul, das eine benutzerfreundlichere objektbasierte Schnittstelle bereitstellt (ähnlich wie Java). Es kann Thread-Objekte erben, um Threads zu implementieren, und stellt auch andere Thread-bezogene Objekte wie Timer und Lock bereit 🎜>
Beispiel für die Verwendung des Thread-Moduls 1234 5 Thread importierendef worker(): """thread worker function""" PRint 'Worker 'thread.start_new_thread(worker) Beispiel für die Verwendung des Threading-Moduls 123456 Gewinde importierendef worker(): """thread worker function""" print 'Worker't = threading.Thread(target=worker) t.start() oder Java Style 12345678 910 Import Threadingclass worker(threading.Thread): def __init__(self): pass def run(): """thread worker function"" print 'Worker' t = worker()t.start() Prozess (Prozess) Aufgrund der globalen oben erwähnte Interpretationssperre In Bezug auf das Problem besteht eine bessere parallele Methode in Python darin, mehrere Prozesse zu verwenden, die CPU-Ressourcen sehr effektiv nutzen und echte Parallelität erreichen können. Natürlich ist der Overhead von Prozessen größer als der von Threads. Wenn Sie also eine alarmierende Anzahl gleichzeitiger Prozesse erstellen möchten, müssen Sie überlegen, ob Ihre Maschine über ein starkes Herz verfügt. Das Multiprocess-Modul von Python verfügt über eine ähnliche Schnittstelle wie Threading. 12345678 vom Multiprocessing-Importprozess def worker(): ""Thread-Worker-Funktion"" print 'Worker'p = Process(target=worker)p.start()p.join() Da Threads denselben Adressraum und Speicher teilen, ist die Kommunikation zwischen Threads sehr einfach . Die Kommunikation ist etwas komplizierter. Zu den üblichen prozessübergreifenden Kommunikationen gehören Pipes, Nachrichtenwarteschlangen, Socket-Schnittstellen (TCP/IP) usw. Das Multiprozessmodul von Python stellt gekapselte Pipes und Warteschlangen bereit, mit denen Nachrichten problemlos zwischen Prozessen übertragen werden können. Die Synchronisierung zwischen Python-Prozessen verwendet Sperren, die mit Threads identisch sind. Darüber hinaus stellt Python auch ein Prozesspool-Pool-Objekt bereit, mit dem Threads einfach verwaltet und gesteuert werden können. Remote verteilter Host (verteilter Knoten)
Mit dem Aufkommen des Big Data-Zeitalters scheint Moores Theorem seine Wirkung auf eine einzelne Maschine verloren zu haben. Die Datenberechnung und -verarbeitung erfordert ein verteiltes Computernetzwerk, und Programme laufen parallel auf mehreren Hostknoten bereits Probleme, die in der aktuellen Softwarearchitektur berücksichtigt werden müssen.
Es gibt mehrere gängige Methoden der prozessübergreifenden Kommunikation zwischen Remote-Hosts
TCP/IP
TCP/IP ist die Grundlage aller Remote-Kommunikation, die API ist jedoch relativ Die Verwendung ist relativ umständlich und wird daher im Allgemeinen nicht in Betracht gezogen
Remote Function Call
RPC ist ein frühes Mittel zur Remote-Interprozesskommunikation. Unter Python gibt es eine Open-Source-Implementierung namens RPyC
Remote Object
Remote Object ist eine Kapselung auf höherer Ebene. Das Programm kann den lokalen Proxy eines Remote-Objekts auf die gleiche Weise bedienen wie ein lokales Objekt. CORBA ist die am weitesten verbreitete Spezifikation für Remote-Objekte. Der größte Vorteil von CORBA besteht darin, dass es in verschiedenen Sprachen und Plattformen kommunizieren kann. Verschiedene Sprachen und Plattformen haben auch ihre eigenen Remote-Objekt-Implementierungen, wie etwa RMI von Java, DCOM von MS
Pythons Open-Source-Implementierung, es gibt viele Unterstützungen für Remote-Objekte
Dopy
Fnorb (CORBA)
ICE
omniORB (CORBA)
Pyro
YAMI
Nachrichtenwarteschlange
Im Vergleich zu RPC oder Remote-Objekten sind Nachrichten eine flexiblere Kommunikationsmethode, die Python-Schnittstellen unterstützt, darunter
RabbitMQ
ZeroMQ
Kafka
AWS SQS + BOTO
Es gibt keinen großen Unterschied zwischen der Ausführung von Parallelität auf dem Remote-Host und der Ausführung lokaler Multiprozesse, und beide müssen das Problem der Kommunikation zwischen Prozessen lösen. Natürlich ist die Verwaltung und Koordination von Remote-Prozessen komplizierter als bei lokalen.
Es gibt viele Open-Source-Frameworks unter Python, die verteilte Parallelität unterstützen und effektive Verwaltungsmethoden bereitstellen, darunter:
Celery
Celery ist ein sehr ausgereiftes verteiltes Python-Framework, das ausgeführt werden kann Aufgaben asynchron in einem verteilten System ausführen und effektive Verwaltungs- und Planungsfunktionen bereitstellen. Referenz hier
SCOOP
SCOOP (Scalable COncurrent Operations in Python) bietet eine einfache und benutzerfreundliche verteilte Aufrufschnittstelle, die die Future-Schnittstelle für Parallelität verwendet.
Dispy
Im Vergleich zu Celery und SCOOP bietet Dispy einen leichteren verteilten parallelen Dienst
PP
PP (Parallel Python) Es ist ein weiteres leichtes Python Paralleler Dienst, siehe hier
Asyncoro
Asyncoro ist ein weiteres Python-Framework, das Generator verwendet, um verteilte Parallelität zu erreichen.
Natürlich gibt es noch viele weitere Systeme. Andere Systeme habe ich nicht aufgelistet eins nach dem anderen
Darüber hinaus bieten viele verteilte Systeme Unterstützung für Python-Schnittstellen, wie z. B. Spark
Pseudo-Thread
Es gibt eine andere Parallelitätsmethode, die nicht üblich ist kann es als Pseudo-Threading bezeichnet werden, das wie ein Thread aussieht und eine Schnittstelle verwendet, die einer Thread-Schnittstelle ähnelt. Bei Verwendung einer Methode ohne Thread entsteht jedoch kein entsprechender Thread-Overhead.
Greenlet
Greenlet bietet leichtgewichtige Coroutinen zur Unterstützung der prozessinternen Parallelität.
Greenlet ist ein Nebenprodukt von Stackless. Es verwendet Tasklet, um eine Technologie namens Mirco-Thread zu unterstützen. Hier ist ein Beispiel für einen Pseudo-Thread, der Greenlet verwendet 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from greenlet import greenlet
def test1():
print 12
gr2.switch( )
print 34
def test2():
print 56
gr1.switch()
print 78
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
Führen Sie das obige Programm aus, um die folgenden Ergebnisse zu erhalten:
1
2
3
12
56
34
Pseudo-Thread gr1-Schalter Wird gedruckt 12, rufen Sie dann den Schalter gr2 auf, um 56 zu erhalten, wechseln Sie dann zurück zu gr1, drucken Sie 34, dann endet der Pseudo-Thread gr1, das Programm wird beendet, sodass 78 niemals gedruckt wird. Anhand dieses Beispiels können wir erkennen, dass wir mit Pseudo-Threads den Ausführungsfluss des Programms effektiv steuern können, Pseudo-Threads jedoch keine echte Parallelität aufweisen.
Eventlet, Gevent und Concurence basieren alle auf Greenlet, um Parallelität bereitzustellen.
eventlet http://eventlet.net/
eventlet ist eine Python-Bibliothek, die Netzwerkaufruf-Parallelität bietet. Benutzer können blockierende E/A-Vorgänge auf nicht blockierende Weise aufrufen.
1
2
3
4
5
6
7
8
9
10
11
12
Eventlet importieren
von eventlet.green import urllib2
urls = ['http://www.google.com', 'http://www. example.com', 'http://www.python.org']
def fetch(url):
return urllib2.urlopen(url).read ()
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
print("got body", len(body ))
Die Ausführungsergebnisse sind wie folgt
1
2
3
('got body', 17629)
('got body', 1270)
( 'got body ', 46949)
eventlet hat urllib2 geändert, um Generatoroperationen zu unterstützen, und die Schnittstelle stimmt mit urllib2 überein. Der GreenPool stimmt hier mit der Pool-Schnittstelle von Python überein.
gevent
gevent ähnelt eventlet. Informationen zu den Unterschieden finden Sie in diesem Artikel
1
2
3
4
5
6
7
gevent importieren
from gevent import socket
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent .spawn(socket.gethostbyname, url) für URL in URLs]
gevent.joinall(jobs, timeout=2)
print [job.value for job in Jobs]
Die Ausführungsergebnisse lauten wie folgt:
1
[ '206.169.145.226', '93.184.216.34', '23.235.39.223']
Zustimmung https:// github.com /concurrence/concurrence
Concurence ist eine weitere Open-Source-Bibliothek, die Greenlets verwendet, um Netzwerk-Parallelität bereitzustellen. Sie können es selbst ausprobieren.
Praktische Anwendung
Es gibt normalerweise zwei Situationen, in denen Parallelität erforderlich ist: Eine ist rechenintensiv, was bedeutet, dass Ihr Programm viele CPU-Ressourcen benötigt. Die andere ist möglicherweise ein IO-intensiver Typ Es gibt eine große Anzahl von Lese- und Schreibvorgängen, einschließlich Lesen und Schreiben von Dateien, Senden und Empfangen von Netzwerkanforderungen usw.
Rechenintensiv
Entsprechend rechenintensiven Anwendungen wählen wir den berühmten Monte-Carlo-Algorithmus zur Berechnung des PI-Werts. Das Grundprinzip ist wie folgt:
Der Monte-Carlo-Algorithmus verwendet statistische Prinzipien, um Pi zu simulieren und zu berechnen. In einem Quadrat fällt ein zufälliger Punkt in die 1/4-Kreisfläche (. rot Die Wahrscheinlichkeit eines Punktes ist proportional zu seiner Fläche. Das heißt, die Wahrscheinlichkeit p = Pi * R * R / 4: R * R, wobei R die Seitenlänge des Quadrats und der Radius des Kreises ist. Das heißt, die Wahrscheinlichkeit beträgt 1/4 von Pi, solange wir die Wahrscheinlichkeit eines Punktes, der auf einen Viertelkreis fällt, simulieren können Es gibt viele Experimente, und es geht darum, eine große Anzahl von Punkten zu generieren, zu sehen, in welchem Bereich sich die Punkte befinden, und dann die Ergebnisse zu berechnen.
Der Grundalgorithmus ist wie folgt:
1
2
3
4
5
aus mathematischem Import-Hypot
aus zufälligem Import-Zufallstyp
def-Test(versuche):
return sum(hypot(random(), random())
Die Testmethode hier führt n(Versuche) Mal durch und gibt die Anzahl der Punkte zurück, die in einen Viertelkreis fallen. Die Beurteilungsmethode besteht darin, den Abstand vom Punkt zum Mittelpunkt des Kreises zu überprüfen. Wenn er kleiner als R ist, liegt er auf dem Kreis.
Mit viel Parallelität können wir schnell mehrere Experimente durchführen. Je mehr Experimente, desto näher liegen die Ergebnisse am wahren Pi.
Hier sind die Programmcodes für verschiedene Parallelitätsmethoden
Nicht-Parallelität
Wir führen zunächst einen einzelnen Thread aus, führen dann aber einen Prozess durch, um zu sehen, wie die Leistung ist
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
von Math Import Hypot
von zufälliger Import zufällig
Eventlet importieren
Zeitpunkt importieren
def test(tries):
return sum(hypot(random( ), random())
def calcPi(nbFutures, trys):
ts = time.time()
Ergebnis = Map(Test, [Versuche] * nbFutures)
ret = 4. * Summe(Ergebnis) / Float(nbFutures * Versuche)
span = time.time() - ts
print „time spend“, span
return ret
print calcPi(3000,4000 )
Multi-Thread-Thread
Um den Thread-Pool zu nutzen, verwenden wir das Dummy-Paket von Multiprocessing, das eine Kapselung von ist Multithreading. Beachten Sie, dass der Code hier zwar überhaupt keine Threads erwähnt, es sich aber definitiv um Multithreading handelt.
Durch Tests haben wir erwartungsgemäß festgestellt, dass die Ausführungsergebnisse bei einem Thread-Pool von 1 die gleichen sind wie bei fehlender Parallelität. Wenn wir die Thread-Pool-Nummer auf 5 setzen, dauert es fast doppelt so lange wie ohne Parallelität, und meine Testdaten reichen von 5 Sekunden bis 9 Sekunden. Bei rechenintensiven Aufgaben ist es daher besser, auf Multithreading zu verzichten.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
aus multiprocessing.dummy import Pool
aus math import hypot
aus random import random
Importzeit
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, Versuche):
ts = time.time()
p = Pool(1)
result = p.map(test, [tries] * nbFutures)
ret = 4. * sum(result) / float(nbFutures * trys)
span = time. time() - ts
print „time spend“, span
return ret
if __name__ == '__main__':
p = Pool()
print("pi = {}".format(calcPi(3000, 4000)))
Multiprozess-Multiprozess
Theoretisch ist es für rechenintensive Aufgaben sinnvoller, Multiprozess-Parallelität zu verwenden. Im folgenden Beispiel ist die Größe des Prozesspools auf 5 festgelegt. und die Größe des Prozesspools wird geändert. Wenn der Prozesspool auf 1 gesetzt ist, ist die Zeit, die für die Ergebnisse des Multithreadings erforderlich ist, ähnlich, da zu diesem Zeitpunkt keine Parallelität besteht. Bei der Einstellung 2 wurde die Reaktionszeit erheblich verbessert, d.
Seien Sie vorsichtig, wenn Sie einen sehr großen Prozesspool einrichten, treten vorübergehend nicht verfügbare Fehler auf. Das System kann die Erstellung zu vieler Prozesse nicht unterstützen .
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
aus Multiprocessing-Importpool
aus Mathe-Import-Hypot
aus zufälligem Import
Importzeitpunkt
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, trys):
ts = time.time()
p = Pool(5)
result = p.map (test, [tries] * nbFutures)
ret = 4. * sum(result) / float(nbFutures * trys)
span = time.time() - ts
print „time spend“, span
return ret
if __name__ == '__main__':
print("pi = {} ".format(calcPi(3000, 4000)))
gevent (pseudo-thread)
Ob es ist gevent Es handelt sich immer noch um ein Eventlet, da es keine tatsächliche Parallelität gibt und sich die Antwortzeit nicht wesentlich von der ohne Parallelität unterscheidet. Dies steht im Einklang mit den Testergebnissen.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
gevent importieren
aus mathematischem Import-Hypot
aus zufälligem Import zufällig
Importzeit
def test(tries):
Rückgabesumme (hypot(random(), random())
def calcPi(nbFutures, trys):
ts = time.time()
jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
gevent.joinall(jobs, timeout=2)
ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * trys)
span = time.time() - ts
print "time spend", span
return ret
print calcPi(3000, 4000)
Eventlet (伪线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
aus dem Mathe-Importhypot
aus dem Zufallsimport
Eventlet importieren
Zeitpunkt importieren
def test(tries):
return sum(hypot(random(), random()) < ; 1 for _ in range(tries))
def calcPi(nbFutures, trys):
ts = time.time()
pool = eventlet.GreenPool()
result = pool.imap(test, [tries] * nbFutures)
ret = 4. * sum(result) / float( nbFutures * versuche)
span = time.time() - ts
print "time spend", span
return-ret
print calcPi(3000,4000)
SCOOP
SCOOP中的Future接口符合PEP-3148的定义, 也就是在Python3中提供的Future接口.
在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
aus dem Mathe-Import-Hypot
aus dem Zufallsimport
aus dem Scoop-Import von Futures
Importzeit
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, Versuche):
ts = time.time()
expr = futures.map(test, [tries] * nbFutures)
ret = 4. * sum(expr) / float(nbFutures * tries)
span = time.time() - ts
print "time spend ", span
return ret
if __name__ == "__main__":
print("pi = {}".format(calcPi(3000, 4000)))
Sellerie
任务代码
1
2
3
4
5
6
7
8
9
10
11
aus Sellerieimport
='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
@ app.task
def test(tries):
return sum(hypot(random(), random())
客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
aus der Sellerie-Importgruppe
aus Aufgaben importieren Test
Importzeit
def calcPi(nbFutures, trys):
ts = time.time()
result = group(test.s(tries) for i in xrange(nbFutures))().get()
ret = 4. * sum(result) / float(nbFutures * tries)
span = time.time() - ts
print "time spend", span
return ret
print calcPi(3000, 4000)
Die Ergebnisse des Parallelitätstests mit Celery waren unerwartet (die Umgebung war eine einzelne Maschine, 4frefork-Parallelität und der Nachrichtenbroker war RabbitMQ). Die Reaktionszeit betrug das Fünf- bis Sechsfache ohne Parallelität. Dies kann daran liegen, dass der Aufwand für die Steuerungskoordination zu hoch ist. Für solche Rechenaufgaben ist Celery möglicherweise keine gute Wahl.
asyncoro
Die Testergebnisse von Asyncoro stimmen mit der Nicht-Parallelität überein.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Asyncoro importieren
aus mathematischem Importhypoten
aus zufälligem Import zufällig
Importzeit
def test(tries):
yield sum(hypot(random(), random())
def calcPi(nbFutures, trys):
ts = time.time()
coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]
ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * trys)
span = time.time() - ts
print „time spend“, span
return ret
print calcPi(3000,4000)
IO-intensive
IO-intensive Aufgaben sind ein weiterer häufiger Anwendungsfall, wie zum Beispiel Netzwerk-WEB-Server. Wie viele Anfragen pro Sekunde verarbeitet werden können, ist ein wichtiger Indikator für den WEB-Server.
Nehmen wir das Lesen von Webseiten als einfachstes Beispiel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
aus Mathematik importieren Hypot
Importzeit
import urllib2
urls = ['http://www.google.com' , ' http://www.example.com', 'http://www.python.org']
def test(url):
return urllib2. urlopen(url).read()
def testIO(nbFutures):
ts = time.time()
map(test , urls * nbFutures)
span = time.time() - ts
print „time spend“, span
testIO (10)
Die Codes unter verschiedenen Parallelitätsbibliotheken sind relativ ähnlich, daher werde ich sie nicht einzeln auflisten. Als Referenz können Sie auf den rechenintensiven Code verweisen.
Durch Tests können wir feststellen, dass die Verwendung von Multi-Threads oder Multi-Prozessen die Effizienz des Programms erheblich verbessern kann besser als diejenigen ohne Parallelität, die Reaktionszeit verbesserte sich von 9 Sekunden auf 0,03 Sekunden. Gleichzeitig bietet eventlet/gevent einen nicht blockierenden asynchronen Aufrufmodus, was sehr praktisch ist. Hier empfiehlt sich die Verwendung von Threads bzw. Pseudo-Threads, da Threads und Pseudo-Threads bei ähnlicher Antwortzeit weniger Ressourcen verbrauchen.
Zusammenfassung
Python bietet verschiedene Parallelitätsmethoden. Entsprechend unterschiedlichen Szenarien müssen wir unterschiedliche Methoden für die Parallelität auswählen. Um die geeignete Methode auszuwählen, müssen Sie nicht nur die Prinzipien der Methode verstehen, sondern auch einige Tests und Experimente durchführen. Daten sind die beste Referenz für Ihre Wahl.
Das Obige ist der Inhalt der gleichzeitigen Programmierung mit Python. Weitere verwandte Artikel finden Sie auf der chinesischen PHP-Website (www.php.cn)!