Executing MPI Tasks with RAPTOR

This notebook will walk you through setting up and using the RAPTOR subsystem to execute MPI function tasks. To execute MPI functions with RAPTOR, we need to specify the type and size of the worker to be deployed by RAPTOR. The primary purpose of using RAPTOR to execute MPI functions is RAPTOR’s capabilities to construct and deliver heterogeneous (different ranks) private MPI communicators during the execution time to the function. In the example below, we will execute an MPI function that requires 4 MPI ranks, and for that, we will deploy a single master and worker.

Warning: We assume you have already worked through our RAPTOR tutorial.

Prepare an RP pilot to host RAPTOR

We will launch a pilot with sufficient resources to run both the RAPTOR master (using 1 core) and a single worker instance (using 10 cores):

[1]:
%env RADICAL_REPORT=TRUE
# do not use animated output in notebooks
%env RADICAL_REPORT_ANIME=FALSE
env: RADICAL_REPORT=TRUE
env: RADICAL_REPORT_ANIME=FALSE
[2]:
import os

import radical.pilot as rp
import radical.utils as ru

# determine the path of the currently active ve to simplify some examples below
ve_path = os.path.dirname(os.path.dirname(ru.which('python3')))

# create session and managers
session = rp.Session()
pmgr    = rp.PilotManager(session)
tmgr    = rp.TaskManager(session)

# submit a pilot
pilot = pmgr.submit_pilots(rp.PilotDescription({'resource'     : 'local.localhost',
                                                'cores'        : 4,
                                                'runtime'      : 30,
                                                'exit_on_error': False}))

# add the pilot to the task manager and wait for the pilot to become active
tmgr.add_pilots(pilot)
pilot.wait(rp.PMGR_ACTIVE)
new session: [rp.session.f5e1b2f4-4693-11f0-930f-764078d63b85]                 \
zmq proxy  : [tcp://172.17.0.2:10001]                                         ok
create pilot manager                                                          ok
create task manager                                                           ok
submit 1 pilot(s)
        pilot.0000   local.localhost           4 cores       0 gpus           ok

[2]:
'PMGR_ACTIVE'

The pilot is now in an ACTIVE state, and the resource is acquired.

Prepare RAPTOR environment

RAPTOR mainly depends on mpi4py to launch its masters and workers. Thus, to use RAPTOR, it is required to have mpi4py installed in the used Python/conda environment it is running in.. RP offers two options: creating a Python/conda environment with mpi4py installed in it, or using an existing environment by specifying the path of the environment. In this tutorial, we assume that the environment in which radical.pilot is installed also has mpi4py installed, and we reference that as 'named_env': 'rp' later in the task description.

If one would want to create a different named env during runtime, the following method would instruct the pilot to do so:

pilot.prepare_env(env_name='raptor_ve',
                  env_spec={'type' : 'venv',
                            'path' : '/tmp/raptor_ve',
                            'setup': ['radical.pilot',
                                      'mpi4py']})
print('raptor_ve is ready')

That environment would then be referenced by 'named_env': 'raptor_ve'.

For us, the Python virtual environment is already prepared and the pilot can launch the masters/workers and executes the tasks.

Create a master and MPI worker by specifiying the raptor_class: MPIWorker in the worker description below. This value will instruct RAPTOR to start the MPI worker rather than the DefaultWorker. Note that we also specified the number of ranks (cores) in the worker description to a number larger than the required number of ranks for the designated MPI task function in order for the function to be executed on that worker.

Warning: The number of master(s) and worker(s) depends on the workload specifications and the use case that you are trying to execute. Sometimes, you might be required to deploy two masters and workers instead of 1 for more efficient load balancing.

[3]:
raptor_descr = {'mode'        : rp.RAPTOR_MASTER,
                'named_env'   : 'rp'}
worker_descr = {'mode'        : rp.RAPTOR_WORKER,
                'ranks'       : 2,
                'named_env'   : 'rp',
                'raptor_class': 'MPIWorker'}

raptor = pilot.submit_raptors([rp.TaskDescription(raptor_descr)])[0]
raptor.submit_workers([rp.TaskDescription(worker_descr),
                       rp.TaskDescription(worker_descr)])
submit: ########################################################################
submit: ########################################################################

[3]:
[<radical.pilot.raptor_tasks.RaptorWorker at 0x7e48a2a5d580>,
 <radical.pilot.raptor_tasks.RaptorWorker at 0x7e488c7aa7f0>]

Define a Python function that requires running in an MPI environment and use the rp.pythontask decorator so the function can be serialized by RP. Note that this function takes a comm argument, representing the MPI communicator that this function needs.

[4]:
@rp.pythontask
def func_mpi(comm, msg, sleep=2):
    import time
    print('hello %d/%d: %s' % (comm.rank, comm.size, msg))
    time.sleep(sleep)
    return 'func_mpi retval'

Create a corresponding rp.TaskDescription to func_mpi and specify the function object and the number of ranks required to run the function within (the number of ranks also represents the size of the comm object passed to the function during the execution time)

[5]:
# create a minimal MPI function task
td = rp.TaskDescription({'mode'    : rp.TASK_FUNCTION,
                         'ranks'   : 2,
                         'function': func_mpi(None, msg='mpi_task.0')})
mpi_task = raptor.submit_tasks([td])[0]
submit: ########################################################################

Wait for the task status to be reported back.

[6]:
tmgr.wait_tasks([mpi_task.uid])
print('id: %s [%s]:\n    out: %s\n    ret: %s\n'
     % (mpi_task.uid, mpi_task.state, mpi_task.stdout, mpi_task.return_value))
wait  : ########################################################################
     DONE      :     1
                                                                              ok

id: task.000000 [DONE]:
    out: ['hello 0/2: mpi_task.0\n', 'hello 1/2: mpi_task.0\n']
    ret: ['func_mpi retval', 'func_mpi retval']

[7]:
session.close()
closing session rp.session.f5e1b2f4-4693-11f0-930f-764078d63b85                \
close task manager                                                            ok
close pilot manager                                                            \
wait for 1 pilot(s)
                                                                              ok
                                                                              ok
session lifetime: 25.5s                                                       ok