Executing MPI Tasks with RAPTOR

This notebook will walk you through setting up and using the RAPTOR subsystem to execute MPI function tasks. To execute MPI functions with RAPTOR, we need to specify the type and size of the worker to be deployed by RAPTOR. The primary purpose of using RAPTOR to execute MPI functions is RAPTOR’s capabilities to construct and deliver heterogeneous (different ranks) private MPI communicators during the execution time to the function. In the example below, we will execute an MPI function that requires 4 MPI ranks, and for that, we will deploy a single master and worker.

Warning: We assume you have already worked through our RAPTOR tutorial.

Prepare an RP pilot to host RAPTOR

We will launch a pilot with sufficient resources to run both the RAPTOR master (using 1 core) and a single worker instance (using 10 cores):

import os

# do not use animated output in notebooks
os.environ['RADICAL_REPORT_ANIME'] = 'False'

import radical.pilot as rp
import radical.utils as ru

# determine the path of the currently active ve to simplify some examples below
ve_path = os.path.dirname(os.path.dirname(ru.which('python3')))

# create session and managers
session = rp.Session()
pmgr    = rp.PilotManager(session)
tmgr    = rp.TaskManager(session)

# submit a pilot
pilot = pmgr.submit_pilots(rp.PilotDescription({'resource'     : 'local.localhost',
                                                'runtime'      : 60,
                                                'cores'        : 17,
                                                'exit_on_error': False}))

# add the pilot to the task manager and wait for the pilot to become active
print('pilot is up and running')
pilot is up and running

The pilot is now in an ACTIVE state, and the resource is acquired.

Prepare RAPTOR environment

RAPTOR mainly depends on mpi4py to launch its masters and workers. Thus, to use RAPTOR, it is required to have mpi4py installed in the used Python/conda environment. RP offers two options: creating a Python/conda environment with mpi4py installed in it, or using an existing environment by specifying the path of the environment. In this tutorial, we will instruct RP to create a new Python virtual environment named raptor_ve:

                  env_spec={'type' : 'venv',
                            'path' : '/tmp/raptor_ve',
                            'setup': ['radical.pilot',
print('raptor_ve is ready')
raptor_ve is ready

The Python virtual environment is now ready, and the pilot can launch the masters/workers and executes the tasks.

Create a master and MPI worker by specifiying the raptor_class: MPIWorker in the worker description below. This value will instruct RAPTOR to start the MPI worker rather than the DefaultWorker. Note that we also specified the number of ranks (cores) in the worker description to a number larger than the required number of ranks for the designated MPI task function in order for the function to be executed on that worker.

Warning: The number of master(s) and worker(s) depends on the workload specifications and the use case that you are trying to execute. Sometimes, you might be required to deploy two masters and workers instead of 1 for more efficient load balancing.

master_descr = {'mode': rp.RAPTOR_MASTER,
                'named_env': 'raptor_ve'}
worker_descr = {'mode': rp.RAPTOR_WORKER,
                'ranks': 10,
                'named_env': 'raptor_ve',
                'raptor_class': 'MPIWorker'}

raptor = pilot.submit_raptors( [rp.TaskDescription(master_descr)])[0]
worker = raptor.submit_workers([rp.TaskDescription(worker_descr),

Define a Python function that requires running in an MPI environment and use the rp.pythontask decorator so the function can be serialized by RP. Note that this function takes a comm argument, representing the MPI communicator that this function needs.

def func_mpi(comm, msg, sleep=2):
    import time
    print('hello %d/%d: %s' % (comm.rank, comm.size, msg))
    return 'func_mpi retval'

Create a corresponding rp.TaskDescription to func_mpi and specify the function object and the number of ranks required to run the function within (the number of ranks also represents the size of the comm object passed to the function during the execution time)

# create a minimal MPI function task
td = rp.TaskDescription({'mode': rp.TASK_FUNCTION,
                         'ranks': 4,
                         'function': func_mpi(None, msg='mpi_task.0')})
mpi_task = raptor.submit_tasks([td])[0]

Wait for the task status to be reported back.

print('id: %s [%s]:\n    out:\n%s\n    ret: %s\n'
     % (mpi_task.uid, mpi_task.state, mpi_task.stdout, mpi_task.return_value))
['task.000000', '', 'AGENT_STAGING_INPUT_PENDING']
id: task.000000 [DONE]:
['hello 1/4: mpi_task.0\n', 'hello 0/4: mpi_task.0\n', 'hello 3/4: mpi_task.0\n', 'hello 2/4: mpi_task.0\n']
    ret: ['func_mpi retval', 'func_mpi retval', 'func_mpi retval', 'func_mpi retval']