suchen

Heim  >  Fragen und Antworten  >  Hauptteil

python - 使用multiprocessing.Process调用start方法后,有较小的几率子进程中run方法未执行

继承multiprocessing.Process实现了一个worker类,在父进程中,自己实现了一个最多启动N的限制(出问题的环境是30个)。
实际运行中发现,大约有万分之二(当前每天运行46000+次,大约出现11次)的概率,子进程创建后run方法未执行。
代码和日志如下,注意打印日志的语句
父进程启动子进程(父进程里还有一个控制并发进程数量的逻辑,如果需要的话我贴出来):

...
    def run_task(self, task):
        logging.info('execute monitor %s' % task['id'])
        worker = execute_worker.ExecuteWorkerProcess(task)
        logging.debug('execute process start %s' % task['id'])
        worker.start()
        logging.info('worker pid is %s (%s)' % (worker.pid, task['id']))
        logging.debug('execute process started %s' % task['id'])
        self.worker_pool.append(worker)
...

子进程run方法

class ExecuteWorkerProcess(multiprocessing.Process):
...
    def __init__(self, task):
        super(ExecuteWorkerProcess, self).__init__()
        self.stopping = False
        self.task = task
        self.worker = ExecuteWorker(task)
        if 'task' in task:
            self.routine = False
        else:
            self.routine = True
        self.zk = None
        logging.debug('process created %s' % self.task['id'])
...
    def run(self):
        logging.debug('process start %s' % self.task['id'])
        try:
            logging.debug('process run before %s' % self.task['id'])
            self._run()
            logging.debug('process run after %s' % self.task['id'])
        except:
            logging.exception('')
            title = u'监控执行进程报错'
            text = u'监控项id:%s\n错误信息:\n%s' % (self.task['id'], traceback.format_exc())
            warning.email_warning(title, text, to_dev=True)
        logging.debug('process start done %s' % self.task['id'])
...

出现问题的进程日志如下:

正常任务日志如下:

可以看到正常和异常的日志主进程中都打印除了子进程的pid,但是异常继承子进程run行数的第一行没有执行。
是否有人遇到过?这个是不是multiprocessing.Process的坑,有没有规避办法...

ringa_leeringa_lee2802 Tage vor902

Antworte allen(1)Ich werde antworten

  • ringa_lee

    ringa_lee2017-04-18 09:06:50

    原因已找到,由于主进程中使用了thread+mutiprocessing(fork),导致logging出现死锁,现象就是遇到子进程里第一句logging就hang住。问题只会发生在Linux下。
    看了stckoverflow这个答案找到的复现方法,另一个回答,解决方案
    复现demo:

    #coding=utf-8
    import os
    import time
    import logging
    import threading
    import multiprocessing
    import logging.handlers
    
    
    def init_log(log_path, level=logging.INFO, when="midnight", backup=7,
                 format="%(levelname)s:[%(asctime)s][%(filename)s:%(lineno)d][%(process)s][%(thread)d] %(message)s",
                 datefmt="%Y-%m-%d %H:%M:%S"):
        formatter = logging.Formatter(format, datefmt)
        logger = logging.getLogger()
        logger.setLevel(level)
    
        dir = os.path.dirname(log_path)
        if not os.path.isdir(dir):
            os.makedirs(dir)
    
        handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log",
                                                            when=when,
                                                            backupCount=backup)
        handler.setLevel(level)
        handler.setFormatter(formatter)
        logger.addHandler(handler)
    
    
    stop = False
    
    class Worker(multiprocessing.Process):
        u"""
        多进程方式执行任务,使用CPU密集型
        """
        def __init__(self):
            super(Worker, self).__init__()
            self.reset_ts = time.time() + 3
            self.stopping = False
    
        def run(self):
            logging.info('Process pid is %s' % os.getpid())
    
        def check_timeout(self, now):
            u"""
            若当前时间已超过复位时间,则结束进程
            :param now:
            :return:
            """
            global stop
            if now > self.reset_ts and not self.stopping:
                self.stopping = True
                logging.info('error pid is %s' % os.getpid())
                stop = True
    
    
    def main():
        global stop
        worker_pool = []
        while 1:
            if worker_pool:
                now = time.time()
                for worker in worker_pool:
                    worker.check_timeout(now)
                    if stop:
                        logging.error('Process not run, exit!')
                        exit(-1)
    
            alive_workers = [worker for worker in worker_pool if worker.is_alive()]
            over_workers = list(set(alive_workers) ^ set(worker_pool))
            for over_worker in over_workers:
                over_worker.join()
            worker_pool = alive_workers
            if len(worker_pool) < 1000:
                logging.info('create worker')
                worker = Worker()
                worker.start()
                logging.info('worker pid is %s' % worker.pid)
                worker_pool.append(worker)
            time.sleep(0.001)
    
    
    class ExecuteThread(threading.Thread):
        def run(self):
            main()
    
    class ExecuteThread2(threading.Thread):
        def run(self):
            global stop
            while 1:
                logging.info('main thread')
                time.sleep(0.001)
                if stop:
                    exit(-1)
    
    if __name__ == '__main__':
        init_log('/yourpath/timeout')
        #main()
    
        thread = ExecuteThread()
        thread2 = ExecuteThread2()
        thread.start()
        thread2.start()
        thread.join()
        thread2.join()
    

    复现完了记得清掉hang住的进程....

    Antwort
    0
  • StornierenAntwort