recherche

Maison  >  Questions et réponses  >  le corps du texte

python - 使用multiprocessing.Process调用start方法后,有较小的几率子进程中run方法未执行

继承multiprocessing.Process实现了一个worker类,在父进程中,自己实现了一个最多启动N的限制(出问题的环境是30个)。
实际运行中发现,大约有万分之二(当前每天运行46000+次,大约出现11次)的概率,子进程创建后run方法未执行。
代码和日志如下,注意打印日志的语句
父进程启动子进程(父进程里还有一个控制并发进程数量的逻辑,如果需要的话我贴出来):

...
    def run_task(self, task):
        logging.info('execute monitor %s' % task['id'])
        worker = execute_worker.ExecuteWorkerProcess(task)
        logging.debug('execute process start %s' % task['id'])
        worker.start()
        logging.info('worker pid is %s (%s)' % (worker.pid, task['id']))
        logging.debug('execute process started %s' % task['id'])
        self.worker_pool.append(worker)
...

子进程run方法

class ExecuteWorkerProcess(multiprocessing.Process):
...
    def __init__(self, task):
        super(ExecuteWorkerProcess, self).__init__()
        self.stopping = False
        self.task = task
        self.worker = ExecuteWorker(task)
        if 'task' in task:
            self.routine = False
        else:
            self.routine = True
        self.zk = None
        logging.debug('process created %s' % self.task['id'])
...
    def run(self):
        logging.debug('process start %s' % self.task['id'])
        try:
            logging.debug('process run before %s' % self.task['id'])
            self._run()
            logging.debug('process run after %s' % self.task['id'])
        except:
            logging.exception('')
            title = u'监控执行进程报错'
            text = u'监控项id:%s\n错误信息:\n%s' % (self.task['id'], traceback.format_exc())
            warning.email_warning(title, text, to_dev=True)
        logging.debug('process start done %s' % self.task['id'])
...

出现问题的进程日志如下:

正常任务日志如下:

可以看到正常和异常的日志主进程中都打印除了子进程的pid,但是异常继承子进程run行数的第一行没有执行。
是否有人遇到过?这个是不是multiprocessing.Process的坑,有没有规避办法...

ringa_leeringa_lee2806 Il y a quelques jours917

répondre à tous(1)je répondrai

  • ringa_lee

    ringa_lee2017-04-18 09:06:50

    La raison a été trouvée. En raison de l'utilisation de thread+mutiprocessing (fork) dans le processus principal, un blocage se produit dans la journalisation. Le phénomène est que la première phrase de journalisation dans le processus enfant se bloque. Le problème ne se produit que sous Linux.
    J'ai trouvé la méthode de reproduction après avoir lu cette réponse sur stckoverflow, une autre réponse, la solution
    Démo de reproduction :

    #coding=utf-8
    import os
    import time
    import logging
    import threading
    import multiprocessing
    import logging.handlers
    
    
    def init_log(log_path, level=logging.INFO, when="midnight", backup=7,
                 format="%(levelname)s:[%(asctime)s][%(filename)s:%(lineno)d][%(process)s][%(thread)d] %(message)s",
                 datefmt="%Y-%m-%d %H:%M:%S"):
        formatter = logging.Formatter(format, datefmt)
        logger = logging.getLogger()
        logger.setLevel(level)
    
        dir = os.path.dirname(log_path)
        if not os.path.isdir(dir):
            os.makedirs(dir)
    
        handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log",
                                                            when=when,
                                                            backupCount=backup)
        handler.setLevel(level)
        handler.setFormatter(formatter)
        logger.addHandler(handler)
    
    
    stop = False
    
    class Worker(multiprocessing.Process):
        u"""
        多进程方式执行任务,使用CPU密集型
        """
        def __init__(self):
            super(Worker, self).__init__()
            self.reset_ts = time.time() + 3
            self.stopping = False
    
        def run(self):
            logging.info('Process pid is %s' % os.getpid())
    
        def check_timeout(self, now):
            u"""
            若当前时间已超过复位时间,则结束进程
            :param now:
            :return:
            """
            global stop
            if now > self.reset_ts and not self.stopping:
                self.stopping = True
                logging.info('error pid is %s' % os.getpid())
                stop = True
    
    
    def main():
        global stop
        worker_pool = []
        while 1:
            if worker_pool:
                now = time.time()
                for worker in worker_pool:
                    worker.check_timeout(now)
                    if stop:
                        logging.error('Process not run, exit!')
                        exit(-1)
    
            alive_workers = [worker for worker in worker_pool if worker.is_alive()]
            over_workers = list(set(alive_workers) ^ set(worker_pool))
            for over_worker in over_workers:
                over_worker.join()
            worker_pool = alive_workers
            if len(worker_pool) < 1000:
                logging.info('create worker')
                worker = Worker()
                worker.start()
                logging.info('worker pid is %s' % worker.pid)
                worker_pool.append(worker)
            time.sleep(0.001)
    
    
    class ExecuteThread(threading.Thread):
        def run(self):
            main()
    
    class ExecuteThread2(threading.Thread):
        def run(self):
            global stop
            while 1:
                logging.info('main thread')
                time.sleep(0.001)
                if stop:
                    exit(-1)
    
    if __name__ == '__main__':
        init_log('/yourpath/timeout')
        #main()
    
        thread = ExecuteThread()
        thread2 = ExecuteThread2()
        thread.start()
        thread2.start()
        thread.join()
        thread2.join()
    

    Après la reproduction, n'oubliez pas d'effacer le processus bloqué....

    répondre
    0
  • Annulerrépondre