Maison > Questions et réponses > le corps du texte
继承multiprocessing.Process实现了一个worker类,在父进程中,自己实现了一个最多启动N的限制(出问题的环境是30个)。
实际运行中发现,大约有万分之二(当前每天运行46000+次,大约出现11次)的概率,子进程创建后run方法未执行。
代码和日志如下,注意打印日志的语句
父进程启动子进程(父进程里还有一个控制并发进程数量的逻辑,如果需要的话我贴出来):
...
def run_task(self, task):
logging.info('execute monitor %s' % task['id'])
worker = execute_worker.ExecuteWorkerProcess(task)
logging.debug('execute process start %s' % task['id'])
worker.start()
logging.info('worker pid is %s (%s)' % (worker.pid, task['id']))
logging.debug('execute process started %s' % task['id'])
self.worker_pool.append(worker)
...
子进程run方法
class ExecuteWorkerProcess(multiprocessing.Process):
...
def __init__(self, task):
super(ExecuteWorkerProcess, self).__init__()
self.stopping = False
self.task = task
self.worker = ExecuteWorker(task)
if 'task' in task:
self.routine = False
else:
self.routine = True
self.zk = None
logging.debug('process created %s' % self.task['id'])
...
def run(self):
logging.debug('process start %s' % self.task['id'])
try:
logging.debug('process run before %s' % self.task['id'])
self._run()
logging.debug('process run after %s' % self.task['id'])
except:
logging.exception('')
title = u'监控执行进程报错'
text = u'监控项id:%s\n错误信息:\n%s' % (self.task['id'], traceback.format_exc())
warning.email_warning(title, text, to_dev=True)
logging.debug('process start done %s' % self.task['id'])
...
出现问题的进程日志如下:
正常任务日志如下:
可以看到正常和异常的日志主进程中都打印除了子进程的pid,但是异常继承子进程run行数的第一行没有执行。
是否有人遇到过?这个是不是multiprocessing.Process的坑,有没有规避办法...
ringa_lee2017-04-18 09:06:50
La raison a été trouvée. En raison de l'utilisation de thread+mutiprocessing (fork) dans le processus principal, un blocage se produit dans la journalisation. Le phénomène est que la première phrase de journalisation dans le processus enfant se bloque. Le problème ne se produit que sous Linux.
J'ai trouvé la méthode de reproduction après avoir lu cette réponse sur stckoverflow, une autre réponse, la solution
Démo de reproduction :
#coding=utf-8
import os
import time
import logging
import threading
import multiprocessing
import logging.handlers
def init_log(log_path, level=logging.INFO, when="midnight", backup=7,
format="%(levelname)s:[%(asctime)s][%(filename)s:%(lineno)d][%(process)s][%(thread)d] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"):
formatter = logging.Formatter(format, datefmt)
logger = logging.getLogger()
logger.setLevel(level)
dir = os.path.dirname(log_path)
if not os.path.isdir(dir):
os.makedirs(dir)
handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log",
when=when,
backupCount=backup)
handler.setLevel(level)
handler.setFormatter(formatter)
logger.addHandler(handler)
stop = False
class Worker(multiprocessing.Process):
u"""
多进程方式执行任务,使用CPU密集型
"""
def __init__(self):
super(Worker, self).__init__()
self.reset_ts = time.time() + 3
self.stopping = False
def run(self):
logging.info('Process pid is %s' % os.getpid())
def check_timeout(self, now):
u"""
若当前时间已超过复位时间,则结束进程
:param now:
:return:
"""
global stop
if now > self.reset_ts and not self.stopping:
self.stopping = True
logging.info('error pid is %s' % os.getpid())
stop = True
def main():
global stop
worker_pool = []
while 1:
if worker_pool:
now = time.time()
for worker in worker_pool:
worker.check_timeout(now)
if stop:
logging.error('Process not run, exit!')
exit(-1)
alive_workers = [worker for worker in worker_pool if worker.is_alive()]
over_workers = list(set(alive_workers) ^ set(worker_pool))
for over_worker in over_workers:
over_worker.join()
worker_pool = alive_workers
if len(worker_pool) < 1000:
logging.info('create worker')
worker = Worker()
worker.start()
logging.info('worker pid is %s' % worker.pid)
worker_pool.append(worker)
time.sleep(0.001)
class ExecuteThread(threading.Thread):
def run(self):
main()
class ExecuteThread2(threading.Thread):
def run(self):
global stop
while 1:
logging.info('main thread')
time.sleep(0.001)
if stop:
exit(-1)
if __name__ == '__main__':
init_log('/yourpath/timeout')
#main()
thread = ExecuteThread()
thread2 = ExecuteThread2()
thread.start()
thread2.start()
thread.join()
thread2.join()
Après la reproduction, n'oubliez pas d'effacer le processus bloqué....