Home >Backend Development >Python Tutorial >Concurrent programming with Python

Concurrent programming with Python

黄舟
黄舟Original
2016-12-16 11:52:591433browse

Letting computer programs run concurrently is a topic that is often discussed. Today I want to discuss various concurrency methods under Python.

Concurrency method

Thread

Multi-threading is the tool that almost every programmer will first think of when using every language (JS programmers please avoid it), using multi-threading can be effective Utilize CPU resources (except for Python). However, the complexity of programs brought by multi-threading is inevitable, especially the synchronization problem of competing resources.

However, due to the use of Global Interpretation Lock (GIL) in Python, the code cannot run concurrently on multiple cores at the same time. In other words, Python's multi-threading cannot be concurrent. Many people will find that using multi-threading can improve themselves. After adding the Python code, the running efficiency of the program has dropped. What a pain in the ass! If you want to know more details, I recommend reading this article. In fact, it is very difficult to use a multi-threaded programming model, and programmers can easily make mistakes. This is not the programmer's fault, because parallel thinking is anti-human, and most of us think serially (schizophrenia is not discussed) , and the computer architecture designed by von Neumann is also based on sequential execution. So if you can't always handle your multi-threaded program, congratulations, you are a normal-thinking programmer:)

Python provides two sets of thread interfaces, one is the thread module, which provides basic, low-level (Low) Level) interface, using Function as the running body of the thread. There is also a group of threading modules, which provide an easier-to-use object-based interface (similar to Java). You can inherit the Thread object to implement threads, and also provide other thread-related objects, such as Timer, Lock

Using the thread module Example

1

2

3

4

5

import thread

def worker():

"""thread worker function"""

PRint 'Worker'

thread.start_new_thread(worker)

Example of using threading module

1

2

3

4

5

6

import threading

def worker():

“""thread worker function"""

Print 'Worker'

t = threading.Thread(target=worker)

t.start()

or Java Style

1

2

3

4

5

6

7

8

9

10

import threading

class worker(thread ing.Thread):

def __init__(self) :

PASSf DEF RUN ():

"" Thread Worker Function "" "

Print 'worker'

t = worker ()

t.start ()

Process

Due to the global interpretation lock problem mentioned above, a better parallel method in Python is to use multiple processes, which can use CPU resources very effectively and achieve true concurrency. Of course, the overhead of processes is greater than that of threads, which means that if you want to create an alarming number of concurrent processes, you need to consider whether your machine has a strong heart.

Python’s mutliprocess module has a similar interface to threading.

1

2

3

4

5

6

7

8

from multiprocessing import Process

def worker():

"""thread worker function"""

 print 'Worker'

p = Process(target=worker)

p.start()

p.join()

🎜Since threads share the same address space and memory, So communication between threads is very easy, but communication between processes is more complicated. Common inter-process communications include pipes, message queues, Socket interfaces (TCP/IP), etc. 🎜🎜Python’s mutliprocess module provides encapsulated pipes and queues, which can easily transfer messages between processes. 🎜🎜The synchronization between Python processes uses locks, which is the same as threads. 🎜🎜In addition, Python also provides a process pool Pool object, which can easily manage and control threads. 🎜🎜Remote distributed host (Distributed Node)🎜

With the advent of the big data era, Moore's Theorem seems to have lost its effect on a single machine. Data calculation and processing require a distributed computer network to run. Programs run in parallel on multiple host nodes. This is already the case today. Issues that must be considered in software architecture.

There are several common methods of inter-process communication between remote hosts

TCP/IP

TCP/IP is the basis of all remote communication. However, the API is relatively low-level and cumbersome to use, so it is generally not considered

Remote Function Call

RPC is an early means of remote inter-process communication. There is an open source implementation RPyC under Python

Remote Object

Remote object is a higher-level encapsulation. The program can operate the local proxy of a remote object in the same way as a local object. CORBA is the most widely used specification for remote objects. The biggest benefit of CORBA is that it can communicate in different languages ​​and platforms. Different languages ​​and platforms also have their own remote object implementations, such as Java's RMI, MS's DCOM

Python's open source implementation, there are many supports for remote objects

Dopy

Fnorb (CORBA)

ICE

omniORB (CORBA)

Pyro

YAMI

Message Queue

Compared with RPC or remote objects, messages are a more flexible communication method. Common message mechanisms that support Python interfaces are

RabbitMQ

ZeroMQ

Kafka

AWS SQS + BOTO

There is no big difference between executing concurrency on the remote host and local multi-process, and both need to solve the problem of inter-process communication. Of course, the management and coordination of remote processes are more complicated than local ones.

There are many open source frameworks under Python to support distributed concurrency and provide effective management methods, including:

Celery

Celery is a very mature Python distributed framework that can execute asynchronously in a distributed system tasks and provide effective management and scheduling functions. Refer here

SCOOP

SCOOP (Scalable COncurrent Operations in Python) provides a simple and easy-to-use distributed calling interface, using the Future interface for concurrency.

Dispy

Compared with Celery and SCOOP, Dispy provides a more lightweight distributed parallel service

PP

PP (Parallel Python) is another lightweight Python parallel service, refer to here

Asyncoro

Asyncoro is another Python framework that uses Generator to achieve distributed concurrency.

Of course there are many other systems, I have not listed them one by one.

In addition, many distributed systems provide support for Python interfaces. For example, Spark

Pseudo-Thread

There is another concurrency method that is not common. We can call it pseudo-thread, which looks like a thread and uses an interface similar to a thread interface, but actually uses a non-threaded one. method, the corresponding thread overhead is not saved.

greenlet

greenlet provides lightweight coroutines to support in-process concurrency.

greenlet is a by-product of Stackless. It uses tasklet to support a technology called mirco-thread. Here is an example of pseudo-threading using greenlet

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

from greenlet import greenlet

def test1 ():

print 12

gr2.switch()

print 34

def test2():

print 56

gr1.switch()

Print 78

gr1 = greenlet( test1)

gr2 = greenlet(test2)

gr1.switch()

Run the above program to get the following results:

1

2

3

12

56

34

Pseudo thread gr1 switch will print 12, then call gr2 switch to get 56, then switch back to gr1, print 34, then pseudo thread gr1 ends and the program exits, so 78 will never be printed. From this example, we can see that using pseudo-threads, we can effectively control the execution flow of the program, but pseudo-threads do not have real concurrency.

Eventlet, gevent and concurence all provide concurrency based on greenlet.

eventlet http://eventlet.net/

eventlet is a Python library that provides network call concurrency. Users can call blocking IO operations in a non-blocking manner.

1

2

3

4

5

6

7

8

9

10

11

12

import eventlet

from eventlet.green import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def fetch(url):

return urllib2.urlopen(url).read()

EPool = Eventlet.greenPool ()

FOR BODY in Pool.imap (FETCH, URLS):

Print ("Got Body", Len (Body)

The execution results are as follows

1

2

3

('got body', 17629)

('got body', 1270)

('got body', 46949)

eventlet order urllib2 has been modified to support generator operations, and the interface is consistent with urllib2. The GreenPool here is consistent with Python's Pool interface.

gevent

gevent is similar to eventlet. You can refer to this article about their differences.

import gevent

from gevent import socket

urls = ['www.google.com', 'www.example.com', 'www.python.org']

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=2)

print [job.value for job in jobs]

The execution result is as follows:

1

[ '206.169.145.226', '93.184.216.34', '23.235.39.223']

concurence https://github.com/concurrence/concurrence

concurence is another greenlet Provide network concurrency I have never used the open source library, so you can try it yourself.

Practical Application

There are usually two situations where concurrency is needed. One is computationally intensive, which means your program requires a lot of CPU resources; the other is IO intensive, and the program may have a large number of reads. Write operations include reading and writing files, sending and receiving network requests, etc.

Computing intensive

Corresponding to computationally intensive applications, we choose the famous Monte Carlo algorithm to calculate the PI value. The basic principle is as follows

The Monte Carlo algorithm uses statistical principles to simulate and calculate pi. In a square, the probability of a random point falling in the 1/4 circle area (red point) is proportional to its area. That is, the probability p = Pi * R * R / 4: R * R, where R is the side length of the square and the radius of the circle. That is to say, the probability is 1/4 of pi. Using this conclusion, as long as we simulate the probability of a point falling on a quarter circle, we can know the pi. In order to get this probability, we can do a lot of experiments, and It is to generate a large number of points, see which area the points are in, and then calculate the results.

The basic algorithm is as follows:

1

2

3

4

5

Concurrent programming with Pythonfrom math import hypot

from random import random

def test(tries):

return sum(hypot(random(), random())

Here the test method does n (tries) tests and returns a quarter circle The number of points. The judgment method is to check the distance from the point to the center of the circle. If it is less than R, it is on the circle.

With a large amount of concurrency, we can quickly run multiple tests. The more tests we run, the closer the results will be to the true pi.

Here are the program codes for different concurrency methods

Non-concurrency

We first run in a single thread but process to see how the performance is

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

from math import hypot

from random import random

import eventlet

import time

def test(tries):

return sum(hypot(random(), random())

def calcPi(nbFutures, tries):

ts = time.time()

result = map(test, [tries] * nbFutures)

ret = 4. * sum(result) / float(nbFu tures * tries) span

Multi-threading

In order to use thread pool , we use the dummy package of multiprocessing, which is an encapsulation of multi-threading. Note that although the code here does not mention threads at all, it is definitely multi-threaded.

Through testing, we found that as expected, when the thread pool is 1, its running results are the same as when there is no concurrency. When we set the thread pool number to 5, it takes a long time. Almost 2 times faster than without concurrency, my test data went from 5 seconds to 9 seconds. So for computationally intensive tasks, it’s better to give up on multi-threading.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

from multiprocessing.dummy import Pool

from math import hypot

from random import random

import time

def test( tries):

return sum(hypot(random(), random())

def calcPi(nbFutures, tries):

ts = time.time()

p = Pool(1)

result = p.map(test, [tries] * nbFutures)

ret = 4. * sum(result) / float(nbFutures * tries)

span = time.time( ) - ts

print "time spend ", span

return ret

if __name__ == '__main__':

p = Pool()

print("pi = {}".format(calcPi( 3000, 4000)))

multiprocess multiprocess

Theoretically, for computationally intensive tasks, it is more appropriate to use multi-process concurrency. In the following example, the size of the process pool is set to 5. Modify The impact of the size of the process pool on the results can be seen. When the process pool is set to 1, the time required for the results of multi-threading is similar, because there is no concurrency at this time; when it is set to 2, the response time is significantly improved. The improvement is half of what it was before without concurrency; however, continuing to expand the process pool has little impact on performance, and may even decrease. Maybe my Apple Air's CPU only has two cores?

Be careful, if you set up a very large process pool, you will encounter Resource temporarily unavailable errors. The system cannot support the creation of too many processes. After all, resources are limited.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

from multiprocessing import Pool

from math import hypot

from random import random

import time

def test(tries):

return sum(hypot(random(), random())

def calcPi(nbFutures, tries):

ts = time.time()

p = Pool (5)

result = p.map(test, [tries] * nbFutures) ret = 4. * sum(result) / float(nbFutures * tries) span = time.time() - ts

print "time spend ", span

                                                        use  use with use using                      out out off         out’s' out's' out's  ‐  out's  's  ‐                                                         to be printed     

gevent (pseudo-thread)

Whether it is gevent or eventlet, because there is no actual concurrency, the response time is not much different from that without concurrency. This is consistent with the test results.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import gevent

from math import hypot

from random import random

import time

def test(tries):

Return sum(hypot(random(), random ())

def calcPi(nbFutures, tries):

ts = time.time()

jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]

gevent.joinall(jobs, timeout=2)

ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000,4000)

   

 

eventlet (伪线程)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

   

from math import hypot

from random import random

import eventlet

import time

  

def test(tries):

    return sum(hypot(random(), random())

  

def calcPi(nbFutures, tries):

    ts = time.time()

    pool = eventlet.GreenPool()

    result = pool.imap(test, [tries] * nbFutures)

      

    ret = 4. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000,4000)

   

 

SCOOP

SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

   

from math import hypot

from random import random

from scoop import futures

  

import time

  

def test(tries):

    return sum(hypot(random(), random())

  

def calcPi(nbFutures, tries):

    ts = time.time()

    expr = futures.map(test, [tries] * nbFutures)

    ret = 4. * sum(expr) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

if __name__ == "__main__":

    print("pi = {}".format(calcPi(3000, 4000)))

   

 

Celery

任务代码

1

2

3

4

5

6

7

8

9

10

11

   

from celery import Celery

  

from math import hypot

from random import random

   

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

   

@app.task

def test(tries):

    return sum(hypot(random(), random())

   

 

客户端代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

   

from celery import group

from tasks import test

  

import time

  

def calcPi(nbFutures, tries):

    ts = time.time()

    result = group(test.s(tries) for i in xrange(nbFutures))().get()

      

    ret = 4. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000, 4000)

   

The results of the concurrency test using Celery are unexpected (the environment is a single machine, 4frefork concurrency, and the message broker is rabbitMQ). It is the worst among all test cases. The response time is 5 to 6 times that without concurrency. This may be because the overhead of control coordination is too high. For such computing tasks, Celery may not be a good choice.

asyncoro

The test results of Asyncoro are consistent with non-concurrency.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

import asyncoro

from math import hypot

from random import random

import time

def test(tries):

​ yield sum(hypot( random(), random())

def calcPi(nbFutures, tries):

ts = time.time()

coros = [ asyncoro.Coro (test,t) for t in [tries] * nbFutures]

ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)

span = time.time( ) - ts

print "time spend ", span

return ret

print calcPi(3000,4000)

IO-intensive

IO-intensive tasks are another common use cases For example, a network WEB server is an example. How many requests it can handle per second is an important indicator of the WEB server.

Let’s take web page reading as the simplest example

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

from math import hypot

import time

import urllib2

urls = ['http://www.google.com' , 'http://www.example.com', 'http://www.python.org']

def test(url):

return urllib2.urlopen(url).read()

def testIO(nbFutures):

ts = time.time()

map(test, urls * nbFutures)

span = time.time() - ts

print time spend ", span

testIO(10)

The codes under different concurrency libraries are relatively similar, so I won’t list them one by one. You can refer to the computationally intensive code for reference.

Through testing, we can find that for IO-intensive tasks, using multi-threads or multi-processes can effectively improve the efficiency of the program. The performance improvement of using pseudo-threads is very significant. The eventlet is more responsive than without concurrency. Time increased from 9 seconds to 0.03 seconds. At the same time, eventlet/gevent provides a non-blocking asynchronous calling mode, which is very convenient. It is recommended to use threads or pseudo-threads here, because threads and pseudo-threads consume fewer resources when the response time is similar.

Summary

Python provides different concurrency methods. Corresponding to different scenarios, we need to choose different methods for concurrency. To choose the appropriate method, you must not only understand the principles of the method, but also do some tests and experiments. Data is the best reference for you to make your choice.

The above is the content of concurrent programming using Python. For more related articles, please pay attention to the PHP Chinese website (www.php.cn)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn