搜尋

首頁  >  問答  >  主體

python 爬虫 多线程用queue做队列,消费者线程无法从queue中取出数据。

高洛峰高洛峰2889 天前458

全部回覆(2)我來回復

  • 大家讲道理

    大家讲道理2017-04-18 09:28:42

    消費者join一下試試,然後你判斷一下條件,如果queue為空的時候,循環break

    回覆
    0
  • 阿神

    阿神2017-04-18 09:28:42

    如果self.Queue為空的話, self.Queue.get() 會拋出 Queue.Empty 異常,這時候線程會退出,這時候都沒有工作線程了。
    在run()方法的while循環裡面增加try...except捕捉該異常試試。程式碼大概如下:

    while True:
        try:
           self.Queue.get(timeout=5) #这里的timeout可以根据情况设置为合适的值
        except Queue.Empty:  # 任务队列空的时候结束此线程
            break
        except:
            raise
            

    ================================================= ======================
    首先:對於你說的mysql不支持多線程寫入問題,我簡單寫了個驗證程序,結論是mysql支持多執行緒寫入的(註:實際工作中在多執行緒中往db中寫入數據,需要加入鎖機制,這裡簡化了),程式碼如下:

    
    #coding: utf-8
    
    import MySQLdb
    import MySQLdb.cursors
    import threading
    
    class MySql(object):
        def __init__(self, host, user, pwd, db_name, port=3306):
            self.host = host
            self.user = user
            self.pwd = pwd
            self.db_name = db_name
            self.port = port
            self.conn = None
            self.cursor = None
            self.connect()
    
        def connect(self):
            try:
                self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pwd, db=self.db_name,
                                            port=self.port)
                self.cursor = self.conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
            except Exception, err:
                print 'connect: %s' % err
            return self.conn
                
        def execute(self, sql):
            rs = ()
            try:
                self.cursor.execute(sql)
                rs = self.cursor.fetchall()
            except Exception, err:
                pass
            return rs
    
        def exec_and_result(self, sql):
            ret_id = -1
            try:
                self.cursor.execute(sql)
                self.conn.commit()
                ret_id = self.cursor.lastrowid
            except Exception, err:
                print 'exec_and_result: %s' % err
            return ret_id
    
        def close(self):
            try:
                self.cursor.close()
                self.conn.close()
            except Exception, err:
                pass
    
    db = {
        'ip': 'xxx.xxx.xxx.xxx',
        'port': xxx,
        'user': 'xxx',
        'pwd': 'xxx',
        'db_name': 'xxx'
    }             
    mysql = MySql(db['ip'], db['user'], db['pwd'], db['db_name'], int(db['port']))
    threads = []
    
    def do(name):
        sql = "insert into site(name, status, create_time, update_time, update_user_account, comment) values('{0}', 0, NOW(), NOW(), 'daiyapeng', 'test');"
        rid = mysql.exec_and_result(sql.format(name))
        print rid
    
    for i in ['test-0','test-1','test-2']:
        t = threading.Thread(target=do, args=(i, ))
        threads.append(t)
    
    for t in threads:
        t.start()
    
    for t in threads:
        t.join()
                
    mysql.close()
                
                
                

    另外:因為不清楚你的程式碼的具體細節,所以不能完全定位問題,我自己寫了個模擬程序,沒有出現你的那種情況,希望對你有幫助,程式碼如下:

    
    #coding: utf-8
    
    import Queue
    import threading
    
    class MyThread(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.queue = queue
    
        def run(self):
            while True:
                try:
                    task = self.queue.get(timeout=2)
                    print 'task: %s' % task
                    # 这里可以处理task
                except Exception, err:
                    break
    
    if __name__ == '__main__':
        threads = []
        q = Queue.Queue()
        for i in xrange(3):
            thread = MyThread(q)
            threads.append(thread)
    
        for t in threads:
            t.start()
    
        for i in xrange(30):
            q.put(i)
        
        for t in threads:
            t.join()
    
        print '====== done ======'      
                

    回覆
    0
  • 取消回覆