python多进程中的坑

最近在线上的测试环境遇到一个偶发的问题,问题表现为docker容器卡死,不执行程序并且也不再消费消息队列中的任务。

线上环境使用docker集群,消息的传递使用消息队列实现。出问题的是一个下游服务,这个服务基本流程是,载入数据,处理之后进行模型训练和预测。奇怪的是,卡死的位置不固定,有时候出现在模型训练过程中,有时候出现在data_loader中,并且不是稳定触发,但是一旦触发,整个容器卡死,只有进行容器重启才能恢复。

debug

首先把代码拉到开发集群,测试之后没有问题,所有测试用例都通过,没有卡死。没办法,只能在线上测试集群debug。

查看代码,发现使用了多进程。原来该服务为了防止出现内存泄漏,每当消费一个消息,就会生成一个子进程完成数据处理和模型训练的功能。而当复现bug之后,查看进程,发现只存在一个python进程。估计是多进程中的子进程挂掉了,没有被父进程正常回收,导致的容器卡死。

其中多进程使用multiprocessing.Pool实现,代码逻辑类似于:

import multiprocessing
import time
import os

def subprocesses(argv):
    print('sub-process id: 'os.getpid())
    time.sleep(10)  # 子进程处理中
    re = {"result": argv}
    return re

if __name__ == '__main__':
    print(os.getpid())
    pool = multiprocessing.Pool(processes=1)
    results = []
    msg = "test"
    sub_res = pool.map(subprocesses, (msg,))
    results.append(sub_res)
    pool.close()  # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用

读者可以使用上述代码进行测试,当子进程执行时,使用kill命令(windows使用taskkill命令)杀死子进程,会发现主程序卡死,永远不会执行完成。

所以问题出现在子进程执行过程中,子进程意外挂掉。

进一步查看日志发现,集群测试时,使用了很大的数据规模,并且给batch_size设置了很大的值,导致在处理数据或者模型训练时,内存远远超过集群对单个服务的限制,从而导致子进程被系统kill。

进一步测试

按照逻辑,子进程意外终止,那么父进程应该接收到信号。然而实际上,即使将子进程写入try/except中,也捕获不了,因为

The signals SIGKILL and SIGSTOP cannot be caught, blocked, or ignored.

所以即使子进程中增加异常捕获也是不起作用的,例如下面的写法:

def subprocesses(argv):
    print(os.getpid())
    try:
        time.sleep(10)
    except:
        print('Error')
    re = {"result": argv}
    return re

或者将map方法换成apply 、 apply_async、 map_async方法都是一样的结果,因为apply_async、map_async中的error_callback函数同样无法捕获kill信号。

例如:

import multiprocessing
import time
import os

def subprocesses(argv):
    print(os.getpid())
    time.sleep(10)
    re = {"result": argv}
    return re

def collect_result(result):
    print("Error")

if __name__ == '__main__':
    print(os.getpid())
    pool = multiprocessing.Pool(processes=1)
    results = []
    msg = "hello"
    sub_res = pool.map_async(subprocesses, (msg,), error_callback=collect_result)
    results.append(sub_res)
    pool.close()
    pool.join()  # 异步多线程需要加这一行,否则主进程会直接结束

上述代码使用map_async,但是子进程被kill的时候仍然触发不了error_callback回调函数

Pool源码

既然Pool类的几种多进程方法都没有办法解决这个问题,这说明是Pool这个类的实现不支持子进程意外挂掉之后通知主进程,说以先来看看这个Pool的源码是怎么实现的。

以下述代码为例,实例化进程池,并指定子进程数量为1:

pool = multiprocessing.Pool(processes=1)
sub_res = pool.apply_async(subprocesses, error_callback=collect_result)  #以apply_async为例,apply,map,map_async结构类似

第一行为Pool的实例化,在第一行代码执行之后,会生成三个队列_taskqueue, _inqueue, _outqueue,并生成一个_cache字典,之后生成指定数量的子进程加入到进程池中,最后生成三个线程_worker_handler, _task_handler, _result_handler

以下为精简后Pool类的init函数

self._pool = []
self._setup_queues()  # 在这里构造_inqueue, _outqueue队列
self._taskqueue = queue.SimpleQueue()  # _taskqueue队列
self._cache = _PoolCache(notifier=self._change_notifier)  # 构造_cache

try:  # 为进程池构造子进程
    self._repopulate_pool()
except Exception:
    for p in self._pool:
        if p.exitcode is None:
            p.terminate()
    for p in self._pool:
        p.join()
    raise

self._worker_handler = threading.Thread(  # _worker_handler线程
    target=Pool._handle_workers,
    args=(self._cache, self._taskqueue, self._ctx, self.Process,
          self._processes, self._pool, self._inqueue, self._outqueue,
          self._initializer, self._initargs, self._maxtasksperchild,
          self._wrap_exception, sentinels, self._change_notifier)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()

self._task_handler = threading.Thread(  # _task_handler线程
    target=Pool._handle_tasks,
    args=(self._taskqueue, self._quick_put, self._outqueue,
          self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()

self._result_handler = threading.Thread(  # _result_handler线程
    target=Pool._handle_results,
    args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()

在第二行代码中,apply_async会将要执行的函数和函数参数打包放入_taskqueue,子进程获取任务执行。

apply_async函数代码:

def apply_async(self, func, args=(), kwds={}, callback=None,
        error_callback=None):
    self._check_running()
    result = ApplyResult(self, callback, error_callback)  # 打包用户传入的任务函数,同时在_cache中添加对应记录
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))  # 任务函数等信息送入_taskqueue队列
    return result

其中

_taskqueue: 存放用户指定子进程去执行的函数以及函数参数信息;
_inqueue: 该队列与子进程交互,从_taskqueue取出的task,放到_inqueue,由子进程从队列中取出任务执行;
_outqueue: 该队列与子进程交互,保存子进程的结果;
_cache: 缓存字典,以job_id为key。

_worker_handler: 维护进程数量的子线程,例如指定进程池大小为4,任务数为10,一个子进程完成任务之后销毁,进程池中进程为3,此时由这个线程生成新的进程加入到进程池;
_task_handler: 从_taskqueue队列取任务到_inqueue队列的子线程;
_result_handler: 从_outqueue取出结果的子线程,并将_cache中对应的记录删除。

分析完类里面的主要变量,下面分析一下过程:

  1. 实例化Pool类时,生成上述主要变量,生成进程池;
  2. 在apply_async函数中,将用户传入的函数和函数变量打包为ApplyResult类,放入_taskqueue,并在_cache字典中添加这条任务的记录;
  3. _task_handler线程遍历_taskqueue队列,将任务放入_inqueue队列;
  4. 子进程执行任务,将结果放入_outqueue队列;
  5. _result_handler从_outqueue队列中取出子进程的运算结果,并删除_cache中对应的记录。
  6. _result_handler检查_cache,当_cache才会停止_result_handler线程

另外,Pool类的一些重要方法如下:

pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但pool.join()必须使用在pool.close()或者pool.terminate()之后。

close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。

因此,很显然,之前的问题是卡在了第5步:一旦某个子进程卡死,那么这个子进程将不会产生结果,则_result_handler就不会删除这个任务对应的_cache记录,_cache不空则_result_handler不会结束,这样整个进程池都不会结束。

看到这里,可以明白,使用进程池必须要能够获取到子进程的结果,而子进程被kill的时候不会产生结果,也不会通知进程池,因此这个问题对于使用进程池来实现多进程是无解的。

因此,当子进程任务比较复杂,存在被操作系统自动kill的可能,那么最好不要使用Pool

解决方法

上面的问题归根到底是通过Pool的方式实现多进程时,主进程不知道子进程是否已经挂掉,因此如果非要使用进程池来解决这个问题,可以从这里下手,例如下面这个不大优雅的实现:

import multiprocessing
import time
import os

def subprocesses(argv):
    print('sub-process id: 'os.getpid())
    time.sleep(10)  # 子进程处理中
    re = {"result": argv}
    return re

if __name__ == '__main__':
    print(os.getpid())
    pool = multiprocessing.Pool(processes=1)
    results = []
    msg = "test"
    sub_res = pool.map(subprocesses, (msg,))
    results.append(sub_res)
    pool.close()  # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
    
    while True:  # 主进程轮询子进程状态,代替pool.join()
        finish_flag = True  # Pool is finish(True) or not
        for sub_p in pool._pool:
            if sub_p.is_alive():
                finish_flag = False
                break
        if finish_flag:
            error_flag = False  # error(True) or not
            for sub_p in pool._pool:
                if sub_p.exitcode != 0:
                    print('Sub-processes exit error')
                    pool.terminate()
                    flag = True
                    break
            break

The exit code or exit status is a number returned by a process created by a command line indicating if there was an error or not.

  • 0 means that there is no error
  • an other value means that there was an error.

上面的代码用主进程轮询子进程状态,代替pool.join(),先通过进程的is_alive()方法判断进程是否存活,当进程池中所有进程都已经退出,则通过进程的exitcode变量判断是否是正常退出,若存在异常退出的子进程,则手动terminate进程池,确保不会卡死。

Process解决

直接不用Pool实现多进程,使用multiprocessing.Process实现,子进程被kill的时候会自动结束:

import multiprocessing
import time
import os

def subprocesses(argv):
    print('sub-process id: 'os.getpid())
    time.sleep(10)  # 子进程处理中
    re = {"result": argv}
    return re

if __name__ == '__main__':    process_list = []
    for i in range(2):
        p = multiprocessing.Process(target=subprocesses, args=('test_process',))
        p.start()
        process_list.append(p)

    for p in process_list:
        p.join()

  本篇
python多进程中的坑 python多进程中的坑
线上环境docker容器中使用python多进程开发时遇到另一个小坑,做一下记录。问题表现为docker容器卡死,不执行程序并且也不再消费消息队列中的任务。
2022-02-15
下一篇 
Don't stop pretraining Don't stop pretraining
如今很多NLP任务都会在Bert等预训练模型的基础上进行fine-tuning,可能很多人没有意识到在任务领域上进行新的预训练会进一步提高模型效果,下面这篇论文就从多个实践角度证明了这种方法的可行性。
2021-12-11
   目录