最近在线上的测试环境遇到一个偶发的问题,问题表现为docker容器卡死,不执行程序并且也不再消费消息队列中的任务。
线上环境使用docker集群,消息的传递使用消息队列实现。出问题的是一个下游服务,这个服务基本流程是,载入数据,处理之后进行模型训练和预测。奇怪的是,卡死的位置不固定,有时候出现在模型训练过程中,有时候出现在data_loader中,并且不是稳定触发,但是一旦触发,整个容器卡死,只有进行容器重启才能恢复。
debug
首先把代码拉到开发集群,测试之后没有问题,所有测试用例都通过,没有卡死。没办法,只能在线上测试集群debug。
查看代码,发现使用了多进程。原来该服务为了防止出现内存泄漏,每当消费一个消息,就会生成一个子进程完成数据处理和模型训练的功能。而当复现bug之后,查看进程,发现只存在一个python进程。估计是多进程中的子进程挂掉了,没有被父进程正常回收,导致的容器卡死。
其中多进程使用multiprocessing.Pool
实现,代码逻辑类似于:
import multiprocessing
import time
import os
def subprocesses(argv):
print('sub-process id: 'os.getpid())
time.sleep(10) # 子进程处理中
re = {"result": argv}
return re
if __name__ == '__main__':
print(os.getpid())
pool = multiprocessing.Pool(processes=1)
results = []
msg = "test"
sub_res = pool.map(subprocesses, (msg,))
results.append(sub_res)
pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
读者可以使用上述代码进行测试,当子进程执行时,使用kill命令(windows使用taskkill命令)杀死子进程,会发现主程序卡死,永远不会执行完成。
所以问题出现在子进程执行过程中,子进程意外挂掉。
进一步查看日志发现,集群测试时,使用了很大的数据规模,并且给batch_size设置了很大的值,导致在处理数据或者模型训练时,内存远远超过集群对单个服务的限制,从而导致子进程被系统kill。
进一步测试
按照逻辑,子进程意外终止,那么父进程应该接收到信号。然而实际上,即使将子进程写入try/except中,也捕获不了,因为
The signals SIGKILL and SIGSTOP cannot be caught, blocked, or ignored.
所以即使子进程中增加异常捕获也是不起作用的,例如下面的写法:
def subprocesses(argv):
print(os.getpid())
try:
time.sleep(10)
except:
print('Error')
re = {"result": argv}
return re
或者将map方法换成apply 、 apply_async、 map_async
方法都是一样的结果,因为apply_async、map_async
中的error_callback函数同样无法捕获kill信号。
例如:
import multiprocessing
import time
import os
def subprocesses(argv):
print(os.getpid())
time.sleep(10)
re = {"result": argv}
return re
def collect_result(result):
print("Error")
if __name__ == '__main__':
print(os.getpid())
pool = multiprocessing.Pool(processes=1)
results = []
msg = "hello"
sub_res = pool.map_async(subprocesses, (msg,), error_callback=collect_result)
results.append(sub_res)
pool.close()
pool.join() # 异步多线程需要加这一行,否则主进程会直接结束
上述代码使用map_async
,但是子进程被kill的时候仍然触发不了error_callback回调函数。
Pool源码
既然Pool
类的几种多进程方法都没有办法解决这个问题,这说明是Pool
这个类的实现不支持子进程意外挂掉之后通知主进程,说以先来看看这个Pool
的源码是怎么实现的。
以下述代码为例,实例化进程池,并指定子进程数量为1:
pool = multiprocessing.Pool(processes=1)
sub_res = pool.apply_async(subprocesses, error_callback=collect_result) #以apply_async为例,apply,map,map_async结构类似
第一行为Pool的实例化
,在第一行代码执行之后,会生成三个队列_taskqueue, _inqueue, _outqueue
,并生成一个_cache
字典,之后生成指定数量的子进程加入到进程池中,最后生成三个线程_worker_handler, _task_handler, _result_handler
。
以下为精简后的Pool
类的init
函数
self._pool = []
self._setup_queues() # 在这里构造_inqueue, _outqueue队列
self._taskqueue = queue.SimpleQueue() # _taskqueue队列
self._cache = _PoolCache(notifier=self._change_notifier) # 构造_cache
try: # 为进程池构造子进程
self._repopulate_pool()
except Exception:
for p in self._pool:
if p.exitcode is None:
p.terminate()
for p in self._pool:
p.join()
raise
self._worker_handler = threading.Thread( # _worker_handler线程
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
self._wrap_exception, sentinels, self._change_notifier)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
self._task_handler = threading.Thread( # _task_handler线程
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()
self._result_handler = threading.Thread( # _result_handler线程
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()
在第二行代码中,apply_async
会将要执行的函数和函数参数打包放入_taskqueue
,子进程获取任务执行。
apply_async
函数代码:
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
self._check_running()
result = ApplyResult(self, callback, error_callback) # 打包用户传入的任务函数,同时在_cache中添加对应记录
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) # 任务函数等信息送入_taskqueue队列
return result
其中
_taskqueue: 存放用户指定子进程去执行的函数以及函数参数信息;
_inqueue: 该队列与子进程交互,从_taskqueue取出的task,放到_inqueue,由子进程从队列中取出任务执行;
_outqueue: 该队列与子进程交互,保存子进程的结果;
_cache: 缓存字典,以job_id为key。
_worker_handler: 维护进程数量的子线程,例如指定进程池大小为4,任务数为10,一个子进程完成任务之后销毁,进程池中进程为3,此时由这个线程生成新的进程加入到进程池;
_task_handler: 从_taskqueue队列取任务到_inqueue队列的子线程;
_result_handler: 从_outqueue取出结果的子线程,并将_cache中对应的记录删除。
分析完类里面的主要变量,下面分析一下过程:
- 实例化Pool类时,生成上述主要变量,生成进程池;
- 在apply_async函数中,将用户传入的函数和函数变量打包为ApplyResult类,放入_taskqueue,并在_cache字典中添加这条任务的记录;
- _task_handler线程遍历_taskqueue队列,将任务放入_inqueue队列;
- 子进程执行任务,将结果放入_outqueue队列;
- _result_handler从_outqueue队列中取出子进程的运算结果,并删除_cache中对应的记录。
- _result_handler检查_cache,当_cache才会停止_result_handler线程
另外,Pool类的一些重要方法如下:
pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但pool.join()必须使用在pool.close()或者pool.terminate()之后。
close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
因此,很显然,之前的问题是卡在了第5步:一旦某个子进程卡死,那么这个子进程将不会产生结果,则_result_handler就不会删除这个任务对应的_cache记录,_cache不空则_result_handler不会结束,这样整个进程池都不会结束。
看到这里,可以明白,使用进程池必须要能够获取到子进程的结果,而子进程被kill的时候不会产生结果,也不会通知进程池,因此这个问题对于使用进程池来实现多进程是无解的。
因此,当子进程任务比较复杂,存在被操作系统自动kill的可能,那么最好不要使用Pool
。
解决方法
上面的问题归根到底是通过Pool的方式实现多进程时,主进程不知道子进程是否已经挂掉,因此如果非要使用进程池来解决这个问题,可以从这里下手,例如下面这个不大优雅的实现:
import multiprocessing
import time
import os
def subprocesses(argv):
print('sub-process id: 'os.getpid())
time.sleep(10) # 子进程处理中
re = {"result": argv}
return re
if __name__ == '__main__':
print(os.getpid())
pool = multiprocessing.Pool(processes=1)
results = []
msg = "test"
sub_res = pool.map(subprocesses, (msg,))
results.append(sub_res)
pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
while True: # 主进程轮询子进程状态,代替pool.join()
finish_flag = True # Pool is finish(True) or not
for sub_p in pool._pool:
if sub_p.is_alive():
finish_flag = False
break
if finish_flag:
error_flag = False # error(True) or not
for sub_p in pool._pool:
if sub_p.exitcode != 0:
print('Sub-processes exit error')
pool.terminate()
flag = True
break
break
The exit code or exit status is a number returned by a process created by a command line indicating if there was an error or not.
- 0 means that there is no error
- an other value means that there was an error.
上面的代码用主进程轮询子进程状态,代替pool.join()
,先通过进程的is_alive()
方法判断进程是否存活,当进程池中所有进程都已经退出,则通过进程的exitcode
变量判断是否是正常退出,若存在异常退出的子进程,则手动terminate进程池,确保不会卡死。
Process解决
直接不用Pool实现多进程,使用multiprocessing.Process
实现,子进程被kill的时候会自动结束:
import multiprocessing
import time
import os
def subprocesses(argv):
print('sub-process id: 'os.getpid())
time.sleep(10) # 子进程处理中
re = {"result": argv}
return re
if __name__ == '__main__': process_list = []
for i in range(2):
p = multiprocessing.Process(target=subprocesses, args=('test_process',))
p.start()
process_list.append(p)
for p in process_list:
p.join()