Home >Database >Mysql Tutorial >python基于mysql实现的容易队列以及跨进程锁

python基于mysql实现的容易队列以及跨进程锁

WBOY
WBOYOriginal
2016-06-07 16:25:421571browse

python基于mysql实现的简单队列以及跨进程锁 在我们做多进程应用开发的过程中,难免会遇到多个进程访问同一个资源(临界资源)的状况,必须通过加一个全局性的锁,来实现资源的同步访问(同一时间只能有一个进程访问资源)。 举个例子: 假设我们用mysql来实

python基于mysql实现的简单队列以及跨进程锁

在我们做多进程应用开发的过程中,难免会遇到多个进程访问同一个资源(临界资源)的状况,必须通过加一个全局性的锁,来实现资源的同步访问(同一时间只能有一个进程访问资源)。

 

举个例子:

假设我们用mysql来实现一个任务队列,实现的过程如下:

1. 在Mysql中创建Job表,用于储存队列任务,如下:

create table jobs(
    id auto_increment not null primary key,
    message text not null,
    job_status not null default 0
);

message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列 

 

2. 有一个生产者进程,往job表中放新的数据,进行排队

insert into jobs(message) values('msg1');

 

3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

 

4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。

 

这里我贴出非常好的一篇文章,大家可以参照一下:

https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you

 

=========================华丽的分割线=======================================

 

说道跨进程的锁实现,我们主要有几种实现方式:

1. 信号量

2. 文件锁fcntl

3. socket(端口号绑定)

4. signal

这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。

 

查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈,链接如下:

http://dev.mysql.com/doc/refman/5.0/fr/miscellaneous-functions.html

 

我用python实现了一个demo,如下:

 

文件名:glock.py

#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
#   Author  :   yunjianfei
#   E-mail  :   yunjianfei@126.com
#   Date    :   2014/02/25
#   Desc    :
#

import logging, time
import MySQLdb


class Glock:
    def __init__(self, db):
        self.db = db

    def _execute(self, sql):
        cursor = self.db.cursor()
        try:
            ret = None
            cursor.execute(sql)
            if cursor.rowcount != 1:
                logging.error("Multiple rows returned in mysql lock function.")
                ret = None
            else:
                ret = cursor.fetchone()
            cursor.close()
            return ret
        except Exception, ex:
            logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex))
            cursor.close()
            return None

    def lock(self, lockstr, timeout):
        sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout)
        ret = self._execute(sql)

        if ret[0] == 0:
            logging.debug("Another client has previously locked '%s'.", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock '%s' was obtained successfully.", lockstr)
            return True
        else:
            logging.error("Error occurred!")
            return None

    def unlock(self, lockstr):
        sql = "SELECT RELEASE_LOCK('%s')" % (lockstr)
        ret = self._execute(sql)
        if ret[0] == 0:
            logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr)
            return False
        elif ret[0] == 1:
            logging.debug("The lock '%s' the lock was released.", lockstr)
            return True
        else:
            logging.error("The lock '%s' did not exist.", lockstr)
            return None

#Init logging
def init_logging():
    sh = logging.StreamHandler()
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
    sh.setFormatter(formatter)
    logger.addHandler(sh)
    logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel()))

def main():
    init_logging()
    db = MySQLdb.connect(host='localhost', user='root', passwd='')
    lock_name = 'queue'

    l = Glock(db)

    ret = l.lock(lock_name, 10)
    if ret != True:
        logging.error("Can't get lock! exit!")
        quit()
    time.sleep(10)
    logging.info("You can do some synchronization work across processes!")
    ##TODO
    ## you can do something in here ##
    l.unlock(lock_name)

if __name__ == "__main__":
    main()

在main函数里, l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。

 

 

在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的:

   3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

 

这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。

 

测试的时候,启动两个glock.py, 结果如下:

Java代码  收藏代码
  1. [@tj-10-47 test]# ./glock.py   
  2. 2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG  
  3. 2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.  
  4. 2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!  
  5. 2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.  

可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。

[@tj-10-47 test]# ./glock.py 
2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
[@tj-10-47 test]#

 


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn