ホームページ >バックエンド開発 >Python チュートリアル >mysqlをベースにPythonで実装したシンプルなキューとクロスプロセスロックの例を詳しく解説

mysqlをベースにPythonで実装したシンプルなキューとクロスプロセスロックの例を詳しく解説

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBオリジナル
2016-06-16 08:43:221200ブラウズ

通常、マルチプロセス アプリケーションを開発する場合、複数のプロセスが同じリソース (重要なリソース) にアクセスする状況が必然的に発生します。このとき、リソース割り当てを実現するために、グローバル ロックを追加する必要があります。 1 つのプロセスが同時にリソースにアクセスできます)。

例は次のとおりです:

mysql を使用してタスクキューを実装するとします。実装プロセスは次のとおりです。

1. 次のように、キュー タスク を保存するジョブ テーブルを MySQL に作成します。

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  job_status not null default 0
);

message はタスク情報を格納するために使用され、job_status はタスクのステータスを識別するために使用されます。ステータスは 0: キュー内、1: キュー外です



2. 新しいデータ をジョブ テーブルに入れてキューに入れるプロデューサー プロセスがあります。

insert into jobs(message) values('msg1');
3. 複数のコンシューマ プロセスがあると仮定し、ジョブ テーブルからキュー情報

を取得します。実行される操作は次のとおりです:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
4. プロセス間ロックがない場合、2 つのコンシューマー プロセスが同時に重複したメッセージを取得し、メッセージが複数回消費される可能性があります。この状況は望ましくないものであるため、クロスプロセス ロックを実装する必要があります。

==========================区切り線=================== = ===================

クロスプロセス ロックの実装に関しては、いくつかの主な実装方法があります。

(1)セマフォ

(2)ファイルロックfcntl

(3)ソケット(ポート番号バインド)
(4)信号
これらの方法にはそれぞれ長所と短所があります。一般的には、最初の 2 つの方法の方が一般的です。ここでは詳しく説明しません。

情報を確認したところ、mysql にはロックの実装があり、大規模な同時分散アクセスがボトルネックになる可能性があるアプリケーション シナリオに適していることがわかりました。
デモは次のように Python を使用して実装されました:

ファイル名: glock.py


メイン関数内:
#!/usr/bin/env python2.7 
# 
# -*- coding:utf-8 -*- 
# 
#  Desc  : 
# 
import logging, time 
import MySQLdb 
class Glock: 
  def __init__(self, db): 
    self.db = db 
  def _execute(self, sql): 
    cursor = self.db.cursor() 
    try: 
      ret = None 
      cursor.execute(sql) 
      if cursor.rowcount != 1: 
        logging.error("Multiple rows returned in mysql lock function.") 
        ret = None 
      else: 
        ret = cursor.fetchone() 
      cursor.close() 
      return ret 
    except Exception, ex: 
      logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex)) 
      cursor.close() 
      return None 
  def lock(self, lockstr, timeout): 
    sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout) 
    ret = self._execute(sql) 
 
    if ret[0] == 0: 
      logging.debug("Another client has previously locked '%s'.", lockstr) 
      return False 
    elif ret[0] == 1: 
      logging.debug("The lock '%s' was obtained successfully.", lockstr) 
      return True 
    else: 
      logging.error("Error occurred!") 
      return None 
  def unlock(self, lockstr): 
    sql = "SELECT RELEASE_LOCK('%s')" % (lockstr) 
    ret = self._execute(sql) 
    if ret[0] == 0: 
      logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr) 
      return False 
    elif ret[0] == 1: 
      logging.debug("The lock '%s' the lock was released.", lockstr) 
      return True 
    else: 
      logging.error("The lock '%s' did not exist.", lockstr) 
      return None 
#Init logging 
def init_logging(): 
  sh = logging.StreamHandler() 
  logger = logging.getLogger() 
  logger.setLevel(logging.DEBUG) 
  formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') 
  sh.setFormatter(formatter) 
  logger.addHandler(sh) 
  logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) 
def main(): 
  init_logging() 
  db = MySQLdb.connect(host='localhost', user='root', passwd='') 
  lock_name = 'queue' 
 
  l = Glock(db) 
 
  ret = l.lock(lock_name, 10) 
  if ret != True: 
    logging.error("Can't get lock! exit!") 
    quit() 
  time.sleep(10) 
  logging.info("You can do some synchronization work across processes!") 
  ##TODO 
  ## you can do something in here ## 
  l.unlock(lock_name) 
if __name__ == "__main__": 
  main() 

l.lock(lock_name, 10) の 10 はタイムアウト時間が 10 秒であることを意味し、10 秒以内にロックを取得できなかった場合はリターンして次の処理を実行します。


このデモでは、TODO がマークされており、コンシューマがジョブ テーブルからメッセージを取得するためのロジックをここに配置できます。つまり、境界線
の上です。
2. 複数のコンシューマープロセスがあると仮定し、ジョブテーブルからキュー情報を取得します。実行される操作は次のとおりです。

このようにして、複数のプロセスが重要なリソースに同時にアクセスし、データの一貫性を確保できます。
select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
テストする場合、2 つの glock.py を起動すると、結果は次のようになります:



最初の glock.py が 17:08:50 にロックを解除し、次の glock.py が 17:08:50 にロックを取得していることがわかります。これは完全に実現可能であることが確認できます。
[@tj-10-47 test]# ./glock.py  
2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 
2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 
2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 
2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. 

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。