mindspore.multiprocessing

介绍

mindspore.multiprocessing提供了创建多进程的能力,其内部实现继承自Python原生的multiprocessing模块,并通过对部分接口进行重载,确保以fork方式创建多进程时MindSpore框架的正常使用。

当使用mindspore.multiprocessing并以fork方式创建多进程时,框架内部会对线程、锁等资源进行清理和重置,以保障框架功能正常。

  • 在fork发生时,父进程在fork前会先等待内部线程任务执行完毕,并在当前线程主动持有GIL锁,以避免fork后的子进程无法持有GIL锁。

  • 在fork发生后,父进程会释放主动持有的GIL锁。

  • 在fork发生后,子进程会释放主动持有的GIL锁,然后对框架内部的线程等资源进行恢复和清理,并将后端重置为 CPU

当不使用fork方式创建多进程时,上述流程不会被触发。此时使用mindspore.multiprocessing创建多进程的执行流程和使用Python原生的multiprocessing模块的流程完全一致。

由于mindspore.multiprocessing继承自Python原生的multiprocessing模块,其接口用法和原生模块完全兼容,用户可以直接把 import multiprocessing 修改为 import mindspore.multiprocessing 。因此,此处不对原生接口进行重复描述。接口的详细介绍和使用方式请参考multiprocessing

当前仅支持后端为 CPU 的Tensor在进程间共享。

使用说明

一个使用mindspore.multiprocessing创建多进程的样例如下:

from mindspore import Tensor, ops
import mindspore.multiprocessing as multiprocessing

def child_process():
    y = ops.log(Tensor(2.0))
    print("ops.log(Tensor(2.0))=", y)

if __name__ == '__main__':
    p = multiprocessing.Process(target=child_process)
    p.start()
    p.join()

运行结果如下:

ops.log(Tensor(2.0))= 0.6931472

一个使用mindspore.multiprocessing创建进程池的样例如下:

from mindspore import Tensor, ops
import mindspore.multiprocessing as multiprocessing

def child_process(x):
    return ops.log(x)

if __name__ == '__main__':
    with multiprocessing.Pool(processes=2) as pool:
        inputs = [Tensor(2.0), Tensor(2.0)]
        outputs = pool.map(child_process, inputs)
        print("ops.log(Tensor(2.0))=", outputs)

运行结果如下:

ops.log(Tensor(2.0))= [Tensor(shape=[], dtype=Float32, value= 0.693147), Tensor(shape=[], dtype=Float32, value= 0.693147)]

当用户在执行计算任务后,再使用fork方式创建多进程时,若使用的模块不是mindspore.multiprocessing,可能会出现框架线程丢失导致的子进程卡住的问题。改为使用mindspore.multiprocessing模块的fork方式创建多进程,可以解决该问题。

仅在POSIX系统下(如Linux和macOS)支持fork方式创建子进程,Windows下不支持该方式

from mindspore import Tensor, ops
# Child process may be stuck when using 'import multiprocessing' .
import mindspore.multiprocessing as multiprocessing

def child_process(q):
    y = ops.log(Tensor(2.0))
    q.put(y)
    return

if __name__ == '__main__':
    multiprocessing.set_start_method("fork", force=True)
    print("parent process:ops.log(Tensor(2.0))=", ops.log(Tensor(2.0)))
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=child_process, args=(q,))
    p.start()
    print("child process:ops.log(Tensor(2.0))=", q.get())
    p.join()

运行结果如下:

parent process:ops.log(Tensor(2.0))= 0.6931472
child process:ops.log(Tensor(2.0))= 0.6931472

当后端为 Ascend 时,进程会独占卡资源。若用户在执行计算任务后创建子进程,且子进程使用了父进程的资源执行另一个计算任务,可能会因资源无法访问而报错。修改子进程创建方式为fork后,框架会将子进程的后端重置为 CPU ,以避免资源冲突。

from mindspore import Tensor, ops, context
import mindspore.multiprocessing as multiprocessing

def child_process(q):
    y = ops.log(Tensor(2.0))
    q.put(y)
    return

if __name__ == '__main__':
    context.set_context(device_target="Ascend")
    # Child process may not be able to acquire resources when start_method is set to 'spawn' .
    multiprocessing.set_start_method("fork", force=True)
    print("parent process:ops.log(Tensor(2.0))=", ops.log(Tensor(2.0)))
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=child_process, args=(q,))
    p.start()
    print("child process:ops.log(Tensor(2.0))=", q.get())
    p.join()

运行结果如下:

parent process:ops.log(Tensor(2.0))= 0.6931472
child process:ops.log(Tensor(2.0))= 0.6931472

也可以将创建多进程的动作放在执行任何计算任务的前面,并在子进程里手动修改后端。

from mindspore import Tensor, ops, context
import mindspore.multiprocessing as multiprocessing

def child_process(q):
    # Child process may not be able to acquire resources when using the same resources as parent process.
    context.set_context(device_target="CPU")
    y = ops.log(Tensor(2.0))
    q.put(y)
    return

if __name__ == '__main__':
    context.set_context(device_target="Ascend")
    multiprocessing.set_start_method("spawn", force=True)
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=child_process, args=(q,))
    p.start()
    print("child process:ops.log(Tensor(2.0))=", q.get())
    p.join()
    # Child process may not be able to acquire resources when this compute task is executed before the child process is created.
    print("parent process:ops.log(Tensor(2.0))=", ops.log(Tensor(2.0)))

运行结果如下:

child process:ops.log(Tensor(2.0))= 0.6931472
parent process:ops.log(Tensor(2.0))= 0.6931472