首頁  >  文章  >  後端開發  >  python 開發的三種運作模式詳細介紹

python 開發的三種運作模式詳細介紹

高洛峰
高洛峰原創
2017-02-14 14:03:191379瀏覽

這篇文章主要介紹了python 開發的三種運行模式詳細介紹的相關資料,需要的朋友可以參考下

Python 三種運行模式

  Python作為一門腳本語言,使用的範圍很廣。有的同學用來演算法開發,有的用來驗證邏輯,還有的作為膠水語言,用它來黏合整個系統的流程。不管怎麼說,怎麼使用python既取決於你自己的業務場景,也取決於你自己的python應用能力。就我個人而言,我覺得python作為既可以用來進行業務的開發,也可以進行產品原型的開發.一般來說,python的運行主要下面這三種模式。

1.單循環模式

  單循環模式使用的最多,也最簡單,當然也最穩定。為什麼呢,因為單循環本來程式碼就寫的很少,出錯的機會就更少,所以一般只要寫對了接口,犯錯誤的機會還是很低的。當然,我們不是說單循環就沒什麼用,恰恰相反。單循環模式是我們最常使用的一種模式。這種開發對於一些小工具、小應用、小場景特別合適。

#!/usr/bin/python
import os
import sys
import re
import signal
import time

g_exit = 0

def sig_process(sig, frame):
  global g_exit
  g_exit = 1
  print 'catch signal'

def main():
  global g_exit
  signal.signal(signal.SIGINT, sig_process)
  while 0 == g_exit:
    time.sleep(1)

    '''
    module process code
    ''' 

if __name__ == '__main__':
  main()

2.多執行緒模式

  多執行模式常用在那些容易阻塞的場合。例如多執行緒客戶端讀寫,多執行緒web存取等等。這裡的多執行緒有個特點,那就是每個執行緒都是按照客戶端創建的。簡單的舉例就是伺服器socket,來一個socket建立一個thread,這樣如果存在多個使用者的話,就有多個thread並發連線。這種方式比較簡單,用起來很快,缺點就是所有業務都有可能並發執行,全域資料保護起來很麻煩。

#!/usr/bin/python
import os
import sys
import re
import signal
import time
import threading

g_exit=0

def run_thread():
  global g_exit
  while 0 == g_exit:
    time.sleep(1)

    '''
    do jobs per thread
    '''

def sig_process(sig, frame):
  global g_exit
  g_exit = 1

def main():

  global g_exit

  signal.signal(signal.SIGINT, sig_process)
  g_threads = []
  for i in range(4):
    td = threading.Thread(target = run_thread)
    td.start()
    g_threads.append(td)

  while 0 == g_exit:
    time.sleep(1)

  for i in range(4):
    g_threads[i].join()


if __name__ == '__main__':
  main()

3.reactor模式

  reactor模式,不復雜,簡單的來說,就是利用多執行緒來處理每一個業務。如果一個業務已經被某一個thread處理了,那麼其他的thread就不能再處理這個業務了。這樣,它就相當於解決了一個問題,也就是我們在前面所說的鎖的問題。因此,對於這種模式的開發者來說,編寫業務其實是一件簡單的事情,因為他所要關注的只是自己的一畝三分地就可以了。之前雲風同學寫的skynet就是這麼一種模式,只不過它使用了c+lua來開發的。其實只要了解了reactor模式本身,用什麼語言開發不重要,關鍵在於理解reactor的精髓就可以了。 

python 开发的三种运行模式详细介绍

  如果寫成code,那應該是這樣的,

#!/usr/bin/python

import os
import sys
import re
import time
import signal
import threading

g_num = 4
g_exit =0
g_threads = []
g_sem = []
g_lock = threading.Lock()
g_event = {}

def add_event(name, data):
  global g_lock
  global g_event

  if '' == name:
    return

  g_lock.acquire()
  if name in g_event:
    g_event[name].append(data)
    g_lock.release()
    return

  g_event[name] = []

  '''
  0 means idle, 1 means busy
  '''
  g_event[name].append(0)
  g_event[name].append(data)
  g_lock.release()

def get_event(name):
  global g_lock
  global g_event

  g_lock.acquire()
  if '' != name:
    if [] != g_event[name]:
      if 1 != len(g_event[name]):
        data = g_event[name][1]
        del g_event[name][1]
        g_lock.release()
        return name, data
      else:
        g_event[name][0] = 0

  for k in g_event:
    if 1 == len(g_event[k]):
      continue

    if 1 == g_event[k][0]:
      continue

    g_event[k][0] =1
    data = g_event[k][1]
    del g_event[k][1]
    g_lock.release()
    return k, data

  g_lock.release()
  return '', -1

def sig_process(sig, frame):
  global g_exit
  g_exit =1
  print 'catch signal'

def run_thread(num):
  global g_exit
  global g_sem
  global g_lock

  name = ''
  data = -1

  while 0 == g_exit:
    g_sem[num].acquire()

    while True: 
      name, data = get_event(name)
      if '' == name:
        break

      g_lock.acquire()
      print name, data
      g_lock.release()


def test_thread():
  global g_exit

  while 0 == g_exit:
    for i in range(100):
      add_event(&#39;1&#39;, (i << 2) + 0)
      add_event(&#39;2&#39;, (i << 2) + 1)
      add_event(&#39;3&#39;, (i << 2) + 2)
      add_event(&#39;4&#39;, (i << 2) + 3)

    time.sleep(1)


def main():
  global g_exit
  global g_num
  global g_threads
  global g_sem

  signal.signal(signal.SIGINT, sig_process)
  for i in range(g_num):
    sem = threading.Semaphore(0)
    g_sem.append(sem)
    td = threading.Thread(target=run_thread, args=(i,))
    td.start()
    g_threads.append(td)

  &#39;&#39;&#39;
  test thread to give data
  &#39;&#39;&#39;
  test = threading.Thread(target=test_thread)
  test.start()

  while 0 == g_exit:
    for i in range(g_num):
      g_sem[i].release()
    time.sleep(1)

  &#39;&#39;&#39;
  call all thread to close
  &#39;&#39;&#39;
  for i in range(g_num):
    g_sem[i].release()

  for i in range(g_num):
    g_threads[i].join()

  test.join()
  print &#39;exit now&#39;

&#39;&#39;&#39;
entry
&#39;&#39;&#39;
if __name__ == &#39;__main__&#39;:
  main()

感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!

更多python 開發的三種運行模式詳細介紹相關文章請關注PHP中文網!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn