ホームページ >バックエンド開発 >Python チュートリアル >Pythonスレッドの学習記録について
このシナリオを考えてみましょう。処理するデータは 10,000 個ありますが、各データの読み取りには 0.1 秒しかかかりません。他の。どうすれば最短時間で実行できるでしょうか?
マルチスレッド (MT)プログラミングが登場する前は、コンピューター プログラムの実行は実行シーケンスで構成されていました。 、実行シーケンスは「シーケンスはホストの中央処理装置 (CPU) で実行されます。タスク自体が逐次実行を必要とするか、プログラム全体が複数のサブタスクで構成されているかに関係なく、サブタスクが独立している場合でも、プログラムはこの方法で実行されます。」 (つまり、1 つのサブタスクの結果が他のサブタスクの結果に影響を与えない場合にも同じことが当てはまります) 上記の問題について、完了するために実行シーケンスを使用すると、約 10000*0.1 + かかります。 10000 = 11000 秒は明らかに長すぎます。
計算を実行しながらデータを取得することはできますか? 可能であれば、タスクの効率が大幅に向上します。本質的に非同期であり、複数の同時トランザクションが必要であり、各トランザクションの実行順序が不確実でランダムで予測不可能な問題の場合、マルチスレッドが理想的なソリューションであり、それぞれが達成する目標を持って複数の実行ストリームに分割されます。次に、結果を結合して最終結果を取得します
スレッドとプロセス
プロセスとは
スレッド ステータス に示すように。図では
スレッドには開始、順次実行、終了の 3 つの部分があります。スレッドの実行はプリエンプト (中断) されるか、一時的に中断されます。 (スリープとも呼ばれます)、これは、プロセス内の各スレッドが同じデータ空間を共有することと呼ばれます。そのため、スレッド間でデータを共有し、相互に通信する方が簡単です。もちろん、このような共有には危険がまったくないわけではありません。複数のスレッドが同じデータに同時にアクセスすると、データ アクセスの順序が異なるために問題が発生し、これが競合と呼ばれる原因となります。 はマルチスレッド用に特別な変更を行わないと完了前にブロックされるため、この「貪欲な」関数は CPU 時間の割り当てを傾けます。その結果、各スレッドに割り当てられる実行時間が同じにならない可能性があり、これは不公平です。Python、スレッド、およびグローバル インタープリター ロック
グローバル インタープリター ロック (GIL)
まず明確にしておきたいのは、GIL は Python の機能ではなく、Python パーサー (CPython) を実装するときに導入された概念であるということです。 C++ が一連の言語 (文法) 標準であるのと同様に、さまざまなコンパイラを使用して実行可能コードにコンパイルできます。 CPython、PyPy、Psyco などの異なる Python 実行環境を通じて、同じコードを実行できます (このうち JPython には GIL がありません)。
それでは、CPython 実装の GIL とは何でしょうか? GIL の正式名は Global Interpreter Lock です。誤解を避けるために、公式の説明を見てみましょう:
CPython では、グローバル インタープリタ ロック (GIL) は、複数のネイティブ スレッドをPython はマルチスレッド プログラミングを完全にサポートしていますが、インタープリターの C 言語 実装部分は、完全に並列実行された場合にはスレッド セーフ ではありません。 実際、インタープリターはグローバル インタープリター ロックによって保護されており、常に 1 つの Python スレッドのみが実行されることが保証されます。
マルチスレッド環境では、Python 仮想マシンは次のように実行されます:
GIL をセットアップ
実行するスレッドに切り替え
実行
バイトコードの数を指定命令
スレッドは積極的に制御を放棄します(time.sleep(0)を呼び出すことができます)
スレッドをスリープ状態に設定します
GILのロックを解除します
上記の手順を繰り返しますもう一度
すべての I/O 指向プログラム (組み込みオペレーティング システムの C コードを呼び出す) では、GIL は I/O 呼び出しの前に解放され、このスレッドが I/O を待機している間に他のスレッドを実行できるようになります。ああ。スレッドが多くの I/O 操作を使用しない場合、スレッドは独自のタイム スライスでプロセッサ (および GIL) を占有します。つまり、I/O 集中型の Python プログラムは、コンピューティング集中型のプログラムよりもマルチスレッド環境を最大限に活用できます。
スレッドは計算を終了すると終了します。スレッドは、thread.exit() などの終了関数を呼び出すことも、sys.exit() などの Python の標準プロセス終了メソッドを使用することも、SystemExit 例外をスローすることもできます。ただし、スレッドを直接「強制終了」することはできません。
Python は、Win32 や Linux、Solaris、MacOS、*BSD などのほとんどの Unix 系システムで実行する場合、マルチスレッド プログラミングをサポートします。 Python は、pthread と呼ばれる POSIX 準拠のスレッドを使用します。
デフォルトでは、インタープリタ
>> import thread
にエラーがない限り、スレッドが使用可能であることを意味します。
Python は、スレッド、スレッド、キューなど、マルチスレッド プログラミング用のいくつかのモジュールを提供します。スレッドおよびスレッド モジュールを使用すると、プログラマはスレッドを作成および管理できます。スレッド モジュールは基本的なスレッドとロックのサポートを提供し、スレッド化はより高レベルで強力なスレッド管理機能を提供します。 Queue モジュールを使用すると、ユーザーは複数のスレッド間でデータを共有するために使用できる queue データ構造を作成できます。
以下の考慮事項により、thread モジュールの使用はお勧めしません。
上位レベルのスレッド モジュールはより高度で、スレッドのより完全なサポートを備えており、スレッド モジュールで 属性を使用すると、スレッドと競合する可能性があります。第 2 に、低レベルのスレッド モジュールには同期プリミティブがほとんどなく (実際には 1 つだけ)、スレッド モジュールには多くの同期プリミティブがあります。
プロセスをいつ終了するかを制御することはできません。メインスレッドが終了すると、すべてのスレッドが警告や通常のクリーンアップなしで強制的に終了されます。前に述べたように、少なくともスレッド化モジュールは、重要な子スレッドが終了した後にプロセスが終了することを保証できます。
スレッドの生成に加えて、スレッドモジュールは基本的な同期データ構造ロックオブジェクト(ロックオブジェクト、プリミティブロック、シンプルロック、ミューテックスロック、ミューテックス、バイナリセマフォとも呼ばれます)も提供します。
スレッドモジュール関数
start_new_thread(function, args, kwargs=None): 新しいスレッドを生成し、指定されたパラメーターとオプションの kwargs を使用して新しいスレッドでこの関数を呼び出します。
allocate_lock(): LockType 型のロックオブジェクトを割り当てます
exit(): スレッドを終了します
acquire(wait=None): ロックオブジェクトの取得を試みます
locked( ): ロック オブジェクトが取得された場合は True を返し、それ以外の場合は False を返します
release(): ロックを解放します
以下はスレッドの使用例です:
import thread from time import sleep, time def loop(num): print('start loop at:', time()) sleep(num) print('loop done at:', time()) def loop1(num): print('start loop 1 at:', time()) sleep(num) print('loop 1 done at:', time()) def main(): print('starting at:', time()) thread.start_new_thread(loop, (4,)) thread.start_new_thread(loop1, (5,)) sleep(6) print('all DONE at:', time()) if name == 'main': main() ('starting at:', 1489387024.886667) ('start loop at:', 1489387024.88705) ('start loop 1 at:', 1489387024.887277) ('loop done at:', 1489387028.888182) ('loop 1 done at:', 1489387029.888904) ('all DONE at:', 1489387030.889918)
start_new_thread() には最初の2 つのパラメータ。したがって、実行したい関数がパラメータを取らない場合でも、空のタプルを渡す必要があります。
なぜ文 sleep(6) を追加するのでしょうか? メインスレッドを停止しないと、メインスレッドは次のステートメントを実行し、「すべて完了」と表示され、実行中のloop() とloop1 ( ) の 2 つのスレッドが終了しました。
我们有没有更好的办法替换使用sleep() 这种不靠谱的同步方式呢?答案是使用锁,使用了锁,我们就可以在两个线程都退出之后马上退出。
#! -*- coding: utf-8 -*- import thread from time import sleep, time loops = [4, 2] def loop(nloop, nsec, lock): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) # 每个线程都会被分配一个事先已经获得的锁,在 sleep()的时间到了之后就释放 相应的锁以通知主线程,这个线程已经结束了。 lock.release() def main(): print('starting at:', time()) locks = [] nloops = range(len(loops)) for i in nloops: # 调用 thread.allocate_lock()函数创建一个锁的列表 lock = thread.allocate_lock() # 分别调用各个锁的 acquire()函数获得, 获得锁表示“把锁锁上” lock.acquire() locks.append(lock) for i in nloops: # 创建线程,每个线程都用各自的循环号,睡眠时间和锁为参数去调用 loop()函数 thread.start_new_thread(loop, (i, loops[i], locks[i])) for i in nloops: # 在线程结束的时候,线程要自己去做解锁操作 # 当前循环只是坐在那一直等(达到暂停主 线程的目的),直到两个锁都被解锁为止才继续运行。 while locks[i].locked(): pass print('all DONE at:', time()) if name == 'main': main()
为什么我们不在创建锁的循环里创建线程呢?有以下几个原因:
我们想到实现线程的同步,所以要让“所有的马同时冲出栅栏”。
获取锁要花一些时间,如果你的 线程退出得“太快”,可能会导致还没有获得锁,线程就已经结束了的情况。
threading 模块不仅提供了 Thread 类,还 供了各 种非常好用的同步机制。
下面是threading 模块里所有的对象:
Thread: 表示一个线程的执行的对象
Lock: 锁原语对象(跟 thread 模块里的锁对象相同)
RLock: 可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定)。
Condition: 条件变量对象能让一个线程停下来,等待其它线程满足了某个“条件”。 如,状态的改变或值的改变。
Event: 通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后, 所有的线程都会被激活。
Semaphore: 为等待锁的线程 供一个类似“等候室”的结构
BoundedSemaphore: 与 Semaphore 类似,只是它不允许超过初始值
Timer: 与 Thread 相似,只是,它要等待一段时间后才开始运行。
另一个避免使用 thread 模块的原因是,它不支持守护线程。当主线程退出时,所有的子线程不 论它们是否还在工作,都会被强行退出。有时,我们并不期望这种行为,这时,就引入了守护线程 的概念
threading 模块支持守护线程,它们是这样工作的:守护线程一般是一个等待客户请求的服务器, 如果没有客户 出请求,它就在那等着。如果你设定一个线程为守护线程,就表示你在说这个线程 是不重要的,在进程退出的时候,不用等待这个线程退出。
如果你的主线程要退出的时候,不用等待那些子线程完成,那就设定这些线程的 daemon 属性。 即,在线程开始(调用 thread.start())之前,调用 setDaemon()函数设定线程的 daemon 标志 (thread.setDaemon(True))就表示这个线程“不重要”
如果你想要等待子线程完成再退出,那就什么都不用做,或者显式地调用 thread.setDaemon(False)以保证其 daemon 标志为 False。你可以调用 thread.isDaemon()函数来判 断其 daemon 标志的值。新的子线程会继承其父线程的 daemon 标志。整个 Python 会在所有的非守护 线程退出后才会结束,即进程中没有非守护线程存在的时候才结束。
Thread类提供了以下方法:
run(): 用以表示线程活动的方法。
start():启动线程活动。
join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
is_alive(): 返回线程是否活动的。
name(): 设置/返回线程名。
daemon(): 返回/设置线程的 daemon 标志,一定要在调用 start()函数前设置
用 Thread 类,你可以用多种方法来创建线程。我们在这里介绍三种比较相像的方法。
创建一个Thread的实例,传给它一个函数
创建一个Thread的实例,传给它一个可调用的类对象
从Thread派生出一个子类,创建一个这个子类的实例
下边是三种不同方式的创建线程的示例:
#! -*- coding: utf-8 -*- # 创建一个Thread的实例,传给它一个函数 import threading from time import sleep, time loops = [4, 2] def loop(nloop, nsec, lock): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) # 每个线程都会被分配一个事先已经获得的锁,在 sleep()的时间到了之后就释放 相应的锁以通知主线程,这个线程已经结束了。 def main(): print('starting at:', time()) threads = [] nloops = range(len(loops)) for i in nloops: t = threading.Thread(target=loop, args=(i, loops[i])) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。 # 使用 join()看上去 会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为"spinlock") threads[i].join() # threads to finish print('all DONE at:', time()) if name == 'main': main()
与传一个函数很相似的另一个方法是在创建线程的时候,传一个可调用的类的实例供线程启动 的时候执行——这是多线程编程的一个更为面向对象的方法。相对于一个或几个函数来说,由于类 对象里可以使用类的强大的功能,可以保存更多的信息,这种方法更为灵活
#! -*- coding: utf-8 -*- # 创建一个 Thread 的实例,传给它一个可调用的类对象 from threading import Thread from time import sleep, time loops = [4, 2] class ThreadFunc(object): def init(self, func, args, name=""): self.name = name self.func = func self.args = args def call(self): # 创建新线程的时候,Thread 对象会调用我们的 ThreadFunc 对象,这时会用到一个特殊函数 call()。 self.func(*self.args) def loop(nloop, nsec): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) def main(): print('starting at:', time()) threads = [] nloops = range(len(loops)) for i in nloops: t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.name)) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。 # 使用 join()看上去 会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为"spinlock") threads[i].join() # threads to finish print('all DONE at:', time()) if name == 'main': main()
最后一个例子介绍如何子类化 Thread 类,这与上一个例子中的创建一个可调用的类非常像。使 用子类化创建线程(第 29-30 行)使代码看上去更清晰明了。
#! -*- coding: utf-8 -*- # 创建一个 Thread 的实例,传给它一个可调用的类对象 from threading import Thread from time import sleep, time loops = [4, 2] class MyThread(Thread): def init(self, func, args, name=""): super(MyThread, self).init() self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): # 创建新线程的时候,Thread 对象会调用我们的 ThreadFunc 对象,这时会用到一个特殊函数 call()。 print 'starting', self.name, 'at:', time() self.res = self.func(*self.args) print self.name, 'finished at:', time() def loop(nloop, nsec): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) def main(): print('starting at:', time()) threads = [] nloops = range(len(loops)) for i in nloops: t = MyThread(loop, (i, loops[i]), loop.name) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()会等到线程结束,或者在给了 timeout 参数的时候,等到超时为止。 # 使用 join()看上去 会比使用一个等待锁释放的无限循环清楚一些(这种锁也被称为"spinlock") threads[i].join() # threads to finish print('all DONE at:', time()) if name == 'main': main()
除了各种同步对象和线程对象外,threading 模块还 供了一些函数。
active_count(): 当前活动的线程对象的数量
current_thread(): 返回当前线程对象
enumerate(): 返回当前活动线程的列表
settrace(func): 为所有线程设置一个跟踪函数
setprofile(func): 为所有线程设置一个 profile 函数
原语锁定是一个同步原语,状态是锁定或未锁定。两个方法acquire()和release() 用于加锁和释放锁。
RLock 可重入锁是一个类似于Lock对象的同步原语,但同一个线程可以多次调用。
Lock 不支持递归加锁,也就是说即便在同 线程中,也必须等待锁释放。通常建议改 RLock, 它会处理 "owning thread" 和 "recursion level" 状态,对于同 线程的多次请求锁 为,只累加
计数器。每次调 release() 将递减该计数器,直到 0 时释放锁,因此 acquire() 和 release() 必须 要成对出现。
from time import sleep from threading import current_thread, Thread lock = Rlock() def show(): with lock: print current_thread().name, i sleep(0.1) def test(): with lock: for i in range(3): show(i) for i in range(2): Thread(target=test).start()
事件用于在线程间通信。一个线程发出一个信号,其他一个或多个线程等待。
Event 通过通过 个内部标记来协调多线程运 。 法 wait() 阻塞线程执 ,直到标记为 True。 set() 将标记设为 True,clear() 更改标记为 False。isSet() 用于判断标记状态。
from threading import Event def test_event(): e = Event() def test(): for i in range(5): print 'start wait' e.wait() e.clear() # 如果不调用clear(),那么标记一直为 True,wait()就不会发生阻塞行为 print i Thread(target=test).start() return e e = test_event()
条件变量和 Lock 参数一样,也是一个,也是一个同步原语,当需要线程关注特定的状态变化或事件的发生时使用这个锁定。
可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
构造方法:
Condition([lock/rlock])
Condition 有以下这些方法:
acquire([timeout])/release(): 调用关联的锁的相应方法。
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
from threading import Condition, current_thread, Thread con = Condition() def tc1(): with con: for i in range(5): print current_thread().name, i sleep(0.3) if i == 3: con.wait() def tc2(): with con: for i in range(5): print current_thread().name, i sleep(0.1) con.notify() Thread(target=tc1).start() Thread(target=tc2).start() Thread-1 0 Thread-1 1 Thread-1 2 Thread-1 3 # 让出锁 Thread-2 0 Thread-2 1 Thread-2 2 Thread-2 3 Thread-2 4 Thread-1 4 # 重新获取锁,继续执
只有获取锁的线程才能调用 wait() 和 notify(),因此必须在锁释放前调用。
当 wait() 释放锁后,其他线程也可进入 wait 状态。notifyAll() 激活所有等待线程,让它们去抢锁然后完成后续执行。
现在我们用一个经典的(生产者消费者)例子来介绍一下 Queue模块。
生产者消费者的场景是: 生产者生产货物,然后把货物放到一个队列之类的数据结构中,生产货物所要花费的时间无法预先确定。消费者消耗生产者生产的货物的时间也是不确定的。
常用的 Queue 模块的属性:
queue(size): 创建一个大小为size的Queue对象。
qsize(): 返回队列的大小(由于在返回的时候,队列可能会被其它线程修改,所以这个值是近似值)
empty(): 如果队列为空返回 True,否则返回 False
full(): 如果队列已满返回 True,否则返回 False
put(item,block=0): 把item放到队列中,如果给了block(不为0),函数会一直阻塞到队列中有空间为止
get(block=0): 从队列中取一个对象,如果给了 block(不为 0),函数会一直阻塞到队列中有对象为止
Queue 模块可以用来进行线程间通讯,让各个线程之间共享数据。
现在,我们创建一个队列,让 生产者(线程)把新生产的货物放进去供消费者(线程)使用。
#! -*- coding: utf-8 -*- from Queue import Queue from random import randint from time import sleep, time from threading import Thread class MyThread(Thread): def init(self, func, args, name=""): super(MyThread, self).init() self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): # 创建新线程的时候,Thread 对象会调用我们的 ThreadFunc 对象,这时会用到一个特殊函数 call()。 print 'starting', self.name, 'at:', time() self.res = self.func(*self.args) print self.name, 'finished at:', time() # writeQ()和 readQ()函数分别用来把对象放入队列和消耗队列中的一个对象。在这里我们使用 字符串'xxx'来表示队列中的对象。 def writeQ(queue): print 'producing object for Q...' queue.put('xxx', 1) print "size now", queue.qsize() def readQ(queue): queue.get(1) print("consumed object from Q... size now", queue.qsize()) def writer(queue, loops): # writer()函数只做一件事,就是一次往队列中放入一个对象,等待一会,然后再做同样的事 for i in range(loops): writeQ(queue) sleep(1) def reader(queue, loops): # reader()函数只做一件事,就是一次从队列中取出一个对象,等待一会,然后再做同样的事 for i in range(loops): readQ(queue) sleep(randint(2, 5)) # 设置有多少个线程要被运行 funcs = [writer, reader] nfuncs = range(len(funcs)) def main(): nloops = randint(10, 20) q = Queue(32) threads = [] for i in nfuncs: t = MyThread(funcs[i], (q, nloops), funcs[i].name) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print threads[i].getResult() print 'all DONE' if name == 'main': main()
进程(有时被称为重量级进程)是程序的一次 执行。每个进程都有自己的地址空间,内存,数据栈以及其它记录其运行轨迹的辅助数据。
线程(有时被称为轻量级进程)跟进程有些相似,不同的是,所有的线程运行在同一个进程中, 共享相同的运行环境。它们可以想像成是在主进程或“主线程”中并行运行的“迷你进程”。
这篇文章很好的解释了 线程和进程的区别,推荐阅读: http://www.ruanyifeng.com/blo...
由于GIL的缘故,对所有面向 I/O 的(会调用内建的操作系统 C 代码的)程序来说,GIL 会在这个 I/O 调用之 前被释放,以允许其它的线程在这个线程等待 I/O 的时候运行。如果某线程并未使用很多 I/O 操作, 它会在自己的时间片内一直占用处理器(和 GIL)。也就是说,I/O 密集型的 Python 程序比计算密集 型的程序更能充分利用多线程环境的好处。
Python的线程就是C语言的一个pthread,并通过操作系统调度算法进行调度(例如linux是CFS)。为了让各个线程能够平均利用CPU时间,python会计算当前已执行的微代码数量,达到一定阈值后就强制释放GIL。而这时也会触发一次操作系统的线程调度(当然是否真正进行上下文切换由操作系统自主决定)。
伪代码
while True: acquire GIL for i in 1000: do something release GIL /* Give Operating System a chance to do thread scheduling */
这种模式在只有一个CPU核心的情况下毫无问题。任何一个线程被唤起时都能成功获得到GIL(因为只有释放了GIL才会引发线程调度)。
但当CPU有多个核心的时候,问题就来了。从伪代码可以看到,从release GIL到acquire GIL之间几乎是没有间隙的。所以当其他在其他核心上的线程被唤醒时,大部分情况下主线程已经又再一次获取到GIL了。这个时候被唤醒执行的线程只能白白的浪费CPU时间,看着另一个线程拿着GIL欢快的执行着。然后达到切换时间后进入待调度状态,再被唤醒,再等待,以此往复恶性循环。
简单的总结下就是:Python的多线程在多核CPU上,只对于IO密集型计算产生正面效果;而当有至少有一个CPU密集型线程存在,那么多线程效率会由于GIL而大幅下降。
以上がPythonスレッドの学習記録についての詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。