컴퓨터 프로그램을 동시에 실행하는 것은 자주 논의되는 주제입니다. 오늘은 Python에서의 다양한 동시성 방법에 대해 논의하고 싶습니다.
동시성 방법
스레드
멀티스레딩은 거의 모든 프로그래머가 모든 언어를 사용할 때 가장 먼저 떠올리는 도구(JS)입니다. -스레딩은 CPU 리소스를 효과적으로 활용할 수 있습니다(Python 예외). 그러나 멀티스레딩으로 인해 발생하는 프로그램의 복잡성, 특히 경쟁 리소스의 동기화 문제는 불가피합니다.
그러나 Python에서는 GIL(Global Interpretation Lock)을 사용하기 때문에 여러 코어에서 동시에 코드를 실행할 수 없습니다. 즉, Python의 멀티스레딩은 동시에 실행될 수 없습니다. 다중 사용 스레드를 사용하여 Python 코드를 개선한 후에는 프로그램의 실행 효율성이 정말 떨어졌습니다. 더 자세한 내용을 알고 싶으시다면 이 글을 읽어보시길 추천드립니다. 사실, 다중 스레드 프로그래밍 모델을 사용하는 것은 매우 어렵고 프로그래머는 쉽게 실수할 수 있습니다. 이는 프로그래머의 잘못이 아닙니다. 왜냐하면 병렬 사고는 반인간적이고 우리 대부분은 연속적으로 사고하기 때문입니다(정신분열증은 논의되지 않습니다). , von Neumann이 설계한 컴퓨터 아키텍처도 순차 실행을 기반으로 합니다. 따라서 멀티스레드 프로그램을 항상 완료할 수 없다면 축하합니다. 당신은 평범한 프로그래머입니다. :)
Python은 두 가지 스레드 인터페이스 세트를 제공합니다. 하나는 기본 Low를 제공하는 스레드 모듈입니다. 레벨 인터페이스는 Function을 스레드의 실행 본문으로 사용합니다. 또 다른 그룹은 사용하기 쉬운 객체 기반 인터페이스(Java와 유사)를 제공하는 스레딩 모듈입니다. 이 모듈은 Thread 객체를 상속하여 스레드를 구현할 수 있으며 Timer, Lock
스레드 모듈 사용 예 1234 5 스레드 가져오기def Worker(): """스레드 작업자 함수""" PRint 'Worker 'thread.start_new_thread(worker) 스레딩 모듈 사용 예 1 23456 스레딩 가져오기def 작업자(): """스레드 작업자 함수""" print 'Worker't = threading.Thread(target=worker) t.start() 또는 Java 스타일 12345678 910 스레딩 가져오기클래스 작업자(threading.Thread): def __init__(self): 통과 def run(): """스레드 작업자 함수"" 'Worker' 인쇄 t = Worker()t.start() 프로세스(Process) 글로벌 환경으로 인해 위에서 언급한 해석 잠금 문제와 관련하여 Python에서 더 나은 병렬 방법은 CPU 리소스를 매우 효과적으로 사용하고 진정한 동시성을 달성할 수 있는 여러 프로세스를 사용하는 것입니다. 물론 프로세스의 오버헤드는 스레드의 오버헤드보다 크므로, 엄청난 수의 동시 프로세스를 생성하려면 머신의 심장이 강한지 고려해야 합니다. Python의 다중 프로세스 모듈은 스레딩과 유사한 인터페이스를 가지고 있습니다. 12345678 from multiprocessing import Process def Worker(): """스레드 작업자 함수""" print 'Worker'p = Process(target=worker)p.start()p.join() 스레드는 동일한 주소 공간과 메모리를 공유하므로 스레드 간 통신은 매우 쉽습니다. . 의사소통은 좀 더 복잡합니다. 일반적인 프로세스 간 통신에는 파이프, 메시지 큐, 소켓 인터페이스(TCP/IP) 등이 포함됩니다. Python의 다중 프로세스 모듈은 프로세스 간에 메시지를 쉽게 전송할 수 있는 캡슐화된 파이프와 큐를 제공합니다. Python 프로세스 간의 동기화에는 스레드와 동일한 잠금을 사용합니다. 또한 Python은 스레드를 쉽게 관리하고 제어할 수 있는 프로세스 풀 Pool 개체도 제공합니다. 원격 분산호스트(분산노드)
빅 데이터 시대가 도래하면서 무어의 정리는 단일 시스템에서는 그 효과를 잃은 것 같습니다. 데이터 계산 및 처리를 위해서는 분산된 컴퓨터 네트워크가 필요하며, 프로그램은 여러 호스트 노드에서 병렬로 실행됩니다. 이미 현재 소프트웨어 아키텍처에서 고려해야 할 문제입니다.
원격 호스트 간 프로세스 간 통신에는 여러 가지 일반적인 방법이 있습니다.
TCP/IP
TCP/IP는 모든 원격 통신의 기본이지만 API는 상대적으로 상대적으로 사용이 번거로워 일반적으로 고려되지 않습니다
원격 함수 호출
RPC는 원격 프로세스 간 통신의 초기 수단입니다. Python에는 RPyC
Remote Object
Remote object라는 오픈 소스 구현이 있습니다. 이 프로그램은 더 높은 수준의 캡슐화입니다. 프로그램은 원격 개체의 로컬 프록시를 동일한 방식으로 작동할 수 있습니다. 로컬 객체. CORBA는 원격 객체에 가장 널리 사용되는 사양입니다. CORBA의 가장 큰 장점은 다양한 언어와 플랫폼으로 통신할 수 있다는 것입니다. 다양한 언어와 플랫폼에는 Java의 RMI, MS의 DCOM
Python의 오픈 소스 구현과 같은 자체 원격 개체 구현이 있으며 원격 개체에 대한 많은 지원이 있습니다
Dopy
Fnorb(CORBA)
ICE
omniORB(CORBA)
Pyro
YAMI
메시지 대기열
RPC 또는 원격 개체와 비교할 때 메시지는 Python 인터페이스를 지원하는 일반적인 메시지 메커니즘에는
RabbitMQ
ZeroMQ
Kafka
AWS SQS + BOTO원격 호스트에서 동시 실행을 실행하는 것과 로컬 멀티 프로세스를 실행하는 것에는 큰 차이가 없으며 둘 다 프로세스 간 통신 문제를 해결해야 합니다. 물론 원격 프로세스의 관리 및 조정은 로컬 프로세스보다 더 복잡합니다. Python에는 분산 동시성을 지원하고 효과적인 관리 방법을 제공하는 다음과 같은 많은 오픈 소스 프레임워크가 있습니다. CeleryCelery는 매우 성숙한 Python 분산 프레임워크로, 실행 가능 분산 시스템에서 작업을 비동기적으로 수행하고 효과적인 관리 및 스케줄링 기능을 제공합니다. 여기 참조SCOOPSCOOP(Scalable COncurrent Operations in Python)는 동시성을 위해 Future 인터페이스를 사용하여 간단하고 사용하기 쉬운 분산 호출 인터페이스를 제공합니다. DispyDispy는 Celery, SCOOP에 비해 더 가벼운 분산 병렬 서비스를 제공합니다. PPPP(Parallel Python) 또 다른 경량 Python입니다. 병렬 서비스는 여기를 참조하세요AsyncoroAsyncoro는 분산 동시성을 달성하기 위해 Generator를 사용하는 또 다른 Python 프레임워크입니다. 물론 다른 시스템을 나열하지 않은 것이 더 많습니다. 하나씩또한 많은 분산 시스템은 SparkPseudo-Thread와 같은 Python 인터페이스에 대한 지원을 제공합니다. 우리는 또 다른 동시성 방법이 있습니다. 스레드처럼 보이고 스레드 인터페이스와 유사한 인터페이스를 사용하는 의사 스레딩이라고 부를 수 있습니다. 그러나 스레드가 아닌 메서드를 사용하는 경우 해당 스레드 오버헤드가 없습니다. greenletgreenlet은 프로세스 내 동시성을 지원하는 경량 코루틴을 제공합니다. Greenlet은 Tasklet을 사용하여 mirco-thread라는 기술을 지원합니다. 다음은 greenlet을 사용하는 의사 스레드의 예입니다 12345678 91011121314
15 from greenlet import greenlet def test1(): print 12 gr2.switch( ) 인쇄 34 def test2(): 인쇄 56 gr1.switch() 78 인쇄 gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch() 위 프로그램을 실행하면 다음 결과를 얻을 수 있습니다. 12 3 125634 유사 스레드 gr1 스위치가 인쇄됩니다. 12, 그런 다음 gr2 스위치를 호출하여 56을 얻은 다음 gr1로 다시 전환하고 34를 인쇄한 다음 의사 스레드 gr1이 종료되고 프로그램이 종료되므로 78은 인쇄되지 않습니다. 이 예에서 의사 스레드를 사용하면 프로그램의 실행 흐름을 효과적으로 제어할 수 있지만 의사 스레드에는 실제 동시성이 없다는 것을 알 수 있습니다. Eventlet, gevent 및 concurence는 모두 Greenlet을 기반으로 하여 동시성을 제공합니다. eventlet http://eventlet.net/eventlet은 네트워크 호출 동시성을 제공하는 Python 라이브러리입니다. 사용자는 비차단 방식으로 차단 IO 작업을 호출할 수 있습니다. 123456789101112 eventlet 가져오기from eventlet.green import urllib2 urls = ['http://www.google.com', 'http://www. example.com', 'http://www.python.org'] def fetch(url): return urllib2.urlopen(url).read ()
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
print("got body", len(body ))
실행 결과는 다음과 같습니다
1
2
3
('몸이 있어요', 17629)
('몸이 있어요', 1270)
( 'got body', 46949)
eventlet은 생성기 작업을 지원하기 위해 urllib2를 수정했으며 인터페이스는 urllib2와 일치합니다. 여기서 GreenPool은 Python의 Pool 인터페이스와 일치합니다.
gevent
gevent는 eventlet과 유사합니다. 차이점은 이 글을 참조하세요
1
2
3
4
5
6
7
gevent 가져오기
gevent 가져오기 소켓
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent URL의 URL에 대한 .spawn(socket.gethostbyname, url)]
gevent.joinall(jobs, timeout=2)
인쇄 [job.value for job in jobs]
실행 결과는 다음과 같습니다.
1
[ '206.169.145.226', '93.184.216.34', '23.235.39.223']
동의 https:// github.com /concurrence/concurrence
concurence는 greenlet을 사용하여 네트워크 동시성을 제공하는 또 다른 오픈 소스 라이브러리입니다. 직접 사용해 볼 수 있습니다.
실용 적용
일반적으로 동시성이 필요한 상황은 두 가지입니다. 하나는 계산 집약적입니다. 즉, 프로그램에 많은 CPU 리소스가 필요하고, 다른 하나는 프로그램이 IO 집약적이라는 의미입니다. 파일 읽기 및 쓰기, 네트워크 요청 보내기 및 받기 등을 포함하여 많은 수의 읽기 및 쓰기 작업이 있습니다.
계산 집약적
계산 집약적 애플리케이션에 대응하여 유명한 Monte Carlo 알고리즘을 선택하여 PI 값을 계산합니다. 기본 원리는 다음과 같습니다
몬테카를로 알고리즘은 통계 원리를 사용하여 pi를 시뮬레이션하고 계산합니다. 정사각형에서 임의의 점이 1/4 원 영역( 빨간색 점)의 확률은 해당 영역에 비례합니다. 즉, 확률 p = Pi * R * R / 4: R * R, 여기서 R은 정사각형의 변의 길이와 원의 반지름입니다. 즉, 확률은 pi의 1/4입니다. 이 결론을 사용하여 점이 1/4원에 떨어질 확률을 시뮬레이션하면 이 확률을 얻기 위해 다음을 수행할 수 있습니다. 많은 실험을 하고, 많은 수의 포인트를 생성하고, 포인트가 어느 영역에 있는지 확인하고, 결과를 계산하는 것입니다.
기본 알고리즘은 다음과 같습니다.
1
2
3
4
5
from math import hypot
from 무작위 가져오기 무작위
def 테스트(시도):
return sum(hypot(random(), random())
여기에서 테스트 방법을 확인하세요. n( 시도) 번 수행하고 1/4 원 내에 속하는 포인트 수를 반환합니다. 판단 방법은 점에서 원의 중심까지의 거리를 확인하여 R보다 작으면 원 위에 있는 것입니다.
많은 양의 동시성을 통해 여러 실험을 빠르게 실행할 수 있으며, 실험이 많을수록 결과는 실제 pi에 더 가까워집니다.
다음은 다양한 동시성 방법에 대한 프로그램 코드입니다
비동시성
처음에는 단일 스레드에서 실행하지만 성능이 어떤지 확인하는 과정을 거칩니다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from math import hypot
from 무작위 가져오기 무작위
가져오기 이벤트렛
가져오기 시간
def test(tries):
return sum(hypot(random( ), 무작위())
def calcPi(nbFutures, try):
ts = time.time()
결과 = map(테스트, [시도] * nbFutures)
ret = 4. * sum(결과) / float(nbFutures * 시도)
범위 = time.time() - ts
인쇄 "시간 소비", 범위
반환 ret
인쇄 calcPi(3000,4000 )
멀티 스레드
스레드 풀을 사용하기 위해 멀티프로세싱의 더미 패키지를 사용합니다. 멀티스레딩. 여기 코드에서는 스레드에 대해 전혀 언급하지 않지만 확실히 다중 스레드라는 점에 유의하세요.
테스트를 통해 예상대로 스레드 풀이 1일 때 실행 결과가 동시성이 없을 때와 동일하다는 것을 발견했습니다. 스레드 풀 번호를 5로 설정하면 그렇지 않은 경우보다 거의 두 배의 시간이 걸립니다. 동시성이 있으며 테스트 데이터의 범위는 5초에서 9초입니다. 따라서 계산 집약적인 작업의 경우 멀티스레딩을 포기하는 것이 좋습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing.dummy import Pool
from math import hypot
fromrandomimportrandom
가져오기 시간
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, try):
ts = time.time()
p = Pool(1)
결과 = p.map(테스트, [시도] * nbFutures)
ret = 4. * 합계(결과) / float(nbFutures * 시도)
범위 = 시간. time() - ts
인쇄 "시간 소비", 범위
return ret
if __name__ == '__main__':
p = Pool()
print("pi = {}".format(calcPi(3000, 4000)))
다중 프로세스 다중 프로세스
이론적으로 컴퓨팅 집약적인 작업의 경우 다중 프로세스 동시성을 사용하는 것이 더 적합합니다. 다음 예에서는 프로세스 풀의 크기가 5로 설정됩니다. , 프로세스 풀의 크기가 수정되어 결과에 미치는 영향을 확인할 수 있습니다. 프로세스 풀을 1로 설정하면 이때 동시성이 없기 때문에 멀티스레딩 결과에 필요한 시간이 비슷해집니다. ; 2로 설정하면 응답 시간이 크게 향상되었습니다. 즉, 이전에는 동시성이 절반도 없었지만 프로세스 풀을 계속 확장해도 성능에 거의 영향을 미치지 않거나 심지어 내 Apple Air의 CPU에 코어가 2개만 있을 수도 있습니다. ?
매우 큰 프로세스 풀을 설정하면 시스템이 너무 많은 프로세스 생성을 지원할 수 없다는 오류가 발생하므로 주의하세요. .
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool
from math import hypot
fromrandomimportrandom
가져오기 시간
def 테스트(시도):
return sum(hypot(random(), random())
def calcPi(nbFutures, trys):
ts = time.time()
p = Pool(5)
result = p.map (테스트, [시도] * nbFutures)
ret = 4. * sum(result) / float(nbFutures * trys)
span = time.time() - ts
"시간 소비" 인쇄, 범위
return ret
if __name__ == '__main__':
print("pi = {} ".format(calcPi(3000, 4000)))
gevent(의사 스레드)
여부 is gevent 실제 동시성이 없기 때문에 여전히 이벤트렛이며, 동시성이 없는 경우와 응답 시간이 크게 다르지 않습니다. 이는 테스트 결과와 일치합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
gevent 가져오기
from math import hypot
from random import random
가져오기 시간
def test(tries):
return sum (hypot(random(), random())
def calcPi(nbFutures, trys):
ts = time.time()
jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
gevent.joinall(jobs, timeout=2)
ret = 4. * sum([jobs의 작업에 대한 job.value]) / float(nbFutures * trys)
span = time.time() - ts
인쇄 "시간 소비", 범위
return ret
인쇄 calcPi(3000, 4000)
eventlet (伪线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from math import hypot
from random import random
가져오기 이벤트
가져오기 시간
def 테스트(시도):
return sum(hypot(random(), random()) < ; 1 범위 내(시도)
def calcPi(nbFutures, trys):
ts = time.time()
풀 = eventlet.GreenPool()
결과 = pool.imap(test, [tries] * nbFutures)
ret = 4. * sum(결과) / float( nbFutures * 시도)
span = time.time() - ts
인쇄 "시간 소비", 범위
return ret
calcPi(3000,4000) 인쇄
SCOOP
SCOOP中的Future接口符합PEP-3148적也就是는 Python3中提供的Future接口.
현재 SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置의 더 많은 정보 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from math import hypot
from random import random
from scoop import futures
가져오기 시간
def 테스트(시도):
return sum(hypot(random(), random())
def calcPi(nbFutures, try):
ts = time.time()
expr = futures.map(test, [tries] * nbFutures)
ret = 4. * sum(expr) / float(nbFutures * tries)
span = time.time() - ts
print "시간 소비", 범위
return ret
if __name__ == "__main__":
print("pi = {}".format(calcPi(3000, 4000)))
셀러리
任务代码
1
2
3
4
5
6
7
8
9
10
11
셀러리 수입 셀러리
from math import hypot
from random import random
app = Celery('tasks', backend='amqp', Broker ='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
@ app.task
def 테스트(시도):
return sum(hypot(random(), random())
客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
셀러리 수입 그룹
작업 가져오기 테스트
가져오기 시간
def calcPi(nbFutures, trys):
ts = time.time()
결과 = group(test.s(tries) for i in xrange(nbFutures))().get()
ret = 4. * sum(결과) / float(nbFutures * tries)
span = time.time() - ts
인쇄 "시간 소비", 범위
return ret
calcPi(3000, 4000) 인쇄
Celery를 사용한 동시성 테스트 결과는 예상치 못한 것이었습니다(환경은 단일 머신, 4frefork 동시성, 메시지 브로커는 RabbitMQ였습니다). 응답 시간은 전체 테스트 사례 중 5~6배였습니다. 동시성 없이. 이는 제어 조정의 오버헤드가 너무 높기 때문일 수 있습니다. 이러한 컴퓨팅 작업의 경우 Celery는 좋은 선택이 아닐 수 있습니다.
asyncoro
Asyncoro의 테스트 결과는 비동시성과 일치합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncoro
from math import hypot
from 무작위 가져오기 무작위
가져오기 시간
def 테스트(시도):
Yield sum(hypot(random(), random())
def calcPi(nbFutures, trys):
ts = time.time()
coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]
ret = 4. * sum([coros의 작업에 대한 job.value()]) / float(nbFutures * try)
span = time.time() - ts
"시간 소비", 범위 인쇄
return ret
print calcPi(3000,4000)
IO 집약적
IO 집약적인 작업은 네트워크 웹 서버와 같은 또 다른 일반적인 사용 사례입니다. 초당 처리할 수 있는 요청 수는 웹 서버의 중요한 지표입니다.
가장 간단한 예로 웹 페이지 읽기를 들어보겠습니다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from math import hypot
가져오기 시간
import urllib2
urls = ['http://www.google.com' , ' http://www.example.com', 'http://www.python.org']
def test(url):
return urllib2. urlopen(url).read()
def testIO(nbFutures):
ts = time.time()
map(test , urls * nbFutures)
span = time.time() - ts
인쇄 "시간 소비", 범위
testIO (10)
다른 동시성 라이브러리에 있는 코드는 비교적 유사하므로 하나씩 나열하지는 않겠습니다. 참조용으로 계산 집약적인 코드를 참조할 수 있습니다.
테스트를 통해 IO 집약적인 작업의 경우 다중 스레드 또는 다중 프로세스를 사용하면 의사 스레드를 사용하여 성능을 효과적으로 향상시킬 수 있다는 것을 알 수 있습니다. 동시성이 없는 경우보다 응답 시간이 9초에서 0.03초로 향상되었습니다. 동시에, eventlet/gevent는 매우 편리한 비차단 비동기 호출 모드를 제공합니다. 여기서는 스레드나 의사 스레드를 사용하는 것이 좋습니다. 스레드와 의사 스레드는 응답 시간이 비슷할 때 더 적은 리소스를 소비하기 때문입니다.
요약
Python은 다양한 시나리오에 따라 다양한 동시성 방법을 제공합니다. 적절한 방법을 선택하려면 방법의 원리를 이해해야 할 뿐만 아니라 몇 가지 테스트와 실험도 수행해야 합니다. 데이터는 선택을 위한 가장 좋은 참고 자료입니다.
위 내용은 Python을 이용한 동시 프로그래밍 내용입니다. 더 많은 관련 글은 PHP 중국어 홈페이지(www.php.cn)를 참고해주세요!