>백엔드 개발 >파이썬 튜토리얼 >Python 멀티스레딩이란 무엇이며 어떻게 사용하나요?

Python 멀티스레딩이란 무엇이며 어떻게 사용하나요?

WBOY
WBOY앞으로
2023-05-13 11:55:14956검색

스레드란 무엇인가요? 왜 그걸 원해요?

기본적으로 Python은 선형 언어이지만 더 많은 처리 능력이 필요할 때 스레딩 모듈이 유용합니다. Python의 스레드는 병렬 CPU 컴퓨팅에 사용할 수 없지만 프로세서가 유휴 상태에서 데이터를 기다리고 있기 때문에 웹 스크래핑과 같은 I/O 작업에 매우 적합합니다.

많은 네트워크/데이터 I/O 관련 스크립트가 원격 소스의 데이터를 기다리는 데 대부분의 시간을 소비하므로 스레드가 게임의 판도를 바꾸고 있습니다. 다운로드가 연결 해제될 수 있으므로(예: 별도의 웹사이트 크롤링) 프로세서는 여러 데이터 소스에서 병렬로 다운로드하고 마지막에 결과를 병합할 수 있습니다. CPU 집약적인 프로세스의 경우 스레드 모듈을 사용해도 이점이 거의 없습니다.

Python 멀티스레딩이란 무엇이며 어떻게 사용하나요?

다행히 스레딩은 표준 라이브러리에 포함되어 있습니다.

import threading
from queue import Queue
import time

target을 호출 가능한 개체로 사용하고 args를 사용하여 함수에 인수를 전달할 수 있습니다. start는 스레드를 시작합니다. target作为可调用对象,使用args将参数传递给函数,并start启动线程。

def testThread(num):
    print num

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=testThread, arg=(i,))
        t.start()

如果你以前从未见过if __name__ == '__main__':,这基本上是一种确保嵌套在其中的代码仅在脚本直接运行(而不是导入)时运行的方法。

同一操作系统进程的线程将计算工作负载分布到多个内核中,如C++和Java等编程语言所示。通常,python只使用一个进程,从该进程生成一个主线程来执行运行时。由于一种称为全局解释器锁(global interpreter lock)的锁定机制,它保持在单个核上,而不管计算机有多少核,也不管产生了多少新线程,这种机制是为了防止所谓的竞争条件。

Python 멀티스레딩이란 무엇이며 어떻게 사용하나요?

提到竞争,我想到了想到 NASCAR 和一级方程式赛车。让我们用这个类比,想象所有一级方程式赛车手都试图同时在一辆赛车上比赛。听起来很荒谬,对吧?,这只有在每个司机都可以使用自己的车的情况下才有可能,或者最好还是一次跑一圈,每次把车交给下一个司机。

这与线程中发生的情况非常相似。线程是从“主”线程“派生”的,每个后续线程都是前一个线程的副本。这些线程都存在于同一进程“上下文”(事件或竞争)中,因此分配给该进程的所有资源(如内存)都是共享的。例如,在典型的python解释器会话中:

>>> a = 8

在这里,a 通过让内存中的某个任意位置暂时保持值 8 来消耗很少的内存 (RAM)。

到目前为止一切顺利,让我们启动一些线程并观察它们的行为,当添加两个数字xy

import time
import threading
from threading import Thread

a = 8

def threaded_add(x, y):
    # simulation of a more complex task by asking
    # python to sleep, since adding happens so quick!
    for i in range(2):
        global a
        print("computing task in a different thread!")
        time.sleep(1)
        #this is not okay! but python will force sync, more on that later!
        a = 10
        print(a)

# the current thread will be a subset fork!
if __name__ != "__main__":
    current_thread = threading.current_thread()


# here we tell python from the main 
# thread of execution make others
if __name__ == "__main__":

    thread = Thread(target = threaded_add, args = (1, 2))
    thread.start()
    thread.join()
    print(a)
    print("main thread finished...exiting")
>>> computing task in a different thread!
>>> 10
>>> computing task in a different thread!
>>> 10
>>> 10
>>> main thread finished...exiting

两个线程当前正在运行。让我们把它们称为thread_onethread_two。如果thread_one想要用值10修改a,而thread_two同时尝试更新同一变量,我们就有问题了!将出现称为数据竞争的情况,并且a的结果值将不一致。

一场你没有看的赛车比赛,但从你的两个朋友那里听到了两个相互矛盾的结果!thread_one告诉你一件事,thread two

a = 8
# spawns two different threads 1 and 2
# thread_one updates the value of a to 10

if (a == 10):
  # a check

#thread_two updates the value of a to 15
a = 15
b = a * 2

# if thread_one finished first the result will be 20
# if thread_two finished first the result will be 30
# who is right?

if __name__ == '__main__': 이전에 본 적이 없다면 이는 기본적으로 Method 내에 중첩되어 있는지 확인하는 코드입니다. 스크립트를 직접 실행할 때만 실행됩니다(가져오지 않음).

잠금

동일한 운영 체제 프로세스의 스레드는 프로그래밍 언어 등 여러 코어에 컴퓨팅 작업 부하를 분산시킵니다. C++, Java 등이 표시됩니다. 일반적으로 Python은 런타임을 실행하기 위해 메인 스레드가 생성되는 하나의 프로세스만 사용합니다. 전역 인터프리터 잠금이라는 잠금 메커니즘으로 인해 컴퓨터의 코어 수나 생성된 새 스레드 수에 관계없이 단일 코어에 유지됩니다. 이는 소위 경쟁 조건을 방지하기 위한 것입니다.

Python 멀티스레딩이란 무엇이며 사용 방법

경쟁이라고 하면 NASCAR와 Formula One이 떠오릅니다. 이 비유를 사용하여 동시에 한 대의 자동차를 타고 경주하려고 하는 모든 Formula 1 드라이버를 상상해 봅시다. 말도 안되는 소리 같죠? 이는 각 운전자가 자신의 차에 접근할 수 있는 경우에만 가능합니다. 또는 더 나은 방법은 한 번에 한 바퀴씩 달리고 매번 다음 운전자에게 차를 넘겨주는 것입니다.

이는 스레드에서 발생하는 것과 매우 유사합니다. 스레드는 "기본" 스레드에서 "분기"되고, 각 후속 스레드는 이전 스레드의 복사본입니다. 이러한 스레드는 모두 동일한 프로세스 "컨텍스트"(이벤트 또는 경합)에 존재하므로 프로세스에 할당된 모든 리소스(예: 메모리)가 공유됩니다. 예를 들어 일반적인 Python 인터프리터 세션에서는

import sys
import gc

hello = "world" #reference to 'world' is 2
print (sys.getrefcount(hello))

bye = "world" 
other_bye = bye 
print(sys.getrefcount(bye)) 
print(gc.get_referrers(other_bye))

여기에서 a는 메모리의 임의 위치에 값 8을 임시로 유지하여 이를 수행합니다. 메모리(RAM)가 거의 없습니다. 🎜

지금까지는 훌륭했습니다. 일부 스레드를 시작하고 두 숫자 xy code>를 추가할 때의 동작을 관찰해 보겠습니다. 🎜<pre class="brush:php;toolbar:false">&gt;&gt;&gt; 4 &gt;&gt;&gt; 6 &gt;&gt;&gt; [['sys', 'gc', 'hello', 'world', 'print', 'sys', 'getrefcount', 'hello', 'bye', 'world', 'other_bye', 'bye', 'print', 'sys', 'getrefcount', 'bye', 'print', 'gc', 'get_referrers', 'other_bye'], (0, None, 'world'), {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': &lt;_frozen_importlib_external.sourcefileloader&gt;, '__spec__': None, '__annotations__': {}, '__builtins__': &lt;module&gt;, '__file__': 'test.py', '__cached__': None, 'sys': &lt;module&gt;, 'gc': &lt;module&gt;, 'hello': 'world', 'bye': 'world', 'other_bye': 'world'}]&lt;/module&gt;&lt;/module&gt;&lt;/module&gt;&lt;/_frozen_importlib_external.sourcefileloader&gt;</pre>rrree현재 두 개의 스레드가 실행 중입니다. <code>thread_onethread_two라고 부르겠습니다. thread_onea를 값 10으로 수정하려고 하고 thread_two가 동시에 동일한 변수를 업데이트하려고 하면 문제가 있는 것입니다! 데이터 경합이라는 조건이 발생하고 결과적으로 a 값이 일치하지 않게 됩니다. 🎜

보지도 않았지만 친구 두 명에게서 상반된 결과를 들었던 레이싱 이벤트! thread_one은 당신에게 무언가를 말하고, thread two는 그것을 반박합니다! 다음은 설명하기 위한 의사 코드 조각입니다. 🎜

import time, os
from threading import Thread, current_thread
from multiprocessing import current_process

COUNT = 200000000
SLEEP = 10

def io_bound(sec):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} \
           ---> Start sleeping...")
   time.sleep(sec)
   print(f"{pid} * {processName} * {threadName} \
           ---> Finished sleeping...")

def cpu_bound(n):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} \
           ---> Start counting...")
   while n>0:
          n -= 1
   print(f"{pid} * {processName} * {threadName} \
       ---> Finished counting...")

 def timeit(function,args,threaded=False):
      start = time.time()
      if threaded:
         t1 = Thread(target = function, args =(args, ))
         t2 = Thread(target = function, args =(args, ))
         t1.start()
         t2.start()
         t1.join()
         t2.join()
      else:
        function(args)
      end = time.time()
      print('Time taken in seconds for running {} on Argument {} is {}s -{}'.format(function,args,end - start,"Threaded" if threaded else "None Threaded"))

if __name__=="__main__":
      #Running io_bound task
      print("IO BOUND TASK NON THREADED")
      timeit(io_bound,SLEEP)

      print("IO BOUND TASK THREADED")
      #Running io_bound task in Thread
      timeit(io_bound,SLEEP,threaded=True)

      print("CPU BOUND TASK NON THREADED")
      #Running cpu_bound task
      timeit(cpu_bound,COUNT)

      print("CPU BOUND TASK THREADED")
      #Running cpu_bound task in Thread
      timeit(cpu_bound,COUNT,threaded=True)
🎜대체 무슨 일이 일어나고 있는 걸까요? 🎜🎜Python은 해석된 언어입니다. 즉, 다른 언어의 소스 코드를 구문 분석하는 프로그램인 해석기가 함께 제공됩니다! Python의 이러한 인터프리터에는 cpython, pypypy, Jpython 및 IronPython이 포함되며, 그중 cpython은 Python의 원래 구현입니다. 🎜🎜CPython은 C 및 기타 프로그래밍 언어와 외부 함수 인터페이스를 제공하는 인터프리터입니다. Python 소스 코드를 CPython 가상 머신에서 해석되는 중간 바이트코드로 컴파일합니다. 지금까지와 앞으로의 논의는 CPython과 환경에서의 동작 이해에 관한 것이었습니다. 🎜

内存模型和锁定机制

编程语言使用程序中的对象来执行操作。这些对象由基本数据类型组成,如stringintegerboolean。它们还包括更复杂的数据结构,如listclasses/objects。程序对象的值存储在内存中,以便快速访问。在程序中使用变量时,进程将从内存中读取值并对其进行操作。在早期的编程语言中,大多数开发人员负责他们程序中的所有内存管理。这意味着在创建列表或对象之前,首先必须为变量分配内存。在这样做时,你可以继续释放以“释放”内存。

在python中,对象通过引用存储在内存中。引用是对象的一种标签,因此一个对象可以有许多名称,比如你如何拥有给定的名称和昵称。引用是对象的精确内存位置。引用计数器用于python中的垃圾收集,这是一种自动内存管理过程。

在引用计数器的帮助下,python通过在创建或引用对象时递增引用计数器和在取消引用对象时递减来跟踪每个对象。当引用计数为0时,对象的内存将被释放。

import sys
import gc

hello = "world" #reference to 'world' is 2
print (sys.getrefcount(hello))

bye = "world" 
other_bye = bye 
print(sys.getrefcount(bye)) 
print(gc.get_referrers(other_bye))
>>> 4
>>> 6
>>> [['sys', 'gc', 'hello', 'world', 'print', 'sys', 'getrefcount', 'hello', 'bye', 'world', 'other_bye', 'bye', 'print', 'sys', 'getrefcount', 'bye', 'print', 'gc', 'get_referrers', 'other_bye'], (0, None, 'world'), {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.sourcefileloader>, '__spec__': None, '__annotations__': {}, '__builtins__': <module>, '__file__': 'test.py', '__cached__': None, 'sys': <module>, 'gc': <module>, 'hello': 'world', 'bye': 'world', 'other_bye': 'world'}]</module></module></module></_frozen_importlib_external.sourcefileloader>

需要保护这些参考计数器变量,防止竞争条件或内存泄漏。以保护这些变量;可以将锁添加到跨线程共享的所有数据结构中。

CPython 的 GIL 通过一次允许一个线程控制解释器来控制 Python 解释器。它为单线程程序提供了性能提升,因为只需要管理一个锁,但代价是它阻止了多线程 CPython 程序在某些情况下充分利用多处理器系统。

当用户编写python程序时,性能受CPU限制的程序和受I/O限制的程序之间存在差异。CPU通过同时执行许多操作将程序推到极限,而I/O程序必须花费时间等待I/O。

因此,只有多线程程序在GIL中花费大量时间来解释CPython字节码;GIL成为瓶颈。即使没有严格必要,GIL也会降低性能。例如,一个用python编写的同时处理IO和CPU任务的程序:

import time, os
from threading import Thread, current_thread
from multiprocessing import current_process

COUNT = 200000000
SLEEP = 10

def io_bound(sec):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} \
           ---> Start sleeping...")
   time.sleep(sec)
   print(f"{pid} * {processName} * {threadName} \
           ---> Finished sleeping...")

def cpu_bound(n):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} \
           ---> Start counting...")
   while n>0:
          n -= 1
   print(f"{pid} * {processName} * {threadName} \
       ---> Finished counting...")

 def timeit(function,args,threaded=False):
      start = time.time()
      if threaded:
         t1 = Thread(target = function, args =(args, ))
         t2 = Thread(target = function, args =(args, ))
         t1.start()
         t2.start()
         t1.join()
         t2.join()
      else:
        function(args)
      end = time.time()
      print('Time taken in seconds for running {} on Argument {} is {}s -{}'.format(function,args,end - start,"Threaded" if threaded else "None Threaded"))

if __name__=="__main__":
      #Running io_bound task
      print("IO BOUND TASK NON THREADED")
      timeit(io_bound,SLEEP)

      print("IO BOUND TASK THREADED")
      #Running io_bound task in Thread
      timeit(io_bound,SLEEP,threaded=True)

      print("CPU BOUND TASK NON THREADED")
      #Running cpu_bound task
      timeit(cpu_bound,COUNT)

      print("CPU BOUND TASK THREADED")
      #Running cpu_bound task in Thread
      timeit(cpu_bound,COUNT,threaded=True)
>>> IO BOUND TASK  NON THREADED
>>> 17244 * MainProcess * MainThread            ---> Start sleeping...
>>> 17244 * MainProcess * MainThread            ---> Finished sleeping...
>>> 17244 * MainProcess * MainThread            ---> Start sleeping...
>>> 17244 * MainProcess * MainThread            ---> Finished sleeping...
>>> Time taken in seconds for running <function> on Argument 10 is 20.036664724349976s -None Threaded
>>> IO BOUND TASK THREADED
>>> 10180 * MainProcess * Thread-1            ---> Start sleeping...
>>> 10180 * MainProcess * Thread-2            ---> Start sleeping...
>>> 10180 * MainProcess * Thread-1            ---> Finished sleeping...
>>> 10180 * MainProcess * Thread-2            ---> Finished sleeping...
>>> Time taken in seconds for running <function> on Argument 10 is 10.01464056968689s -Threaded
>>> CPU BOUND TASK NON THREADED
>>> 14172 * MainProcess * MainThread            ---> Start counting...
>>> 14172 * MainProcess * MainThread        ---> Finished counting...
>>> 14172 * MainProcess * MainThread            ---> Start counting...
>>> 14172 * MainProcess * MainThread        ---> Finished counting...
>>> Time taken in seconds for running <function> on Argument 200000000 is 44.90199875831604s -None Threaded
>>> CPU BOUND TASK THEADED
>>> 15616 * MainProcess * Thread-1            ---> Start counting...
>>> 15616 * MainProcess * Thread-2            ---> Start counting...
>>> 15616 * MainProcess * Thread-1        ---> Finished counting...
>>> 15616 * MainProcess * Thread-2        ---> Finished counting...
>>> Time taken in seconds for running <function> on Argument 200000000 is 106.09711360931396s -Threaded</function></function></function></function>

从结果中我们注意到,multithreading在多个IO绑定任务中表现出色,执行时间为10秒,而非线程方法执行时间为20秒。我们使用相同的方法执行CPU密集型任务。好吧,最初它确实同时启动了我们的线程,但最后,我们看到整个程序的执行需要大约106秒!然后发生了什么?这是因为当Thread-1启动时,它获取全局解释器锁(GIL),这防止Thread-2使用CPU。因此,Thread-2必须等待Thread-1完成其任务并释放锁,以便它可以获取锁并执行其任务。锁的获取和释放增加了总执行时间的开销。因此,可以肯定地说,线程不是依赖CPU执行任务的理想解决方案。

这种特性使并发编程变得困难。如果GIL在并发性方面阻碍了我们,我们是不是应该摆脱它,还是能够关闭它?。嗯,这并不容易。其他功能、库和包都依赖于GIL,因此必须有一些东西来取代它,否则整个生态系统将崩溃。这是一个很难解决的问题。

多进程

我们已经证实,CPython使用锁来保护数据不受竞速的影响,尽管这种锁存在,但程序员已经找到了一种显式实现并发的方法。当涉及到GIL时,我们可以使用multiprocessing库来绕过全局锁。多处理实现了真正意义上的并发,因为它在不同CPU核上跨不同进程执行代码。它创建了一个新的Python解释器实例,在每个内核上运行。不同的进程位于不同的内存位置,因此它们之间的对象共享并不容易。在这个实现中,python为每个要运行的进程提供了不同的解释器;因此在这种情况下,为多处理中的每个进程提供单个线程。

import os
import time
from multiprocessing import Process, current_process

SLEEP = 10
COUNT = 200000000

def count_down(cnt):
   pid = os.getpid()
   processName = current_process().name
   print(f"{pid} * {processName} \
           ---> Start counting...")
   while cnt > 0:
       cnt -= 1

def io_bound(sec):
   pid = os.getpid()
   threadName = current_thread().name
   processName = current_process().name
   print(f"{pid} * {processName} * {threadName} \
           ---> Start sleeping...")
   time.sleep(sec)
   print(f"{pid} * {processName} * {threadName} \
           ---> Finished sleeping...")

if __name__ == '__main__':
# creating processes
    start = time.time()

    #CPU BOUND
    p1 = Process(target=count_down, args=(COUNT, ))
    p2 = Process(target=count_down, args=(COUNT, ))

    #IO BOUND
    #p1 = Process(target=, args=(SLEEP, ))
    #p2 = Process(target=count_down, args=(SLEEP, ))

  # starting process_thread
    p1.start()
    p2.start()

  # wait until finished
    p1.join()
    p2.join()

    stop = time.time()
    elapsed = stop - start

    print ("The time taken in seconds is :", elapsed)
>>> 1660 * Process-2            ---> Start counting...
>>> 10184 * Process-1            ---> Start counting...
>>> The time taken in seconds is : 12.815475225448608

可以看出,对于cpu和io绑定任务,multiprocessing性能异常出色。MainProcess启动了两个子进程,Process-1Process-2,它们具有不同的PIDs,每个都执行将COUNT减少到零的任务。每个进程并行运行,使用单独的CPU内核和自己的Python解释器实例,因此整个程序执行只需12秒。

请注意,输出可能以无序的方式打印,因为过程彼此独立。这是因为每个进程都在自己的默认主线程中执行函数。

我们还可以使用asyncio库(上一节我已经讲过了,没看的可以返回到上一节去学习)绕过GIL锁。asyncio的基本概念是,一个称为事件循环的python对象控制每个任务的运行方式和时间。事件循环知道每个任务及其状态。就绪状态表示任务已准备好运行,等待阶段表示任务正在等待某个外部任务完成。在异步IO中,任务永远不会放弃控制,也不会在执行过程中被中断,因此对象共享是线程安全的。

import time
import asyncio

COUNT = 200000000

# asynchronous function defination
async def func_name(cnt):
       while cnt > 0:
           cnt -= 1

#asynchronous main function defination
async def main ():
  # Creating 2 tasks.....You could create as many tasks (n tasks)
  task1 = loop.create_task(func_name(COUNT))
  task2 = loop.create_task(func_name(COUNT))

  # await each task to execute before handing control back to the program
  await asyncio.wait([task1, task2])

if __name__ =='__main__':
  # get the event loop
  start_time = time.time()
  loop = asyncio.get_event_loop()
  # run all tasks in the event loop until completion
  loop.run_until_complete(main())
  loop.close()
  print("--- %s seconds ---" % (time.time() - start_time))
>>> --- 41.74118399620056 seconds ---

我们可以看到,asyncio需要41秒来完成倒计时,这比multithreading的106秒要好,但对于cpu受限的任务,不如multiprocessing的12秒。Asyncio创建一个eventloop和两个任务task1task2,然后将这些任务放在eventloop上。然后,程序await任务的执行,因为事件循环执行所有任务直至完成。

为了充分利用python中并发的全部功能,我们还可以使用不同的解释器。JPython和IronPython没有GIL,这意味着用户可以充分利用多处理器系统。

与线程一样,多进程仍然存在缺点:

  1. 数据在进程之间混洗会产生 I/O 开销

  2. 整个内存被复制到每个子进程中,这对于更重要的程序来说可能是很多开销

위 내용은 Python 멀티스레딩이란 무엇이며 어떻게 사용하나요?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제