RADICAL-Pilot

Pypi Package License Documentation Status

RADICAL-Pilot (RP) is a pilot system written in Python and specialized in executing applications composed of many heterogeneous computational tasks on high performance computing (HPC) platforms. As a Pilot system, RP separates resource acquisition from using those resources to execute application tasks. Resources are acquired by submitting a job to the batch system of an HPC machine. Once the job is scheduled on the requested resources, RP can directly schedule and launch application tasks on those resources. Thus, tasks are not scheduled via the batch system of the HPC platform, but directly on the acquired resources with the maximum degree of concurrency they afford. See our Brief Introduction to RADICAL-Pilot to see how RP works on an HPC platform.

RP offers unique features when compared to other pilot systems: (1) concurrent and sequential execution of heterogeneous tasks on one or more pilots, e.g., single/multi-core, single/multi-GPU, MPI/OpenMP; (2) describing executable tasks and Python function tasks; (3) support of all the major HPC batch systems, e.g., slurm, torque, pbs, lsf, etc.; (4) support of more than 16 methods to launch tasks, e.g., ssh, mpirun, aprun, jsrun, prrte, etc.; and (5) a general purpose distributed architecture.

Getting Started

This notebook walks you through executing a hello_world application written with RADICAL-Pilot (RP) and locally executed on a GNU/Linux operating system. The application consists of a Bag of Tasks with heterogeneous requirements: different number of CPU cores/GPUs and different execution time for each task. In this simple application, tasks have no data requirements but see data staging for how to manage data in RP.

Warning: We assume you understand what a pilot is and how it enables to concurrently and sequentially execute compute tasks on its resources. See our Brief Introduction to RP video to familiarize yourself with the architectural concepts and execution model of a pilot.

Installation

Warning: RP must be installed in a Python environment. RP will not work properly when installed as a system-wide package. You must create and activate a virtual environment before installing RP.

You can create a Python environment suitable to RP using Virtualenv, Venv or Conda. Once created and activated a virtual environment, RP is a Python module installed via pip, Conda, or Spack.

Note: Please see using virtual environments with RP for more options and detailed information. That will be especially useful when executing RP on supported high performance computing (HPC) platforms.

Virtualenv

virtualenv ~/.ve/radical-pilot
. ~/.ve/radical-pilot/bin/activate
pip install radical.pilot

Venv

python -m venv ~/.ve/radical-pilot
. ~/.ve/radical-pilot/bin/activate
pip install radical.pilot

Conda

If there is no conda pre-installed, here is a distilled set of commands to install Miniconda on a GNU/Linux x86_64 OS. Find more (and possibly updated) information on the official Conda documentation)

wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ./miniconda.sh
chmod +x ./miniconda.sh
./miniconda.sh -b -p ./conda
source ./conda/bin/activate

Once Conda is available:

conda create -y -n radical-pilot
conda activate radical-pilot
conda install -y -c conda-forge radical.pilot

Spack

If there is no spack pre-installed, here is a distilled set of commands to install Spack on a GNU/Linux x86_64 OS. Find more (and possibly updated) information on the official Spack documentation.

git clone https://github.com/spack/spack.git
. spack/share/spack/setup-env.sh

Once Spack is available:

spack env create ve.rp
spack env activate ve.rp
spack install py-radical-pilot

Note: Simplified stack for local usage: When using RP on a local machine or cluster there is no remote file staging or job submission. In that situation, RP can fall back to a simplified software stack. If this is the case, run pip install psij-python (or equivalents for conda or spack), and RP will transparently switch to PSIJ for local job submission.

Check the installed version

Often, we need to know what version of RP we installed. For example, you will need to know that when opening a support ticket with the RADICAL development team.

We install a command with RP that prints information for all the installed RADICAL Cybertools:

[1]:
!radical-stack

  python               : /home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/envs/devel/bin/python3
  pythonpath           :
  version              : 3.7.17
  virtualenv           :

  radical.gtod         : 1.52.0
  radical.pilot        : v1.52.1-18-g270fc0c@HEAD-detached-at-origin-devel
  radical.utils        : 1.52.0

Write your first application

RP executes in batch mode:

  • Write an application using RP API.

  • Launch that application.

  • Wait a variable amount of time doing something else.

  • When the application exits, come back, collect and check the results.

Each RP application has a distinctive pattern:

  1. Create a session

  2. Create a pilot manager

  3. Describe the pilot on which you want to run your application tasks:

    • Define the platform on which you want to execute the application

    • Define the amount/type of resources you want to use to run your application tasks.

  4. Assign the pilot description to the pilot manager

  5. Create a task manager

  6. Describe the computational tasks that you want to execute:

    • Executable launched by the task

    • Arguments to pass to the executable command if any

    • Amount of each type of resource used by the executable, e.g., CPU cores and GPUs

    • When the executable is MPI/OpenMP, number of ranks for the whole executable, number of ranks per core or GPU

    • Many other parameters. See the API specification for full details

  7. Assign the task descriptions to the task manager

  8. Submit tasks for execution

  9. Wait for tasks to complete execution

Some of RP behavior can be configured via environment variables. RP’s progression bar does not work properly with Jupyter notebooks. Thus, you may want to set it to FALSE.

[2]:
%env RADICAL_REPORT_ANIME=FALSE
env: RADICAL_REPORT_ANIME=FALSE

As with every Python application, first you import all the required modules.

[3]:
import radical.pilot as rp

Enable user feedback

As RP implements a batch programming model, by default, it returns a minimal amount of information. After submitting the tasks for execution, RP will remain silent until all the tasks have completed. In practice, when developing and debugging your application, you will want more feedback. We wrote a reporter module that you can use with RP and all the other RADICAL-Cybertools.

To use the reporter:

  • Configure RP by exporting a shell environment variable.

    export RADICAL_PILOT_REPORT=True
    
  • Import radical.utils, create a reporter and start to use it to print meaningful messages about the state of the application execution.

Note: See our tutorial about Profiling a RADICAL-Pilot Application for a guide on how to trace and profile your application.

Note: See our tutorial about Debugging a RADICAL-Pilot Application for a guide on how to debug your application.

[4]:
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

================================================================================
 Getting Started (RP version v1.52.1)
================================================================================


Creating a session

radical.pilot.Session is the root object of all the other objects of RP.

[5]:
session = rp.Session()

Creating a pilot manager

You need to manage the resources you will acquire with a pilot either locally or, more commonly and usefully, on a supported HPC platform. An instance of radical.pilot.PilotManager attached to your session will do that for you.

Note: One radical.pilot.PilotManager can manage multiple pilots. See our tutorial about Using Multiple Pilots with RADICAL-Pilot to see why and how

[6]:
pmgr = rp.PilotManager(session=session)

Configuring pilot resources

You can use a dictionary to specify location, amount and other properties of the resources you want to acquire with a pilot; and use that dictionary to initialize a radical.pilot.PilotDescription object. See the radical.pilot.TaskDescription API for a full list of properties you can specify for each pilot.

In this example, we want to run our hello_world application on our local GNU/Linux, for not more than 30 minutes and use 2 cores.

Warning: We choose a 30 minutes runtime, but the application could take less or more time to complete. 30 minutes are the upper bound but RP will exit as soon as all the task have reached a final state (DONE, CANCELED, FAILED). Conversely, RP will always exit once the runtime expires, even if some tasks still need to be executed.

Note: We could choose to use as many CPU cores as we have available on our local machine. RP will allocate all of them, but it will use only the cores required by the application tasks. If all the tasks together require fewer cores than those available, the remaining cores will go unused. Conversely, if there are more tasks that cores, RP will schedule each task as soon as the required amount of cores becomes available. In this way, RP will maintain the available resources as busy as possible and the application tasks will run both concurrently and sequentially, depending on resource availability.

Note: 'exit_on_error': False allows us to compile this notebook without errors. You should probably not use it with a stand-alone RP application.

[7]:
pd_init = {'resource'     : 'local.localhost',
           'runtime'      : 30,  # pilot runtime minutes
           'exit_on_error': True,
           'project'      : None,
           'queue'        : None,
           'cores'        : 4,
           'gpus'         : 0,
           'exit_on_error': False}
pdesc = rp.PilotDescription(pd_init)

Submitting the pilot

We now have a pilot manager, we know how many resources we want and on what platform. We are ready to submit our request!

Note: On a local machine, RP acquires the requested resources as soon as we submit the pilot. On a supported HPC platform, our request will queue a job into the platform’s batch system. The actual resources will become available only when the batch system schedules the job. This is not under the control of RP and, barring reservation, the actual queue time will be unknown.

We use the submit_pilots method of our pilot manager and pass it the pilot description.

[8]:
report.header('submit pilot')
pilot = pmgr.submit_pilots(pdesc)

--------------------------------------------------------------------------------
submit pilot


Creating a task manager

We have acquired the resources we asked for (or we are waiting in a queue to get them) so now we need to do something with those resources, i.e., executing our application tasks :-) First, we create a radical.pilot.TaskManager and associate it to our session. That manager will take care of taking our task descriptions and sending them to our pilot so that it can execute those tasks on the allocated resources.

[9]:
tmgr = rp.TaskManager(session=session)

Registering the pilot with the task manager

We tell the task manager what pilot it should use to execute its tasks.

[10]:
tmgr.add_pilots(pilot)

Describing the application tasks

In this example, we want to run simple tasks but that require different number of CPU cores and that run for a variable amount of time. Thus, we use the executable radical-pilot-hello.sh we crafted to occupy a configurable amount of resources for a configurable amount of time.

Each task is an instance of radical.pilot.TaskDescription with some properties defined (for a complete list of task properties see the TaskDescription API:

  • executable: the name of the executable we want to launch with the task

  • arguments: the arguments to pass to executable. In this case, the number of seconds it needs to run for.

  • ranks: this is the number of nodes on which the task should run. Here it is set to 1 as we are running all our tasks on our local computer. See Describing tasks in RADICAL-Pilot and the details about executing tasks that use the message passing interface (MPI).

  • cores_per_rank: the amount of cores that each (rank of the) task utilizes. In our case, each task will randomly use either 1 or 2 cores or the 4 we have requested.

Warning: Executing MPI tasks (i.e., one with radical.pilot.TaskDescription.ranks > 1) requires for an MPI implementation to be available on the machine on which you will run the task. That is usually taken care of by the system administrator but if you are managing your own cluster, you will have to install and make available one of the many MPI distributions available for GNU/Linux.

We run 10 tasks that should be enough to see both concurrent and sequential executions on the amount of resources we requested, but not enough to clog the example.

Note: We use the reporter to produce a progress bar while we loop over the task descriptions

[11]:
import os
import random

n = 10

report.progress_tgt(n, label='create')
tds = list()
for i in range(n):

    td = rp.TaskDescription()
    td.executable     = 'radical-pilot-hello.sh'
    td.arguments      = [random.randint(1, 10)]
    td.ranks          =  1
    td.cores_per_rank =  random.randint(1, 2)

    tds.append(td)
    report.progress()

report.progress_done()
create: ########################################################################

Submitting tasks for execution

Now that we have all the elements of the application we can execute its tasks. We submit the list of application tasks to the task manager that, in turn, will submit them to the indicated pilot for execution. Upon receiving the list of task descriptions, the pilot will schedule those tasks on its available resources and then execute them.

Note: For RP, tasks are black boxes, i.e., it knows nothing about the code executed by the task. RP just knows that a task has been launched on the requested amount of resources, and it will wait until the tasks exits. In that way, RP is agnostic towards task details like language used for its implementation, the type of scientific computation it performs, how it uses data, etc. This is why RP can serve a wide range of scientists, independent on their scientific domain.

[12]:
report.header('submit %d tasks' % n)
tmgr.submit_tasks(tds)

--------------------------------------------------------------------------------
submit 10 tasks


[12]:
[<Task object, uid task.000000>,
 <Task object, uid task.000001>,
 <Task object, uid task.000002>,
 <Task object, uid task.000003>,
 <Task object, uid task.000004>,
 <Task object, uid task.000005>,
 <Task object, uid task.000006>,
 <Task object, uid task.000007>,
 <Task object, uid task.000008>,
 <Task object, uid task.000009>]

Waiting for the tasks to complete

Wait for all tasks to reach a final state (DONE, CANCELED or FAILED). This is a blocking call, i.e., the application will wait without exiting and, thus, the shell from which you launched the application should not exit either. Thus, no closing your laptop or no exiting from a remote connection without first leaving the shell running in background or using a terminal multiplexer like tmux.

Note: After the wait call returns, you can describe and/or submit more tasks/pilots as your RP session will still be open.

Note: You can wait for the execution of a subset of the tasks you defined. See Describing tasks in RADICAL-Pilot for more information.

[13]:
tmgr.wait_tasks()
[13]:
['DONE',
 'DONE',
 'DONE',
 'DONE',
 'DONE',
 'DONE',
 'DONE',
 'DONE',
 'DONE',
 'DONE']

Once the wait is finished, let us know and exit!

[14]:
report.header('finalize')
session.close(cleanup=True)

--------------------------------------------------------------------------------
finalize


Generated Output

RP is a distributed system, even when all its components run on a single machine as with this example. RP has two main components (Client and Agent) and both store their output into a sandbox, stored at a specific filesystem location:

  • Client sandbox: A directory created within the working directory from where the RP application was launched. The sandbox is named after the session ID, e.g., rp.session.nodename.username.018952.0000.

  • Agent sandbox: A directory created at a different location, depending on the machine on which the application executes. The Agent sandbox, named radical.pilot.sandbox, contains the following nested directories: <session_sandbox_ID>/<pilot_sandbox_ID>/<task_sandbox_ID>.

When running RP locally, the Agent sandbox is located at $HOME/radical.pilot.sandbox. When using a supported HPC platform, the location of the Agent sandbox depends on the filesystem capabilities of the platform. You can see the pre-configured location for the Agent sandbox in the RP git repository, at src/radical/pilot/configs/resource_*.json.

Warning: When executing RP on a supported HPC platform, the output file(s) of each task are saved in the task_sanbox_ID of that task. Without specific staging instructions (see our tutorial Staging Data with RADICAL-Pilot), you will have to manually retrieve those files. When executing locally, you can retrieve them from $HOME/radical.pilot.sandbox/<session_sandbox_ID>/<pilot_sandbox_ID>/<task_sandbox_ID>.

Note: When enabling debugging (see our tutorial Debugging a RADICAL-Pilot Application) and/or tracing (see our tutorial Tracing and Profiling a RADICAL-Pilot Application), RP writes the debug and/or trace files in the Client and Agent sandbox. On large/production runs, RP can produce hundreds of debug files. Please contact the RADICAL development team if you need further assistance.

Here are the output files of the task.000000 of the application we just executed in this notebook:

[15]:
! ls $HOME/radical.pilot.sandbox/rp.session.*/pilot*/task.000000/
task.000000.err      task.000000.launch.out  task.000000.out
task.000000.exec.sh  task.000000.launch.sh   task.000000.prof

Here is the “result” produced by task.000000:

[16]:
! cat `ls $HOME/radical.pilot.sandbox/rp.session.*/pilot*/task.000000/task.000000.out`
0 : PID     : 3621
0 : NODE    : build-24250560-project-13481-radicalpilot
0 : CPUS    : 00
0 : GPUS    :
0 : RANK    : 0
0 : THREADS : 1
0 : SLEEP   : 8

Tutorials

This set of tutorials explore some of RADICAL-Pilot capabilities. Each tutorial focuses on a specific topic, covering simple and more advanced details. Before dwelling into these tutorials, you should be comfortable with our getting started example application.

Tutorials can be run via our self-contained Docker container or independently. To run the tutorials in our Docker container:

  1. clone the tutorials repository:

git clone git@github.com:radical-cybertools/tutorials.git
  1. Follow the instructions in the README.md, choosing method A or B.

  2. After following the instructions, you will be given a URI to cut and paste in your browser to access to the Jupyter Notebook server that is running in the container.

  3. Load and execute each tutorial in the Jupyter Notebook server on your browser.

  4. Once finished, stop all the containers you started to execute the tutorial.

Configuration System

RADICAL-Pilot (RP) uses a configuration system to set control and management parameters for the initialization of its components and to define resource entry points for the target platform.

It includes:

  • Run description

    • Resource label for a target platform configuration file;

    • Project allocation name (i.e., account/project, specific for HPC platforms);

    • Job queue name (i.e., queue/partition, specific for HPC platforms);

    • Amount of the resources (e.g., cores, gpus, memory) to allocate for the runtime period;

    • Mode to access the target platform (e.g., local, ssh, batch/interactive).

  • Target platform description

    • Batch system (e.g., SLURM, LSF, etc.);

    • Provided launch methods (e.g., SRUN, MPIRUN, etc.);

    • Environment setup (including package manager, working directory, etc.);

    • Entry points: batch system URL, file system URL.

Run description

Users have to describe at least one pilot in each RP application. That is done by instantiating a radical.pilot.PilotDescription object. Among that object’s attributes, resource is mandatory and is referred as a resource label (or platform ID), which corresponds to a target platform configuration file (see the section Platform description). Users need to know what ID corresponds to the HPC platform on which they want to execute their RP application.

Allocation parameters

Every run should state the project name (i.e., allocation account), preferable queue for a job submission, and the amount of required resources explicitly, unless it is a local run without accessing any batch system.

import radical.pilot as rp

pd = rp.PilotDescription({
    'resource': 'ornl.frontier',  # platform ID
    'project' : 'XYZ000',         # allocation account
    'queue'   : 'debug',          # optional (default value might be set in the platform description)
    'cores'   : 32,               # amount of CPU slots
    'gpus'    : 8,                # amount of GPU slots
    'runtime' : 15                # maximum runtime for a pilot (in minutes)
})
Resource access schema

Resource access schema (pd.access_schema) is provided as part of a platform description, and in case of more than one access schemas users can set a specific one in radical.pilot.PilotDescription. Check schema availability per target platform:

  • local: launch RP application from the target platform (e.g., login nodes of the specific machine).

  • ssh: launch RP application outside the target platform and use ssh protocol and corresponding SSH client to access the platform remotely.

  • gsissh: launch RP application outside the target platform and use GSI-enabled SSH to access the platform remotely.

  • interactive: launch RP application from the target platform within the interactive session after being placed on allocated resources (e.g., batch or compute nodes).

  • batch: launch RP application by a submitted batch script at the target platform.

Note: For details on submission of applications on HPC see the tutorial Using RADICAL-Pilot on HPC Platforms.

Platform description

The RADICAL-Pilot uses configuration files for bookkeeping of supported platforms. Each configuration file identifies a facility (e.g., ACCESS, TACC, ORNL, ANL, etc.), is written in JSON and is named following the resource_<facility_name>.json convention. Each facility configuration file contains a set of platform names/labels with corresponding configuration parameters. Resource label (or platform ID) follows the <facility_name>.<platform_name> convention, and users use it for the resource attribute of their radical.pilot.PilotDescription object.

Predefined configurations

The RADICAL-Pilot development team maintains a growing set of pre-defined configuration files for supported HPC platforms (list platform descriptions in RP’s GitHub repo).

For example, if users want to execute their RP application on Frontera, they will have to search for the resource_tacc.json file and, inside that file, for the key(s) that start with the name frontera. The file resource_tacc.json contains the keys frontera, frontera_rtx, and frontera_prte. Each key identifies a specific set of configuration parameters: frontera offers a general-purpose set of configuration parameters; frontera_rtx enables the use of the rtx queue for GPU nodes; and frontera_prte enables the use of the PRTE-based launch method to execute the application’s tasks. Thus, for Frontera, the value for resource will be tacc.frontera, tacc.frontera_rtx or tacc.frontera_prte.

Customizing a predefined configuration

Users can customize existing platform configuration files by overwriting existing key/value pairs with ones from configuration files, which have the same names, but located in a user space. Default location of user-defined configuration files is $HOME/.radical/pilot/configs/.

Note: To change the location for user-defined platform configuration files, please, use env variable RADICAL_CONFIG_USER_DIR, which will be used instead of env variable HOME in the location path above. Make sure that the corresponding path exists, before creating configs there.

Two examples of customized configurations are below: (i) in one for ornl.summit you change parameter system_architecture.options, and (ii) in another for tacc.frontera you set a default launch method MPIEXEC. With that files, every pilot description using 'resource': 'ornl.summit' or 'resource': 'tacc.frontera' would use that new values. Changed parameters are described in the following section.

resource_ornl.json

{
    "summit": {
        "system_architecture": {
            "options": ["gpumps", "gpudefault"]
        }
    }
}

resource_tacc.json

{
    "frontera": {
        "launch_methods": {
            "order"  : ["MPIEXEC"],
            "MPIEXEC": {}
        }
    }
}
User-defined configuration

Users can write whole new configuration for an existing or a new platform with arbitrary platform ID. For example, you will create a custom platform configuration entry resource_tacc.json locally. That file will be loaded into the RP’s radical.pilot.Session object alongside with other configurations for TACC-related platforms.

[1]:
resource_tacc_tutorial = \
{
    "frontera_tutorial":
    {
        "description"                 : "Short description of the resource",
        "notes"                       : "Notes about resource usage",

        "default_schema"              : "local",
        "schemas"                     : {
            "local"                   : {
                "job_manager_endpoint": "slurm://frontera.tacc.utexas.edu/",
                "filesystem_endpoint" : "file://frontera.tacc.utexas.edu/"
            },
            "ssh"                     : {
                "job_manager_endpoint": "slurm+ssh://frontera.tacc.utexas.edu/",
                "filesystem_endpoint" : "sftp://frontera.tacc.utexas.edu/"
            },
            "batch"                   : {
                "job_manager_endpoint": "fork://localhost/",
                "filesystem_endpoint" : "file://localhost/"
            },
            "interactive"             : {
                "job_manager_endpoint": "fork://localhost/",
                "filesystem_endpoint" : "file://localhost/"
            },
        },

        "default_queue"               : "production",
        "resource_manager"            : "SLURM",

        "cores_per_node"              : 56,
        "gpus_per_node"               : 0,
        "system_architecture"         : {
                                         "smt"           : 1,
                                         "options"       : ["nvme", "intel"],
                                         "blocked_cores" : [],
                                         "blocked_gpus"  : []
                                        },

        "agent_config"                : "default",
        "agent_scheduler"             : "CONTINUOUS",
        "agent_spawner"               : "POPEN",
        "default_remote_workdir"      : "$HOME",

        "pre_bootstrap_0"             : [
                                        "module unload intel impi",
                                        "module load   intel impi",
                                        "module load   python3/3.9.2"
                                        ],
        "launch_methods"              : {
                                         "order"  : ["MPIRUN"],
                                         "MPIRUN" : {
                                             "pre_exec_cached": [
                                                 "module load TACC"
                                             ]
                                         }
                                        },

        "python_dist"                 : "default",
        "virtenv_mode"                : "local"
    }
}

The definition of each field:

  • description (optional) - human-readable description of the platform.

  • notes (optional) - information needed to form valid pilot descriptions, such as what parameters are required, etc.

  • schemas - allowed values for the pd.access_schema attribute of the pilot description. The first schema in the list is used by default. For each schema, a subsection is needed, which specifies job_manager_endpoint and filesystem_endpoint.

  • job_manager_endpoint - access URL for pilot submission (interpreted by RADICAL-SAGA).

  • filesystem_endpoint - access URL for file staging (interpreted by RADICAL-SAGA).

  • default_queue (optional) - queue name to be used for pilot submission to a corresponding batch system (see job_manager_endpoint).

  • resource_manager - the type of job management system. Valid values are: CCM, COBALT, FORK, LSF, PBSPRO, SLURM, TORQUE, YARN.

  • cores_per_node (optional) - number of available CPU cores per compute node. If not provided then it will be discovered by RADICAL-SAGA and by Resource Manager in RADICAL-Pilot.

  • gpus_per_node (optional) - number of available GPUs per compute node. If not provided then it will be discovered by RADICAL-SAGA and by Resource Manager in RADICAL-Pilot.

  • system_architecture (optional) - set of options that describe platform features:

    • smt - Simultaneous MultiThreading (i.e., threads per physical core). If it is not provided then the default value 1 is used. It could be reset with env variable RADICAL_SMT exported before running RADICAL-Pilot application. RADICAL-Pilot uses cores_per_node x smt to calculate all available cores/CPUs per node.

    • options - list of job management system specific attributes/constraints, which are provided to RADICAL-SAGA.

      • COBALT uses option --attrs for configuring location as filesystems=home,grand, mcdram as mcdram=flat, numa as numa=quad;

      • LSF uses option -alloc_flags to support gpumps, nvme;

      • PBSPRO uses option -l for configuring location as filesystems=grand:home, placement as place=scatter;

      • SLURM uses option --constraint for compute nodes filtering.

    • blocked_cores - list of cores/CPUs indices, which are not used by Scheduler in RADICAL-Pilot for tasks assignment.

    • blocked_gpus - list of GPUs indices, which are not used by Scheduler in RADICAL-Pilot for tasks assignment.

  • agent_config - configuration file for RADICAL-Pilot Agent (default value is default for a corresponding file agent_default.json).

  • agent_scheduler - Scheduler in RADICAL-Pilot (default value is CONTINUOUS).

  • agent_spawner - Executor in RADICAL-Pilot, which spawns task execution processes (default value is POPEN).

  • default_remote_workdir (optional) - directory for agent sandbox (see the tutorials Getting Started and Staging Data with RADICAL-Pilot). If not provided then the current directory is used ($PWD).

  • forward_tunnel_endpoint (optional) - name of the host, which can be used to create ssh tunnels from the compute nodes to the outside of the platform.

  • pre_bootstrap_0 (optional) - list of commands to execute for the bootstrapping process to launch RADICAL-Pilot Agent.

  • pre_bootstrap_1 (optional) - list of commands to execute for initialization of sub-agent, which are used to run additional instances of RADICAL-Pilot components such as Executor and Stager.

  • launch_methods - set of supported launch methods. Valid values are APRUN, CCMRUN, FLUX, FORK, IBRUN, JSRUN (JSRUN_ERF), MPIEXEC (MPIEXEC_MPT), MPIRUN (MPIRUN_CCMRUN, MPIRUN_DPLACE, MPIRUN_MPT, MPIRUN_RSH), PRTE, RSH, SRUN, SSH. For each launch method, a subsection is needed, which specifies pre_exec_cached with list of commands to be executed to configure the launch method, and method related options (e.g., dvm_count for PRTE).

    • order - sets the order of launch methods to be selected for the task placement (the first value in the list is a default launch method).

  • python_dist - python distribution. Valid values are default and anaconda.

  • virtenv_mode - bootstrapping process set the environment for RADICAL-Pilot Agent:

    • create - create a python virtual environment from scratch;

    • recreate - delete the existing virtual environment and build it from scratch, if not found then create;

    • use - use the existing virtual environment, if not found then create;

    • update - update the existing virtual environment, if not found then create (default);

    • local - use the client existing virtual environment (environment from where RADICAL-Pilot application was launched).

  • virtenv (optional) - path to the existing virtual environment or its name with the pre-installed RCT stack; use it only when virtenv_mode=use.

  • rp_version - RADICAL-Pilot installation or reuse process:

    • local - install from tarballs, from client existing environment (default);

    • release - install the latest released version from PyPI;

    • installed - do not install, target virtual environment has it.

Examples

Note: In our examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP’s reporter does not work within a notebook. You could use it when executing an RP application as a standalone Python script.

[2]:
%env RADICAL_REPORT_ANIME=FALSE
env: RADICAL_REPORT_ANIME=FALSE
[3]:
# ensure that the location for user-defined configurations exists
!mkdir -p "${RADICAL_CONFIG_USER_DIR:-$HOME}/.radical/pilot/configs/"
[4]:
import os

import radical.pilot as rp
import radical.utils as ru

With the next steps, you will save the earlier created configuration for a target platform into the file resource_tacc.json, located in a user-space. You also will be able to read that file and print some of its attributes to confirm that they are in place.

[5]:
# save earlier defined platform configuration into the user-space
ru.write_json(resource_tacc_tutorial, os.path.join(os.path.expanduser('~'), '.radical/pilot/configs/resource_tacc.json'))
[6]:
tutorial_cfg = rp.utils.get_resource_config(resource='tacc.frontera_tutorial')

for attr in ['schemas', 'resource_manager', 'cores_per_node', 'system_architecture']:
    print('%-20s : %s' % (attr, tutorial_cfg[attr]))
schemas              : {'batch': Config: {'filesystem_endpoint': 'file://localhost/', 'job_manager_endpoint': 'fork://localhost/'}, 'interactive': Config: {'filesystem_endpoint': 'file://localhost/', 'job_manager_endpoint': 'fork://localhost/'}, 'local': Config: {'filesystem_endpoint': 'file://frontera.tacc.utexas.edu/', 'job_manager_endpoint': 'slurm://frontera.tacc.utexas.edu/'}, 'ssh': Config: {'filesystem_endpoint': 'sftp://frontera.tacc.utexas.edu/', 'job_manager_endpoint': 'slurm+ssh://frontera.tacc.utexas.edu/'}}
resource_manager     : SLURM
cores_per_node       : 56
system_architecture  : {'blocked_cores': [], 'blocked_gpus': [], 'options': ['nvme', 'intel'], 'smt': 1}
[7]:
print('job_manager_endpoint : ', rp.utils.get_resource_job_url(resource='tacc.frontera_tutorial', schema='ssh'))
print('filesystem_endpoint  : ', rp.utils.get_resource_fs_url (resource='tacc.frontera_tutorial', schema='ssh'))
job_manager_endpoint :  slurm+ssh://frontera.tacc.utexas.edu/
filesystem_endpoint  :  sftp://frontera.tacc.utexas.edu/
[8]:
session = rp.Session()
pmgr    = rp.PilotManager(session=session)
[9]:
tutorial_cfg = session.get_resource_config(resource='tacc.frontera_tutorial', schema='batch')
for attr in ['label', 'launch_methods', 'job_manager_endpoint', 'filesystem_endpoint']:
    print('%-20s : %s' % (attr, ru.as_dict(tutorial_cfg[attr])))
label                : tacc.frontera_tutorial
launch_methods       : {'MPIRUN': {'pre_exec_cached': ['module load TACC']}, 'order': ['MPIRUN']}
job_manager_endpoint : fork://localhost/
filesystem_endpoint  : file://localhost/

Platform description created above is also available within the radical.pilot.Session object. Let’s confirm that newly created resource description is within the session. Session object has all provided platform configurations (pre- and user-defined ones), thus for a pilot you just need to select a particular configuration and a corresponding access schema (as part of the pilot description).

[10]:
pd = rp.PilotDescription({
    'resource'     : 'tacc.frontera_tutorial',
    'project'      : 'XYZ000',
    'queue'        : 'production',
    'cores'        : 56,
    'runtime'      : 15,
    'access_schema': 'batch',
    'exit_on_error': False
})

pilot = pmgr.submit_pilots(pd)
[11]:
from pprint import pprint

pprint(pilot.as_dict())
{'client_sandbox': '/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/devel/docs/source/tutorials',
 'description': {'access_schema': 'batch',
                 'app_comm': [],
                 'candidate_hosts': [],
                 'cleanup': False,
                 'cores': 56,
                 'exit_on_error': False,
                 'gpus': 0,
                 'input_staging': [],
                 'job_name': None,
                 'layout': 'default',
                 'memory': 0,
                 'nodes': 0,
                 'output_staging': [],
                 'prepare_env': {},
                 'project': 'XYZ000',
                 'queue': 'production',
                 'resource': 'tacc.frontera_tutorial',
                 'runtime': 15,
                 'sandbox': None,
                 'services': [],
                 'uid': None},
 'endpoint_fs': 'file://localhost/',
 'js_hop': 'fork://localhost/',
 'js_url': 'fork://localhost/',
 'log': None,
 'pilot_sandbox': 'file://localhost/home/docs/radical.pilot.sandbox/rp.session.7c57390e-080a-11ef-9100-0242ac110002/pilot.0000/',
 'pmgr': 'pmgr.0000',
 'resource': 'tacc.frontera_tutorial',
 'resource_details': None,
 'resource_sandbox': 'file://localhost/home/docs/radical.pilot.sandbox',
 'resources': None,
 'session': 'rp.session.7c57390e-080a-11ef-9100-0242ac110002',
 'session_sandbox': 'file://localhost/home/docs/radical.pilot.sandbox/rp.session.7c57390e-080a-11ef-9100-0242ac110002',
 'state': 'PMGR_LAUNCHING',
 'stderr': None,
 'stdout': None,
 'type': 'pilot',
 'uid': 'pilot.0000'}

After exploring pilot setup and configuration we close the session.

[12]:
session.close(cleanup=True)

Debugging

RADICAL-Pilot is a complex runtime system which employes multiple distributed components to orchestrate workload execution. It is also a research software, funded by research grants. As such it is possibly it is not quite comparable to commercially supported software systems.

Also, RADICAL-Pilot targets mostly academic HPC environments and high end machines which are usually at the cutting edge of hard and software development. Those machines thus usually have their own custom and sometimes peculiar and evolving system environment.

All that is to say that it might be necessary to investigate various possible failure modes, both failures related to the execution of your workload tasks, and also possibly failures related to RADICAL-Pilot’s own operation.

This notebook attempts to guide you through different means to investigate possible failure modes. That is not necessarily an intuitive process, but hopefully serves to cover the most common problems. We want to encourage you to seek support from the RCT develope community via TODO if the presented means proof insufficient.

Setting the stage

We run a simple RP application which triggers specific failures on purpose - the resulting session will be used as demonstrator for the remainder of this notebook. We submit one task which suceeds (task.000000) and one with an invalid executable which fails (task.000001).

[1]:

import radical.pilot as rp import radical.utils as ru client_sandbox = None pilot_sandbox = None with rp.Session() as session: pmgr = rp.PilotManager(session=session) pilot = pmgr.submit_pilots(rp.PilotDescription({'resource': 'local.localhost', 'cores' : 4, 'runtime' : 10})) tmgr = rp.TaskManager(session=session) tmgr.add_pilots(pilot) td_1 = rp.TaskDescription({'executable': 'date'}) td_2 = rp.TaskDescription({'executable': 'data'}) tasks = tmgr.submit_tasks([td_1, td_2]) tmgr.wait_tasks() for task in tasks: print('%s: %s' % (task.uid, task.state)) client_sandbox = ru.Url(pilot.client_sandbox).path + '/' + session.uid pilot_sandbox = ru.Url(pilot.pilot_sandbox).path print('client sandbox: %s' % client_sandbox) print('pilot sandbox: %s' % pilot_sandbox)
task.000000: DONE
task.000001: FAILED
client sandbox: /home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/devel/docs/source/tutorials/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002
pilot  sandbox: /home/docs/radical.pilot.sandbox/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002/pilot.0000/

Investigating Task Failures

You created a task description, submitted your task, and they end up in FAILED state. On the API level, you can inspec the tasks stdout and stderr values as follows:

[2]:
import os

import radical.pilot as rp

for task in tasks:
    if task.state == rp.FAILED:
        print('%s stderr: %s' % (task.uid, task.stderr))
    elif task.state == rp.DONE:
        print('%s stdout: %s' % (task.uid, task.stdout))
task.000000 stdout: Wed May  1 22:31:37 UTC 2024

task.000001 stderr: /home/docs/radical.pilot.sandbox/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002//pilot.0000//task.000001/task.000001.exec.sh: 49: data: not found

Note though that the available length of both values is shortened to 1024 characters. If that is inefficient you can still inspect the complete values on the file system of the target resource. For that you would navigate to the task sandbox (whose value can be inspected via task.sandbox).

That sandbox usually has a set of files similar to the example shown below. The <task.uid>.out and <task.uid>.err files will have captured the task’s stdout and stderr streams, respectively:

[3]:
tid = tasks[1].uid

%cd $pilot_sandbox/$tid

!ls -l
!cat "$tid".err

/home/docs/radical.pilot.sandbox/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002/pilot.0000/task.000001
total 16
-rw-r--r-- 1 docs docs  147 May  1 22:31 task.000001.err
-rwxr--r-- 1 docs docs 1927 May  1 22:31 task.000001.exec.sh
-rw-r--r-- 1 docs docs    0 May  1 22:31 task.000001.launch.out
-rwxr--r-- 1 docs docs 2033 May  1 22:31 task.000001.launch.sh
-rw-r--r-- 1 docs docs    0 May  1 22:31 task.000001.out
-rw-r--r-- 1 docs docs  909 May  1 22:31 task.000001.prof
/home/docs/radical.pilot.sandbox/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002//pilot.0000//task.000001/task.000001.exec.sh: 49: data: not found

A very common problem for task failures is an invalid environment setup: scientific applications frequently requires software modules to be loaded, virtual environments to be activated, etc. Those actions are specified in the task description’s pre_exec statements. You may want to investigate <task.uid>.exec.sh in the task sandbox to check if the environment setup is indeed as you expect it to be.

Investigate RADICAL-Pilot Failures

If the investigation of the task sandbox did not yield any clues as to the origin of the failure, but your task still ends up in FAILED state or RP itself fails in any other way, we suggest the following sequence of commands, in that order, to investigate the problem further.

First, check the client side session sandbox for any ERROR log messages or error messages in general:

[4]:

%cd $client_sandbox ! grep 'ERROR' *log ! ls -l *.out *.err
[Errno 2] No such file or directory: '/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/devel/docs/source/tutorials/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002'
/home/docs/radical.pilot.sandbox/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002/pilot.0000/task.000001
grep: *log: No such file or directory
-rw-r--r-- 1 docs docs 147 May  1 22:31 task.000001.err
-rw-r--r-- 1 docs docs   0 May  1 22:31 task.000001.launch.out
-rw-r--r-- 1 docs docs   0 May  1 22:31 task.000001.out

You would expect no ERROR lines to show up in the log files, and all stdout/stderr files of the RP components to be empty.

The next step is to repeat that process in the pilot sandbox:

[5]:

%cd $pilot_sandbox ! grep 'ERROR' *log ! ls -l *.out *.err
/home/docs/radical.pilot.sandbox/rp.session.81a9bd46-080a-11ef-a72b-0242ac110002/pilot.0000
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_0.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_0.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_collecting_queue.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_collecting_queue.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_executing.0000.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_executing.0000.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_executing_queue.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_executing_queue.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_schedule_pubsub.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_schedule_pubsub.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_scheduling.0000.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_scheduling.0000.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_scheduling_queue.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_scheduling_queue.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_input.0000.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_input.0000.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_input_queue.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_input_queue.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_output.0000.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_output.0000.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_output_queue.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_staging_output_queue.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_unschedule_pubsub.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 agent_unschedule_pubsub.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 bootstrap_0.err
-rw-r--r-- 1 docs docs 14182 May  1 22:31 bootstrap_0.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 control_pubsub.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 control_pubsub.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 raptor_scheduling_queue.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 raptor_scheduling_queue.out
-rw-r--r-- 1 docs docs     0 May  1 22:31 state_pubsub.err
-rw-r--r-- 1 docs docs     0 May  1 22:31 state_pubsub.out

Here you will always find bootstrap_0.out to be populated with the output of RP’s shell bootstrapper. If no other errors in the log or stdio files show up, you may want to look at that bootstrap_0.out output to see if and why the pilot bootstrapping failed.

Ask for Help from the RADICAL Team

If neither of the above steps provided any insight into the causes of the observed failures, please execute the following steps:

  • create a tarball of the client sandbox

  • create a tarball of the session sandbox

  • open an issue at https://github.com/radical-cybertools/radical.pilot/issues/new and attach both tarballs

  • describe the observed problem and include the following additional information:

    • output of the radical-stack command

    • information of any change to the resource configuration of the target resource

We will likely be able to infer the problem causes from the provided sandbox tarballs and will be happy to help you in correcting those, or we will ask for forther information about the environment your application is running in.

Describing Tasks

The notion of tasks is fundamental in RADICAL-Pilot as tasks define the work to be executed on a supported HPC platform. This notebook will guide the user through the various task types available in RADICAL-Pilot, and how to specify their respective payload. It will also show some means to inspect tasks after (successful or failed) execution.

Warning: We assume that you are familiar with deploying, configuring and using RADICAL-Pilot, for example by taking the getting started introduction tutorial.

Warning: All examples in this notebook are executed locally on a GNU/Linux host. The host needs to have MPI installed - OpenMPI, MPICH, MVAPICH or any other MPI flavor is supported as long as it provides a standard compliant mpiexec command. See the documentation of your GNU/Linux distribution on how to install MPI.

Let’s have a quick check that we have MPI launch method installed.

[1]:
import radical.utils as ru

mpi_lm_exists = bool(ru.which(['mpirun', 'mpiexec']))

First, some preparatory work for the tutorial. We import some modules and set some variables. Note that we import radical.pilot as rp so to abbreviate future API calls.

[2]:
import os
import sys
import pprint

# do not use animated output in notebooks
os.environ['RADICAL_REPORT_ANIME'] = 'False'

import radical.pilot as rp

# determine the path of the currently active virtualenv to simplify some examples below
ve_path = os.path.dirname(os.path.dirname(ru.which('python3')))
display(ve_path)
'/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/envs/devel'

Initial setup and Pilot Submission

As showed in the introductory tutorials, we will first configure the reporter output, then set up an RADICAL-Pilot session, create pilot and task manager instances and run a small local pilot with 10 cores and 1 GPU assigned to it.

[3]:
# configure reporter output
report = ru.Reporter(name='radical.pilot')
report.title('Tutorial: Describing Tasks (RP version %s)' % rp.version)

# create session and managers
session = rp.Session()
pmgr    = rp.PilotManager(session)
tmgr    = rp.TaskManager(session)

# submit a pilot
pilot = pmgr.submit_pilots(rp.PilotDescription({'resource'     : 'local.localhost',
                                                'runtime'      : 60,
                                                'cores'        : 32,
                                                'gpus'         : 1,
                                                'exit_on_error': True}))

# add the pilot to the task manager and wait for the pilot to become active
tmgr.add_pilots(pilot)
pilot.wait(rp.PMGR_ACTIVE)
report.info('pilot state: %s' % pilot.state)

================================================================================
 Tutorial: Describing Tasks (RP version v1.52.1)
================================================================================

pilot state: PMGR_ACTIVE

Task execution

At this point we have the system set up and ready to execute our workload. To do so we describe the tasks of which the workload is comprised and submit them for execution. The goal of this tutorial is to introduce the various attributes available for describing tasks, to explain the execution process in some detail, and to describe how completed or failed tasks can be inspected.

RP Executable Tasks vs. Raptor Tasks

RADICAL-Pilot is, in the most general sense, a pilot-based task execution backend. Its implementation focuses on executable tasks, i.e., on tasks which are described by an executable, it’s command line arguments, in- and output files, and by its execution environment.

A more general task execution engine called ‘Raptor’ is additionally provided as part of RADICAL-Pilot. Raptor can additionally execute function tasks, i.e., tasks which are defined by a function code entry point, function parameters and return values. This tutorial that you are reading right now, focuses on executable tasks. Raptor’s additionally supported task types are the topic of the tutorial Raptor: executing Python functions at scale.

Task Descriptions

The rp.TaskDescription class is, as the name suggests, the basis for all task descriptions in RADICAL-Pilot. Its most important attribute is mode: for executable tasks the mode must be set to rp.TASK_EXECUTABLE, which is the default setting.

Executable tasks have exactly one additional required attribute: executable, i.e, the name of the executable. That can be either an absolute path to the executable on the file system of the target HPC platform, or it can be a plain executable name which is known at runtime in the task’s execution environment (we will cover the execution environment setup further down below).

[4]:
# create a minimal executable task
td   = rp.TaskDescription({'executable': '/bin/date'})
task = tmgr.submit_tasks(td)

The task will be scheduled for execution on the pilot we created above. We now wait for the task to complete, i.e., to reach one of the final states DONE, CANCELED or FAILED:

[5]:
tmgr.wait_tasks()
[5]:
['DONE']

Congratulations, you successfully executed a RADICAL-Pilot task!

Task Inspection

Once completed, we can inspect the tasks for details of their execution: we print a summary for all tasks and then inspect one of them in more detail. The output shows a number of task attributes which can be set by the task description. Those are specifically:

  • uid: a unique string identifying the task. If not defined in the task description, RP will generate an ID which is unique within the scope of the current session.

  • name: a common name for the task which has no meaning to RP itself but can be used by the application to identify or classify certain tasks. The task name is not required to be unique.

  • metadata: any user defined data. The only requirement is that the data are serializable via msgpack, which RP internally uses as serialization format. Note that metadata are communicated along with the task itself and, as such, they should usually be very small bits of data to not deteriorate performance.

It is very application dependent what task attributes are useful: you may not need most of those in your specific applications. But for example: task.stdout and task.stderr provide a quick and easy ways to scan the task results without the need to explicit data staging, and the task.task_sandbox is useful if your application employs out-of-band data management and needs access to the task output files.

[6]:
report.plain('uid             : %s\n' % task.uid)
report.plain('tmgr            : %s\n' % task.tmgr.uid)
report.plain('pilot           : %s\n' % task.pilot)
report.plain('name            : %s\n' % task.name)
report.plain('executable      : %s\n' % task.description['executable'])
report.plain('state           : %s\n' % task.state)
report.plain('exit_code       : %s\n' % task.exit_code)
report.plain('stdout          : %s\n' % task.stdout.strip())
report.plain('stderr          : %s\n' % task.stderr)
report.plain('return_value    : %s\n' % task.return_value)
report.plain('exception       : %s\n' % task.exception)
report.plain('\n')
report.plain('endpoint_fs     : %s\n' % task.endpoint_fs)
report.plain('resource_sandbox: %s\n' % task.resource_sandbox)
report.plain('session_sandbox : %s\n' % task.session_sandbox)
report.plain('pilot_sandbox   : %s\n' % task.pilot_sandbox)
report.plain('task_sandbox    : %s\n' % task.task_sandbox)
report.plain('client_sandbox  : %s\n' % task.client_sandbox)
report.plain('metadata        : %s\n' % task.metadata)
uid             : task.000000
tmgr            : tmgr.0000
pilot           : pilot.0000
name            :
executable      : /bin/date
state           : DONE
exit_code       : 0
stdout          : Wed May  1 22:32:16 UTC 2024
stderr          :
return_value    : None
exception       : None

endpoint_fs     : file://localhost/
resource_sandbox: file://localhost/home/docs/radical.pilot.sandbox
session_sandbox : file://localhost/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002
pilot_sandbox   : file://localhost/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002/pilot.0000/
task_sandbox    : file://localhost/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002/pilot.0000/task.000000/
client_sandbox  : /home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/devel/docs/source/tutorials
metadata        : {'exec_pid': [9042], 'rank_pid': [9049], 'launch_pid': 9035}

All applications can fail, often for reasons out of control of the user. A Task is no different, it can fail as well. Many non-trivial application will need to have a way to handle failing tasks. Detecting the failure is the first and necessary step to do so, and RP makes that part easy: RP’s task state model defines that a failing task will immediately go into FAILED state, and that state information is available as the task.state property.

Note: Depending on when the failure happen, the task may also have a value for the task.stderr property. That will enable to further inspect the causes of the failure. task.stderr will only be available if the task reached the EXECUTING state before failing. See the task state model for more information.

Let us submit a new set of tasks and inspect the failure modes. We will scan /bin/date for acceptable single letter arguments:

[7]:
import string
letters = string.ascii_lowercase + string.ascii_uppercase

report.progress_tgt(len(letters), label='create')

tds = list()
for letter in letters:
    tds.append(rp.TaskDescription({'executable': '/bin/date',
                                   'arguments': ['-' + letter]}))
    report.progress()

report.progress_done()

tasks = tmgr.submit_tasks(tds)
create: ########################################################################

This time, we wait only for the newly submitted tasks. We then find which ones succeeded and check their resulting output. Spoiler alert: We will find 3 valid single-letter options.

[8]:
tmgr.wait_tasks([task.uid for task in tasks])

for task in tasks:
    if task.state == rp.DONE:
        print('%s: %s: %s' % (task.uid, task.description['arguments'], task.stdout.strip()))

task.000021: ['-u']: Wed May  1 22:32:17 UTC 2024
task.000035: ['-I']: 2024-05-01
task.000044: ['-R']: Wed, 01 May 2024 22:32:18 +0000

By changing the state we check for from rp.DONE to rp.FAILED, we can inspect the error messages for the various tested flags (in task.stderr):

[9]:
tmgr.wait_tasks([task.uid for task in tasks])

for task in tasks:
    if task.state == rp.FAILED:
        print('%s: %s: %s' % (task.uid, task.description['arguments'], task.stderr.strip()))
task.000001: ['-a']: /bin/date: invalid option -- 'a'
Try '/bin/date --help' for more information.
task.000002: ['-b']: /bin/date: invalid option -- 'b'
Try '/bin/date --help' for more information.
task.000003: ['-c']: /bin/date: invalid option -- 'c'
Try '/bin/date --help' for more information.
task.000004: ['-d']: /bin/date: option requires an argument -- 'd'
Try '/bin/date --help' for more information.
task.000005: ['-e']: /bin/date: invalid option -- 'e'
Try '/bin/date --help' for more information.
task.000006: ['-f']: /bin/date: option requires an argument -- 'f'
Try '/bin/date --help' for more information.
task.000007: ['-g']: /bin/date: invalid option -- 'g'
Try '/bin/date --help' for more information.
task.000008: ['-h']: /bin/date: invalid option -- 'h'
Try '/bin/date --help' for more information.
task.000009: ['-i']: /bin/date: invalid option -- 'i'
Try '/bin/date --help' for more information.
task.000010: ['-j']: /bin/date: invalid option -- 'j'
Try '/bin/date --help' for more information.
task.000011: ['-k']: /bin/date: invalid option -- 'k'
Try '/bin/date --help' for more information.
task.000012: ['-l']: /bin/date: invalid option -- 'l'
Try '/bin/date --help' for more information.
task.000013: ['-m']: /bin/date: invalid option -- 'm'
Try '/bin/date --help' for more information.
task.000014: ['-n']: /bin/date: invalid option -- 'n'
Try '/bin/date --help' for more information.
task.000015: ['-o']: /bin/date: invalid option -- 'o'
Try '/bin/date --help' for more information.
task.000016: ['-p']: /bin/date: invalid option -- 'p'
Try '/bin/date --help' for more information.
task.000017: ['-q']: /bin/date: invalid option -- 'q'
Try '/bin/date --help' for more information.
task.000018: ['-r']: /bin/date: option requires an argument -- 'r'
Try '/bin/date --help' for more information.
task.000019: ['-s']: /bin/date: option requires an argument -- 's'
Try '/bin/date --help' for more information.
task.000020: ['-t']: /bin/date: invalid option -- 't'
Try '/bin/date --help' for more information.
task.000022: ['-v']: /bin/date: invalid option -- 'v'
Try '/bin/date --help' for more information.
task.000023: ['-w']: /bin/date: invalid option -- 'w'
Try '/bin/date --help' for more information.
task.000024: ['-x']: /bin/date: invalid option -- 'x'
Try '/bin/date --help' for more information.
task.000025: ['-y']: /bin/date: invalid option -- 'y'
Try '/bin/date --help' for more information.
task.000026: ['-z']: /bin/date: invalid option -- 'z'
Try '/bin/date --help' for more information.
task.000027: ['-A']: /bin/date: invalid option -- 'A'
Try '/bin/date --help' for more information.
task.000028: ['-B']: /bin/date: invalid option -- 'B'
Try '/bin/date --help' for more information.
task.000029: ['-C']: /bin/date: invalid option -- 'C'
Try '/bin/date --help' for more information.
task.000030: ['-D']: /bin/date: invalid option -- 'D'
Try '/bin/date --help' for more information.
task.000031: ['-E']: /bin/date: invalid option -- 'E'
Try '/bin/date --help' for more information.
task.000032: ['-F']: /bin/date: invalid option -- 'F'
Try '/bin/date --help' for more information.
task.000033: ['-G']: /bin/date: invalid option -- 'G'
Try '/bin/date --help' for more information.
task.000034: ['-H']: /bin/date: invalid option -- 'H'
Try '/bin/date --help' for more information.
task.000036: ['-J']: /bin/date: invalid option -- 'J'
Try '/bin/date --help' for more information.
task.000037: ['-K']: /bin/date: invalid option -- 'K'
Try '/bin/date --help' for more information.
task.000038: ['-L']: /bin/date: invalid option -- 'L'
Try '/bin/date --help' for more information.
task.000039: ['-M']: /bin/date: invalid option -- 'M'
Try '/bin/date --help' for more information.
task.000040: ['-N']: /bin/date: invalid option -- 'N'
Try '/bin/date --help' for more information.
task.000041: ['-O']: /bin/date: invalid option -- 'O'
Try '/bin/date --help' for more information.
task.000042: ['-P']: /bin/date: invalid option -- 'P'
Try '/bin/date --help' for more information.
task.000043: ['-Q']: /bin/date: invalid option -- 'Q'
Try '/bin/date --help' for more information.
task.000045: ['-S']: /bin/date: invalid option -- 'S'
Try '/bin/date --help' for more information.
task.000046: ['-T']: /bin/date: invalid option -- 'T'
Try '/bin/date --help' for more information.
task.000047: ['-U']: /bin/date: invalid option -- 'U'
Try '/bin/date --help' for more information.
task.000048: ['-V']: /bin/date: invalid option -- 'V'
Try '/bin/date --help' for more information.
task.000049: ['-W']: /bin/date: invalid option -- 'W'
Try '/bin/date --help' for more information.
task.000050: ['-X']: /bin/date: invalid option -- 'X'
Try '/bin/date --help' for more information.
task.000051: ['-Y']: /bin/date: invalid option -- 'Y'
Try '/bin/date --help' for more information.
task.000052: ['-Z']: /bin/date: invalid option -- 'Z'
Try '/bin/date --help' for more information.

MPI Tasks and Task Resources

So far, we run single-core tasks. The most common way for application to utilize multiple cores and nodes on HPC machines is to use MPI as a communication layer, which coordinates multiple application processes, i.e., MPI ranks. In fact, the notion of ranks is central to RP’s TaskDescription class. All MPI ranks will be near-exact copies of each other: they run in the same work directory and the same environment, are defined by the same executable and arguments, get the same amount of resources allocated, etc. Notable exceptions are:

  • rank processes may run on different nodes;

  • rank processes can communicate via MPI;

  • each rank process obtains a unique rank ID.

It is up to the underlying MPI implementation to determine the exact value of the process’ rank ID. The MPI implementation may also set a number of additional environment variables for each process.

It is important to understand that only applications which make use of MPI should have more than one rank – otherwise identical copies of the same application instance are launched which will compute the same results, thus wasting resources for all ranks but one. Worse: I/O-routines of these non-MPI ranks can interfere with each other and invalidate those results.

Also: applications with a single rank cannot make effective use of MPI - depending on the specific resource configuration, RP may launch those tasks without providing an MPI communicator.

The following rank-related attributes are supported by RADICAL-Pilot:

  • ranks: the number of MPI ranks (application processes) to start

  • cores_per_rank: the number of cores each rank can use for spawning additional threads or processes

  • gpus_per_rank: the number of GPUs each rank can utilize

  • mem_per_rank: the size of memory (in Megabytes) which is available to each rank

  • lfs_per_rank: the amount of node-local file storage which is available to each rank

  • threading_type: how to inform the application about available resources to run threads on

    • rp.OpenMP: define OMP_NUM_THREADS in the task environment

  • gpu_type: how to inform the application about available GPU resources

    • rp.CUDA: define CUDA_VISIBLE_DEVICES in the task environment

The next example uses the radical-pilot-hello.sh command as a test to report on rank creation.

Note: No core pinning is performed on localhost. Thus, tasks see all CPU cores as available to them. However, the THREADS information still reports the correct number of assigned CPU cores.

Note: If there is no MPI launch method installed, then we will proceed with a single rank.

[10]:
tds = list()
for n in range(4):
    ranks = (n + 1) if mpi_lm_exists else 1
    tds.append(rp.TaskDescription({'executable'    : ve_path + '/bin/radical-pilot-hello.sh',
                                   'arguments'     : [n + 1],
                                   'ranks'         : ranks,
                                   'cores_per_rank': (n + 1),
                                   'threading_type': rp.OpenMP}))
    report.progress()

report.progress_done()

tasks = tmgr.submit_tasks(tds)
tmgr.wait_tasks([task.uid for task in tasks])

for task in tasks:
    print('--- %s:\n%s\n' % (task.uid, task.stdout.strip()))
....

--- task.000053:
0 : PID     : 10694
0 : NODE    : build-24250560-project-13481-radicalpilot
0 : CPUS    : 00
0 : GPUS    :
0 : RANK    : 0
0 : THREADS : 1
0 : SLEEP   : 1

--- task.000054:
0 : PID     : 10661
0 : NODE    : build-24250560-project-13481-radicalpilot
0 : CPUS    : 00
0 : GPUS    :
0 : RANK    : 0
0 : THREADS : 2
0 : SLEEP   : 2
1 : PID     : 10682
1 : NODE    : build-24250560-project-13481-radicalpilot
1 : CPUS    : 00
1 : GPUS    :
1 : RANK    : 1
1 : THREADS : 2
1 : SLEEP   : 2

--- task.000055:
0 : PID     : 10637
0 : NODE    : build-24250560-project-13481-radicalpilot
0 : CPUS    : 00
0 : GPUS    :
0 : RANK    : 0
0 : THREADS : 3
0 : SLEEP   : 3
1 : PID     : 10642
1 : NODE    : build-24250560-project-13481-radicalpilot
1 : CPUS    : 00
1 : GPUS    :
1 : RANK    : 1
1 : THREADS : 3
1 : SLEEP   : 3
2 : PID     : 10646
2 : NODE    : build-24250560-project-13481-radicalpilot
2 : CPUS    : 00
2 : GPUS    :
2 : RANK    : 2
2 : THREADS : 3
2 : SLEEP   : 3

--- task.000056:
0 : PID     : 10514
0 : NODE    : build-24250560-project-13481-radicalpilot
0 : CPUS    : 00
0 : GPUS    :
0 : RANK    : 0
0 : THREADS : 4
0 : SLEEP   : 4
1 : PID     : 10511
1 : NODE    : build-24250560-project-13481-radicalpilot
1 : CPUS    : 00
1 : GPUS    :
1 : RANK    : 1
1 : THREADS : 4
1 : SLEEP   : 4
2 : PID     : 10533
2 : NODE    : build-24250560-project-13481-radicalpilot
2 : CPUS    : 00
2 : GPUS    :
2 : RANK    : 2
2 : THREADS : 4
2 : SLEEP   : 4
3 : PID     : 10550
3 : NODE    : build-24250560-project-13481-radicalpilot
3 : CPUS    : 00
3 : GPUS    :
3 : RANK    : 3
3 : THREADS : 4
3 : SLEEP   : 4

Task Data Management

The TaskDescription supports diverse means to specify the task’s input/out data and data-related properties:

  • stdout: path of the file to store the task’s standard output in

  • stderr: path of the file to store the task’s standard error in

  • input_staging: list of file staging directives to stage task input data

  • output_staging: list of file staging directives to stage task output data

Let us run an example task which uses those 4 attributes: we run a word count on /etc/passwd (which we stage as input file) and store the result in an output file (which we fetch back). We will also stage back the files in which standard output and standard error are stored (although in this simple example both are expected to be empty).

[11]:

td = rp.TaskDescription({'executable' : '/bin/sh', 'arguments' : ['-c', 'cat input.dat | wc > output.dat'], 'stdout' : 'task_io.out', 'stderr' : 'task_io.err', 'input_staging' : [{'source': '/etc/passwd', 'target': 'input.dat'}], 'output_staging': [{'source': 'output.dat', 'target': '/tmp/output.test.dat'}, {'source': 'task_io.out', 'target': '/tmp/output.test.out'}, {'source': 'task_io.err', 'target': '/tmp/output.test.err'}] }) task = tmgr.submit_tasks(td) tmgr.wait_tasks([task.uid]) # let's check the resulting output files print(ru.sh_callout('ls -la /tmp/output.test.*', shell=True)[0]) print(ru.sh_callout('cat /tmp/output.test.dat')[0])
-rw-r--r-- 1 docs docs 24 May  1 22:32 /tmp/output.test.dat
-rw-r--r-- 1 docs docs  0 May  1 22:32 /tmp/output.test.err
-rw-r--r-- 1 docs docs  0 May  1 22:32 /tmp/output.test.out

     24      34    1265

RADICAL-Pilot data staging capabilities go beyond what is captured in the example above:

  • Data can be transferred, copied, moved and linked;

  • data can refer to absolute paths, or are specified relative to the systems root file system, to RP’s resource sandbox, session sandbox, pilot sandbox or task sandbox;

  • data staging can be performed not only for tasks, but also for the overall workflow (for example, when many tasks share the same input data).

Find a detailed explanation of RADICAL-Pilot data staging capabilities in our Data Staging with RADICAL-Pilot tutorial.

Task Execution Environment

On HPC platforms, it is common to provide application executables via environment modules. But task execution environments are also frequently used for scripting languages such as Python (e.g., virtualenv, venv or conda). RADICAL-Pilot supports the setup of the task execution environment in the following ways:

  1. environment dictionary

  2. use pre_exec directives to customize task specific environments

  3. prepare and reuse named environments for tasks

We will cover these options in the next three examples.

Environment Dictionary

Environment variables can be set explicitly in the task description via the environment attribute. When that attribute is not specified, tasks will be executed in the default environment that the pilot found on the compute nodes. If the attribute environment is defined, then the default environment will be augmented with the settings specified in environment. Usefull variables to export might be PATH, LD_LIBRARY_PATH, etc., or any application specific environment variables used by your tasks.

Note: As demonstrated below, a number of custom environment variables are always provided, such as the various sandbox locations known to RADICAL-Pilot.

[12]:
td = rp.TaskDescription({'executable' : '/bin/sh',
                         'arguments'  : ['-c', 'printf "FOO=$FOO\nBAR=$BAR\nSHELL=$SHELL\n"; env | grep RP_ | sort'],
                         'environment': {'FOO': 'foo', 'BAR': 'bar'}
                        })
task = tmgr.submit_tasks(td)
tmgr.wait_tasks([task.uid])
print(task.stdout)
[... CONTENT SHORTENED ...]
ilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002//pilot.0000//gtod
RP_PILOT_ID=pilot.0000
RP_PILOT_SANDBOX=/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002//pilot.0000/
RP_PROF=/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002//pilot.0000//prof
RP_PROF_TGT=/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002//pilot.0000//task.000058/task.000058.prof
RP_RANK=0
RP_RANKS=1
RP_REGISTRY_ADDRESS=tcp://172.17.0.2:10002
RP_RESOURCE=local.localhost
RP_RESOURCE_SANDBOX=/home/docs/radical.pilot.sandbox
RP_SESSION_ID=rp.session.975cedb6-080a-11ef-818b-0242ac110002
RP_SESSION_SANDBOX=/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002/
RP_TASK_ID=task.000058
RP_TASK_NAME=task.000058
RP_TASK_SANDBOX=/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002//pilot.0000//task.000058
RP_VENV_PATH=/home/docs/radical.pilot.sandbox/ve.local.localhost.v1.52.1
RP_VENV_TYPE=venv

Environment Setup with pre_exec

The pre_exec attribute of the task description can be used to specify a set of shell commands which will be executed before the task’s executable is launched. pre_exec can be used to prepare the task’s runtime environment, for example to:

  • Load a system module;

  • export some environment variable;

  • run a shell script or shell commands;

  • activate some virtual environment.

The example shown below activates the virtual environment this notebook is running in (in ve_path) so that it is usable for the task itself. We run another pre_exec command to install the pyyaml module in it. The actual task will then run pip list to check if that module is indeed available.

Warning: The first pre_exec command assumes that this is a virtual environment, not a Conda environment. You may need to change that command if your notebook runs in a Conda environment.

[13]:
td = rp.TaskDescription({'pre_exec'   : ['. %s/bin/activate' % ve_path,
                                         'pip install pyyaml'],
                         'executable' : '/bin/sh',
                         'arguments'  : ['-c', 'which python3; pip show pyyaml'],
                        })
task = tmgr.submit_tasks(td)
tmgr.wait_tasks([task.uid])
print(task.stdout)
Requirement already satisfied: pyyaml in /home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/envs/devel/lib/python3.7/site-packages (6.0.1)
/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/envs/devel/bin/python3
Name: PyYAML
Version: 6.0.1
Summary: YAML parser and emitter for Python
Home-page: https://pyyaml.org/
Author: Kirill Simonov
Author-email: xi@resolvent.net
License: MIT
Location: /home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/envs/devel/lib/python3.7/site-packages
Requires:
Required-by: myst-parser

Environment Setup with named_env

When the same environment is used for many tasks, then the collective sum of the pre_exec activities can create a significant runtime overhead, both on the shared filesystem and also on the system load. named_env addresses that problem: applications can prepare a task environment and then use the named_env attribute to activate it for the task. This process is very lightweight on system load and runtime overhead and thus the recommended way to set up task environments which are shared among many tasks. Any setup step though which needs to be individually run for each task, such as the creation of task specific input files, should still be added to the task’s pre_exec directives.

Note: If you don’t need to create a new environment, but want to ensure that tasks will use the same environment as where RP Agent runs (rp), then you can provide it per each task: td.named_env = 'rp'.

[14]:

pilot.prepare_env(env_name='test_env', env_spec={'type' : 'venv', 'setup': ['psutil']}) td = rp.TaskDescription({'executable' : '/bin/sh', 'arguments' : ['-c', 'which python3; pip list | grep psutil'], 'named_env' : 'test_env' }) task = tmgr.submit_tasks(td) tmgr.wait_tasks([task.uid]) print(task.stdout)
/home/docs/radical.pilot.sandbox/rp.session.975cedb6-080a-11ef-818b-0242ac110002/pilot.0000/env/rp_named_env.test_env/bin/python3
psutil             5.9.8

[15]:
report.header('finalize')
session.close()

--------------------------------------------------------------------------------
finalize


Using Multiple Pilots

RADICAL-Pilot supports managing multiple pilots during a single run, while workload (bag of computing tasks) will be distributed among all available pilots. radical.pilot.TaskManager dispatches tasks to pilots according to a scheduling algorithm (radical.pilot.SCHEDULER_ROUND_ROBIN, radical.pilot.SCHEDULER_BACKFILLING), which is provided during its initialization, e.g., TaskManager(session=session, scheduler=SCHEDULER_BACKFILLING).

Note: RADICAL-Pilot enacts a second scheduling step within each launched pilot: once the Agent takes ownership of tasks, which the radical.pilot.TaskManager scheduler assigned to it, the Agent Scheduler will place the tasks on the set of resources (CPU cores, GPUs) that the Agent is managing. The Agent Scheduler can be configured via resource configuration files (see the tutorial RADICAL-Pilot Configuration System)

  • Round-Robin Scheduler, RR (SCHEDULER_ROUND_ROBIN, default) fairly distributes arriving tasks over the set of known pilots, independent of task state, expected workload, pilot state or pilot lifetime. As such, RR is a fairly simplistic, but also a very fast scheduler, which does not impose any additional communication round trips between radical.pilot.TaskManager and pilot agents.

  • Backfilling Scheduler, BF (SCHEDULER_BACKFILLING) provides a better load balancing, but at the cost of additional communication round trips. It depends on the actual application workload if that load balancing is beneficial or not. It is most beneficial for the large number of pilots and for relatively long-running tasks (e.g., the task runtime is significantly longer than the communication round trip time between radical.pilot.TaskManager and pilot agents).

    • It is not recommended to use BF for: (i) a single pilot, and/or (ii) many short-running tasks.

    • BF will only dispatch tasks to pilot agents once the pilot agent is in the radical.pilot.PMGR_ACTIVE state. The tasks will thus get executed even if one of the pilots never reaches that state - the load will be distributed between pilots which become active (radical.pilot.PMGR_ACTIVE).

    • BF will only dispatch as many tasks to the Agent as they can be executed concurrently. No tasks will be waiting in the Agent Scheduler queue. BF will react on task termination events, and will then backfill (!) the Agent with any remaining tasks. The Agent will remain under-utilized during that communication.

Examples

Note: In our examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP’s reporter does not work within a notebook. You could use it when executing an RP application as a standalone Python script.

[1]:
%env RADICAL_REPORT_ANIME=False
env: RADICAL_REPORT_ANIME=False
[2]:
import radical.pilot as rp
import radical.utils as ru
[3]:
session = rp.Session()
pmgr    = rp.PilotManager(session=session)
tmgr    = rp.TaskManager(session=session)  # -> rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)

We run multiple pilots using a description for a “local” platform (local.localhost), which is provided by the built-in configuration file resource_local.json and mimics a configuration for CPU cores and GPUs.

Note: The amount of resources is arbitrary, and it does not reflect the number of physical resources available.

Note: Before running multiple pilots on a single HPC platform, ensure that its scheduling policy conforms the use case, since each pilot represents a batch job and HPC’s scheduling policy might limit the number of jobs running simultaneously per user. For the remote submission see the tutorial Using RADICAL-Pilot on HPC Platforms.

[4]:
pd0 = rp.PilotDescription({
    'resource': 'local.localhost',
    'cores'   : 4,
    'runtime' : 10
})
pd1 = rp.PilotDescription({
    'resource': 'local.localhost',
    'cores'   : 2,
    'runtime' : 5
})

pilots = pmgr.submit_pilots([pd0, pd1])

After pilots are submitted, they should be added to `radical.pilot.TaskManager for the task dispatching.

[5]:
tmgr.add_pilots(pilots)
tmgr.list_pilots()  # lists all pilots available for TaskManager
[5]:
['pilot.0000', 'pilot.0001']
[6]:
N_TASKS = 4
tds     = []  # list of task descriptions

for _ in range(N_TASKS):

    td = rp.TaskDescription({
        'executable': '/bin/echo',
        'arguments' : ['pilot_id=$RP_PILOT_ID']
    })

    tds.append(td)

tasks = tmgr.submit_tasks(tds)
tmgr.wait_tasks()
[6]:
['DONE', 'DONE', 'DONE', 'DONE']

Each task was assigned to a corresponding pilot according to a default scheduling algorithm radical.pilot.SCHEDULER_ROUND_ROBIN, but it is possible to assign a task to a particular pilot explicitly with the radical.pilot.TaskDescription.pilot attribute. Thus, in the example below, we submit another pilot and assign a new task to it.

[7]:
pd3 = rp.PilotDescription({
    'resource': 'local.localhost',
    'cores'   : 2,
    'runtime' : 5
})

pilot = pmgr.submit_pilots(pd3)
tmgr.add_pilots(pilot)

td = rp.TaskDescription()
td.executable = '/bin/echo'
td.arguments  = ['task is assigned to $RP_PILOT_ID']
td.pilot      = pilot.uid

task = tmgr.submit_tasks(td)
tmgr.wait_tasks(uids=task.uid)

tasks.append(task)

If the task is assigned to a pilot, which wasn’t added to radical.pilot.TaskManager, then that pilot is unknown for the task dispatching, and the task is pending for a corresponding pilot to arrive. In the example below, we set a timeout for the radical.pilot.TaskManager.wait_tasks() method, since by default, that method waits for tasks reaching their final state, but this task will have the state radical.pilot.TMGR_SCHEDULING until the session will be closed (since we will not submit an associated pilot).

[8]:
td = rp.TaskDescription()
td.executable = '/bin/echo'
td.arguments  = ['pilot_id=$RP_PILOT_ID']
td.pilot      = 'unknown_pilot_id'

task = tmgr.submit_tasks(td)
tmgr.wait_tasks(uids=task.uid, timeout=15)
[8]:
'TMGR_SCHEDULING'

As a final step, we go through all tasks outputs to see which pilot was assigned to each task.

[9]:
for task in tasks:
    stdout = task.stdout.strip()[:35]
    print('%s - output: %-30s (in task description: pilot="%s")' %
          (task.uid, stdout, task.description['pilot']))
task.000000 - output: pilot_id=pilot.0000            (in task description: pilot="")
task.000001 - output: pilot_id=pilot.0001            (in task description: pilot="")
task.000002 - output: pilot_id=pilot.0000            (in task description: pilot="")
task.000003 - output: pilot_id=pilot.0001            (in task description: pilot="")
task.000004 - output: task is assigned to pilot.0002 (in task description: pilot="pilot.0002")
[10]:
session.close(cleanup=True)

Tracing and Profiling

Warning: We assume that you are familiar with deploying, configuring and using RADICAL-Pilot, for example by taking the getting started introduction tutorial.

Tracing may help user to gather information about the behavior of RADICAL-Pilot (RP) and of their application’s tasks during or after execution. RP uses dedicated files to log tracing information. Those files end with the *.prof extension and are stored inside sandboxes. For details about RP’s sandboxes, see Generated Output in our Getting Started tutorial.

In RP lingo, a trace is a collection of timestamps, where each time stamp is associated to an event that, in turn, originates within a specific RP’s component in a given context. That is a lot of information. In order to organize that information, we created an event model, a state model and a dedicated format for each record in a trace.

You must distinguish between tracing the behavior of RP from tracing the behavior of the tasks of your application. RP does not trace the inner behavior of a task. For RP, each task is a black box: once launched, the behavior of a task is unknown to RP. This is by design and helps to maintain a rigid separation of concerns between RP and each task, enabling the concurrent execution of heterogeneous tasks. If you need to trace the behavior of your tasks, you can use tools like TAU or other tracers/profilers.

Event and state models

RP’s event model defines the event triggered by each RP’s component, alongside its description and *.prof file where it is recorded. Note that, as reported in the event model’s table, not all the events are guaranteed to be fired by each configuration of RP. In order to use RP’s tracing capabilities to monitor and analyze the execution of your application, you will have to study the description of each event, understand what information gives you about either RP or your tasks, and then name an informed decision on which events fit your purpose.

Understanding all the above requires a careful consideration of RP’s architecture and an understanding of where and when each component executes. Sounds too complicated? It is. Go and see how to use RADICAL-Analytics to simplify tracing and profiling, albeit only after your application has completed its execution.

In RP’s lingo, a state is a special type of event. States have two properties that are relevant to tracing:

  1. Each state is guaranteed to fire in every possible configuration of RP, i.e., independent on the platform on which it executes, the number and types of resources, the specific virtual environment it uses for its Agent and the other myriad of possible configurations of RP.

  2. States are guaranteed to always fire in a precise sequence, i.e., each state is part of a well-defined, immutable sequence.

When tracing your application, states trade off granularity for reliability. As such, they are also easier to use as they require a less granular and specific understanding of RP. To see the list of states, ordered in their guaranteed sequence, see RP’s state model.

Record format

Each *.prof file uses the following format:

time,event,comp,thread,uid,state,msg

Where:

  • time = (mandatory) timestamp expressed in Unix Time (epoch)

  • event = (mandatory) the name of the timestamped event

  • comp = (optionl) the name of the component that triggered the event

  • thread = (mandatory) the ID of the thread in which the event was triggered with comp

  • uid = (mandatory) the ID of the entity to which that event belongs (e.g., task or pilot)

  • state = (optional) the name given to the event when it is also a state, i.e., guaranteed to be triggered in every configuration of RP

  • msg = (optional) an arbitrary message associated to that event.

For example, here is the record of the launch_start event for a task:

1684787127.2228520,launch_start,,MainThread,task.000000,AGENT_EXECUTING,

As you will see later in this tutorial, you can use launch_start to monitor when a task is/was launched to be executed. Here is another example, this time for the schedule_ok event triggered by RP’s Agent Scheduler:

1684787127.1614347,schedule_ok,agent_scheduling.0000,Thread-1 (_worker_thread),task.000000,,

First note that this event is not a state, e.g., it would not fire in a configuration of RP where the scheduler is bypassed (a corner case as, for every practical purpose, this event will also fire!). As described in the event model, schedule_ok indicates the time at which the scheduler’s search for task resources succeeded. Tracing this event for each task would give you the time series of when all your tasks were/are scheduled by RP for execution.

Note: Order matters! States are guaranteed to be triggered always in the same order while only a subset of events have a partial order within each module. Consider that partial order when consulting event model.

RADICAL Analytics

Parsing and filtering the *.proffiles requires a solid understanding of:

  1. RP’s event/state model in order to select only the information you actually need;

  2. the format in which the information is encoded within the *.prof file;

  3. in which file or files RP recorded the information you need;

  4. bash commands/scripts to parse and filter those files.

That is both difficult and cumbersome.

In order to facilitate using RP traces to profile the execution of your application, we created RADICAL Analytics (RA). Please see the linked documentation and consider using RA whenever you need postmortem profiling. In the following, we discuss how you can perform some task-related tracing at runtime or postmortem, i.e., while RP is executing your application or once it finished. This is useful while developing your application and/or for spot-checks while executing a large run. For production profiling, you should use RA.

Tracing at runtime

In a real-life scenario, you will:

  1. Start the execution of your application on a supported HPC platform;

  2. while the application is running, execute shell commands to parse and filter across multiple *.prof files.

The format of this tutorial does not allow for concurrently running both your application and the shell commands you need to spot-check the behavior of your tasks. Thus, we first run a sample application and then show some useful shell commands.

Warning: In real-life scenarios, your shell commands will operate on multiple small files created on a filesystem shared by all the compute nodes of the HPC platform. Even with just as little as a few hundreds tasks, your shell commands could deteriorate the performance of the shared file systems. You should exercise extreme caution as the whole HPC machine could grind to a halt. That would affect all the users of the machine and make the system administrators very unhappy; ask as we know…

Here we:

  1. Use the RADICAL_PROFILE environment variable to enable RP tracing;

  2. run a condensed version of the application we used for the Getting Started.

Warning: By default, RP sets RADICAL_PROFILE to TRUE. When set to FALSE, RP will not create the *.prof files.

[1]:
%env RADICAL_PROFILE=True
%env RADICAL_REPORT_ANIME=False

import os
import random
import radical.pilot as rp

session = rp.Session()
pmgr = rp.PilotManager(session=session)

pd_init = {'resource'     : 'local.localhost',
           'runtime'      : 30,  # pilot max runtime in minutes
           'cores'        : 4,
           'gpus'         : 0,
           'exit_on_error': False}

n = 10
tds = list()

for i in range(n):

    td = rp.TaskDescription()
    td.executable     = 'radical-pilot-hello.sh'
    td.arguments      = [random.randint(1, 10)]
    td.ranks          =  1
    td.cores_per_rank =  random.randint(1, 2)
    tds.append(td)

pdesc = rp.PilotDescription(pd_init)
pilot = pmgr.submit_pilots(pdesc)

tmgr = rp.TaskManager(session=session)
tmgr.add_pilots(pilot)
tmgr.submit_tasks(tds)
tmgr.wait_tasks()

session.close()
env: RADICAL_PROFILE=True
env: RADICAL_REPORT_ANIME=False
Task state

Typically, you will want to know in which state your tasks are during runtime. Did my task start to execute? Here is an example of how you could answer to that question using bash commands and RP’s traces:

First, we get the name of the session you are running/have just run. We save it to a Python variable, and we also export it to a shell variable so to be able to access it both within this notebook and from the shell commands.

[2]:
%env SESSION_ID=$session.uid
env: SESSION_ID=rp.session.03a4ac98-080b-11ef-80ff-0242ac110002

Second, for each task, we parse its *.prof file, filtering for the event app_start. That will tell us whether and when each task started to execute.

Note: How do you know that you need the event app_start? You read the definition of each event in the RP’s event model.

Note: How do you know that the event app_start is recorded in the file task.*.prof? You read in the RP’s event model in which file the app_start event is written.

Here is the list of events recorded for task.000000.

[3]:
!cat ~/radical.pilot.sandbox/$SESSION_ID/pilot.0000/task.000000/task.000000.prof
1714602924.3953731,launch_start,,MainThread,task.000000,AGENT_EXECUTING,
1714602924.4044149,launch_pre,,MainThread,task.000000,AGENT_EXECUTING,
1714602924.4067240,launch_submit,,MainThread,task.000000,AGENT_EXECUTING,
1714602924.4221361,exec_start,,MainThread,task.000000,AGENT_EXECUTING,
1714602924.4449739,exec_pre,,MainThread,task.000000,AGENT_EXECUTING,
1714602924.4516921,rank_start,,MainThread,task.000000,AGENT_EXECUTING,
1714602924.4647529,app_start,,MainThread,task.000000,AGENT_EXECUTING,
1714602929.6260891,app_stop,,MainThread,task.000000,AGENT_EXECUTING,
1714602929.6290460,rank_stop,,MainThread,task.000000,AGENT_EXECUTING,RP_EXEC_PID=20014:RP_RANK_PID=20049
1714602929.6324749,exec_post,,MainThread,task.000000,AGENT_EXECUTING,
1714602929.6371989,exec_stop,,MainThread,task.000000,AGENT_EXECUTING,
1714602929.6399641,launch_collect,,MainThread,task.000000,AGENT_EXECUTING,RP_LAUNCH_PID=20006
1714602929.6435180,launch_post,,MainThread,task.000000,AGENT_EXECUTING,
1714602929.6478310,launch_stop,,MainThread,task.000000,AGENT_EXECUTING,

You can filter only the information you care about from each list of traces. Further, you may want to convert each timestamp from Unix Time (epoch) to a human-readable date/time.

[4]:
!for trace in `find ~/radical.pilot.sandbox/$SESSION_ID/pilot.0000/ -type f -name  "task.*.prof"`; do \
    time_stamp=`grep 'app_start' $trace`      ; \
    IFS=, read -a fields <<<"$time_stamp"     ; \
    start_time=`date -d  @${fields[0]} +'%c'` ; \
    echo ${fields[4]}: $start_time            ; \
done
/usr/bin/sh: 1: Syntax error: redirection unexpected

We also want to know whether and when each task finished.

[5]:
!for trace in `find ~/radical.pilot.sandbox/$SESSION_ID/pilot.0000/ -type f -name  "task.*.prof"`; do \
    start=`grep 'app_start' $trace`; stop=`grep 'app_stop' $trace`      ; \
    IFS=, read -a fields_start <<<"$start"                              ; \
    IFS=, read -a fields_stop <<<"$stop"                                ; \
    start_time=`date -d  @${fields_start[0]} +'%r'`                     ; \
    stop_time=`date -d  @${fields_stop[0]} +'%r'`                       ; \
    echo ${fields_start[4]} \-\> start: $start_time \| stop: $stop_time ; \
done
/usr/bin/sh: 1: Syntax error: redirection unexpected

In a shell environment, you could save the list of traces in a variable so to limit the amount of times we hit the shared filesystem of the HPC platform. Further, you could define a set of aliases and make those for commands more compact and easy to issue. At runtime you could use the watch command to “monitor” the progression of the tasks at regular intervals. In presence of large amount of files, you could avoid printing an unreadable list of files and just print aggregates of the number of files that have been done. Again, you should exercise extreme caution to avoid overloading the shared file system.

Tracing heterogeneous tasks

Concurrently, executing heterogeneous tasks on heterogeneous resources is RP’s most distinctive and useful capability. Thus, it is likely you will execute different types of tasks and that you may want to trace tasks based on their type. Looking at the examples above, you will notice that tasks are named as task.xxxxxx, giving no information about what actually was the type of executable of each task.

You can introduce arbitrary information about each task using the uid properties of the TaskDescription object. This will create an arbitrary UID for each task, in which you will be able to codify distinctive properties like, for example, simulation, analysis, machine_learning, etc. Once you do that, you will be able to parse and filter the task profiles looking for the UIDs you defined. Go ahead, try this out by download the container of this tutorial, executing it on your local machine via Docker and editing the code to introduce your own UIDs and create your own shell commands to trace their execution.

Executing Tasks with RAPTOR

This notebook introduces you to RAPTOR, a high-throughput RADICAL-Pilot’s subsystem that executes function tasks and non-MPI executable tasks at scale on supported HPC platforms. This tutorial will guide you through the setup and configuration of RAPTOR and through the specification and execution of a variety of task types.

Warning: We assume you have already worked through our getting started and describing tasks tutorials.

Warning: All examples in this notebook are executed locally on your machine. You need to have installed MPI before executing these examples. RADICAL-Pilot and RAPTOR support OpenMPI, MPICH, MVAPICH or any other MPI flavor that provides a standards compliant mpiexec command.

When Using RAPTOR

Use RAPTOR when you want to concurrently execute free/serialized Python functions, Python class methods and shell commands. For example, you want to concurrently execute up to 10^5 machine learning functions across thousands of GPUs and CPUs. RAPTOR supports single, multi-process and MPI Python functions.

You should also use RAPTOR when your application requires non-MPI tasks which execute for less than 5 minutes. You could use RADICAL-Pilot without RAPTOR for that workload but you would incur into high scheduling and launching overheads.

What is RAPTOR

RAPTOR is a subsystem of RP, thus you have to execute RP in order to use RAPTOR. RAPTOR launches a configurable number of masters and workers on the resources you acquired via RP. Once up and running, each RAPTOR’s master will receive task execution requests from RP. In turn, each master will dispatch those requests to the workers which are optimized to execute small, short-running tasks at scale.

Different from RP’s ‘normal’ task, RAPTOR can execute a variety of task types:

  • executables: similar to RP’s native task execution, but without MPI support

  • free Python functions

  • Python class methods

  • serialized Python functions

  • plain Python code

  • shell commands

Importantly, all function invocations can make use of MPI by defining multiple ranks.

RAPTOR has a number of advanced capabilities, such as:

  • new task types can be added by applications

  • the RAPTOR Master class can be overloaded by applications

  • the RAPTOR Worker class can be overloaded by applications

  • Master and Worker layout can be tuned in a variety of ways

  • different Worker implementations are available with different capabilities and scaling properties

  • workload execution can mix RAPTOR task execution and ‘normal’ RP task execution

Those topics will not be covered in this basic tutorial.

Prepare a RP pilot to host RAPTOR

We will launch a pilot with sufficient resources to run both the raptor master (using 1 core) and two worker instances (using 8 cores each):

[1]:
import os

# do not use animated output in notebooks
os.environ['RADICAL_REPORT_ANIME'] = 'False'

import radical.pilot as rp
import radical.utils as ru

# determine the path of the currently active ve to simplify some examples below
ve_path = os.path.dirname(os.path.dirname(ru.which('python3')))

# create session and managers
session = rp.Session()
pmgr    = rp.PilotManager(session)
tmgr    = rp.TaskManager(session)

# submit a pilot
pilot = pmgr.submit_pilots(rp.PilotDescription({'resource'     : 'local.localhost',
                                                'runtime'      : 60,
                                                'cores'        : 17,
                                                'exit_on_error': False}))

# add the pilot to the task manager and wait for the pilot to become active
tmgr.add_pilots(pilot)
pilot.wait(rp.PMGR_ACTIVE)
print('pilot is up and running')

pilot is up and running

We now have an active pilot with sufficient resource and can start executing the RAPTOR master and worker instances. Both master and worker need to run in an environment which has radical.pilot installed, so we place it in the pilot agent environment rp (the named_env attribute is covered by the tutorial describing tasks):

[2]:
master_descr = {'mode'     : rp.RAPTOR_MASTER,
                'named_env': 'rp'}
worker_descr = {'mode'     : rp.RAPTOR_WORKER,
                'named_env': 'rp'}

raptor  = pilot.submit_raptors( [rp.TaskDescription(master_descr)])[0]
workers = raptor.submit_workers([rp.TaskDescription(worker_descr),
                                 rp.TaskDescription(worker_descr)])
Task execution

At this point we have the pilot set up and running, we started the master task, and the master will upon initialization start the worker tasks: the RAPTOR overlay is now ready to execute a Python function. Note that a function must be decorated with the rp.pythontask decorator for it to be callable as a raptor function.

[3]:
# function for raptor to execute
@rp.pythontask
def msg(val: int):
    if(val %2 == 0):
        print('Regular message')
    else:
        print(f'This is a very odd message: {val}')

# create a minimal function task
td   = rp.TaskDescription({'mode'    : rp.TASK_FUNCTION,
                           'function': msg(3)})
task = raptor.submit_tasks([td])[0]

The task will be scheduled for execution on the pilot we created above. We now wait for the task to complete, i.e., to reach one of the final states DONE, CANCELED or FAILED:

[4]:
print(task)
tmgr.wait_tasks([task.uid])
print('id: %s [%s]:\n    out:\n%s\n    ret: %s\n'
     % (task.uid, task.state, task.stdout, task.return_value))
['task.000000', '', 'AGENT_STAGING_INPUT_PENDING']
id: task.000000 [DONE]:
    out:
This is a very odd message: 3

    ret: None

[5]:
session.close()

Executing MPI Tasks with RAPTOR

This notebook will walk you through setting up and using the RAPTOR subsystem to execute MPI function tasks. To execute MPI functions with RAPTOR, we need to specify the type and size of the worker to be deployed by RAPTOR. The primary purpose of using RAPTOR to execute MPI functions is RAPTOR’s capabilities to construct and deliver heterogeneous (different ranks) private MPI communicators during the execution time to the function. In the example below, we will execute an MPI function that requires 4 MPI ranks, and for that, we will deploy a single master and worker.

Warning: We assume you have already worked through our RAPTOR tutorial.

Prepare an RP pilot to host RAPTOR

We will launch a pilot with sufficient resources to run both the RAPTOR master (using 1 core) and a single worker instance (using 10 cores):

[1]:
import os

# do not use animated output in notebooks
os.environ['RADICAL_REPORT_ANIME'] = 'False'

import radical.pilot as rp
import radical.utils as ru

# determine the path of the currently active ve to simplify some examples below
ve_path = os.path.dirname(os.path.dirname(ru.which('python3')))

# create session and managers
session = rp.Session()
pmgr    = rp.PilotManager(session)
tmgr    = rp.TaskManager(session)

# submit a pilot
pilot = pmgr.submit_pilots(rp.PilotDescription({'resource'     : 'local.localhost',
                                                'runtime'      : 60,
                                                'cores'        : 17,
                                                'exit_on_error': False}))

# add the pilot to the task manager and wait for the pilot to become active
tmgr.add_pilots(pilot)
pilot.wait(rp.PMGR_ACTIVE)
print('pilot is up and running')
pilot is up and running

The pilot is now in an ACTIVE state, and the resource is acquired.

Prepare RAPTOR environment

RAPTOR mainly depends on mpi4py to launch its masters and workers. Thus, to use RAPTOR, it is required to have mpi4py installed in the used Python/conda environment. RP offers two options: creating a Python/conda environment with mpi4py installed in it, or using an existing environment by specifying the path of the environment. In this tutorial, we will instruct RP to create a new Python virtual environment named raptor_ve:

[2]:
pilot.prepare_env(env_name='raptor_ve',
                  env_spec={'type' : 'venv',
                            'path' : '/tmp/raptor_ve',
                            'setup': ['radical.pilot',
                                      'mpi4py']})
print('raptor_ve is ready')
raptor_ve is ready

The Python virtual environment is now ready, and the pilot can launch the masters/workers and executes the tasks.

Create a master and MPI worker by specifiying the raptor_class: MPIWorker in the worker description below. This value will instruct RAPTOR to start the MPI worker rather than the DefaultWorker. Note that we also specified the number of ranks (cores) in the worker description to a number larger than the required number of ranks for the designated MPI task function in order for the function to be executed on that worker.

Warning: The number of master(s) and worker(s) depends on the workload specifications and the use case that you are trying to execute. Sometimes, you might be required to deploy two masters and workers instead of 1 for more efficient load balancing.

[3]:
master_descr = {'mode': rp.RAPTOR_MASTER,
                'named_env': 'raptor_ve'}
worker_descr = {'mode': rp.RAPTOR_WORKER,
                'ranks': 10,
                'named_env': 'raptor_ve',
                'raptor_class': 'MPIWorker'}

raptor = pilot.submit_raptors( [rp.TaskDescription(master_descr)])[0]
worker = raptor.submit_workers([rp.TaskDescription(worker_descr),
                                rp.TaskDescription(worker_descr)])

Define a Python function that requires running in an MPI environment and use the rp.pythontask decorator so the function can be serialized by RP. Note that this function takes a comm argument, representing the MPI communicator that this function needs.

[4]:
@rp.pythontask
def func_mpi(comm, msg, sleep=2):
    import time
    print('hello %d/%d: %s' % (comm.rank, comm.size, msg))
    time.sleep(sleep)
    return 'func_mpi retval'

Create a corresponding rp.TaskDescription to func_mpi and specify the function object and the number of ranks required to run the function within (the number of ranks also represents the size of the comm object passed to the function during the execution time)

[5]:
# create a minimal MPI function task
td = rp.TaskDescription({'mode': rp.TASK_FUNCTION,
                         'ranks': 4,
                         'function': func_mpi(None, msg='mpi_task.0')})
mpi_task = raptor.submit_tasks([td])[0]

Wait for the task status to be reported back.

[6]:
print(mpi_task)
tmgr.wait_tasks([mpi_task.uid])
print('id: %s [%s]:\n    out:\n%s\n    ret: %s\n'
     % (mpi_task.uid, mpi_task.state, mpi_task.stdout, mpi_task.return_value))
['task.000000', '', 'AGENT_STAGING_INPUT_PENDING']
id: task.000000 [DONE]:
    out:
['hello 2/4: mpi_task.0\n', 'hello 0/4: mpi_task.0\n', 'hello 1/4: mpi_task.0\n', 'hello 3/4: mpi_task.0\n']
    ret: ['func_mpi retval', 'func_mpi retval', 'func_mpi retval', 'func_mpi retval']

[7]:
session.close()

Staging Data

RADICAL-Pilot (RP) provides capabilities of moving data from its client side to its agent side (on the HPC platform) and back, and within the agent space, i.e., between sandboxes for session, pilots and tasks. To ensure that a task finds the data it needs on the HPC platform where it runs, RP provides a mechanism to stage the task’s input data before its execution. Further, RP allows users to stage the output data generated by tasks, back to their workstations or in any other file system location on the HPC platform. Users can then collect their data from that location after their RP application has terminated.

The rest of the tutorial shows how the user can describe data staging and what that means for the running application.

Note: Often, HPC platforms have a shared file system and that allows users not to define staging directives for the tasks. That is because: (i) tasks will be able to access data saved on the shared file system from all the compute nodes allocated to a pilot; and (ii) users can define the paths to those data directly in each task description, without the need to stage (i.e., copy/link) those data inside each task’s sandbox.

Staging directives

Format

The staging directives are specified using a dict type with the following form:

staging_directive = {'source': str, 'target': str, 'action': str, 'flags': int}
  • Source - data files (or directory) that need to be staged (see the section Locations);

  • Target - the target path for the staged data (see the section Locations);

  • Action - defines how the provided data should be staged (see the section Actions);

  • Flags - sets the options applied for a corresponding action (see the section Flags)

Locations

Source and Target locations can be given as strings or radical.utils.Url instances. Strings containing :// are converted into URLs, while strings without :// are considered absolute or relative paths, and are thus interpreted in the context of the client’s working directory (see the section Simplified directive format for examples).

Special URL schemas are relative to certain locations:

  • client:// - client’s working directory (./);

  • endpoint:// - root of the file system on the target platform;

  • resource:// - agent sandbox (radical.pilot.sandbox/) on the target platform;

  • session:// - session sandbox on the target platform (within the agent sandbox);

  • pilot:// - pilot sandbox on the target platform (within the session sandbox);

  • task:// - task sandbox on the target platform (within the pilot sandbox).

All locations are interpreted as directories, not as files. We treat each schema as a namespace (i.e., schema:// = schema:///) and not as a location qualified by a hostname. The hostname element of the URL is expected to be empty, and the path is always considered relative to the locations specified above (even though URLs usually don’t have a notion of relative paths).

endpoint:// is based on the filesystem_endpoint attribute of the platform config (see the tutorial RADICAL-Pilot Configuration System) and points to the file system accessible via that URL. Note that the notion of root depends on the access protocol and the providing service implementation.

We saw sandboxes in the Getting Started tutorial. Here we expand upon that initial introduction. The hierarchy of the sandboxes is the following:

<default_remote_workdir>/radical.pilot.sandbox/<session_sandbox_ID>/<pilot_sandbox_ID>/<task_sandbox_ID>

where default_remote_workdir is the attribute of the platform config (see the tutorial RADICAL-Pilot Configuration System) and, if it is not provided then the current directory is used ($PWD). Sandboxes for session, pilot and task are named with their unique IDs (uid).

Examples of the expanded locations:

# assumptions for the examples below
#   - client's working directory
#        /home/user
#   - agent's sandboxes hierarchy
#        /tmp/radical.pilot.sandbox/rp.session.0000/pilot.0000/task.0000

in : 'client:///tmp/input_data'
out: '/home/user/tmp/input_data'

in : 'task:///test.txt'
out: '/tmp/radical.pilot.sandbox/rp.session.0000/pilot.0000/task.0000/test.txt'
Actions
  • radical.pilot.TRANSFER (default) - remote file transfer from source URL to target URL;

  • radical.pilot.COPY - local file copy (i.e., not crossing host boundaries);

  • radical.pilot.MOVE - local file move;

  • radical.pilot.LINK - local file symlink.

Using appropriate data actions helps to improve the application runtime. It is known that I/O operations are expensive and can negatively impact the total execution time of an application. Thus, RP applications should be built considering that:

  • the most expensive I/O operations (TRANSFER, MOVE, COPY) should be applied for staging between the client:// location and corresponding paths on the target platform, since they will be performed outside of the allocated resources and will be no resources idling (pilot job is not launched at this moment);

  • task staging between sandboxes should minimize the usage of such actions as MOVE and COPY, and use the LINK action if possible, since these operations will be executed within the allocated resources.

In the example from the section Examples, we demonstrate that if all tasks have the same input data, then this data can be located in a shared space (e.g., staged to the pilot:// location) and be linked into each task’s sandbox (e.g., a link per input file within the task:// location).

Flags

Flags are set automatically, but a user also can set them explicitly.

  • radical.pilot.CREATE_PARENTS - create the directory hierarchy for targets on the fly;

  • radical.pilot.RECURSIVE - if source is a directory, handles it recursively.

Simplified directive format

RP gives some flexibility in the description of staging between the client side and the sandboxes for pilot and task. Thus, if a user provides just names (absolute or relative paths, e.g., names of files or directories), then RP expands them into corresponding directives.

  • If a string directive is a single path, then after expanding it, the source will be a provided path within the client:// location, while the target will be a base name from a provided path within the pilot:// or the task:// location for radical.pilot.PilotDescription or radical.pilot.TaskDescription respectively.

  • Having directional characters >, < within a string directive defines the direction of the staging between corresponding paths:

    • Input staging: source > target, the source defines a path within the client:// location, and the target defines a path within the pilot:// or the task:// location for radical.pilot.PilotDescription or radical.pilot.TaskDescription respectively.

    • Output staging: target < source (applied for radical.pilot.TaskDescription only), the source defines a path within the task:// location, and the target defines a path within the client:// location.

Examples of the staging directives being expanded:

radical.pilot.PilotDescription.input_staging

in : [ '/tmp/input_data/' ]
out: [{'source' : 'client:///tmp/input_data',
       'target' : 'pilot:///input_data',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS|radical.pilot.RECURSIVE}]
in : [ 'input.dat > staged.dat' ]
out: [{'source' : 'client:///input.dat',
       'target' : 'pilot:///staged.dat',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS}]

radical.pilot.TaskDescription.input_staging

in : [ '/tmp/task_input.txt' ]
out: [{'source' : 'client:///tmp/task_input.txt',
       'target' : 'task:///task_input.txt',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS}]

radical.pilot.TaskDescription.output_staging

in : [ 'collected.dat < output.txt' ]
out: [{'source' : 'task:///output.txt',
       'target' : 'client:///collected.dat',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS}]

Examples

Note: In these examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP’s reporter does not work well within a notebook. You could use the reporter’s progression bar when executing your RP application as a standalone Python script.

[1]:
%env RADICAL_REPORT_ANIME=FALSE
env: RADICAL_REPORT_ANIME=FALSE
[2]:
import radical.pilot as rp
import radical.utils as ru
[3]:
session = rp.Session()
pmgr    = rp.PilotManager(session=session)
tmgr    = rp.TaskManager(session=session)

For this example, create a new directory input_dir within the current working directory, and place a file into this directory. That file will be the input data for every task (this input file is referred in the radical.pilot.TaskDescription.arguments attribute).

Warning: You need to ensure that the directory, where your script will create the data for staging, is writable. Also, you are responsible to cleanup that data after it is staged.

[4]:
import os

input_dir = os.path.join(os.getcwd(), 'input_dir')
os.makedirs(input_dir, exist_ok=True)

with open(input_dir + '/input.txt', 'w') as f:
    f.write('Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)')

You will stage the newly created directory input_dir with all its files into the pilot:// location.

Note: If provided path for input_staging is not an absolute path, then RP will look for it within the current working directory. Using absolute paths will guarantee that the staging data will be located correctly.

[5]:
# Staging directives for the pilot.

pd = rp.PilotDescription({
    'resource'     : 'local.localhost',
    'cores'        : 2,
    'runtime'      : 15,
    'input_staging': [input_dir],
    'exit_on_error': False
})

# The staging directive above lists a single directory name.
# This will automatically be expanded to:
#
#    {'source' : 'client:///input_dir',
#     'target' : 'pilot:///input_dir',
#     'action' : rp.TRANSFER,
#     'flags'  : rp.CREATE_PARENTS|rp.RECURSIVE}

pilot = pmgr.submit_pilots(pd)
tmgr.add_pilots(pilot)

Note: You can define input data staging for a pilot within the radical.pilot.PilotDescription object or as an input parameter in the radical.pilot.Pilot.stage_in() method. Importantly, you can only use the radical.pilot.Pilot.stage_out() method to define output data staging.

For each task we define directives for input and output staging. We link the file input.txt to the task’s sandbox before executing that task. After its execution, the task produces an output file, which we copy to the pilot sandbox.

[6]:
# Staging directives for tasks.

N_TASKS = 2
tds     = []  # list of task descriptions
outputs = []  # list of file names, which are tasks' outputs

for idx in range(N_TASKS):
    output = 'output.%d.txt' % idx

    td = rp.TaskDescription({
        'executable'    : '/bin/echo',
        'arguments'     : ['$(cat input.txt)'],
        'stdout'        : output,
        # link file from the pilot sandbox to the task sandbox
        'input_staging' : [{'source': 'pilot:///input_dir/input.txt',
                            'target': 'task:///input.txt',
                            'action': rp.LINK}],
        # copy task's output file to the pilot sandbox
        'output_staging': [{'source': 'task:///%s'  % output,
                            'target': 'pilot:///%s' % output,
                            'action': rp.COPY}]
    })

    tds.append(td)
    outputs.append(output)

tmgr.submit_tasks(tds)
tmgr.wait_tasks()
[6]:
['DONE', 'DONE']

You can perform output data staging even after the pilot runtime has finished (i.e., pilot.state=DONE), but always before closing your session object.

[7]:
# Staging data from the pilot sandbox to the client working directory

pilot.stage_out([{'source': 'pilot:///%s'             % output,
                  'target': 'client:///output_dir/%s' % output,
                  'action': rp.TRANSFER} for output in outputs])
[7]:
['/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/devel/docs/source/tutorials/output_dir/output.0.txt',
 '/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/devel/docs/source/tutorials/output_dir/output.1.txt']
[8]:
!cat output_dir/*
Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)
Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)
[9]:
session.close(cleanup=True)

Using HPC Platforms

RADICAL-Pilot consists of a client and a so-called agent. Client and agent can execute on two different machines, e.g., the former on your workstation, the latter on an HPC platform’s compute node. Alternatively, the client can execute on the login node of an HPC platform and the agent on a compute node, or both client and agent can execute on a compute node.

How to deploy RADICAL-Pilot depends on the platform’s policies that regulate access to the platform (ssh, DUO, hardware token), and the amount and type of resources that can be used on a login node (usually minimal). Further, each HPC platform will require a specific resource configuration file (provided with RADICAL-Pilot) and, in some cases, some user-dependent configuration.

RADICAL-Pilot (RP) provides three ways to use supported HPC platforms to execute workloads:

  • Remote submission: users can execute their RP application from their workstation, and then RP accesses the HPC platform via ssh.

  • Interactive submission: users can submit an interactive/batch job on the HPC platform, and then RP from a compute node.

  • Login submission: users can ssh into the login node of the HPC platform, and then launch their RP application from that shell.

Remote submission

Warning: Remote submission does not work with two factors authentication. Target HPC platforms need to support passphrase-protected ssh keys as a login method without the use of a second authentication factor. Usually, the user needs to reach an agreement with the system administrators of the platform in order to allow ssh connections from a specific IP address. Putting such an agreement in place is from difficult to impossible, and requires a fixed IP.

Warning: Remote submissions require a ``ssh`` connection to be alive for the entire duration of the application. If the ssh connection fails while the application executes, the application will fail. This has the potential of leaving an orphan RP Agent running on the HPC platform, consuming allocation and failing to properly execute any new application task. Remote submissions should not be attempted on a laptop with a Wi-Fi connection; and the risk of interrupting the ssh connection increases with the time taken by the application to complete.

If you can manually ssh into the target HPC platform, RADICAL-Pilot can do the same. You will have to set up an ssh key and, for example, follow up this guide if you need to become more familiar.

Note: RADICAL-Pilot will not work without configuring the ssh-agent, and it will require entering the user’s ssh key passphrase to access the HPC platform

After setting up and configuring ssh, you will be able to instruct RP to run its client on your local workstation and its agent on one or more HPC platforms. With the remote submission mode, you:

  1. Create a pilot description object;

  2. Specify and the RP resource ID of the supported HP platform;

  3. Specify the access schema you want to use to access that platform.

[1]:
import radical.pilot as rp

session = rp.Session()
pd_init = {'resource'     : 'tacc.frontera',
           'access_schema': 'ssh'
          }

pdesc = rp.PilotDescription(pd_init)

Note: For a list of supported HPC platforms, see List of Supported Platforms. Resource configuration files can are located at radical/pilot/configs/ in the RADICAL-Pilot git repository.

Interactive Submission

User can perform an interactive submission of an RP application on a supported HPC platform in two ways:

  • Submitting an interactive job to the batch system to acquire a shell and then executing the RP application from that shell.

  • Submitting a batch script to the batch system that, once scheduled, will execute the RP application.

Note: The command to acquire an interactive job and the script language to write a batch job depends on the batch system deployed on the HPC platform and on its configuration. That means that you may have to use different commands or scripts depending on the HPC platform that you want to use. See the guide for each supported HPC platform for more details.

Configuring an RP application for interactive submission

You will need to set the access_schema in your pilot description to interactive. All the other parameters of your application remain the same and are independent of how you execute your RP application. For example, assume that your application requires 4096 cores, will terminate in 10 hours, and you want to execute it on TACC Frontera. To run it from an interactive job, you will have to use the following pilot description:

[2]:
pd_init = {'resource'     : 'tacc.frontera',
           'access_schema': 'interactive',
           'runtime'      : 6000,
           'exit_on_error': True,
           'project'      : 'myproject',
           'queue'        : 'normal',
           'cores'        : 4096,
           'gpus'         : 0
          }

pdesc = rp.PilotDescription(pd_init)
session.close(cleanup=True)
Submitting an interactive job

To run RP an RP application with that pilot description on an interactive computing mode, you must request the amount and type of resources needed to execute your application. That means that, if your application requires N/M cores/GPUs, you will have to submit an interactive job requiring N nodes so that N provides N/M cores/GPUs. Consult the user guide of the resource you want to use to find out how many cores/GPUs each compute node has.

For our example application, you will need to do the following:

  1. ssh into Frontera’s login node. To find out Frontera’s FQDN check its user guide

  2. Check how many nodes you need on Frontera to get at least 4096 cores. Following the user guide, each Cascade Lake (CLX) Compute Nodes of Frontera has 56 cores. Thus, you will need 74 nodes (you may want to consider whether your application could scale to use all the available 4144 cores).

  3. Find on Frontera’s user guide the command and the options required to submit an interactive job.

  4. Issue the appropriate command, in our case, assuming that your application will take no more than 10 hours to complete:

idev -p normal -N 74 -n 56 -m 600
  1. Once your job is scheduled and returns a shell, execute your RP application from that shell, e.g. with:

python3 -m venv /ve/my_rp_ve
. ~/ve/my_rp_ve/bin/activate
python3 my_application.py
Submitting a batch job

To run RP in a batch job, you must create a batch script that specifies your resource requirements, application execution time, and the RP application that you want to execute. Following the example given above, the following script could be used on TACC Frontera:

#SBATCH -J myjob           # Job name
#SBATCH -o myjob.o%j       # Name of stdout output file
#SBATCH -e myjob.e%j       # Name of stderr error file
#SBATCH -p normal          # Queue (partition) name
#SBATCH -N 74              # Total # of nodes
#SBATCH -n 56              # Total # of mpi tasks
#SBATCH -t 10:00:00        # Run time (hh:mm:ss)
#SBATCH --mail-type=all    # Send email at begin and end of job
#SBATCH -A myproject       # Project/Allocation name (req'd if you have more than 1)

python my_application.py

Once saved into a myjobscript.sbatch, you could submit your batch job on Frontera with:

sbatch myjobscript.sbatch

Login submission

Warning: very likely, login submission will break the login node usage policies and be killed by system administrators. Login submissions should be used as a last resort, only when either a remote or interactive submission is not available.

To run your RP application on the login node of a supported HPC platform, you will need to ssh into the login node, load the python environment and execute your PR application. For the example application above, you would do the following:

ssh username@frontera.tacc.utexas.edu
python3 -m venv /ve/my_rp_ve
. ~/ve/my_rp_ve/bin/activate
python3 my_application.py

But you would be breaching the login node usage policies on Frontera.

Supported HPC Platforms

RADICAL-Pilot (RP) supports a variety of high performance computing (HPC) platforms, across the Department of Energy and the National Science Foundation. RP utilizes json configuration files to store the parameters specific to each platform. See the configuration `tutorial https://radicalpilot.readthedocs.io/en/stable/tutorials/configuration.html#Platform-description`_ for more information about how to write a configuration file for a new platform. You can also see the current configuration file for each of the supported platforms in the tables below.

Department of Energy (DOE) HPC Platforms

Name

FQDN

Launch Method

Configuration File

Andes

andes.olcf.ornl.gov

srun

resource_ornl.json

Aurora

aurora.alcf.anl.gov

mpiexec

resource_anl.json

Cheyenne

cheyenne.ucar.edu

fork, mpirun, mpiexec_mpt

resource_ncar.json

Crusher

crusher.olcf.ornl.gov

srun

resource_ornl.json

Frontier

frontier.olcf.ornl.gov

srun

resource_ornl.json

Lassen

lassen.llnl.gov

jsrun

resource_llnl.json

Polaris

polaris.alcf.anl.gov

mpiexec

resource_anl.json

Summit

summit.olcf.ornl.gov

jsrun, mpirun, ssh, prte

resource_ornl.json

National Science Foundation (NSF) HPC Platforms

Name

FQDN

Launch Method

Configuration File

Bridges2

bridges2.psc.edu

srun

resource_access.json

Expanse

login.expanse.sdsc.edu

mpirun

resource_access.json

Frontera

frontera.tacc.utexas.edu

mpirun, ssh, srun, prte

resource_tacc.json

Stampede2

stampede2.tacc.utexas.edu

fork, ibrun, mpirun, ssh

resource_access.json

Campus HPC Platforms

Name

FQDN

Launch Method

Configuration File

Amarel

amarel.rutgers.edu

srun

resource_rutgers.json

Tiger

tiger.princeton.edu

srun

resource_princeton.json

Traverse

traverse.princeton.edu

srun, mpirun

resource_princeton.json

Rivanna

rivanna.hpc.virginia.edu

mpirun

resource_uva.json

Guides

Amarel (Rutgers)

Platform user guide

https://sites.google.com/view/cluster-user-guide

General description
  • Resource manager - SLURM

  • Launch methods (per platform ID)

    • rutgers.amarel - SRUN

  • Configuration per node (per queue)

    • main queue (388 nodes)

      • 12-56 CPU cores

      • 64-256 GB of memory

    • gpu queue (38 nodes)

      • 12-52 CPU cores

      • 2-8 GPUs

      • 64-1500 GB of memory

    • mem queue (7 nodes)

      • 28-56 CPU cores

      • 512-1500 GB of memory

  • Other available queues

    • nonpre

      • jobs will not be preempted by higher-priority or owner jobs

    • graphical

      • specialized partition for jobs submitted by the OnDemand system

    • cmain

      • “main” partition for the Amarel resources located in Camden

Note

In order to be able to access Amarel cluster, you must be connected to Rutgers Virtual Private Network (VPN) with a valid Rutgers netid.

Note

Amarel uses the --constraint option in SLURM to specify nodes features (SLURM constraint). RADICAL-Pilot allows to provide such features within a corresponding configuration file. For example, if you want to select nodes with “skylake” and “oarc” features (see the list of Available compute hardware), please follow the next steps:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_rutgers.json <<EOF
{
    "amarel": {
        "system_architecture": {"options": ["skylake", "oarc"]}
    }
}
EOF
Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python
# OR
#   module use /projects/community/modulefiles
#   module load python/3.9.6-gc563
python3.9 -m venv ve.rp
source ve.rp/bin/activate

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot
Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Aurora (ALCF/ANL)

ACCESS IS CURRENTLY ENABLED FOR ESP and ECP TEAMS ONLY

Platform user guide

https://docs.alcf.anl.gov/aurora/getting-started-on-aurora/

General description
  • Resource manager - PBSPRO

  • Launch methods (per platform ID)

    • anl.aurora - MPIEXEC

  • Configuration per node (10,624 nodes in total)

    • 2 CPUs with 52 cores each, 2 threads per core (SMT=2)

    • 6 GPUs (Intel Data Center Max 1550 Series)

    • 512 GB of memory per CPU

Note

RADICAL-Pilot provides a possibility to manage the -l option (resource selection qualifier) for PBSPRO and sets the default values in a corresponding configuration file. For the cases, when it is needed to have a different setup, please, follow these steps:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_anl.json <<EOF
{
    "aurora": {
        "system_architecture": {"options": ["filesystems=grand:home",
                                            "place=scatter"]}
    }
}
EOF
Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load cray-python/3.9.13.1
python3 -m venv ve.rp
source ve.rp/bin/activate

OR create a virtual environment with conda:

module use /soft/modulefiles
module load frameworks
conda create -y -n ve.rp python=3.9
conda activate ve.rp
# OR clone base environment
#   conda create -y -p $HOME/ve.rp --clone $CONDA_PREFIX
#   conda activate $HOME/ve.rp

Note

Using conda would require to have a different environment setup for RADICAL-Pilot while running on a target platform. To have it configured correctly, please, follow the steps below:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_anl.json <<EOF
{
    "aurora": {
        "pre_bootstrap_0" : ["module use /soft/modulefiles",
                             "module load frameworks"]
    }
}
EOF

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot
# OR in case of conda environment
conda install -c conda-forge radical.pilot
Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself. In this example we use virtual environment with venv.

#!/bin/sh

# - pre run -
module load cray-python
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG
export RADICAL_REPORT=TRUE

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Bridges2 (ACCESS)

Platform user guide

https://www.psc.edu/resources/bridges-2/user-guide-2-2/

General description
  • Resource manager - SLURM

  • Launch methods (per platform ID)

    • access.bridges2 - SRUN

  • Configuration per node (per queue)

    • Regular Memory allocation:

      • RM or RM-512 queues (50 nodes):

        • 128 CPU cores (1 thread per core)

        • 256 GB or 512 GB of memory respectively

      • RM-shared queue (50 nodes):

        • 128 CPU cores (1 thread per core)

        • 512 GB of memory

    • Extreme Memory allocation:

      • EM queue (100 nodes):

        • 96 CPU cores (1 thread per core)

        • 4 TB of memory

    • GPU allocation:

      • GPU queue (33 nodes):

        • 40 CPU cores (1 thread per core)

        • 8 GPUs (NVIDIA Tesla v100-32 * 32 GB)

        • 8 GPUs Nvidia (V100-16 * 16 GB)

        • 512 GB of memory

      • GPU-shared queue (1 node):

        • 48 CPU cores (1 thread per core)

        • 16 GPUs (NVIDIA V100-32 * 32 GB)

        • 1.5 TB of memory

Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python
python3 -m venv ve.rp
source ve.rp/bin/activate

OR create a virtual environment with conda:

module load anaconda3
conda create --name conda.rp
conda activate conda.rp

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot
# OR in case of conda environment
conda install -c conda-forge radical.pilot
Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Delta (NCSA)

Platform user guide

https://wiki.ncsa.illinois.edu/display/DSC/Delta+User+Guide

General description
  • Resource manager - SLURM

  • Launch methods (per platform ID)

    • ncsa.delta* - SRUN

  • Configuration per node (per platform ID)

    • ncsa.delta (132 nodes)

      • 128 CPU cores, each core has 1 thread

      • 256 GiB of memory

    • ncsa.delta_gpu_a40 (100 nodes)

      • 64 CPU cores, each core has 1 thread

      • 4 GPUs (NVIDIA A40)

      • 256 GiB of memory

    • ncsa.delta_gpu_a100_4way (100 nodes)

      • 64 CPU cores, each core has 1 thread

      • 4 GPUs (NVIDIA A100)

      • 256 GiB of memory

    • ncsa.delta_gpu_a100_8way (6 nodes)

      • 128 CPU cores, each core has 1 thread

      • 8 GPUs (NVIDIA A100)

      • 2,048 GiB of memory

    • ncsa.delta_gpu_mi100 (1 node)

      • 128 CPU cores, each core has 1 thread

      • 8 GPUs (AMD MI100)

      • 2,048 GiB of memory

Note

Use the accounts command to list the accounts available for charging (Local Account Charging).

Note

Use the quota command to view your use of the file systems and use by your projects (Quota Usage).

Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python
python3 -m venv ve.rp
source ve.rp/bin/activate

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot

Note

Polaris does not provide virtual environments with conda.

Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

SDSC Expanse

Frontera (TACC)

Platform user guide

https://frontera-portal.tacc.utexas.edu/user-guide/

General description
  • Resource manager - SLURM

  • Launch methods (per platform ID)

    • tacc.frontera - MPIRUN

    • tacc.frontera_rtx - SRUN

    • tacc.frontera_prte - PRTE (PRRTE/PMIx)

  • Configuration per node (per platform ID)

    • tacc.frontera, tacc.frontera_prte

      • Cascade Lake (CLX) compute nodes (8,368 nodes)

        • 56 CPU cores

        • 192 GB of memory

      • Large memory nodes (16 nodes)

        • 112 CPU cores

        • 2.1 TB of memory (NVDIMM)

    • tacc.frontera_rtx (90 nodes)

      • 32 CPU cores

      • 4 GPUs (NVIDIA RTX 5000)

      • 128 GB of memory

Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python3
python3 -m venv ve.rp
source ve.rp/bin/activate

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot
Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python3
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Frontier (OLCF/ORNL)

Platform user guide

https://docs.olcf.ornl.gov/systems/frontier_user_guide.html

General description
  • Resource manager - SLURM

  • Launch methods (per platform ID)

    • ornl.frontier - SRUN

    • ornl.frontier_flux - FLUX

  • Configuration per node (9,408 nodes in total)

    • 64 CPU cores, each core has 2 threads (SMT=2)

    • 8 GPUs (AMD MI250X)

    • 512 GiB of memory

Note

Frontier uses the --constraint option in SLURM to specify nodes features (SLURM constraint). RADICAL-Pilot allows to provide such features within a corresponding configuration file. For example, follow the following steps to set NVMe constraint (NVMe Usage):

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_ornl.json <<EOF
{
    "frontier": {
        "system_architecture": {"options": ["nvme"]}
    }
}
EOF

Note

RADICAL-Pilot follows the default setting of Frontier SLURM core specialization, which reserves one core from each L3 cache region, leaving 56 allocatable cores out of the available 64.

If you need to change the core specialization to use all 64 cores (i.e., constraining all system processes to core 0), then follow these steps:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_ornl.json <<EOF
{
   "frontier": {
      "system_architecture" : {"blocked_cores" : []}
   }
}
EOF

If you need to change only the SMT level (=1), but keep the default setting (8 cores for system processes), then follow these steps:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_ornl.json <<EOF
{
   "frontier": {
      "system_architecture" : {"smt"           : 1,
                               "blocked_cores" : [0, 8, 16, 24, 32, 40, 48, 56]}
   }
}
EOF

Note

Changes in the "system_architecture" parameters can be combined.

Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load cray-python
python3 -m venv ve.rp
source ve.rp/bin/activate

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot

Note

Frontier does not provide virtual environments with conda.

Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load cray-python
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG
export RADICAL_REPORT=TRUE

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Lassen (LLNL)

Platform user guide

https://hpc.llnl.gov/documentation/tutorials/using-lc-s-sierra-systems

General description
  • Resource manager - LSF

  • Launch methods (per platform ID)

    • llnl.lassen - JSRUN

  • Configuration per node (788 nodes in total)

    • 44 CPU cores (Power9), each core has 4 hardware-threads (SMT=4)

      • 4 cores are blocked for users (reserved for system processes)

    • 4 GPUs (NVIDIA Tesla V100)

    • 256 GB of memory

Note

Changing the number of hardware-threads per core available for an application could be done either with export RADICAL_SMT=1 (before running the application) or by following the steps below:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_llnl.json <<EOF
{
    "lassen": {
        "system_architecture": {"smt": 1}
    }
}
EOF

Note

Lassen uses the -alloc_flags option in LSF to specify nodes features. RADICAL-Pilot allows to provide such features within a corresponding configuration file. Allowed features are: atsdisable, autonumaoff, cpublink, ipisolate. For example, follow the next steps to add some of that features:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_llnl.json <<EOF
{
    "lassen": {
        "system_architecture": {"options": ["autonumaoff", "cpublink"]}
    }
}
EOF

Note

Changes in the "system_architecture" parameters can be combined.

Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python/3.8.2
python3 -m venv ve.rp
source ve.rp/bin/activate
pip install -U pip setuptools wheel

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot

Note

Lassen does not provide virtual environments with conda.

Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python/3.8.2
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Perlmutter (NERSC)

Platform user guide

https://docs.nersc.gov/systems/perlmutter/

General description
  • Resource manager - SLURM

  • Launch methods (per platform ID)

    • nersc.perlmutter* - SRUN

  • Configuration per node (per platform ID)

    • nersc.perlmutter (3,072 nodes)

      • 128 CPU cores, each core has 2 threads (SMT=2)

      • 512 GiB of memory

    • nersc.perlmutter_gpu (1,792 nodes in total)

      • 64 CPU cores, each core has 2 threads (SMT=2)

      • 4 GPUs (NVIDIA A100)

        • 1,536 nodes with 40 GiB of HBM per GPU

        • 256 nodes with 80 GiB of HBM per GPU

      • 256 GiB of memory

Note

Perlmutter uses the --constraint option in SLURM to specify nodes features (SLURM constraint). RADICAL-Pilot allows to provide such features within a corresponding configuration file. For example, Perlmutter allows to request to run on up to 256 GPU nodes, which have 80 GiB of GPU-attached memory instead of 40 GiB (Specify a constraint during resource allocation), thus the corresponding configuration should be updated as following:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_nersc.json <<EOF
{
    "perlmutter_gpu": {
        "system_architecture": {"options": ["gpu", "hbm80g"]}
    }
}
EOF
Setup execution environment
Python virtual environment

Using Python at NERSC

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python
python3 -m venv ve.rp
source ve.rp/bin/activate

OR create a virtual environment with conda:

module load python
conda create -y -n ve.rp python=3.9
conda activate ve.rp

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot
# OR in case of conda environment
conda install -c conda-forge radical.pilot
Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Polaris (ALCF/ANL)

Platform user guide

https://docs.alcf.anl.gov/polaris/getting-started

General description
  • Resource manager - PBSPRO

  • Launch methods (per platform ID)

    • anl.polaris - MPIEXEC

  • Configuration per node (560 nodes in total)

    • 32 CPU cores, each core has 2 threads (SMT=2)

    • 4 GPUs (NVIDIA A100)

    • 512 GiB of memory

Note

RADICAL-Pilot provides a possibility to manage the -l option (resource selection qualifier) for PBSPRO and sets the default values in a corresponding configuration file. For the cases, when it is needed to have a different setup, please, follow these steps:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_anl.json <<EOF
{
    "polaris": {
        "system_architecture": {"options": ["filesystems=grand:home",
                                            "place=scatter"]}
    }
}
EOF

Note

Binding MPI ranks to GPUs: If you want to control GPUs assignment per task, then the following code snippet provides an example of setting CUDA_VISIBLE_DEVICES for each MPI rank on Polaris:

import radical.pilot as rp

td = rp.TaskDescription()
td.pre_exec.append('export CUDA_VISIBLE_DEVICES=$((3 - $PMI_LOCAL_RANK % 4))')
td.gpu_type = ''  # reset GPU type, thus RP will not set "CUDA_VISIBLE_DEVICES"
Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
python3 -m venv ve.rp
source ve.rp/bin/activate

OR create a virtual environment with conda:

module load conda; conda activate
conda create -y -n ve.rp python=3.9
conda activate ve.rp
# OR clone base environment
#   conda create -y -p $HOME/ve.rp --clone $CONDA_PREFIX
#   conda activate $HOME/ve.rp

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot
# OR in case of conda environment
conda install -c conda-forge radical.pilot
Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself. In this example we use virtual environment with conda.

#!/bin/sh

# - pre run -
module load conda
eval "$(conda shell.posix hook)"
conda activate ve.rp

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Monitoring page: https://status.alcf.anl.gov/#/polaris


Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Rivanna (UVA)

Platform user guide

https://www.rc.virginia.edu/userinfo/rivanna/overview/

General description
  • Resource manager - SLURM

  • Launch methods (per platform ID)

    • uva.rivanna - SRUN

  • Configuration per node

    • CPU-only nodes (520 nodes in total)

      • 16-48 CPU cores

      • 128-1500 GB of memory

    • GPU nodes (47 nodes in total)

      • 28-128 CPU cores

      • 4-10 GPUs (A100, P100, V100, K80, RTX2080Ti, RTX3090)

      • 128-2000 GB of memory

  • Available queues

    • standard

    • parallel

    • largemem

    • gpu

    • dev

Note

Rivanna nodes are heterogeneous and have different node configurations. Please refer to this link for more information about the resources per node.

Note

If you run RADICAL-Pilot in the “interactive” mode (pilot_description.access_schema = 'interactive'), make sure that you use option --exclusive (SLURM exclusive) in your batch script or within a command to start an interactive session.

Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python
python3 -m venv ve.rp
source ve.rp/bin/activate

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot

Note

Rivanna does not provide virtual environments with conda.

Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Summit (OLCF/ORNL)

Platform user guide

https://docs.olcf.ornl.gov/systems/summit_user_guide.html

General description
  • Resource manager - LSF

  • Launch methods (per platform ID)

    • ornl.summit - MPIRUN

    • ornl.summit_jsrun - JSRUN

    • ornl.summit_prte - PRTE (PRRTE/PMIx)

    • ornl.summit_interactive - MPIRUN, JSRUN

  • Configuration per node

    • Regular nodes (4,674 nodes)

      • 44 CPU cores (Power9), each core has 4 hardware-threads (SMT=4)

        • 2 cores are blocked for users (reserved for system processes)

      • 6 GPUs (NVIDIA Tesla V100)

      • 512 GB of memory

    • “High memory” nodes (54 nodes)

      • Basic configuration is the same as for regular nodes

      • 2 TB of memory

Note

Launch method MPIRUN is able to see only one hardware-thread per core, thus make sure that SMT level is set to 1 with a corresponding platform ID either with export RADICAL_SMT=1 (before running the application) or follow the steps below:

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_ornl.json <<EOF
{
    "summit": {
        "system_architecture": {"smt": 1}
    }
}
EOF

Launch methods JSRUN and PRTE support the following values for the SMT level: 1, 2, 4 (see Hardware Threads).

Note

Summit uses the -alloc_flags option in LSF to specify nodes features (Allocation-wide Options). RADICAL-Pilot allows to provide such features within a corresponding configuration file. For example, follow the next steps to enable Multi-Process Service (MPS):

mkdir -p ~/.radical/pilot/configs
cat > ~/.radical/pilot/configs/resource_ornl.json <<EOF
{
    "summit_jsrun": {
        "system_architecture": {"options": ["gpumps"]}
    }
}
EOF

Note

Changes in the "system_architecture" parameters can be combined.

Setup execution environment
Python virtual environment

Create a virtual environment with venv:

export PYTHONNOUSERSITE=True
module load python/3.11.6
# OR with old modules
#    module load DefApps-2023
#    module load python/3.8-anaconda3
python3 -m venv ve.rp
source ve.rp/bin/activate

OR create a virtual environment with conda (using old modules):

module load DefApps-2023
module load python/3.8-anaconda3
conda create -y -n ve.rp python=3.9
eval "$(conda shell.posix hook)"
conda activate ve.rp

OR clone a conda virtual environment from the base environment (using old modules):

module load DefApps-2023
module load python/3.8-anaconda3
eval "$(conda shell.posix hook)"
conda create -y -p $HOME/ve.rp --clone $CONDA_PREFIX
conda activate $HOME/ve.rp

Install RADICAL-Pilot after activating a corresponding virtual environment:

pip install radical.pilot
# OR in case of conda environment
conda install -c conda-forge radical.pilot
Launching script example

Launching script (e.g., rp_launcher.sh) for the RADICAL-Pilot application includes setup processes to activate a certain execution environment and launching command for the application itself.

#!/bin/sh

# - pre run -
module load python/3.11.6
source ve.rp/bin/activate

export RADICAL_PROFILE=TRUE
# for debugging purposes
export RADICAL_LOG_LVL=DEBUG
export RADICAL_REPORT=TRUE

# - run -
python <rp_application>

Execute launching script as ./rp_launcher.sh or run it in the background:

nohup ./rp_launcher.sh > OUTPUT 2>&1 </dev/null &
# check the status of the script running:
#   jobs -l

Note

If you find any inaccuracy in this description, please, report back to us by opening a ticket.

Environment Variables

Several aspects of RADICAL-Pilot (RP) behavior can/must be configured via environment variables. Those variables are exported in the shell from which you will launch your RP application. Usually, environment variables can be set using the export command.

Note

All the variables are optional. You can set them to change/enhance the behavior of RP, depending on your requirements.

Warning

Tables are large, scroll to the right to see the whole table.

End user

Name

Description

Default value

RADICAL_BASE

Root directory where to save temporary state files

$HOME/.radical/

RADICAL_UTILS_NTPHOST

NTP host used for profile syncing

0.pool.ntp.org

RADICAL_PILOT_BULK_CB

Enables bulk callbacks to boost performance. This changes the callback signature

FALSE

RADICAL_PILOT_STRICT_CANCEL

Limits task cancelation by not forcing the state “CANCELLED” on the Task Manager.

{NOT_SET}

RADICAL_DEFAULT_LOG_TGT

Log target

.

RADICAL_DEFAULT_LOG_DIR

Log directory

$PWD

RADICAL_DEFAULT_LOG_LVL

The default log level when not explicitly set

ERROR

RADICAL_DEFAULT_REPORT

Flag to turn reporting on [TRUE/1] or off [FALSE/0/OFF]

TRUE

RADICAL_DEFAULT_REPORT_TGT

List of comma separated targets [0/null, 1/stdout, 2/stderr, ./{report_name/path}]

stderr

RADICAL_DEFAULT_REPORT_DIR

Directory used by the reporter module

$PWD

RADICAL_DEFAULT_PROFILE

Flag to turn profiling/tracing on [TRUE/1] or off [FALSE/0/OFF]

TRUE

RADICAL_DEFAULT_PROFILE_DIR

Directory where to store profiles/traces

$PWD

RADICAL_PILOT_PROXY_URL

Proxy to facilitate communication between the client machine, i.e., the host where the application created this Session instance, and the target resource, i.e., the host where the pilot agent/s is/are running and where the workload is being executed.

{NOT_SET}

Logger

ru.Logger instances have a name and a name space.

Name

Description

Default value

<NS>_LOG_LVL

Logging level [“DEBUG”, “INFO”, “WARNING”, “ERROR”, “CRITICAL”]

Refer RADICAL_DEFAULT_*

<NS>_LOG_TGT

Used for the log targets

Refer RADICAL_DEFAULT_*

Note

The name space is used to derive environmental variable names for log levels and targets. If no name space is given, it is derived from the variable name. For example, the name radical.pilot becomes RADICAL_PILOT.

Note

<NS>_LOG_LVL controls the debug output for a corresponding namespace (NS), where NS can be applied as for a specific package (e.g., RADICAL_PILOT_LOG_LVL or RADICAL_UTILS_LOG_LVL) or for a whole stack (e.g., RADICAL_LOG_LVL).

Reporter

ru.Reporter instances are very similar to ru.Logger instances: same schema is used for names and name spaces.

Name

Description

Default value

<NS>_REPORT

Boolean to turn on and off Reporter

TRUE

<NS>_LOG_TGT

Where to report to.

{NOT_SET}

Note

<NS>_LOG_TGT is a list of comma separated targets [“0”/”null”, “1”/”stdout”, “2”/”stderr”, “.”/”<log_name>”] where to write the debug output for a corresponding namespace (NS).

Developers

Name

Description

Default value

RADICAL_UTILS_NO_ATFORK

Disables monkeypatching

{NOT_SET}

RADICAL_DEBUG

Enables scattered debug facilities. This will likely slow down and even destabilize the code

{NOT_SET}

RU_RAISE_ON_\*

Related to RADICAL_DEBUG, triggers specific exceptions

{NOT_SET}

RADICAL_DEBUG_HELPER

Related to RADICAL_DEBUG, enables a persistent debug helper class in the code and installs some signal handlers for extra debug output

{NOT_SET}

RADICAL_DEBUG_VERBOSE

Related to RADICAL_DEBUG, enables verbose messages for debugging. Controls “debug” module to collect stack traces. Verbose flag sets the level of details for output messages

{NOT_SET}

\*_PROFILE

Profiler is similar to Logger and Reporter

{NOT_SET}

RADICAL_PILOT_PRUN_VERBOSE

Increase verbosity of prun output

FALSE

UMS_OMPIX_PRRTE_DIR

Installation directory for PMIx/PRRTE used in RP LM PRTE (optional, to be obsolete)

{NOT_SET}

RADICAL_SAGA_SMT

Sets SMT settings on some resources. Usually configured via resource config options

1

RP_PROF_DEBUG

Enables additional debug messages on profile extraction

{NOT_SET}

SAGA

Name

Description

Default value

RADICAL_SAGA_PTY_SSH_PROMPT

Prompt Pattern. Use this regex to detect shell prompts

[\$#%>\]]\s*$

RADICAL_SAGA_PTY_SSH_COPYMODE

Use the specified protocol for pty-level file transfer

options: ‘sftp’, ‘scp’, ‘rsync+ssh’, ‘rsync’

RADICAL_SAGA_PTY_SSH_SHAREMODE

Use the specified mode as flag for the ssh ControlMaster

options: ‘auto’, ‘no’ (This should be set to “no” on CentOS)

RADICAL_SAGA_PTY_SSH_TIMEOUT

Connection timeout (in seconds) for the SAGA PTY layer. Connection timeout should be set to 60 or more

10.0

RADICAL_SAGA_PTY_CONN_POOL_SIZE

Maximum number of connections kept in a connection pool

10

RADICAL_SAGA_PTY_CONN_POOL_TTL

Minimum time a connection is kept alive in a connection pool

600

RADICAL_SAGA_PTY_CONN_POOL_WAIT

Maximum number of seconds to wait for any connection in the connection pool to become available before raising a timeout error

600

Deprecated

Name

Description

RP_ENABLE_OLD_DEFINES

Enables backward compatibility for old state defines

Contributing

This section offers useful information to the developers that want to join the RADICAL-Cybertools (RCT) community and contribute to RADICAL-Pilot (RP) development. Here, developers can find both technical and organizational information about how and when RP is released or the code style to adopt when opening a pull request, but also how to behave when participating in the community activities.

Code of Conduct

This code of conduct regulates the activities of the maintainers and contributors to the RADICAL-Pilot (RP) project.

Our Pledge

In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, sex characteristics, gender identity and expression, sexual identity and orientation, level of experience, education, socio-economic status, nationality, personal appearance, race, or religion.

Our Standards

Examples of behavior that contributes to creating a positive environment include:

  • Using welcoming and inclusive language;

  • being respectful of differing viewpoints and experiences;

  • gracefully accepting constructive criticism;

  • focusing on what is best for the community as opposed to personal agendas and preferences;

  • showing empathy and exercising emotional responsibility towards other community members;

Examples of unacceptable behavior by participants include:

  • Trolling, insulting/derogatory comments, and personal or political attacks

  • the use of sexualized language or imagery and unwelcome sexual attention or advances;

  • public or private harassment;

  • publishing others’ private information, such as a physical or electronic address, without explicit permission;

  • other conduct which could reasonably be considered inappropriate in a professional setting, as defined by the ACM Code of Ethics and Professional Conduct.

Our Responsibilities

Maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.

Maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.

Scope

This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, acting as an appointed representative at an online or offline event, or publishing material related to the project. Representation of a project may be further defined and clarified by project maintainers.

Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project’s Principal Investigators (PIs) at shantenu.jha@rutgers.edu and matteo.turilli@rutgers.edu. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The PIs are obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.

Attribution

This Code of Conduct is adapted from the Contributor Covenant, version 1.4, and from the Contributor Covenant Code of Conduct.

Coding Guidelines

We use PEP.8 as base of our coding guidelines, with some additions and changes. Those changes are mostly due to the personal preferences of the current developers of RADICAL-Pilot (RP). Such preferences can be discussed, revised and agreed upon during RP development meetings. Deviations from PEP.8 pertains to:

  • Enabling visual alignment of code;

  • preferring ' over " (but use " when that avoids escapes);

  • preferring %-based string expansion over string.format;

  • allowing short names for loop variables.

In all other regards, we expect PEP.8 compliance.

Example:

#!/usr/bin/env python3

import sys
import time
import pprint

import radical.utils as ru
import radical.saga  as rs
import radical.pilot as rp


# ------------------------------------------------------------------------------
# constants
# (we use delimiting lines like above to separate code sections, classes, etc.)
#
STATE_ONE   = 'one'
STATE_TWO   = 'two'
STATE_THREE = 'three'


# ------------------------------------------------------------------------------
#
# this class is nothing special, but its mine.
#
class MyClass(object):
    '''
    This method demonstrates what deviations from PEP.8 are accepted in RP.
    '''

    # --------------------------------------------------------------------------
    #
    def __init__(self):

        self._state   = STATE_ONE
        self._created = time.time()
        self._worker  = ru.Thread()
        self._config  = {'title'         : 'hello darkness',
                         'subtitle'      : 'my old friend',
                         'super_category': 'example'}


    # --------------------------------------------------------------------------
    #
    def method(self, arg_1='default_1', arg_2='default_2',
                     arg_3='default_3', arg_4='default_4'):
        '''
        method documentation
        '''

        if arg_1 == 'default_1' and \
           arg_2 != 'default_2':
            print 'this is unexpectedly indented (%s: %s)' % (arg_1, arg_2)

        # we allow one letter names for temporary loop variables
        for i in range(4):
            print "It's a simply i [%d]" % i

        # strangely indendet clauses
        if   arg3 == arg4: print 'haleluja'               # because why not
        elif arg2 == arg4: print 'praise the space'       # loooots of space
        else             : print 'clickediclickediclick'  # clickedispace!

        pprint.some_formatting_method(mode='radical', align='fancy',
                                      enforce=False,  encourage=True)


# ------------------------------------------------------------------------------

This is our flake8 configuration, which should transfer to other pep8 linters:

[flake8]
exclude          = .git,__pycache__,docs,build,dist,setup.py
max-complexity   = 10
max-line-length  = 80
per-file-ignores =
     # imported but unused, star-import, unmatched F*
     __init__.py: F401, F403, X100
ignore           =
     # module level import location
     E402,
     # space before/after operator
     E221, E222,E251,
     # multiple spaces after/after keyword
     E271, E272,
     # line too long
     E501,
     # whitespace before/after ...
     E203, E202, E231, E241, E211, E116, E127,
     # same indent
     E129,
     # comment indent not mod(4)
     E114,
     # cont line indentation
     E128, E126, E124, E125, E131,
     # blank lines
     W391, E301, E303,
     # multiple statements on one line
     E701, E704,
     # space before bracket
     C0326,
     # trailing whitespace
     W291,
     # Complex methods
     C901,
     # Do not use bare 'except'
     E722,
     # allow line breaks after binary operators
     W504,
     # allow lambda assignment (used for partial callbacks)
     E731

Testing Guidelines

Unit and integration tests for any major change to the code. Unit tests for most of the new classes and methods are also mandatory.

Unit Tests

When coding unit tests, the following guidelines are in effect. This is based on Python’s unittest module (docs), information gathered by Python Guide docs, Pytest docs, online examples and the personal preferences of the development team. Such preferences can be discussed, revised and agreed upon during RP development meetings.

Tests organization

Under the /tests folder, for each module there is a folder named test_*, where * is the component under test. A unit test for a specific class contains in the filename the name of the tested class after test_. For example:

tests/
│
├── test_scheduler/
│   └── __init__.py
|   └── test_base.py

Each unit test has a single class. This class is named after the component it tests, for example TestContinuous for testing the continuous scheduler of RP. In addition, the following guidelines are in place:

  • Unit tests should import the TestCase class from unittest, for example:

    from unittest import TestCase
    
  • Unit tests should inherit from TestCase:

    class SimpleTestClass(TestCase):
    
  • A single class method should be tested by each test class method. All others should be mocked like:

    # --------------------------------------------------------------------------
    #
    @mock.patch.object(Continuous, '__init__', return_value=None)
    @mock.patch.object(Continuous, '_configure', return_value=None)
    def test_find_resources(mocked_init, mocked_configure):
    
  • Each unit test can define a setUp method and utilize it to get its test cases from the test_cases folder. This method can be generalized between tests of the same component.

  • Each unit test is responsible to tear down the test and remove any logs, profiles or any other file created because of the test by defining a tearDown method.

  • Each unit test defines a main as follows:

    if __name__ == '__main__':
    
        tc = SimpleTestClass()
        tc.test_find_resources()
    
  • Unit tests methods should use assertion methods and not assert as defined in TestCase documentation. Accordingly, test cases organization is proposed to be as following. That proposal will be evolved, and the current guidelines will be extended accordingly.

Executing tests

We run unit tests using pytest from the repo’s root folder, for example:

pytest -vvv tests/

Testing methods with no return

Each method in RP’s components does three things: returns a value, changes the state of an object or executes a callback. Two of these cases do not run a return or return always True. When the method changes an object the new values should be checked. When the method calls a callback the callback should be mocked.

Object change

Some methods change the state of either an input object or the state of their self. For example:

...
def foo(self, unit, value):
    unit['new_entry'] = some_value

In this case the test should assert for the new value, for example:

def test_foo(self):
    component.foo(unit, value)
    self.assertTrue(unit['new_entry'], value)

Similarly, when a method changes the state of a component that should be asserted. For example:

def configure(self):
    self._attribute = something

The test should look like:

def test_configure(self):
    component.configure()

    self.assertTrue(component._attribute, something)
Mocking callbacks

RP uses callbacks to move information around components. As a result, several methods are not returning specific values or objects. This in turn makes it difficult to create a unit test for such methods.

The following code shows an example of how such methods can be mocked so that a unit test can receive the necessary information

# --------------------------------------------------------------------------
#
@mock.patch.object(Default, '__init__',   return_value=None)
@mock.patch('radical.utils.raise_on')
def test_work(self, mocked_init, mocked_raise_on):

    global_things = []
    global_state = []

    # ------------------------------------------------------------------------------
    #
    def _advance_side_effect(things, state, publish, push):
        nonlocal global_things
        nonlocal global_state
        global_things.append(things)
        global_state.append(state)

    # ------------------------------------------------------------------------------
    #
    def _handle_unit_side_effect(unit, actionables):
        _advance_side_effect(unit, actionables, False, False)


    tests = self.setUp()
    component = Default(cfg=None, session=None)
    component._handle_unit = mock.MagicMock(side_effect=_handle_unit_side_effect)
    component.advance = mock.MagicMock(side_effect=_advance_side_effect)
    component._log = ru.Logger('dummy')

    for test in tests:
        global_things = []
        global_state = []
        component._work([test[0]])
        self.assertEqual(global_things, test[1][0])
        self.assertEqual(global_state, test[1][1])

The method under test (MUT) checks if a unit has staging input directives and is pushed either to _handle_units or advance. Finally, _handle_units call advance. It is important to mock both calls. For that reason there are two local functions defined _advance_side_effect and _handle_unit_side_effect. These functions are used as side_effects of MagicMock. When these methods are called by the MUT, the code in our code will be executed.

We also want to be able to capture the input of the side effect. This is done by global_things and global_state variables. It is important that these two variables are changed from the mock functions and keep the new values. This is done by defining them as nonlocal.

Branching Model

RADICAL-Pilot (RP) uses git-flow as branching model, with some simplifications. We follow semantic version numbering.

  • Release candidates and releases are tagged in the master branch (we do not use dedicated release branches at this point).

  • A release is prepared by:

    • Tagging a release candidate on devel (e.g. v1.23RC4);

    • testing that RC;

    • problems are fixed in devel, toward a new release candidate;

    • once the RC is found stable, devel is merged to master, the release is tagged on master (e.g. v1.23) and shipped to PyPI.

  • Urgent hotfix releases:

    • Branch from master to hotfix/problem_name;

    • fix the problem in that branch;

    • test that branch;

    • merge back to master and prepare release candidate for hotfix release.

  • Normal bug fixes:

    • Branch of devel, naming convention: fix/issue_1234 (reference GitHub issue);

    • fix in that branch, and test;

    • create pull request toward devel;

    • code review, then merge.

  • Major development activities go into feature branches:

    • Branch devel into feature/feature_name;

    • work on that feature branch;

    • on completion, merge devel into the feature branch (that should be done frequently if possible, to avoid large deviation (== pain) of the branches);

    • test the feature branch;

    • create a pull request for merging the feature branch into devel (that should be a fast-forward now);

    • merging of feature branches into devel should be discussed with the group before they are performed, and only after code review.

  • Documentation changes are handled like fix or feature branches, depending on size and impact, similar to code changes.

Branch Naming

  • devel, master: never commit directly to those;

  • feature/abc: development of new features;

  • fix/abc_123: referring to ticket 123;

  • hotfix/abc_123: referring to ticket 123, to be released right after merge to master;

  • experiment/sc16: experiments toward a specific publication etc. Cannot be merged, they will be converted to tags after experiments conclude;

  • project/xyz: branch for a dedicated group of people, usually contains unreleased features/fixes, and is not expected to be merged back;

  • tmp/abc: temporary branch, will be deleted soon;

  • test/abc: test some integration, like a merge of two feature branches.

For the latter: assume you want to test how feature/a works in combination with feature/b, then:

  • git checkout feature/a;

  • git checkout -b test/a_b;

  • git merge feature/b;

  • do tests.

Branching Policies

All branches are ideally short living. To support this, only a limited number of branches should be open at any point in time. Like, only N branches for fixes and M << N branches for features should be open for each developer - other features / issues have to wait.

Some additional rules

  • Commits, in particular for bug fixes, should be self-contained so make it easy to use git cherry-pick, so that bug fixes can quickly be transferred to other branches (such as hotfixes).

  • Do not use git rebase, unless you really know what you are doing.

  • You may not want to use the tools available for git-flow – those have given us inconsistent results in the past, partially because they used rebase.

Release Procedure

Release Manager: Andre Merzky

Preconditions for release

  1. If release is a milestone release: no open tickets for milestone;

  2. all tests on devel pass.

Preparing a regular Release

git prep
git release
git bump minor

git prep is a git script for the following steps:

git co master              --> checkout master
git pa                     --> pull all branches form remote
git gone -a                --> remove all stale branches
git merge devel            --> merge the release candidate
git change >> CHANGES.md   --> draft changelog from new commits in master
gvim -o VERSION CHANGES.md --> set new version if needed, make CHANGELOG human readable

After that that last manual intervention, the actual release itself with git release runs:

git ci -am 'version bump'       --> commit VERSION and CHANGELOG changes
git tag \"v$(cat VERSION)\"     --> tag the release
git push                        --> push master to origin
git push --tags                 --> push the release tag
make upload                     --> push to pypi
git co devel                    --> checkout devel
git merge master                --> merge the release into devel
git bump minor                  --> bump minor version
git ci -am 'devel version bump' --> commit minor version bump on devel
git dsync -a                    --> sync all branches to be in sync with devel

That last step is hard to automate as it involves resolving conflicts in all branches. But it is also important as it saved us over the last years from branches running out of sync with devel. git-dsync is a custom script which, essentially, does the following (pseudo-code)

git checkout devel
git pull
for each branch in $(git branch -a)
do
    git checkout branch
    git pull
    git merge devel
    git push
done

Preparing a hotfix release

  1. Create branch from latest master: e.g. git checkout master; git pull; git checkout -b hotfix/issue_123;

  2. update version echo "0.1.2" > VERSION;

  3. make modifications to branch: either by $EDITOR or git cherry-pick abcsuperdupercommit890 (The latter is preferred);

  4. update release notes: $EDITOR CHANGES.md;

  5. commit and push: git commit -a; git push;

  6. create `pull-request

    <https://github.com/radical-cybertools/radical.pilot/pulls>`__ of hotfix branch to master;

  7. wait on and/or nudge other developer to review and test;

  8. if not approved, GOTO 3.

Perform a Release

  1. If approved, move to master branch and pull in merged content: git checkout master, then git pull;

  2. create tag: git tag -a v0.1.2 -m "release v0.1.2.3";

  3. push tag to github: git push --tags;

  4. release on PyPI: python setup.py sdist; twine upload --skip-existing dist/radical.xyz-0.1.2.tar.gz;

  5. verify PyPI version on https://pypi.python.org/pypi/radical.xyz;

  6. GOTO "Post Release".

Post Release

  1. Merge master into devel branch: git checkout devel; git merge master; git push;

  2. merge devel into all open development branches: for b in $branches; do git checkout $b; git merge devel; done.

Testing twine and PyPI release

  1. Register at PyPI;

  2. create the test release: python setup.py sdist;

  3. Upload your test release to test.pypi: twine upload -r testpypi --skip-existing dist/radical.xyz-0.1.2.tar.gz;

  4. Check/test your release. More information at Using test PyPI.

Glossary

This is the main terminology used in RADICAL-Pilot (RP)’s documentation and application programming interface.

Workload

Workloads are sets of tasks. RP assumes no priority among the tasks of a workload, i.e., workloads are different from workflows. RP makes no assumption about when tasks of a workload are provided. RP schedules, places and launches the tasks that are available at the moment in which resources are available. For RP, it makes no difference if new tasks arrive while other tasks are executing.

Task

Tasks are self-contained portions of an application which RP can execute independently of each other. Those tasks are usually application executables (such as Gromacs or NAMD) whose execution is further parameterized by command line arguments, input files, environment settings, etc. But those tasks can also be individual function calls or small snippets of Python code. In that case, the execution is parameterized by function arguments, Python execution context, etc.

It is important to note that RP considers tasks independent, i.e., they don’t execute with a shared memory space.

For more details, see the API documentation

Pilot

As an abstraction, a pilot is a placeholder for resources on a given platform and is capable of executing tasks of a workload on those resources. As a system, pilot is a type of middleware software that implements the pilot abstraction.

RP is a pilot system, capable of: (1) acquiring resources by submitting jobs to HPC platforms; (2) managing those resources on the user’s (or application’s) behalf; and (3) executing sets and sequences of Tasks on those resources.

Usually, applications written with RP: (1) define one or more pilots; (2) define the HPC platform where each pilot should be submitted; (3) the type and amount of resources that each pilot should acquire on that resource; and (3) the time for which each pilot’s resources should be available (i.e., walltime). Once each pilot is defined, the application can schedule Tasks for execution on it.

Design and Implementation

RADICAL-Pilot is a distributed system that executes both a client and an agent component. The client component executes on the same machine and the same Python interpreter on which the application written against the RADICAL-Pilot API executes. The agent executes either locally into a separate Python interpreter or, most often, on a remote HPC machine into a dedicated Python interpreter.

Figure 1 shows a high-level representation of RP architecture (yellow boxes) when deployed on two HPC platforms (Resource A and B), executing an application (Application) with 5 pilots (green boxes) and 36 tasks (red circles). Application contains pilot and Task descriptions; RP Client has two components: Pilot Manager and Task Manager. Pilot descriptions are passed to the Pilot Manager and Task descriptions to the Task Manager. The Pilot Manager uses Pilot Launcher to launch 2 of the 5 described pilots. One pilot is submitted to the local Resource Management (RM) system of Resource A, the other pilot to the RM of Resource B. Once instantiated, each pilot becomes available for Task execution. At that point, RP Task Manager sends 2 tasks to Resource A and 5 tasks to Resource B.

RP architecture

Figure 1. High-level view of RP architecture when deployed on a simplified view of two HPC platforms.

State Model

Pilot and Task progress through linear state models. The state names indicate what RP module and component operate on the specific Pilot or Task entity. Specifically, a Pilot or Task is, at any point in time, either owned by a RP component or is waiting in a Queue to be communicated between components.

Pilot

Pilot States

State Name

Module

Component

Action

Sandbox

Prof file

NEW

Pilot Manager

Creating a pilot

client

pmgr.0000.prof

PMGR_LAUNCHING_PENDING

Pilot Manager

Launcher queue

Pilot waits for submission

client

pmgr.0000.prof

PMGR_LAUNCHING

Pilot Manager

Pilot Launcher

Submit a pilot to the batch system

client

pmgr.0000.prof, pmgr_launching.0000.prof

PMGR_ACTIVE_PENDING

LRM

Pilot is waiting in the batch queue or bootstrapping

client

pmgr.0000.prof, pmgr_launching.0000.prof

PMGR_ACTIVE

LRM

Pilot is active on the cluster resources

client

pmgr.0000.prof, pmgr_launching.0000.prof

DONE

Pilot Manager

Pilot marked as done. Final state

client

pmgr_launching.0000.prof

CANCELED

Pilot Manager

Pilot marked as cancelled. Final state

client

pmgr_launching.0000.prof

FAILED

Pilot Manager

Pilot marked as failed. Final state

client

pmgr_launching.0000.prof

Task

Task States

State Name

Module

Component

Action

Sandbox

Prof file

NEW

Task Manager

Creating a task

client

tmgr.0000.prof

TMGR_SCHEDULING_PENDING

Task Manager

Scheduler queue

Task queued for scheduling on a pilot

client

tmgr.0000.prof

TMGR_SCHEDULING

Task Manager

Scheduler

Assigning task to a pilot

client

tmgr_scheduling.0000.prof

TMGR_STAGING_INPUT_PENDING

Task Manager

Stager In queue

Task queued for data staging

client

tmgr_scheduling.0000.prof

TMGR_STAGING_INPUT

Task Manager

Stager In

Staging task’s files to the target platform (if any)

client

tmgr_staging_input.0000.prof

AGENT_STAGING_INPUT_PENDING

Agent

Stager In queue

Task waiting to be picked up by Agent

client, agent

tmgr_staging_input.0000.prof, agent_0.prof

AGENT_STAGING_INPUT

Agent

Stager In

Staging task’s files inside the target platform, making available within the task sandbox

agent

agent_staging_input.0000.prof

AGENT_SCHEDULING_PENDING

Agent

Scheduler queue

Task waiting for scheduling on resources, i.e., cores and/or GPUs

agent

agent_staging_input.0000.prof

AGENT_SCHEDULING

Agent

Scheduler

Assign cores and/or GPUs to the task

agent

agent_scheduling.0000.prof

AGENT_EXECUTING_PENDING

Agent

Executor queue

Cores and/or GPUs are assigned, wait for execution

agent

agent_scheduling.0000.prof

AGENT_EXECUTING

Agent

Executor

Executing tasks on assigned cores and/or GPUs. Available resources are utilized

agent

agent_executing.0000.prof

AGENT_STAGING_OUTPUT_PENDING

Agent

Stager Out queue

Task completed and waits for output staging

agent

agent_executing.0000.prof

AGENT_STAGING_OUTPUT

Agent

Stager Out

Staging out task files within the platform (if any)

agent

agent_staging_output.0000.prof

TMGR_STAGING_OUTPUT_PENDING

Task Manager

Stager Out queue

Waiting for Task Manager to pick up Task again

agent

agent_0.prof, agent_staging_output.0000.prof

TMGR_STAGING_OUTPUT

Task Manager

Stager Out

Task’s files staged from remote to local resource (if any)

client

tmgr_staging_output.0000.prof

DONE

Task Manager

Task marked as done. Final state

client

tmgr_staging_output.0000.prof

CANCELED

Task Manager

Task marked as cancelled. Final state

client

tmgr_staging_output.0000.prof

FAILED

Task Manager

Task marked as failed. Final state

client

tmgr_staging_output.0000.prof

Event Model

Events marked as optional depend on the content of task descriptions etc, all other events will usually be present in ‘normal’ runs. All events have an event name, a timestamp, and a component (which recorded the event) defined - all other fields (uid, state, msg) are optional. The names of the actual component IDs depend on the exact RP configuration and startup sequence.

The exact order and multiplicity of events is ill-defined, as they depend on many boundary conditions: system properties, system noise, system synchronization, RP API call order, application timings, RP configuration, resource configuration, and noise. However, while a global event model is thus hard to define, the order presented in the lists below gives some basic indication on event ordering within each individual component.

Format of this file

event_name          : semantic event description (details on 'uid', 'msg', 'state' fields) -- Sandbox (prof file name)

All Components

get                 : component receives an entity               (uid: eid, state: estate)               -- agent; client (agent_scheduling_queue.get.0000.prof, pmgr_launching_queue.get.0000.prof, stager_request_queue.get.0000.prof; tmgr_scheduling_queue.get.0000.prof)
advance             : component advances  entity state           (uid: eid, state: estate)               -- agent; client (agent_0.prof, agent_executing.0000.prof, agent_scheduling.0000.prof, agent_staging_input.0000.prof, agent_staging_output.0000.prof; pmgr.0000.prof, pmgr_launching.0000.prof, tmgr.0000.prof, tmgr_scheduling.0000.prof, tmgr_staging_input.0000.prof, tmgr_staging_output.0000.prof)
publish             : component publishes entity state           (uid: eid, state: estate)               --  ()
put                 : component pushes an entity out             (uid: eid, state: estate, msg: channel) --  ()
lost                : component lost   an entity (state error)   (uid: eid, state: estate)               --  ()
drop                : component drops  an entity (final state)   (uid: eid, state: estate)               -- client (tmgr_staging_output.0000.prof)
component_init      : component child  initializes after start() ()                                      -- agent; client (agent_0.prof, agent_executing.0000.prof, agent_scheduling.0000.prof, agent_staging_input.0000.prof, agent_staging_output.0000.prof; pmgr.0000.prof, pmgr_launching.0000.prof, stager.0000.prof, tmgr.0000.prof, tmgr_scheduling.0000.prof, tmgr_staging_input.0000.prof, tmgr_staging_output.0000.prof)
component_final     : component finalizes ()                                                             -- client (pmgr.0000.prof, pmgr_launching.0000.prof, stager.0000.prof, tmgr.0000.prof, tmgr_scheduling.0000.prof, tmgr_staging_input.0000.prof, tmgr_staging_output.0000.prof)

partial orders
* per component     : component_init, *, component_final
* per entity        : get, advance, publish, put

Session (Component)

session_start       : session is being created (not reconnected) (uid: sid)        -- agent; client (rp.session.*.prof)
session_close       : session close is requested                 (uid: sid)        --  ()
session_stop        : session is closed                          (uid: sid)        --  ()
session_fetch_start : start fetching logs/profs/json after close (uid: sid, [API]) --  ()
session_fetch_stop  : stops fetching logs/profs/json after close (uid: sid, [API]) --  ()

partial orders
* per session       : session_start, config_parser_start, \
                      config_parser_stop, session_close,  \
                      session_stop,  session_fetch_start, \
                      session_fetch_stop

PilotManager (Component)

setup_done          : manager has bootstrapped                   (uid: pmgr) -- client (pmgr.0000.prof)

PMGRLaunchingComponent (Component)

staging_in_start    : pilot sandbox staging starts               (uid: pilot) -- client (pmgr_launching.0000.prof)
staging_in_stop     : pilot sandbox staging stops                (uid: pilot) -- client (pmgr_launching.0000.prof)
submission_start    : pilot job submission starts                (uid: pilot) -- client (pmgr_launching.0000.prof)
submission_stop     : pilot job submission stops                 (uid: pilot) -- client (pmgr_launching.0000.prof)

partial orders
* per pilot         : staging_in_start, staging_in_stop, \
                      submission_start, submission_stop

Pilot (in session profile, all optional)

staging_in_start    : pilot level staging request starts         (uid: pilot, msg: did, [PILOT-DS]) -- agent; client (rp.session.*.prof)
staging_in_fail     : pilot level staging request failed         (uid: pilot, msg: did, [PILOT-DS]) -- agent; client (rp.session.*.prof)
staging_in_stop     : pilot level staging request stops          (uid: pilot, msg: did, [PILOT-DS]) -- agent; client (rp.session.*.prof)

partial orders
* per file          : staging_in_start, (staging_in_fail | staging_in_stop)

TaskManager (Component)

setup_done          : manager has bootstrapped                   (uid: tmgr)                       -- client (tmgr.0000.prof)
get                 : tasks   received from application          (uid: tmgr, msg: 'bulk size: %d') --  ()
get                 : task    received from application          (uid: task)                       --  ()

TMGRSchedulingComponent (Component)

TMGRStagingInputComponent (Component)

create_sandbox_start: create_task_sandbox starts                 (uid: task, [Task-DS])           --  ()
create_sandbox_stop : create_task_sandbox stops                  (uid: task, [Task-DS])           --  ()
staging_in_start    : staging request starts                     (uid: task, msg: did, [Task-DS]) -- client (pmgr_launching.0000.prof)
staging_in_stop     : staging request stops                      (uid: task, msg: did, [Task-DS]) -- client (pmgr_launching.0000.prof)
staging_in_tar_start: tar optimization starts                    (uid: task, msg: did, [Task-DS]) --  ()
staging_in_tar_stop : tar optimization stops                     (uid: task, msg: did, [Task-DS]) --  ()

partial orders
* per task          : create_sandbox_start, create_sandbox_stop,
                      (staging_in_start | staging_in_stop)*
* per file          : staging_in_start, staging_in_stop

bootstrap_0.sh

bootstrap_0_start   : pilot bootstrapper 1 starts                (uid: pilot)          -- agent (bootstrap_0.prof)
tunnel_setup_start  : setting up tunnel    starts                (uid: pilot)
tunnel_setup_stop   : setting up tunnel    stops                 (uid: pilot, [CFG-R])
ve_setup_start      : pilot ve setup       starts                (uid: pilot)          -- agent (bootstrap_0.prof)
ve_create_start     : pilot ve creation    starts                (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
ve_activate_start   : pilot ve activation  starts                (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
ve_activate_stop    : pilot ve activation  stops                 (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
ve_update_start     : pilot ve update      starts                (uid: pilot, [CFG-R])
ve_update_start     : pilot ve update      stops                 (uid: pilot, [CFG-R])
ve_create_stop      : pilot ve creation    stops                 (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
rp_install_start    : rp stack install     starts                (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
rp_install_stop     : rp stack install     stops                 (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
ve_setup_stop       : pilot ve setup       stops                 (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
ve_activate_start   : pilot ve activation  starts                (uid: pilot, [CFG-R]) -- agent (bootstrap_0.prof)
ve_activate_stop    : pilot ve activation  stops                 (uid: pilot)          -- agent (bootstrap_0.prof)
cleanup_start       : sandbox deletion     starts                (uid: pilot)
cleanup_stop        : sandbox deletion     stops                 (uid: pilot)
bootstrap_0_stop    : pilot bootstrapper 1 stops                 (uid: pilot)          -- agent (bootstrap_0.prof)

partial orders
* as above

agent_0 (Component)

hostname            : host or nodename for agent_0               (uid: pilot)                              -- agent (agent_0.prof)
cmd                 : command received from pmgr                 (uid: pilot, msg: command, [API]).        -- agent (agent_0.prof)
get                 : tasks   received from task manager         (uid: pilot, msg: 'bulk size: %d')        --  ()
get                 : task    received from task manager         (uid: task)                               --  ()
dvm_start           : DVM startup by launch method               (uid: pilot, msg: 'dvm_id=%d') [CFG-DVM]) --  ()
dvm_uri             : DVM URI is set successfully                (uid: pilot, msg: 'dvm_id=%d') [CFG-DVM]) --  ()
dvm_ready           : DVM is ready for execution                 (uid: pilot, msg: 'dvm_id=%d') [CFG-DVM]) --  ()
dvm_stop            : DVM terminated                             (uid: pilot, msg: 'dvm_id=%d') [CFG-DVM]) --  ()
dvm_fail            : DVM termination failed                     (uid: pilot, msg: 'dvm_id=%d') [CFG-DVM]) --  ()


partial orders
* per instance      : hostname, (cmd | get)*
* per instance      : dvm_start, dvm_uri, dvm_ready, (dvm_stop | dvm_fail)

AgentStagingInputComponent (Component)

staging_in_start    : staging request starts                     (uid: task, msg: did, [Task-DS]) -- client (pmgr_launching.0000.prof)
staging_in_skip     : staging request is not handled here        (uid: task, msg: did, [Task-DS]) --  ()
staging_in_fail     : staging request failed                     (uid: task, msg: did, [Task-DS]) --  ()
staging_in_stop     : staging request stops                      (uid: task, msg: did, [Task-DS]) -- client (pmgr_launching.0000.prof)

partial orders
* per file          : staging_in_skip
                    | (staging_in_start, (staging_in_fail | staging_in_stop))

AgentSchedulingComponent (Component)

schedule_try        : search for task resources starts           (uid: task)            --  ()
schedule_fail       : search for task resources failed           (uid: task, [RUNTIME]) --  ()
schedule_ok         : search for task resources succeeded        (uid: task)            -- agent (agent_scheduling.0000.prof)
unschedule_start    : task resource freeing starts               (uid: task)            -- agent (agent_scheduling.0000.prof)
unschedule_stop     : task resource freeing stops                (uid: task)            -- agent (agent_scheduling.0000.prof)

partial orders
* per task          : schedule_try, schedule_fail*, schedule_ok, \
                      unschedule_start, unschedule_stop

AgentExecutingComponent: (Component)

task_start          : task handling process starts               (uid: task)                  -- agent (agent_executing.0000.prof)
task_mkdir          : creation of sandbox requested              (uid: task)                  -- agent (agent_executing.0000.prof)
task_mkdir_done     : creation of sandbox completed              (uid: task)                  -- agent (agent_executing.0000.prof)
task_run_start      : pass to executing layer (ssh, mpi...)      (uid: task)                  -- agent (agent_executing.0000.prof)
task_run_ok         : executing layer accepted task              (uid: task)                  -- agent (agent_executing.0000.prof)
launch_start        : task launch script: starts                 (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
launch_pre          : task launch script: pre-submission         (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
launch_submit       : task launch script: launch method starts   (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
exec_start          : task exec script: starts [per rank]        (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
exec_pre            : task exec script: pre-exec starts          (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
rank_start          : task exec script: executable started       (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
app_start           : application executable started             (uid: task, [APP])           -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
app_*               : application specific events                (uid: task, [APP], optional) --  ()
app_stop            : application executable stops               (uid: task, [APP])           -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
rank_stop           : task exec script: executable stopped       (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
exec_post           : task exec script: post-exec starts         (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
exec_stop           : task exec script: stopped                  (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
launch_collect      : task launch script: launch method returned (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
launch_post         : task launch script: post-submission        (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
launch_stop         : task launch script: completes              (uid: task)                  -- agent (task.<TASK_NUMBER>.prof, agent_staging_output.0000.prof)
task_run_stop       : exec layer passed back control             (uid: task)                  -- agent (agent_staging_output.0000.prof)
task_run_cancel_start: try to cancel task via exec layer (kill)  (uid: task, [API])           --  ()
task_run_cancel_stop : did cancel    task via exec layer (kill)  (uid: task, [API])           --  ()

partial orders
* per task          : task_start, task_run_start, task_run_ok,
                      launch_start, launch_pre, launch_submit, exec_start,
                      exec_pre, rank_start, app_start, app_*, app_stop,
                      rank_stop, exec_post, exec_stop, launch_collect,
                      launch_post, launch_stop, task_run_stop
* per task          : task_run_cancel_start, task_run_cancel_stop

NOTE: raptor tasks will not log the complete set of events - they will miss
the launch_* events (raptor has not separate launcher), the exec_pre and
exec_post events (pre and post exec are not supported), and the task_mkdir_*
events (raptor tasks don't have individual sandboxes).

AgentStagingOutputComponent (Component)

staging_stdout_start: reading task stdout starts                 (uid: task)                      -- agent (agent_staging_output.0000.prof)
staging_stdout_stop : reading task stdout stops                  (uid: task)                      -- agent (agent_staging_output.0000.prof)
staging_stderr_start: reading task stderr starts                 (uid: task)                      -- agent (agent_staging_output.0000.prof)
staging_stderr_stop : reading task stderr stops                  (uid: task)                      -- agent (agent_staging_output.0000.prof)
staging_uprof_start : reading task profile starts                (uid: task, [APP])               -- agent (agent_staging_output.0000.prof)
staging_uprof_stop  : reading task profile stops                 (uid: task, [APP])               -- agent (agent_staging_output.0000.prof)
staging_out_start   : staging request starts                     (uid: task, msg: did, [Task-DS]) --  ()
staging_out_skip    : staging request is not handled here        (uid: task, msg: did, [Task-DS]) --  ()
staging_out_fail    : staging request failed                     (uid: task, msg: did, [Task-DS]) --  ()
staging_out_stop    : staging request stops                      (uid: task, msg: did, [Task-DS]) --  ()

partial orders
* per task          : staging_stdout_start, staging_stdout_stop,
                      staging_stderr_start, staging_stderr_stop,
                      staging_uprof_start,  staging_uprof_stop,
* per file          : staging_out_skip \
                    | (staging_out_start, (staging_out_fail | staging_out_stop))

TMGRStagingOutputComponent (Component)

staging_out_start   : staging request starts                     (uid: task, msg: did, [Task-DS]) --  ()
staging_out_stop    : staging request stops                      (uid: task, msg: did, [Task-DS]) --  ()

partial orders
* per file          : staging_out_start, staging_out_stop

UpdateWorker (Component)

update_request      : a state update is requested                (uid: task, msg: state)           --  ()
update_pushed       : bulk state update has been sent            (           msg: 'bulk size: %d') --  ()
update_pushed       : a state update has been send               (uid: task, msg: state)           --  ()

partial orders
* per state update  : update_request, update_pushed

All profiles

sync_abs            : sets an absolute, NTP synced time stamp               ([INTERNAL])
END                 : last entry, profiler is being closed

partial orders
* per profile       : sync_abs, *, END

Conditional events

- [API]           - only for corresponding RP API calls
- [CFG]           - only for some RP configurations
  - [CFG-R]       - only for some bootstrapping configurations
  - [CFG-DVM]     - only for launch methods which use a DVM
- [Task]            - only for some Task descriptions
  - [Task-DS]       - only for tasks specifying data staging directives
  - [Task-PRE]      - only for tasks specifying pre-exec directives
  - [Task-POST]     - only for tasks specifying post-exec directives
- [PILOT]         - only for certain pilot
- [APP]           - only for applications writing compatible profiles
- [RUNTIME]       - only on  certain runtime decisions and system configuration
- [INTERNAL]      - only for certain internal states

API Reference

Sessions

class radical.pilot.Session(proxy_url: Optional[str] = None, uid: Optional[str] = None, cfg: Optional[dict] = None, _role: Optional[str] = 'primary', _reg_addr: Optional[str] = None, **close_options)[source]

Root of RP object hierarchy for an application instance.

A Session is the root object of all RP objects in an application instance: it holds radical.pilot.PilotManager and radical.pilot.TaskManager instances which in turn hold radical.pilot.Pilot and radical.pilot.Task instances, and several other components which operate on those stateful entities.

__init__(proxy_url: Optional[str] = None, uid: Optional[str] = None, cfg: Optional[dict] = None, _role: Optional[str] = 'primary', _reg_addr: Optional[str] = None, **close_options)[source]

Create a new session.

A new Session instance is created and stored in the database.

Any RP Session will require an RP Proxy to facilitate communication between the client machine (i.e., the host where the application created this Session instance) and the target resource (i.e., the host where the pilot agent/s is/are running and where the workload is being executed).

A proxy_url can be specified which then must point to an RP Proxy Service instance which this session can use to establish a communication proxy. If proxy_url is not specified, the session will check for the environment variables RADICAL_PILOT_PROXY_URL and will interpret it as such above. If that information is not available, the session will instantiate a proxy service on the local host. Note that any proxy service instantiated by the session itself will be terminated once the session instance is closed or goes out of scope and is thus garbage collected and as such should not be used by other session instances.

Note: an RP proxy will have to be accessible by both the client and the

target hosts to facilitate communication between both parties. That implies access to the respective ports. Proxies started by the session itself will use the first port larger than 10.000 which is found to be free.

Parameters
  • proxy_url (str, optional) – proxy service URL - points to an RP proxy service which is used to establish an RP communication proxy for this session.

  • uid (str, optional) – Create a session with this UID. Session UIDs MUST be unique - otherwise they will lead to communication conflicts, resulting in undefined behaviours.

  • cfg (str | dict, optional) – a named or instantiated configuration to be used for the session.

  • _role (bool) – only PRIMARY sessions created by the original application process (via rp.Session()), will create proxies and Registry Services. AGENT sessions will also create a Registry but no proxies. All other DEFAULT session instances are instantiated internally in processes spawned (directly or indirectly) by the initial session, for example in some of it’s components, or by the RP agent. Those sessions will inherit the original session ID, but will not attempt to create a new proxies or registries.

  • **close_options (optional) – If additional key word arguments are provided, they will be used as the default arguments to Session.close(). This can be useful when the Session is used as a Python context manager, such that close() is called automatically at the end of a with block.

  • _reg_addr (str, optional) – Non-primary sessions will connect to the registry at that endpoint and pull session config and resource configurations from there.

as_dict()[source]

Returns a Python dictionary representation of the object.

close(**kwargs)[source]

Close the session.

All subsequent attempts access objects attached to the session will result in an error. If cleanup is set to True, the session data is removed from the database.

Parameters
  • terminate (bool, optional) – Shut down all pilots associated with the session.

  • download (bool, optional) – Fetch pilot profiles and database entries.

get_pilot_managers(pmgr_uids=None)[source]

Get known PilotManager(s).

Parameters

pmgr_uids (str | Iterable[str], optional) – uids of the PilotManagers we want.

Returns

One

or more radical.pilot.PilotManager objects.

Return type

radical.pilot.PilotManager | list[radical.pilot.PilotManager]

get_resource_config(resource, schema=None)[source]

Returns a dictionary of the requested resource config.

get_task_managers(tmgr_uids=None)[source]

Get known TaskManager(s).

Parameters

tmgr_uids (str | list[str]) – uids of the TaskManagers we want

Returns

One or more radical.pilot.TaskManager objects.

Return type

radical.pilot.TaskManager | list[radical.pilot.TaskManager]

inject_metadata(metadata)[source]

Insert (experiment) metadata into an active session.

RP stack version info always get added.

list_pilot_managers()[source]

Get PilotManager instances.

Lists the unique identifiers of all radical.pilot.PilotManager instances associated with this session.

Returns

A list of radical.pilot.PilotManager uids.

Return type

list[str]

list_resources()[source]

Get list of known resource labels.

Returns a list of known resource labels which can be used in a pilot description.

list_task_managers()[source]

Get TaskManager identifiers.

Lists the unique identifiers of all radical.pilot.TaskManager instances associated with this session.

Returns

A list of radical.pilot.TaskManager uids.

Return type

list[str]

Pilots and PilotManagers

PilotManagers

class radical.pilot.PilotManager(session, cfg='default')[source]

Manage Pilot instances.

A PilotManager manages rp.Pilot instances that are submitted via the radical.pilot.PilotManager.submit_pilots() method. It is possible to attach one or more HPC resources to a PilotManager to outsource machine specific configuration parameters to an external configuration file.

Example:

s = rp.Session()

pm = rp.PilotManager(session=s)

pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cpus = 16

p1 = pm.submit_pilots(pd)  # create first  pilot with 16 cores
p2 = pm.submit_pilots(pd)  # create second pilot with 16 cores

# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
    t = rp.TaskDescription()
    t.executable = "/bin/sleep"
    t.arguments = ['60']
    tasks.append(t)

# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)

The pilot manager can issue notification on pilot state changes. Whenever state notification arrives, any callback registered for that notification is fired.

Note

State notifications can arrive out of order wrt the pilot state model!

__init__(session, cfg='default')[source]

Creates a new PilotManager and attaches is to the session.

Parameters
  • session (rp.Session) – The session instance to use.

  • uid (str) – ID for pilot manager, to be used for reconnect

  • cfg (dict, str) – The configuration or name of configuration to use.

Returns

A new PilotManager object.

Return type

rp.PilotManager

as_dict()[source]

Returns a dictionary representation of the PilotManager object.

cancel_pilots(uids=None, _timeout=None)[source]

Cancel one or more rp.Pilots.

Parameters

uids (str | list[str], optional) – The IDs of the pilots to cancel.

close(terminate=True)[source]

Shut down the PilotManager and all its components.

Parameters

terminate (bool) –

cancel non-final pilots if True (default)

Note

Pilots cannot be reconnected to after termination.

control_cb(topic, msg)[source]

This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.

get_pilots(uids=None)[source]

Returns one or more pilots identified by their IDs.

Parameters

uids (str | list[str]) – The IDs of the pilot objects to return.

Returns

A list of rp.Pilot objects.

Return type

list

kill_pilots(uids=None, _timeout=None)[source]

Kill one or more rp.Pilots

Parameters

uids (str | list[str], optional) – The IDs of the pilots to cancel.

list_pilots()[source]

Get the UIDs of the managed rp.Pilots.

Returns

A list of rp.Pilot UIDs.

Return type

list[str]

register_callback(cb, cb_data=None, metric='PILOT_STATE')[source]

Registers a new callback function with the PilotManager.

Manager-level callbacks get called if the specified metric changes. The default metric PILOT_STATE fires the callback if any of the Pilots managed by the PilotManager change their state.

All callback functions need to have the same signature:

def cb(obj, value, cb_data)

where object is a handle to the object that triggered the callback, value is the metric, and data is the data provided on callback registration.. In the example of PILOT_STATE above, the object would be the pilot in question, and the value would be the new state of the pilot.

Available metrics are

  • PILOT_STATE: fires when the state of any of the pilots which are

    managed by this pilot manager instance is changing. It communicates the pilot object instance and the pilots new state.

submit_pilots(descriptions)[source]

Submit one or more rp.Pilot instances to the pilot manager.

Parameters

descriptions (rp.PilotDescription | list[rp.PilotDescription]) – The description of the pilot instance(s) to create.

Returns

A list of rp.Pilot objects.

Return type

list[rp.Pilot]

property uid

The unique id.

Type

str

wait_pilots(uids=None, state=None, timeout=None)[source]

Block for state transition.

Returns when one or more rp.Pilots reach a specific state.

If pilot_uids is None, wait_pilots returns when all Pilots reach the state defined in state. This may include pilots which have previously terminated or waited upon.

Example

# TODO – add example

Parameters
  • uids (str | list[str], optional) – If set, only the Pilots with the specified uids are considered. If None (default), all Pilots are considered.

  • state (str | list[str]) –

    The state that Pilots have to reach in order for the call to return.

    By default wait_pilots waits for the Pilots to reach a terminal state, which can be one of the following: * rp.rps.DONE * rp.rps.FAILED * rp.rps.CANCELED

  • timeout (float, optional) – Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.

PilotDescription

class radical.pilot.PilotDescription(from_dict=None)[source]

Specify a requested Pilot.

A PilotDescription object describes the requirements and properties of a radical.pilot.Pilot and is passed as a parameter to radical.pilot.PilotManager.submit_pilots() to instantiate and run a new pilot.

Note

A PilotDescription MUST define at least resource, cores or nodes, and runtime.

Example::

pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.PilotDescription() pd.resource = “local.localhost” pd.cores = 16 pd.runtime = 5 # minutes

pilot = pm.submit_pilots(pd)

uid

A unique ID for the pilot. A unique ID will be assigned by RP if the field is not set.

Type

str, optional

job_name

The name of the job / pilot as provided to the batch system. If not set then uid will be used instead.

Type

str, optional

resource

The key of a platform description entry. If the key exists, the machine-specific configuration is loaded from the config file once the PilotDescription is passed to radical.pilot.PilotManager.submit_pilots(). If the key doesn’t exist, an exception ValueError is raised.

Type

str

access_schema

The key of an access mechanism to use. The valid access mechanism is defined in the resource configuration. See Configuration System. The first schema defined in the resource configuration is used by default, if no access_schema is specified.

Type

str, optional

runtime

The maximum run time (wall-clock time) in minutes of the pilot. Default 10.

Type

int, optional

sandbox

The working (“sandbox”) directory of the pilot agent. This parameter is optional and if not set, it defaults to radical.pilot.sandbox in your home or login directory. Default None.

Warning

If you define a pilot on an HPC cluster and you want to set sandbox manually, make sure that it points to a directory on a shared filesystem that can be reached from all compute nodes.

Type

str, optional

nodes

The number of nodes the pilot should allocate on the target resource. This parameter could be set instead of cores and gpus (and memory). Default 1.

Note

If nodes is specified, gpus and cores must not be specified.

Type

int, optional

cores

The number of cores the pilot should allocate on the target resource. This parameter could be set instead of nodes.

Note

For local pilots, you can set a number larger than the physical machine limit (corresponding resource configuration should have the attribute “fake_resources”).

Note

If cores is specified, nodes must not be specified.

Type

int, optional

gpus

The number of gpus the pilot should allocate on the target resource.

Note

If gpus is specified, nodes must not be specified.

Type

int, optional

memory

The total amount of physical memory the pilot (and related to it job) requires.

Type

int, optional

queue

The name of the job queue the pilot should get submitted to. If queue is set in the resource configuration (resource), defining queue will override it explicitly.

Type

str, optional

project

The name of the project / allocation to charge for used CPU time. If project is set in the resource configuration (resource), defining project will override it explicitly.

Type

str, optional

candidate_hosts

The list of host names where this pilot is allowed to start on.

Type

list[str], optional

app_comm

The list of names is interpreted as communication channels to start within the pilot agent, for the purpose of application communication, i.e., that tasks running on that pilot are able to use those channels to communicate amongst each other.

The names are expected to end in _queue or _pubsub, indicating the type of channel to create. Once created, tasks will find environment variables of the name RP_%s_IN and RP_%s_OUT, where %s is replaced with the given channel name (uppercased), and IN/OUT indicate the respective endpoint addresses for the created channels.

Type

list[str], optional

input_staging

The list of files to be staged into the pilot sandbox.

Type

list, optional

output_staging

The list of files to be staged from the pilot sandbox.

Type

list, optional

cleanup

If cleanup is set to True, the pilot will delete its entire sandbox upon termination. This includes individual Task sandboxes and all generated output data. Only log files will remain in the sandbox directory. Default False.

Type

bool, optional

exit_on_error

Flag to trigger app termination in case of the pilot failure. Default True.

Type

bool, optional

services

A list of commands which get started on a separate service compute node right after bootstrapping, and before any RP task is launched. That service compute node will not be used for any other tasks.

Type

list[TaskDescription], optional

layout

Point to a json file or an explicit (dict) description of the pilot layout: number and size of partitions and their configuration. Default “default”.

Type

str | dict, optional

Pilots

class radical.pilot.Pilot(pmgr: PilotManager, descr)[source]

Represent a resource overlay on a local or remote resource.

Note

A Pilot cannot be created directly. The factory method radical.pilot.PilotManager.submit_pilots() has to be used instead.

Example:

pm = radical.pilot.PilotManager(session=s)
pd = radical.pilot.PilotDescription()

pd.resource = "local.localhost"
pd.cores    = 2
pd.runtime  = 5 # minutes

pilot = pm.submit_pilots(pd)
as_dict()[source]

Dictionary representation.

Returns

a Python dictionary representation of the object.

Return type

dict

cancel()[source]

Cancel the pilot.

property description

The description the pilot was started with, as a dictionary.

Type

dict

property endpoint_fs

The URL which is internally used to access the target resource’s root file system.

Type

radical.utils.Url

property log

A list of human readable [timestamp, string] tuples describing various events during the pilot’s lifetime. Those strings are not normative, only informative!

Type

list[tuple]

property pilot_sandbox

The full sandbox URL of this pilot, if that is already known, or ‘None’ otherwise.

Type

str

property pmgr

The pilot’s manager.

Type

PilotManager

prepare_env(env_name, env_spec)[source]

Prepare a virtual environment.

Request the preparation of a task or worker environment on the target resource. This call will block until the env is created.

Parameters
  • env_name (str) – name of the environment to prepare.

  • env_spec (dict) –

    specification of the environment to prepare, like:

    {'type'    : 'venv',
     'version' : '3.7',
     'pre_exec': ['module load python'],
     'setup'   : ['radical.pilot==1.0', 'pandas']},
    
    {'type'    : 'conda',
     'version' : '3.8',
     'setup'   : ['numpy']}
    
    {'type'   : 'conda',
     'version': '3.8',
     'path'   : '/path/to/ve',
     'setup'  : ['numpy']}
    

    where the type specifies the environment type, version specifies the Python version to deploy, and setup specifies how the environment is to be prepared. If path is specified the env will be created at that path. If path is not specified, RP will place the named env in the pilot sandbox (under env/named_env_name). If a VE exists at that path, it will be used as is (an update is not performed). pre_exec commands are executed before env creation and setup are attempted.

Note

The optional version specifier is only interpreted up to minor version, subminor and less are ignored.

register_callback(cb, metric='PILOT_STATE', cb_data=None)[source]

Add callback for state changes.

Registers a callback function that is triggered every time the pilot’s state changes.

All callback functions need to have the same signature:

def cb(obj, state)

where obj is a handle to the object that triggered the callback and state is the new state of that object. If cb_data is given, then the cb signature changes to

def cb(obj, state, cb_data)

and cb_data are passed along.

property resource

The resource tag of this pilot.

Type

str

property resource_details

agent level resource information.

Type

dict

property resource_sandbox

The full URL of the path that RP considers the resource sandbox, i.e., the sandbox on the target resource’s file system which is shared by all sessions which access that resource.

Type

radical.utils.Url

property resources

The amount of resources used by this pilot.

Type

str

rpc(cmd, *args, rpc_addr=None, **kwargs)[source]

Remote procedure call.

Send am RPC command and arguments to the pilot and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.

property session

The pilot’s session.

Type

Session

property session_sandbox

The full URL of the path that RP considers the session sandbox on the target resource’s file system which is shared by all pilots which access that resource in the current session.

Type

radical.utils.Url

stage_in(sds)[source]

Stage files “in”.

Stages the content of the staging_directives to the pilot sandbox.

Please note the documentation of radical.pilot.staging_directives.complete_url() for details on path and sandbox semantics.

stage_out(sds=None)[source]

Stage data “out”.

Fetches the content of the staging_directives from the pilot sandbox.

Please note the documentation of radical.pilot.staging_directives.complete_url() for details on path and sandbox semantics.

property state

The current state of the pilot.

Type

str

property stderr

A snapshot of the pilot’s STDERR stream.

If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property stdout

A snapshot of the pilot’s STDOUT stream.

If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property uid

The pilot’s unique identifier within a PilotManager.

Type

str

wait(state=None, timeout=None)[source]

Block for state change.

Returns when the pilot reaches a specific state or when an optional timeout is reached.

Parameters
  • state (list[str]) –

    The state(s) that pilot has to reach in order for the call to return.

    By default wait waits for the pilot to reach a final state, which can be one of the following:

    • radical.pilot.states.DONE

    • radical.pilot.states.FAILED

    • radical.pilot.states.CANCELED

  • timeout (float) – Optional timeout in seconds before the call returns regardless whether the pilot has reached the desired state or not. The default value None never times out.

Tasks and TaskManagers

TaskManager

class radical.pilot.TaskManager(session, cfg='default', scheduler=None)[source]

A TaskManager manages radical.pilot.Task instances which represent the executable workload in RADICAL-Pilot. A TaskManager connects the Tasks with one or more Pilot instances (which represent the workload executors in RADICAL-Pilot) and a scheduler which determines which Task gets executed on which Pilot.

Example:

s = rp.Session()

pm = rp.PilotManager(session=s)

pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cores = 16

p1 = pm.submit_pilots(pd) # create first pilot with 16 cores
p2 = pm.submit_pilots(pd) # create second pilot with 16 cores

# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
    t = rp.TaskDescription()
    t.executable = "/bin/sleep"
    t.arguments = ['60']
    tasks.append(t)

# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)

The task manager can issue notification on task state changes. Whenever state notification arrives, any callback registered for that notification is fired.

Note

State notifications can arrive out of order wrt the task state model!

__init__(session, cfg='default', scheduler=None)[source]

Create a new TaskManager and attaches it to the session.

Parameters
  • session (radical.pilot.Session) – The session instance to use.

  • cfg (dict | str) – The configuration or name of configuration to use.

  • scheduler (str) – The name of the scheduler plug-in to use.

  • uid (str) – ID for unit manager, to be used for reconnect

Returns

A new TaskManager object.

Return type

radical.pilot.TaskManager

add_pilots(pilots)[source]

Associates one or more pilots with the task manager.

Parameters

pilots (radical.pilot.Pilot | list[radical.pilot.Pilot]) – The pilot objects that will be added to the task manager.

as_dict()[source]

Returns a dictionary representation of the TaskManager object.

cancel_tasks(uids=None)[source]

Cancel one or more radical.pilot.Task s.

Note that cancellation of tasks is immediate, i.e. their state is immediately set to CANCELED, even if some RP component may still operate on the tasks. Specifically, other state transitions, including other final states (DONE, FAILED) can occur after cancellation. This is a side effect of an optimization: we consider this acceptable tradeoff in the sense “Oh, that task was DONE at point of cancellation – ok, we can use the results, sure!”.

If that behavior is not wanted, set the environment variable:

export RADICAL_PILOT_STRICT_CANCEL=True
Parameters

uids (str | list[str]) – The IDs of the tasks to cancel.

cancel_units(uids=None)[source]

Cancel one or more radical.pilot.Task s.

Deprecated since version 1.5.12: use cancel_tasks()

close()[source]

Shut down the TaskManager and all its components.

control_cb(topic, msg)[source]

This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.

get_pilots()[source]

Get the pilot instances currently associated with the task manager.

Returns

A list of radical.pilot.Pilot instances.

Return type

list[radical.pilot.Pilot]

get_tasks(uids=None)[source]

Get one or more tasks identified by their IDs.

Parameters

uids (str | list[str]) – The IDs of the task objects to return.

Returns

A list of radical.pilot.Task objects.

Return type

list[radical.pilot.Task]

get_units(uids=None)[source]

Get one or more tasks identified by their IDs.

Deprecated since version 1.5.12: use get_tasks()

list_pilots()[source]

Lists the UIDs of the pilots currently associated with the task manager.

Returns

A list of radical.pilot.Pilot UIDs.

Return type

list[str]

list_tasks()[source]

Get the UIDs of the tasks managed by this task manager.

Returns

A list of radical.pilot.Task UIDs.

Return type

list[str]

list_units()[source]

Get Task UIDs.

Deprecated since version 1.5.12: use list_tasks()

pilot_rpc(pid, cmd, *args, rpc_addr=None, **kwargs)[source]

Remote procedure call.

Send an RPC command and arguments to the pilot and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.

register_callback(cb, cb_data=None, metric=None, uid=None)[source]

Registers a new callback function with the TaskManager.

Manager-level callbacks get called if the specified metric changes. The default metric TASK_STATE fires the callback if any of the Tasks managed by the PilotManager change their state.

All callback functions need to have the same signature:

def cb(obj, value) -> None:
    ...

where obj is a handle to the object that triggered the callback, value is the metric, and data is the data provided on callback registration.. In the example of TASK_STATE above, the object would be the task in question, and the value would be the new state of the task.

If cb_data is given, then the cb signature changes to

def cb(obj, state, cb_data) -> None:
    ...

and cb_data are passed unchanged.

If uid is given, the callback will invoked only for the specified task.

Available metrics are

  • TASK_STATE: fires when the state of any of the tasks which are managed by this task manager instance is changing. It communicates the task object instance and the tasks new state.

  • WAIT_QUEUE_SIZE: fires when the number of unscheduled tasks (i.e. of tasks which have not been assigned to a pilot for execution) changes.

remove_pilots(pilot_ids, drain=False)[source]

Disassociates one or more pilots from the task manager.

After a pilot has been removed from a task manager, it won’t process any of the task manager’s tasks anymore. Calling remove_pilots doesn’t stop the pilot itself.

Parameters

drain (bool) – Drain determines what happens to the tasks which are managed by the removed pilot(s). If True, all tasks currently assigned to the pilot are allowed to finish execution. If False (the default), then non-final tasks will be canceled.

property scheduler

The scheduler name.

Type

str

submit_raptors(descriptions, pilot_id=None)[source]

Submit RAPTOR master tasks.

Submits on or more radical.pilot.TaskDescription instances to the task manager, where the TaskDescriptions have the mode RAPTOR_MASTER set.

This is a thin wrapper around submit_tasks.

Parameters

descriptions – (radical.pilot.TaskDescription) description of the workers to submit.

Returns

A list of radical.pilot.Task

objects.

Return type

list[radical.pilot.Task]

submit_tasks(descriptions)[source]

Submit tasks for execution.

Submits one or more radical.pilot.Task instances to the task manager.

Parameters

list (descriptions (radical.pilot.TaskDescription |) – [radical.pilot.TaskDescription]): The description of the task instance(s) to create.

Returns

A list of radical.pilot.Task

objects.

Return type

list[radical.pilot.Task]

submit_units(descriptions)[source]

Submit tasks for execution.

Deprecated since version 1.5.12: use submit_tasks()

submit_workers(descriptions)[source]

Submit RAPTOR workers.

Submits on or more radical.pilot.TaskDescription instances to the task manager, where the TaskDescriptions have the mode RAPTOR_WORKER set.

This method is a thin wrapper around submit_tasks.

Parameters

descriptions – (radical.pilot.TaskDescription) description of the workers to submit.

Returns

A list of radical.pilot.Task

objects.

Return type

list[radical.pilot.Task]

property uid

The unique id.

Type

str

wait_tasks(uids=None, state=None, timeout=None)[source]

Block for state transition.

Returns when one or more radical.pilot.Tasks reach a specific state.

If uids is None, wait_tasks returns when all Tasks reach the state defined in state. This may include tasks which have previously terminated or waited upon.

Example:

# TODO -- add example
Parameters
  • uids (str | list[str]) – If uids is set, only the Tasks with the specified uids are considered. If uids is None (default), all Tasks are considered.

  • state (str) –

    The state that Tasks have to reach in order for the call to return.

    By default wait_tasks waits for the Tasks to reach a terminal state, which can be one of the following.

    • radical.pilot.rps.DONE

    • radical.pilot.rps.FAILED

    • radical.pilot.rps.CANCELED

  • timeout (float) – Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.

wait_units(uids=None, state=None, timeout=None)[source]

Block for state transition.

Deprecated since version 1.5.12: use wait_tasks()

TaskDescription

class radical.pilot.TaskDescription(from_dict=None)[source]

Describe the requirements and properties of a Task.

A TaskDescription object describes the requirements and properties of a radical.pilot.Task and is passed as a parameter to radical.pilot.TaskManager.submit_tasks() to instantiate and run a new task.

uid

A unique ID for the task. This attribute is optional, a unique ID will be assigned by RP if the field is not set.

Type

str, optional

name

A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads.

Type

str, optional

mode

The execution mode to be used for this task. Default “executable”. The following modes are accepted.

  • TASK_EXECUTABLE: the task is spawned as an external executable via a resource specific launch method (srun, aprun, mpiexec, etc).

    • required attributes: executable

    • related attributes: arguments

  • TASK_FUNCTION: the task references a python function to be called.

    • required attributes: function

    • related attributes: args

    • related attributes: kwargs

  • TASK_METHOD: the task references a raptor worker method to be called.

    • required attributes: method

    • related attributes: args

    • related attributes: kwargs

  • TASK_EVAL: the task is a code snippet to be evaluated.

    • required attributes: code

  • TASK_EXEC: the task is a code snippet to be exec’ed.

    • required attributes: code

  • TASK_SHELL: the task is a shell command line to be run.

    • required attributes: command

  • TASK_PROC: the task is a single core process to be executed.

    • required attributes: executable

    • related attributes: arguments

  • TASK_RAPTOR_MASTER: the task references a raptor master to be

    instantiated.

    • related attributes: raptor_file

    • related attributes: raptor_class

  • TASK_RAPTOR_WORKER: the task references a raptor worker to be

    instantiated.

    • related attributes: raptor_file

    • related attributes: raptor_class

There is a certain overlap between TASK_EXECUTABLE, TASK_SHELL and TASK_PROC modes. As a general rule, TASK_SHELL and TASK_PROC should be used for short running tasks which require a single core and no additional resources (gpus, storage, memory). TASK_EXECUTABLE should be used for all other tasks and is in fact the default. TASK_SHELL should only be used if the command to be run requires shell specific functionality (e.g., pipes, I/O redirection) which cannot easily be mapped to other task attributes.

TASK_RAPTOR_MASTER and TASK_RAPTOR_WORKER are special types of tasks that define RAPTOR’s master(s) and worker(s) components and their resource requirements. They are launched by the Agent on one or more nodes, depending on their requirements.

Type

str, optional

executable

The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.

Type

str

arguments

The command line arguments for the given executable (list of strings).

Type

list[str]

code

The code to run. This field is expected to contain valid python code which is executed when the task mode is TASK_EXEC or TASK_EVAL.

Type

str

function

The function to run. This field is expected to contain a python function name which can be resolved in the scope of the respective RP worker implementation (see documentation there). The task mode must be set to TASK_FUNCTION. args and kwargs are passed as function parameters.

Type

str

args

Positional arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.

Type

list, optional

kwargs

Named arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.

Type

dict, optional

command

A shell command to be executed. This attribute is used for the TASK_SHELL mode.

Type

str

use_mpi

flag if the task should be provided an MPI communicator. Defaults to True if more than 1 rank is requested (see ranks), otherwise defaults to False. Set this to True if you want to enfoce an MPI communicator on single-ranked tasks.

Type

bool, optional

ranks

The number of application processes to start on CPU cores. Default 1.

For two ranks or more, an MPI communicator will be available to the processes.

ranks replaces the deprecated attribute cpu_processes. The attribute cpu_process_type was previously used to signal the need for an MPI communicator - that attribute is now also deprecated and will be ignored.

Type

int, optional

cores_per_rank

The number of cpu cores each process will have available to start its own threads or processes on. By default, core refers to a physical CPU core - but if the pilot has been launched with SMT-settings > 1, core will refer to a virtual core or hyperthread instead (the name depends on the CPU vendor).

cores_per_rank replaces the deprecated attribute cpu_threads.

Type

int, optional

threading_type

The thread type, influences startup and environment (<empty>/POSIX, OpenMP).

threading_type replaces the deprecated attribute cpu_thread_type.

Type

str, optional

gpus_per_rank

The number of gpus made available to each rank. If gpu is shared among several ranks, then a fraction of gpu should be provided (e.g., 2 ranks share a GPU, then gpus_per_rank=.5).

gpus_per_rank replaces the deprecated attribute gpu_processes. The attributes gpu_threads and gpu_process_type are also deprecated and will be ignored.

Type

float, optional

gpu_type

The type of GPU environment to provide to the ranks (<empty>, CUDA, ROCm).

gpu_type replaces the deprecated attribute gpu_thread_type.

Type

str, optional

lfs_per_rank

Local File Storage per rank - amount of data (MB) required on the local file system of the node.

lfs_per_rank replaces the deprecated attribute lfs_per_process.

Type

int, optional

mem_per_rank

Amount of physical memory required per rank.

mem_per_rank replaces the deprecated attribute mem_per_process.

Type

int, optional

environment

Environment variables to set in the environment before the execution (launching picked LaunchMethod).

Type

dict, optional

named_env

A named virtual environment as prepared by the pilot. The task will remain in AGENT_SCHEDULING state until that environment gets created.

Type

str, optional

sandbox

This specifies the working directory of the task. It will be created if it does not exist. By default, the sandbox has the name of the task’s uid and is relative to the pilot’s sandbox.

Type

str, optional

stdout

The name of the file to store stdout. If not set then uid.out will be used.

Type

str, optional

stderr

The name of the file to store stderr. If not set then uid.err will be used.

Type

str, optional

input_staging

The files that need to be staged before the execution (list of staging directives, see below).

Type

list, optional

output_staging

The files that need to be staged after the execution (list of staging directives, see below).

Type

list, optional

stage_on_error

Output staging is normally skipped on FAILED or CANCELED tasks, but if this flag is set, staging is attempted either way. This may though lead to additional errors if the tasks did not manage to produce the expected output files to stage. Default False.

Type

bool, optional

pre_launch

Actions (shell commands) to perform before launching (i.e., before LaunchMethod is submitted), potentially on a batch node which is different from the node the task is placed on.

Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of CPU time or other resources! Deviating from that rule will likely result in reduced overall throughput.

Type

list, optional

post_launch

Actions (shell commands) to perform after launching (i.e., after LaunchMethod is executed).

Precautions are the same as for pre_launch actions.

Type

list, optional

pre_exec

Actions (shell commands) to perform before the task starts (LaunchMethod is submitted, but no actual task running yet). Each item could be either a string (str), which represents an action applied to all ranks, or a dictionary (dict), which represents a list of actions applied to specified ranks (key is a rankID and value is a list of actions to be performed for this rank).

The actions/commands are executed on the respective nodes where the ranks are placed, and the actual rank startup will be delayed until all pre_exec commands have completed.

Precautions are the same as for pre_launch actions.

No assumption should be made as to where these commands are executed (although RP attempts to perform them in the task’s execution environment).

No assumption should be made on the specific shell environment the commands are executed in other than a POSIX shell environment.

Errors in executing these commands will result in the task to enter FAILED state, and no execution of the actual workload will be attempted.

Type

list, optional

pre_exec_sync

Flag indicates necessary to sync ranks execution, which enforce to delay individual rank execution, until all pre_exec actions for all ranks are completed. Default False.

Type

bool, optional

post_exec

Actions (shell commands) to perform after the task finishes. The same remarks as on pre_exec apply, inclusive the point on error handling, which again will cause the task to fail, even if the actual execution was successful.

Type

list, optional

restartable

If the task starts to execute on a pilot, but cannot finish because the pilot fails or is canceled, the task can be restarted. Default False.

Type

bool, optional

tags

Configuration specific tags, which influence task scheduling and execution (e.g., tasks co-location).

Type

dict, optional

scheduler

deprecated in favor of raptor_id.

Type

str, optional

raptor_id

Raptor master ID this task is associated with.

Type

str, optional

worker_class

deprecated in favor of raptor_class master or worker task.

Type

str, optional

raptor_class

Class name to instantiate for this Raptor master or worker task.

Type

str, optional

worker_file

deprecated in favor of raptor_class.

Type

str, optional

raptor_file

Optional application supplied Python source file to load raptor_class from.

Type

str, optional

metadata

User defined metadata. Default None.

Type

Any, optional

timeout

Any timeout larger than 0 will result in the task process to be killed after the specified amount of seconds. The task will then end up in CANCELED state.

Type

float, optional

cleanup

If cleanup flag is set, the pilot will delete the entire task sandbox upon termination. This includes all generated output data in that sandbox. Output staging will be performed before cleanup. Note that task sandboxes are also deleted if the pilot’s own cleanup flag is set. Default False.

Type

bool, optional

pilot

Pilot uid. If specified, the task is submitted to the pilot with the given ID. If that pilot is not known to the TaskManager, an exception is raised.

Type

str, optional

Task Ranks

The notion of ranks is central to RP’s TaskDescription class. We here use the same notion as MPI, in that the number of ranks refers to the number of individual processes to be spawned by the task execution backend. These processes will be near-exact copies of each other: they run in the same workdir and the same environment, are defined by the same executable and arguments, get the same amount of resources allocated, etc. Notable exceptions are:

  • Rank processes may run on different nodes;

  • rank processes can communicate via MPI;

  • each rank process obtains a unique rank ID.

It is up to the underlying MPI implementation to determine the exact value of the process’ rank ID. The MPI implementation may also set a number of additional environment variables for each process.

It is important to understand that only applications which make use of MPI should have more than one rank – otherwise identical copies of the same application instance are launched which will compute the same results, thus wasting resources for all ranks but one. Worse: I/O-routines of these non-MPI ranks can interfere with each other and invalidate those results.

Also, applications with a single rank cannot make effective use of MPI— depending on the specific resource configuration, RP may launch those tasks without providing an MPI communicator.

Task Environment

RP tasks are expected to be executed in isolation, meaning that their runtime environment is completely independent from the environment of other tasks, independent from the launch mechanism used to start the task, and also independent from the environment of the RP stack itself.

The task description provides several hooks to help setting up the environment in that context. It is important to understand the way those hooks interact with respect to the environments mentioned above.

  • pre_launch directives are set and executed before the task is passed on to the task launch method. As such, pre_launch usually executed on the node where RP’s agent is running, and not on the tasks target node. Executing pre_launch directives for many tasks can thus negatively impact RP’s performance (*). Note also that pre_launch directives can in some cases interfere with the launch method.

    Use pre_launch directives for rare, heavy-weight operations which prepare the runtime environment for multiple tasks: fetch data from a remote source, unpack input data, create global communication channels, etc.

  • pre_exec directives are set and executed after the launch method placed the task on the compute nodes and are thus running on the target node. Note that for MPI tasks, the pre_exec directives are executed once per rank. Running large numbers of pre_exec directives concurrently can lead to system performance degradation (*), for example when those directives concurrently hot the shared files system (for loading modules or Python virtualenvs etc).

    Use pre_exec directives for task environment setup such as module load, virtualenv activate, export whose effects are expected to be applied either to all task ranks or to specified ranks. Avoid file staging operations at this point (files would be redundantly staged multiple times - once per rank).

(*) The performance impact of repeated concurrent access to the system’s shared file system can be significant and can pose a major bottleneck for your application. Specifically module load and virtualenv activate operations and the like are heavy on file system I/O, and executing those for many tasks is ill advised. Having said that: RP attempts to optimize those operations: if it identifies that identical pre_exec directives are shared between multiple tasks, RP will execute the directives exactly once and will cache the resulting environment settings - those cached settings are then applied to all other tasks with the same directives, without executing the directives again.

Staging Directives

The Staging Directives are specified using a dict in the following form:

staging_directive = {
   'source'  : None, # see 'Location' below
   'target'  : None, # see 'Location' below
   'action'  : None, # See 'Action operators' below
   'flags'   : None, # See 'Flags' below
   'priority': 0     # Control ordering of actions (unused)
}

Locations

source and target locations can be given as strings or ru.Url instances. Strings containing :// are converted into URLs immediately. Otherwise, they are considered absolute or relative paths and are then interpreted in the context of the client’s working directory.

Special URL schemas

  • client:// : relative to the client’s working directory

  • resource:// : relative to the RP sandbox on the target resource

  • pilot:// : relative to the pilot sandbox on the target resource

  • task:// : relative to the task sandbox on the target resource

In all these cases, the hostname element of the URL is expected to be empty, and the path is always considered relative to the locations specified above (even though URLs usually don’t have a notion of relative paths).

For more details on path and sandbox handling check the documentation of radical.pilot.staging_directives.complete_url().

Action operators

  • rp.TRANSFER : remote file transfer from source URL to target URL

  • rp.COPY : local file copy, i.e., not crossing host boundaries

  • rp.MOVE : local file move

  • rp.LINK : local file symlink

Flags

  • rp.CREATE_PARENTS: create the directory hierarchy for targets on the fly

  • rp.RECURSIVE: if source is a directory, handles it recursively

Task

class radical.pilot.Task(tmgr, descr, origin)[source]

Represent a ‘task’ that is executed on a Pilot.

Tasks allow to control and query the state of this task.

Note

A task cannot be created directly. The factory method rp.TaskManager.submit_tasks() has to be used instead.

Example:

tmgr = rp.TaskManager(session=s)

ud = rp.TaskDescription()
ud.executable = "/bin/date"

task = tmgr.submit_tasks(ud)
as_dict()[source]

Returns a Python dictionary representation of the object.

cancel()[source]

Cancel the task.

property client_sandbox

The full URL of the client sandbox, which is usually the same as the current working directory of the Python script in which the RP Session is instantiated.

Note that the URL may not be usable to access that sandbox from another machine: RP in general knows nothing about available access endpoints on the local host.

Type

radical.utils.Url

property description

The description the task was started with, as a dictionary.

Type

dict

property endpoint_fs

The URL which is internally used to access the target resource’s root file system.

Type

radical.utils.Url

property exception

A string representation (__repr__) of the exception which caused the task’s FAILED state if such one was raised while managing or executing the task.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.

Type

str

property exception_detail

Additional information about the exception which caused this task to enter FAILED state.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.

Type

str

property exit_code

The exit code of the task, if that is already known, or ‘None’ otherwise.

Type

int

property metadata

The metadata field of the task’s description.

property mode

The task mode.

Type

str

property name

The task’s application specified name.

Type

str

property origin

Indicates where the task was created.

Type

str

property pilot

The pilot ID of this task, if that is already known, or ‘None’ otherwise.

Type

str

property pilot_sandbox

The full URL of the path that RP considers the pilot sandbox on the target resource’s file system which is shared by all tasks which are executed by that pilot.

Type

radical.utils.Url

register_callback(cb, cb_data=None, metric=None)[source]

Add a state-change callback.

Registers a callback function that is triggered every time a task’s state changes.

All callback functions need to have the same signature:

def cb(obj, state) -> None:
    ...

where obj is a handle to the object that triggered the callback and state is the new state of that object. If cb_data is given, then the cb signature changes to

def cb(obj, state, cb_data) -> None:
    ...

and cb_data are passed unchanged.

property resource_sandbox

The full URL of the path that RP considers the resource sandbox, i.e., the sandbox on the target resource’s file system that is shared by all sessions which access that resource.

Type

radical.utils.Url

property return_value

The return value for tasks which represent function call (or None otherwise).

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.

Type

Any

property sandbox

An alias for task_sandbox.

Type

str

property session

The task’s session.

Type

radical.pilot.Session

property session_sandbox

The full URL of the path that RP considers the session sandbox on the target resource’s file system which is shared by all pilots which access that resource in the current session.

Type

radical.utils.Url

property state

The current state of the task.

Type

str

property stderr

A snapshot of the executable’s STDERR stream.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property stdout

A snapshot of the executable’s STDOUT stream.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property task_sandbox

The full sandbox URL of this task, if that is already known, or ‘None’ otherwise.

Type

radical.utils.Url

property tmgr

The task’s manager.

Type

radical.pilot.TaskManager

property uid

The task’s unique identifier within a TaskManager.

Type

str

wait(state=None, timeout=None)[source]

Block for state change.

Returns when the task reaches a specific state or when an optional timeout is reached.

Parameters
  • state (str | list[str], optional) –

    The state(s) that task has to reach in order for the call to return.

    By default wait waits for the task to reach a final state, which can be one of the following.

    • rp.states.DONE

    • rp.states.FAILED

    • rp.states.CANCELED

  • timeout (float, optional) – Optional timeout in seconds before the call returns regardless whether the task has reached the desired state or not. The default value None never times out.

Raptor

class radical.pilot.raptor.Master(cfg: Optional[Config] = None)[source]

Raptor Master class

The rp.raptor.Master instantiates and orchestrates a set of workers which are used to rapidly and efficiently execute function tasks. As such the raptor master acts as an RP executor: it hooks into the RP agent communication channels to receive tasks from the RP agent scheduler in order to execute them. Once completed tasks are pushed toward the agent output staging component and will then continue their life cycle as all other tasks.

alive()[source]

check if the main work thread of this master is running

control_cb(topic, msg)[source]

listen for worker_register, worker_unregister, worker_rank_heartbeat and rpc_req messages.

join()[source]

wait until the main work thread of this master completes

request_cb(tasks)[source]

A raptor master implementation can overload this cb to filter all newly submitted tasks: it recieves a list of tasks and returns a potentially different list of tasks which are then executed. It is up to the master implementation to ensure proper state transition for any tasks which are passed as argument but are not returned by the call and thus are not submitted for execution.

Parameters

tasks ([List[Dict[str, ANY]]) – list of tasks which this master received for execution

Returns

possibly different list of tasks than

received

Return type

tasks ([List[Dict[str, ANY]])

result_cb(tasks)[source]

A raptor master implementation can overload this cb which get’s called when raptor tasks complete execution.

Parameters

tasks ([List[Dict[str, ANY]]) – list of tasks which this master executed

start()[source]

start the main work thread of this master

stop()[source]

stop the main work thread of this master

submit_tasks(tasks) None[source]

submit a list of tasks to the task queue We expect to get either TaskDescription instances which will then get converted into task dictionaries and pushed out, or we get task dictionaries which are used as is. Either way, self.request_cb will be called for all tasks submitted here.

Parameters

tasks (List[TaskDescription]) – description of tasks to be submitted

submit_workers(descriptions: List[TaskDescription]) List[str][source]

Submit a raptor workers per given descriptions element and pass the queue raptor info as configuration file. Do not wait for the workers to come up - they are expected to register via the control channel.

The task descriptions specifically support the following keys:

  • raptor_class: str, type name of worker class to execute

  • raptor_filestr, optional

    Module file from which raptor_class may be imported, if a custom RP worker class is used

Note that only one worker rank (presumably rank 0) should register with the master - the workers are expected to synchronize their ranks as needed.

Parameters

descriptions (List[TaskDescription]) – a list of worker descriptions

Returns

list of uids for submitted worker tasks

Return type

List[str]

terminate()[source]

terminate all workers and the master’s own work loop.

wait_workers(count=None, uids=None)[source]

Wait for n workers, or for workers with given UID, or for all workers to become available, then return. A worker is considered available when it registered with this master and get’s its status flag set to ACTIVE.

Parameters
  • count (int) – number of workers to wait for

  • uids (List[str]) – set of worker UIDs to wait for

worker_state_cb(worker_dict, state)[source]

This callback can be overloaded - it will be invoked whenever the master receives a state update for a worker it is connected to.

Parameters
  • worker_dict (Dict[str, Any]) – a task dictionary representing the worker whose state was updated

  • state (str) – new state of the worker

property workers

task dictionaries representing all currently registered workers

class radical.pilot.raptor.Worker(manager, rank, raptor_id)[source]

Implement the Raptor protocol for dispatching multiple Tasks on persistent resources.

get_dispatcher(name)[source]

Query a registered execution mode.

Parameters

name (str) – name of execution mode to query for

Returns

the dispatcher method for that execution mode

Return type

Callable

get_master()[source]

The worker can submit tasks back to the master - this method will return a small shim class to provide that capability. That class has a single method run_task which accepts a single rp.TaskDescription from which a rp.Task is created and executed. The call then waits for the task’s completion before returning it in a dict representation, the same as when passed to the master’s result_cb.

Note: the run_task call is running in a separate thread and will thus

not block the master’s progress.

Returns

a shim class with only one method: run_task(td) where

td is a TaskDescription to run.

Return type

Master

join()[source]

Wait until the worker’s main work loop completed.

register_mode(name, dispatcher) None[source]

Register a new task execution mode that this worker can handle. The specified dispatcher callable should accept a single argument: the task to execute.

Parameters
  • name (str) – name of the mode to register

  • dispatcher (callable) – function which implements the execution mode

start()[source]

Start the workers main work loop.

stop()[source]

Signal the workers to stop the main work loop.

A radical.pilot.Task managing a radical.pilot.raptor.Master instance is created using radical.pilot.TaskDescription.mode rp.RAPTOR_MASTER, or through submit_raptors(). The object returned to the client is a Task subclass with additional features.

class radical.pilot.raptor_tasks.Raptor(tmgr, descr, origin)[source]

RAPTOR (‘RAPid Task executOR’) is a task executor which, other than other RADICAL-Pilot executors can handle function tasks.

A Raptor must be submitted to a pilot. It will be associated with RaptorWorker instances on that pilot and use those workers to rapidly execute tasks. Raptors excel at high throughput execution for large numbers of short running tasks. However, they have limited capabilities with respect to managing task data dependencies, multinode tasks, MPI executables, and tasks with heterogeneous resource requirements.

rpc(cmd, *args, **kwargs)[source]

Send a raptor command, wait for the response, and return the result.

Parameters
  • rpc (str) – name of the rpc call to invoke

  • *args (*List[Any]) – unnamed arguments

  • **kwargs (**Dict[str, Any])) – named arguments

Returns

the returned dictionary has the following fields:
  • out: captured standard output

  • err: captured standard error

  • ret: return value of the call (can be any serializable type)

  • exc: tuple of exception type (str) and error message (str)

Return type

Dict[str, Any]

submit_tasks(descriptions: List[TaskDescription]) List[Task][source]

Submit a set of tasks to this Raptor instance.

Parameters

descriptions (List[TaskDescription]) – ;aunch a raptor worker for each provided description.

Returns

a list of rp.Task instances, one for each task.

Return type

List[Tasks]

The tasks might not get executed until a worker is available for this Raptor instance.

submit_workers(descriptions: List[TaskDescription]) List[Task][source]

Submit a set of workers for this Raptor instance.

Parameters

descriptions (List[TaskDescription]) – ;aunch a raptor worker for each provided description.

Returns

a list of rp.Task instances, one for each created

worker task

Return type

List[Tasks]

The method will return immediately without waiting for actual task instantiation. The submitted tasks will operate solely on behalf of the Raptor master this method is being called on.

Utilities and helpers

class radical.pilot.agent.scheduler.base.AgentSchedulingComponent(cfg, session)[source]
control_cb(topic, msg)[source]

listen on the control channel for raptor queue registration commands

slot_status(msg=None, uid=None)[source]

Returns a multi-line string corresponding to the status of the node list

unschedule_cb(topic, msg)[source]

release (for whatever reason) all slots allocated to this task

work(tasks)[source]

This is the main callback of the component, which is called for any incoming (set of) task(s). Tasks arriving here must always be in AGENT_SCHEDULING_PENDING state, and must always leave in either AGENT_EXECUTING_PENDING or in a FINAL state (FAILED or CANCELED). While handled by this component, the tasks will be in AGENT_SCHEDULING state.

This method takes care of initial state change to AGENT_SCHEDULING, and then puts them forward onto the queue towards the actual scheduling process (self._schedule_tasks).

class radical.pilot.agent.scheduler.continuous.Continuous(cfg, session)[source]

The Continuous scheduler attempts to place threads and processes of a tasks onto nodes in the cluster.

_configure()[source]

Configure this scheduler instance

  • scattered: This is the continuous scheduler, because it attempts to allocate a continuous set of cores/nodes for a task. It does, however, also allow to scatter the allocation over discontinuous nodes if this option is set. This implementation is not optimized for the scattered mode! The default is ‘False’.

_find_resources(node, find_slots, ranks_per_slot, cores_per_slot, gpus_per_slot, lfs_per_slot, mem_per_slot, partial)[source]

Find up to the requested number of slots, where each slot features the respective x_per_slot resources. This call will return a list of slots of the following structure:

{

‘node_name’: ‘node_name’, ‘node_id’ : ‘1’, ‘core_map’ : [[1, 2, 4, 5], [6, 7, 8, 9]], ‘gpu_map’ : [[1, 3], [1, 3]], ‘lfs’ : 1234, ‘mem’ : 4321

}

The call will not change the allocation status of the node, atomicity must be guaranteed by the caller.

We don’t care about continuity within a single node - cores [1, 5] are assumed to be as close together as cores [1, 2].

When partial is set, the method CAN return less than find_slots number of slots - otherwise, the method returns the requested number of slots or None.

FIXME: SMT handling: we should assume that hardware threads of the same

physical core cannot host different executables, so HW threads can only account for thread placement, not process placement. This might best be realized by internally handling SMT as minimal thread count and using physical core IDs for process placement?

_iterate_nodes()[source]

The scheduler iterates through the node list for each task placement. However, we want to avoid starting from node zero every time as tasks have likely placed on that node previously - instead, we in general want to pick off where the last task placement succeeded. This iterator is preserving that state.

Note that the first index is yielded twice, so that the respective node can function as first and last node in an allocation.

schedule_task(task)[source]

Find an available set of slots, potentially across node boundaries (in the MPI case).

A slot is here considered the amount of resources required by a single MPI rank. Those resources need to be available on a single node - but slots can be distributed across multiple nodes. Resources for non-MPI tasks will always need to be placed on a single node.

By default, we only allow for partial allocations on the first and last node - but all intermediate nodes MUST be completely used (this is the ‘CONTINUOUS’ scheduler after all).

If the scheduler is configured with scattered=True, then that constraint is relaxed, and any set of slots (be it continuous across nodes or not) is accepted as valid allocation.

No matter the mode, we always make sure that we allocate slots in chunks of cores, gpus, lfs and mem required per process - otherwise the application processes would not be able to acquire the requested resources on the respective node.

unschedule_task(tasks)[source]

This method is called when previously acquired resources are not needed anymore. slots are the resource slots as previously returned by schedule_task().

class radical.pilot.utils.component.AgentComponent(cfg, session)[source]
advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=True, prof=True)[source]

Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.

  • things: list of things to advance

  • state: new state to set for the things

  • publish: determine if state update notifications should be issued

  • push: determine if things should be pushed to outputs

  • fwd: determine if notifications are forarded to the ZMQ bridge

  • prof: determine if state advance creates a profile event (publish, and push are always profiled)

‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.

If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.

This is evaluated in self.publish.

class radical.pilot.utils.component.BaseComponent(cfg, session)[source]

This class provides the basic structure for any RP component which operates on stateful things. It provides means to:

  • define input channels on which to receive new things in certain states

  • define work methods which operate on the things to advance their state

  • define output channels to which to send the things after working on them

  • define notification channels over which messages with other components can be exchanged (publish/subscriber channels)

All low level communication is handled by the base class. Deriving classes will register the respective channels, valid state transitions, and work methods. When a ‘thing’ is received, the component is assumed to have full ownership over it, and that no other component will change the ‘thing’s state during that time.

The main event loop of the component – work() – is executed on run() and will not terminate on its own, unless it encounters a fatal error.

Components inheriting this class should attempt not to use shared resources. That will ensure that multiple instances of the component can coexist for higher overall system throughput. Should access to shared resources be necessary, it will require some locking mechanism across process boundaries.

This approach should ensure that

  • ‘thing’s are always in a well defined state;

  • components are simple and focus on the semantics of ‘thing’ state

    progression;

  • no state races can occur on ‘thing’ state progression;

  • only valid state transitions can be enacted (given correct declaration

    of the component’s semantics);

  • the overall system is performant and scalable.

Inheriting classes SHOULD overload the following methods:

  • initialize():
    • set up the component state for operation

    • register input/output/notification channels

    • register work methods

    • register callbacks to be invoked on state notification

    • the component will terminate if this method raises an exception.

  • work()

  • called in the main loop of the component process, on all entities

    arriving on input channels. The component will not terminate if this method raises an exception. For termination, terminate() must be called.

  • finalize()
    • tear down the component (close threads, unregister resources, etc).

Inheriting classes MUST call the constructor:

class StagingComponent(rpu.BaseComponent):
    def __init__(self, cfg, session):
        super().__init__(cfg, session)

A component thus must be passed a configuration (either as a path pointing to a file name to be opened as ru.Config, or as a pre-populated ru.Config instance). That config MUST contain a session ID (sid) for the session under which to run this component, and a uid for the component itself which MUST be unique within the scope of the given session.

All components and the component managers will continuously sent heartbeat messages on the control pubsub - missing heartbeats will by default lead to component termination.

Further, the class must implement the registered work methods, with a signature of:

work(self, things)

The method is expected to change the state of the ‘thing’s given. ‘Thing’s will not be pushed to outgoing channels automatically – to do so, the work method has to call (see call documentation for other options):

self.advance(thing)

Until that method is called, the component is considered the sole owner of the ‘thing’s. After that method is called, the ‘thing’s are considered disowned by the component. If, however, components return from the work methods without calling advance on the given ‘thing’s, then the component keeps ownership of the ‘thing’s to advance it asynchronously at a later point in time. That implies that a component can collect ownership over an arbitrary number of ‘thing’s over time, and they can be advanced at the component’s discretion.

The component process is a stand-alone daemon process which runs outside of Python’s multiprocessing domain. As such, it can freely use Python’s multithreading (and it extensively does so by default) - but developers should be aware that spawning additional processes in this component is discouraged, as Python’s process management is not playing well with it’s multithreading implementation.

advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=False, prof=True)[source]

Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.

  • things: list of things to advance

  • state: new state to set for the things

  • publish: determine if state update notifications should be issued

  • push: determine if things should be pushed to outputs

  • fwd: determine if notifications are forarded to the ZMQ bridge

  • prof: determine if state advance creates a profile event (publish, and push are always profiled)

‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.

If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.

This is evaluated in self.publish.

control_cb(topic, msg)[source]

This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.

get_input_ep(qname)[source]

return an input endpoint

get_output_ep(qname)[source]

return an output endpoint

output(things, state=None)[source]

this pushes the given things to the output queue register for the given state

publish(pubsub, msg, topic=None)[source]

push information into a publication channel

register_input(states, queue, cb=None, qname=None, path=None)[source]

Using this method, the component can be connected to a queue on which things are received to be worked upon. The given set of states (which can be a single state or a list of states) will trigger an assert check upon thing arrival.

This method will further associate a thing state with a specific worker callback cb. Upon thing arrival, the thing state will be used to lookup the respective worker, and the thing will be handed over. Workers should call self.advance(thing), in order to push the thing toward the next component. If, for some reason, that is not possible before the worker returns, the component will retain ownership of the thing, and should call advance() asynchronously at a later point in time.

Worker invocation is synchronous, ie. the main event loop will only check for the next thing once the worker method returns.

register_output(states, qname)[source]

Using this method, the component can be connected to a queue to which things are sent after being worked upon. The given set of states (which can be a single state or a list of states) will trigger an assert check upon thing departure.

If a state but no output is specified, we assume that the state is final, and the thing is then considered ‘dropped’ on calling advance() on it. The advance() will trigger a state notification though, and then mark the drop in the log. No other component should ever again work on such a final thing. It is the responsibility of the component to make sure that the thing is in fact in a final state.

register_publisher(pubsub)[source]

Using this method, the component can registered itself to be a publisher of notifications on the given pubsub channel.

register_subscriber(pubsub, cb)[source]

This method is complementary to the register_publisher() above: it registers a subscription to a pubsub channel. If a notification is received on thag channel, the registered callback will be invoked. The callback MUST have one of the signatures:

callback(topic, msg)

where ‘topic’ is set to the name of the pubsub channel.

The subscription will be handled in a separate thread, which implies that the callback invocation will also happen in that thread. It is the caller’s responsibility to ensure thread safety during callback invocation.

register_timed_cb(cb, cb_data=None, timer=None)[source]

Idle callbacks are invoked at regular intervals – they are guaranteed to not be called more frequently than ‘timer’ seconds, no promise is made on a minimal call frequency. The intent for these callbacks is to run lightweight work in semi-regular intervals.

rpc(cmd, *args, rpc_addr=None, **kwargs)[source]

Remote procedure call.

Send am RPC command and arguments to the control pubsub and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.

stop()[source]

We need to terminate and join all threads, close all communication channels, etc. But we trust on the correct invocation of the finalizers to do all this, and thus here only forward the stop request to the base class.

unregister_input(states, qname, worker)[source]

This methods is the inverse to the ‘register_input()’ method.

unregister_output(states)[source]

this removes any outputs registerd for the given states.

unregister_timed_cb(cb)[source]

This method is reverts the register_timed_cb() above: it removes an idler from the component, and will terminate the respective thread.

work_cb()[source]

This is the main routine of the component, as it runs in the component process. It will first initialize the component in the process context. Then it will attempt to get new things from all input queues (round-robin). For each thing received, it will route that thing to the respective worker method. Once the thing is worked upon, the next attempt on getting a thing is up.

class radical.pilot.utils.component.ClientComponent(cfg, session)[source]
advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=False, prof=True)[source]

Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.

  • things: list of things to advance

  • state: new state to set for the things

  • publish: determine if state update notifications should be issued

  • push: determine if things should be pushed to outputs

  • fwd: determine if notifications are forarded to the ZMQ bridge

  • prof: determine if state advance creates a profile event (publish, and push are always profiled)

‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.

If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.

This is evaluated in self.publish.

radical.pilot.utils.prof_utils.get_consumed_resources(session, rtype='cpu', tdurations=None)[source]

For all ra.pilot or ra.task entities, return the amount and time of resources consumed. A consumed resource is characterized by:

  • a resource type (we know about cores and gpus)

  • a metric name (what the resource was used for)

  • a list of 4-tuples of the form: [t0, t1, r0, r1]

The tuples are formed so that t0 to t1 and r0 to r1 are continuous:

  • t0: time, begin of resource consumption

  • t1: time, begin of resource consumption

  • r0: int, index of resources consumed (min)

  • r1: int, index of resources consumed (max)

An entity can consume different resources under different metrics, but the returned consumption specs will never overlap. Thus, any resource is accounted for exactly one metric at any point in time. The returned structure has the following overall form:

{
  'metric_1' : {
      uid_1 : [[t0, t1, r0, r1],
               [t2, t3, r2, r3],
               ...
              ],
      uid_2 : ...
  },
  'metric_2' : ...
}
radical.pilot.utils.prof_utils.get_hostmap(profile)[source]

We abuse the profile combination to also derive a pilot-host map, which will tell us on what exact host each pilot has been running. To do so, we check for the PMGR_ACTIVE advance event in agent_0.prof, and use the NTP sync info to associate a hostname.

radical.pilot.utils.prof_utils.get_hostmap_deprecated(profiles)[source]

This method mangles combine_profiles and get_hostmap, and is deprecated. At this point it only returns the hostmap

radical.pilot.utils.prof_utils.get_provided_resources(session, rtype='cpu')[source]

For all ra.pilots, return the amount and time of the type of resources provided. This computes sets of 4-tuples of the form: [t0, t1, r0, r1] where:

t0: time, begin of resource provision t1: time, begin of resource provision r0: int, index of resources provided (min) r1: int, index of resources provided (max)

The tuples are formed so that t0 to t1 and r0 to r1 are continuous.

radical.pilot.utils.prof_utils.get_resource_timelines(task, transitions)[source]

For each specific task, return a set of tuples of the form:

[start, stop, metric]

which reports what metric has been used during what time span.

radical.pilot.utils.prof_utils.get_session_description(sid, src=None)[source]

This will return a description which is usable for radical.analytics evaluation. It informs about:

  • set of stateful entities

  • state models of those entities

  • event models of those entities (maybe)

  • configuration of the application / module

If src is given, it is interpreted as path to search for session information (json dump). src defaults to $PWD/$sid.

The serializer should be able to (de)serialize information that we want to send over the wire from the client side to the agent side via 1- ZMQ 2- MongoDB

we except:

1- Callables with and without dependecies. 2- Non-callables like classes and other python objects

radical.pilot.utils.serializer.deserialize_file(fname)[source]

Deserialize object from file

radical.pilot.utils.serializer.deserialize_obj(data)[source]

Deserialize object from str.

radical.pilot.utils.serializer.serialize_file(obj, fname=None)[source]

serialize object to file

radical.pilot.utils.serializer.serialize_obj(obj)[source]

serialize object

radical.pilot.utils.session.fetch_filetype(ext, name, sid, src=None, tgt=None, access=None, skip_existing=False, fetch_client=False, log=None, rep=None)[source]
Parameters
  • ext (-) – file extension to fetch

  • name (-) – full name of filetype for log messages etc

  • sid (-) – session for which all files are fetched

  • src (-) – dir to look for client session files ($src/$sid/*.ext)

  • tgt (-) – dir to store the files in ($tgt/$sid/*.ext, $tgt/$sid/$pid/*.ext)

Returns

list of file names (fetched and/or cached)

Return type

list[str]

radical.pilot.utils.misc.create_tar(tgt: str, dnames: List[str]) None[source]

Create a tarball on the file system which contains all given directories

radical.pilot.utils.misc.get_resource_config(resource: str) Union[None, Config][source]

For the given resource label, return the resource configuration used by radical.pilot.

Parameters

resource (str) – resource label for which to return the cfg

Returns

the resource configuration

The method returns None if no resource config is found for the specified resource label.

Return type

radical.utils.Config

radical.pilot.utils.misc.get_resource_configs() Config[source]

Return all resource configurations used by radical.pilot.

Configurations for the individual resources are organized as sites and resources:

  • cfgs = get_resource_configs()

  • sites = cfgs.keys()

  • for site in sites: resource_names = cfgs[site].keys()

Returns

the resource configurations

Return type

radical.utils.Config

radical.pilot.utils.misc.get_resource_fs_url(resource: str, schema: Optional[str] = None) Union[None, Url][source]

For the given resource label, return the contact URL of the resource’s file system. This corresponds to the filesystem_endpoint setting in the resource config.

For example, rs.filesystem.directory(get_resource_fs_url(…)).change_dir(‘/’) is equivalent to the base endpoint:/// URL available for use in a staging_directive.

Parameters
  • resource (str) – resource label for which to return the url

  • schema (str, optional) – access schema to use for resource access. Defaults to the default access schema as defined in the resources config files.

Returns

the file system URL

The method returns None if no resource config is found for the specified resource label and access schema.

Return type

radical.utils.Url

radical.pilot.utils.misc.get_resource_job_url(resource: str, schema: Optional[str] = None) Union[None, Url][source]

For the given resource label, return the contact URL of the resource’s job manager.

Parameters
  • resource (str) – resource label for which to return the url

  • schema (str, optional) – access schema to use for resource access. Defaults to the default access schema as defined in the resources config files.

Returns

the job manager URL

The method returns None if no resource config is found for the specified resource label and access schema.

Return type

radical.utils.Url

Release Notes

For a list of open issues and known problems, see: https://github.com/radical-cybertools/radical.pilot/issues/

Current

This is the latest release - if uncertain, use this release.


1.52.1 Hotfix Release 2024-04-18

  • more fixes for setuptools upgrade

  • remove sdist staging

  • simplify bootstrapper


1.52.0 Release 2024-04-15

  • fix for setuptools upgrade

  • added resource configuration for Flux on Frontier

  • use srun for flux startup


1.49.2 Hotfix Release 2024-04-10

  • fix #3162: missing advance on failed agent staging, create target dir


1.49.0 Release 2024-04-05

  • MPIRUN/SRUN documentation update

  • inherit environment for local executor


1.48.0 Release 2024-03-24

  • remove SAGA as hard dependency

  • add named_env check to TD verfication

  • add dragon readme

  • add link for a monitoring page for Polaris

  • add supported platform Aurora (ALCF)

  • add SRUN to Bridges2 config and make it default

  • compensate job env isolation for tasks

  • disable env inheritance in PSIJ launcher

  • ensure string type on stdout/stderr

  • fix a pilot cancellation race

  • fix external proxy handling

  • fix non-integer gpu count for flux

  • fix staging scheme expansion

  • fix state reporting from psij

  • fix tarball staging for session

  • refactoring moves launch and exec script creation to executor base

  • removed Theta placeholder (Theta is decommissioned)

  • set original env as a base for LM env

  • update flake8, remove unneeded dependency

  • update ANL config file (added Aurora and removed obsolete platforms)

  • update doc page for Summit (rollback using conda env)

  • update resource configs for Summit

  • update table for supported platforms

  • use physical cores only (no threads) for Aurora for now


1.47.0 Release 2024-02-08

  • binder tutorial

  • expand / fix documentation and README, update policies

  • docs for psij deployment

  • raptor example now terminates on worker failures

  • fix raptor worker registration

  • fix flux startup

  • fix type for LFS_SIZE_PER_NODE

  • update HB config parameters for raptor


1.46.2 Hotfix-Release 2024-02-02

  • fix detection of failed tasks


1.46.1 Hotfix-Release 2024-01-17

  • fix type for LFS_SIZE_PER_NODE


1.46.0 Release 2024-01-11

  • pypi fix

  • configurabe raptor hb timeouts


1.43.0 Release 2024-01-10

  • add bragg prediction example

  • add initial agent scheduler documentation

  • add JSRUN_ERF setup for Summit’s config

  • add mechanism to determine batch/non-batch RP starting

  • collect task PIDs through launch- and exec-scripts

  • ensure mpi4py for raptor example

  • fix ERF creation for JSRUN LM (and updated tests accordingly)

  • fix Popen test

  • fix Task._update method (description attribute)

  • fix _get_exec in Popen (based on provided comments)

  • fix parsing PIDs procedure (based on provided comments)

  • fix profiling in Popen (based on provided comments)

  • fix resource manager handling in get_resource_config

  • fix tasks handling in prof_utils

  • fix test for launch-/exec-scripts

  • follow-up on comments

  • forward after scheduling

  • keep pids dict empty if there is no ranks provided

  • moved collecting EXEC_PID into exec-script

  • preserve process id for tasks with executable mode

  • switch raptor to use the agent ve

  • update metadata within task description


1.42.0 Release 2023-12-04

  • AgentComponent forwards all state notifications

  • document event locations

  • MPI tutorial for RAPTOR

  • add mpi4py to the ci requirements

  • add bulk_size for the executing queue (for sub-agents)

  • add option --ppn for PALS flavor in MPIEXEC LM

  • amarel cfg

  • current version requires RU v1.43

  • fix Profiling tutorial (fails when executed outside from its directory)

  • collect service related data in registry

  • fix multi pilot example

  • move agent config generation to session init

  • remove obsolete Worker class

  • remove MongoDB module load from the Perlmutter config

  • remove mpi4py from doc requirements

  • save sub-agent config into Registry

  • sub-agents are no daemons anymore

  • update documentation for Polaris (GPUs assignment)

  • update launcher for Agent_N

  • update sub-agent config (in sync with the agent default config)

  • update “Describing Tasks” tutorial

  • use RMInfo details for LM options


1.41.0 Release 2023-10-17

  • fix RTD

  • replace MongoDB with ZMQ messaging

  • adapt resource config for ccs.mahti to the new structure

  • add description about input staging data

  • add method to track startup file with service URL (special case - SOMA)

  • add package mpich into CU and docs dependencies

  • add resource_description class

  • check agent sandbox existence

  • clean RPC handling

  • clean raptor RPC

  • deprecated python.system_packages

  • enable testing of all notebooks

  • enable tests for all devel-branches

  • fix heartbeat management

  • fix LM config initialization

  • fix RM LSF for Lassen (+ add platform config)

  • fix Session close options

  • fix TMGR Staging Input

  • fix pilot_state in bootstrapping

  • fix task_pre_exec configurable parameter for Popen

  • fix bootstrapping for sub-agents

  • keep pilot RPCs local

  • raptor worker: one profile per rank

  • let raptor use registry

  • shield agains missing mpi

  • sub-schema for schemas

  • switch to registry configs instead of config files

  • update testes

  • update handling of the service startup process

  • upload session when testing notebooks

  • use hb msg class type

  • version RP devel/nodb temporary


1.37.0 Release 2023-09-23

  • fix default_remote_workdir for csc.mahti platform

  • add README to description for pypi

  • link config tutorial

  • add raptor to API docs

  • add MPI flavor MPI_FLAVOR_PALS

  • add cpu-binding for LM MPIEXEC with the MPI_FLAVOR_PALS flavor

  • clean up Polaris config

  • fix raptor master hb_freq and hb_timeout

  • fix test for MPIRUN LM

  • fix tests for MPIEXEC LM

  • add csc.mahti resource config

  • add slurm inspection test


1.36.0 Release 2023-08-01

  • added pre-defined pre_exec for Summit (preserve LD_LIBRARY_PATH from LM)

  • fixed GPU discovery from SLURM env variables

  • increase raptor’s heartbeat time


1.35.0 Release 2023-07-11

  • Improve links to resource definitions.

  • Improve typing in Session.get_pilot_managers

  • Provide a target for Sphinx :py:mod: role.

  • Un-hide “Utilities and helpers” section in API reference.

  • Use a universal and unique identifier for registered callbacks.

  • added option --exact for Rivanna (SRun LM)

  • fixes tests for PRs from forks (#2969)


1.34.0 Release 2023-06-22

  • major documentation overhaul

  • Fixes ticket #1577

  • Fixes ticket #2553

  • added tests for PilotManager methods (cancel_pilots, kill_pilots)

  • fixed configuration for Perlmutter

  • fixed env dumping for RP Agent

  • move timeout into kill_pilots method to delay forced termination

  • re-introduce a use_mpi flag


1.33.0 Release 2023-04-25

  • add a resource definition for rivanna at UVa.

  • add documentation for missing properties

  • add an exception for RAPTOR workers regarding GPU sharing

  • add an exception in case GPU sharing is used in SRun or MPIRun LMs

  • add configuration discovery for gpus_per_node (Slurm)

  • add PMI_ID env variable (related to Hydra)

  • add rank env variable for MPIExec LM

  • add resource config for Frontier@OLCF

  • add service task description verification

  • add interactive config to UVA

  • add raptor tasks to the API doc

  • add rank documentation

  • allow access to full node memory by default

  • changed type for task['resources'], let RADICAL-Analytics to handle it

  • changed type of gpus_per_rank attribute in TaskDescription (from int to float)

  • enforce correct task mode for raptor master/workers

  • ensure result_cb for executable tasks

  • ensure session._get_task_sandbox for raptor tasks

  • ensure that wait_workers raises RuntimeError during stop

  • ensure worker termination on raptor shutdown

  • fix CUDA env variable(s) setup for pre_exec (in POPEN executor)

  • fix gpu_map in Scheduler and its usage

  • fix ranks calculation

  • fix slots estimation process

  • fix tasks binding (e.g., bind task to a certain number of cores)

  • fix the process of requesting a correct number of cores/gpus (in case of blocked cores/gpus)

  • Fix path of task sandbox path

  • fix wait_workers

  • google style docstrings.

  • use parameter new_session_per_task within resource description to control input parameter start_new_session in subprocess.Popen

  • keep virtualenv as fallback if venv is missing

  • let SRun LM to get info about GPUs from configured slots

  • make slot dumps dependent on debug level

  • master rpc handles stop request

  • move from custom virtualenv version to venv module

  • MPI worker sync

  • Reading resources from created task description

  • reconcile different worker submission paths

  • recover bootstrap_0_stop event

  • recover task description dump for raptor

  • removed codecov from test requirements (codecov is represented by GitHub actions)

  • removed gpus_per_node - let SAGA handle GPUs

  • removed obsolete configs (FUNCS leftover)

  • re-order worker initialization steps, time out on registration

  • support sandboxes for raptor tasks

  • sync JSRun LM options according to defined slots

  • update JSRun LM according to GPU sharing

  • update slots estimation and core/gpu_map creation

  • worker state update cb

Past

Use past releases to reproduce an earlier experiments.


1.21.0 Release 2023-02-01

  • add worker rank heartbeats to raptor

  • ensure descr defaults for raptor worker submission

  • move blocked_cores/gpus under system_architecture in resource config

  • fix blocked_cores/gpus parameters in configs for ACCESS and ORNL resources

  • fix core-option in JSRun LM

  • fix inconsistency in launching order if some LMs failed to be created

  • fix thread-safety of PilotManager staging operations.

  • add ANL’s polaris and polaris_interactive support

  • refactor raptor dispatchers to worker base class


1.20.1 Hotfix Release 2023-01-07

  • fix task cancellation call


1.20.0 Release 2022-12-16

  • interactive amarel cfg

  • add docstring for run_task, remove sort

  • add option -r (number of RS per node) is case of GPU tasks

  • add TaskDescription attribute pre_exec_sync

  • add test for Master.wait

  • add test for tasks cancelling

  • add test for TMGR StagingIn

  • add comment for config addition Fixes #2089

  • add TASK_BULK_MKDIR_THRESHOLD as configurable Fixes #2089

  • agent does not need to pull failed tasks

  • bump python test env to 3.7

  • cleanup error reporting

  • document attributes as attr, not data.

  • extended tests for RM PBSPro

  • fix allocated_cores/gpus in PMGR Launching

  • fix commands per rank (either a single string command or list of commands)

  • fix JSRun test

  • fix nodes indexing (node_id)

  • fix option -b (--bind)

  • fix setup procedure for agent staging test(s)

  • fix executor test

  • fix task cancelation if task is waiting in the scheduler wait queue

  • fix Sphinx syntax.

  • fix worker state statistics

  • implement task timeout for popen executor

  • refactor popen task cancellation

  • removed pre_rank and post_rank from Popen executor

  • rename XSEDE to ACCESS #2676

  • reorder env setup per rank (by RP) and consider (enforce) CPU/GPU types

  • reorganized task/rank-execution processes and synced that with launch processes

  • support schema aliases in resource configs

  • task attribute slots is not required in an executor

  • unify raptor and non-raptor prof traces

  • update amarel cfg

  • update RM Fork

  • update RM PBSPro

  • update SRun option cpus-per-task - set the option if cpu_threads > 0

  • update test for PMGR Launching

  • update test for Popen (for pre/post_rank transformation)

  • update test for RM Fork

  • update test for JSRun (w/o ERF)

  • update test for RM PBSPro

  • update profile events for raptor tasks

  • interactive amarel cfg


1.18.1 Hotfix Release 2022-11-01

  • fix Amarel configuration


1.18.0 Release 2022-10-11

  • move raptor profiles and logfiles into sandboxes\

  • consistent use of task modes\

  • derive etypes from task modes

  • clarify and troubleshoot raptor.py example

  • docstring update

  • make sure we issue a bootstrap_0_stop event

  • raptor tasks now create rank_start/ranks_stop events

  • reporte allocated resources for RA

  • set MPIRun as default LM for Summit

  • task manager cancel wont block: fixes #2336

  • update task description (focus on ranks)


1.17.0 Release 2022-09-15

  • add docker compose recipe.

  • add option -gpu for IBM Spectrum MPI

  • add comet resource config

  • add doc of env variable

  • add interactive schema to frontera config

  • add rcfg inspection utilities

  • also tarball log files, simplify code

  • clarify semantics on file and pwd schemas

  • document programmatical inspection resource definitions

  • ensure RADICAL_SMT setting, document for end user

  • fixed session cache (resolved cachedir)

  • fix ornl resource sbox and summit interactive mode

  • fix session test cleanup

  • keep Spock’s resource config in sync with Crusher’s config

  • make pilot launch and bootstrap CWD-independent

  • make staging schemas consistent for pilot and task staging

  • only use major and minor version for prep_env spec version

  • pilot profiles and logfiles are now transferred as tarball #2663

  • fix scheduler termination

  • remove deprecated FUNCS executor

  • support RP within interactive jobs

  • simple attempt on api level reconnect

  • stage_in.target fix for absolute path Fixes #2590

  • update resource config for Crusher@ORNL

  • use current working tree for docker rp source.


1.16.0 Release 2022-08-15

  • add check for exception message

  • add test for Agent_0

  • fix cpu_threads for special tasks (service, sub-agent)

  • fix task['resources'] value

  • fix uid generation for components (use shared file for counters)

  • fix master task tmgr

  • fix raptor tests

  • fix rp serializer unittest

  • fix sub_agent keyerror

  • keep agent’s config with sub-agents in sync with default one

  • remove confusion of task attribute names (slots vs. resources)

  • set default values for agent and service tasks descriptions

  • set env variable (RP_PILOT_SANDBOX) for agent and service tasks launchers

  • update exec profile events

  • update headers for mpirun- and mpiexec-modules

  • update LM env setup for MPIRun and MPIExec special case (MPT=true)

  • update LM IBRun

  • update mpi-info extraction


1.15.1 Hotfix Release 2022-07-04

  • fix syntactic error in env prep script


1.15.0 Release 2022-07-04

  • added tests for PRTE LM

  • added tests for rank_cmd (IBRun and SRun LMs)

  • adding TMGR stats

  • adding xsede.expanse to the resource config

  • always interprete prep_env version request

  • anaconda support for prepare_env

  • Checking input staging exists before tar-ing Fixes #2483

  • ensure pip in venv mode

  • fixed _rm_info in IBRun LM

  • fixed status callback for SAGA Launcher

  • fixed type in ornl.summit_prte config

  • fix Ibrun set rank env

  • fix raptor env vals

  • use os.path to check if file exists Fixes #2483

  • remove node names duplication in SRun LM command

  • hide node-count from saga job description

  • ‘state_history’ is no longer supported

  • support existing VEs for prepare_env

  • updated installation of dependencies in bootstrapper

  • updated PRTE LM setup and config (including new release of PRRTE on Summit)

  • updating PMGR/AGENT stats - see #2401


1.14.0 Release 2022-04-13

  • support for MPI function tasks

  • support different RAPTOR workers

  • simplify / unify task and function descriptions

  • refactor resource aquisition

  • pilot submission via PSIJ or SAGA

  • added resource config for Crusher@OLCF/ORNL

  • support for execution of serialized function

  • pilot size can now be specified in number of nodes

  • support for PARSL integration

  • improved SMT handling

  • fixed resource configuration for jsrun

  • fix argument escapes

  • raptor consistently reports exceptions now


1.13.0 Release 2022-03-21

  • fix slurm nodefile/nodelist

  • clean temporary setup files

  • fixed test for LM Srun

  • local execution needs to check FORK first

  • fix Bridges-2 resource config


1.12.0 Release 2022-02-28

  • fix callback unregistration

  • fix capturing of task exit code

  • fix srun version command

  • fix metric setup / lookup in tmgr

  • get backfilling scheduler back in sync

  • re-introduced LM to handle aprun

  • Remove task log and the state_history

  • ru.Description -> ru.TypedDict

  • set LM’s initial env with activated VE

  • updated LSF handling cores indexing for LM JSRun

  • use proper shell quoting

  • use ru.TypedDict for Munch, fix tests


1.11.2 Hotfix Release 2022-01-21

  • for non-mpi tasks, ensure that $RP_RANK is set to 0


1.11.0 Release 2022-01-19

  • improve environment isolation for tasks and RCT components

  • add test for LM Srun

  • add resource manager instance to Executor base class

  • add test for blocked cores and gpus parameters (RM base)

  • add unittest to test LM base class initialization from Registry

  • add raptor test

  • add prepare_env example

  • add raptor request and result cb registration

  • avoid shebang use during bootstrap, pip sometimes screws it up

  • detect slurm version and use node file/list

  • enable nvme on summit

  • ensure correct out/err file paths

  • extended GPU handling

  • fix configs to be aligned with env isolation setup

  • fix LM PRTE rank setup command

  • fix cfg.task_environment handling

  • simplify BS env setup

  • forward resource reqs for raptor tasks

  • iteration on flux executor integration

  • limit pymongo version

  • provision radical-gtod

  • reconcile named env with env isolation

  • support Spock

  • support ALCF/JLSE Arcticus and Iris testbeds

  • fix staging behavior under stage_on_error

  • removed dead code


1.10.2 Hotfix Release 2021-12-14

  • constrain mongodb version dependency


1.10.0 Release 2021-11-22

  • Add fallback for ssh tunnel on ifconfig-less nodes

  • cleanup old resources

  • removed OSG leftovers

  • updating test cases

  • fix recursive flag


1.9.2 Hotfix Release 2021-10-27

  • fix shell escaping for task arguments


1.9.0 Release 2021-10-18

  • amarel cfg


1.8.0 Release 2021-09-23

  • fixed pilot staging for input directories

  • clean up configs

  • disabled os.setsid in Popen executor/spawner (in subprocess.Popen)

  • refreshed module list for Summit

  • return virtenv setup parameters

  • Support for :py:mod:radical.pilot.X links. (@eirrgang)

  • use local virtual env (either venv or conda) for Summit


1.6.8 Hotfix Release 2021-08-24

  • adapt flux integration to changes in flux event model

  • fix a merge problem on flux termination handling


1.6.7 Release 2021-07-09

  • artifact upload for RA integration test

  • encapsulate kwargs handling for Session.close().

  • ensure state updates

  • fail tasks which can never be scheduled

  • fixed jsrun resource_set_file to use cpu_index_using: logical

  • separate cpu/gpu utilization

  • fix error handling in data stager

  • use methods from the new module host within RU (>=1.6.7)


1.6.6 Release 2021-05-18

  • added flags to keep prun aware of gpus (PRTE2 LM)

  • add service node support

  • Bridges mpiexec confing fix

  • task level profiling now python independent

  • executor errors should not affect task bulks

  • revive ibrun support, include layout support

  • MPI standard prescribes -H, not -host

  • remove pilot staging area

  • reduce profiling verbosity

  • restore original env before task execution

  • scattered repex staging fixes

  • slurm env fixes

  • updated documentation for PilotDescription and TaskDescription


1.6.5 Release 2021-04-14

  • added flag exclusive for tags (in task description, default False)

  • Adding Bridges2 and Comet

  • always specifu GPU number on srun

  • apply RP+* env vars to raptor tasks

  • avoid a termination race

  • Summit LFS config and JSRUN integration tests

  • gh workflows and badges

  • ensure that RU lock names are unique

  • fixed env creation command and updated env setup check processes

  • fixed launch command for PRTE2 LM

  • fix missing event updates

  • fix ve isolation for prep_env

  • keep track of tagged nodes (no nodes overlapping between different tags)

  • ensure conda activate works

  • allow output staging on failed tasks

  • python 2 -> 3 fix for shebangs

  • remove support for add_resource_config

  • Stampede2 migrates to work2 filesystem

  • update setup module (use python3)


1.6.3 Hotfix Release 2021-04-03

  • fix uid assignment for managers


1.6.2 Hotfix Release 2021-03-26

  • switch to pep-440 for sdist and wheel versioning, to keep pip happy


1.6.1 Release 2021-03-09

  • support for Andes@ORNL, obsolete Rhea@ORNL

  • add_pilot() also accepts pilot dict

  • fixed conda activation for PRTE2 config (Summit@ORNL)

  • fixed partitions handling in LSF_SUMMIT RM

  • reorganized DVM start process (prte2)

  • conf fixes for comet

  • updated events for PRTE2 LM

  • integration test for Bridges2

  • prepare partitioning


1.6.0 Release 2021-02-13

  • rename ComputeUnit -> Task

  • rename ComputeUnitDescription -> TaskDescription

  • rename ComputePilot -> Pilot

  • rename ComputePilotDescription -> PilotDescription

  • rename UnitManager -> TaskManager

  • related renames to state and constant names etc

  • backward compatibility for now deprecated names

  • preparation for agent partitioning (RM)

  • multi-DVM support for PRTE.v1 and PRTE.v2

  • RM class tests

  • Bridges2 support

  • fix to co-scheduling tags

  • fix handling of IP variable in bootstrap

  • doc and test updates, linter fixes, etc

  • update scheduler tag types


1.5.12 Release 2021-02-02

  • multi-dvm support

  • cleanup of raptor

  • fix for bootstrap profiling

  • fix help string in bin/radical-pilot-create-static-ve

  • forward compatibility for tags

  • fix data stager for multi-pilot case

  • parametric integration tests

  • scattered fixes for raptor and sub-agent profiling

  • support new resource utilization plots


1.5.11 Release 2021-01-19

  • cleanup pypi tarball


1.5.10 Release 2021-01-18

  • gpu related fixes (summit)

  • avoid a race condition during termination

  • fix bootstrapper timestamps

  • fixed traverse config

  • fix nod counting for FORK rm

  • fix staging context

  • move staging ops into separate worker

  • use C locale in bootstrapper


1.5.8 Release 2020-12-09

  • improve test coverage

  • add env isolation prototype and documentation

  • change agent launcher to ssh for bridges

  • fix sub agent init

  • fix Cheyenne support

  • define an intel-friendly bridges config

  • add environment preparation to pilot

  • example fixes

  • fixed procedure of adding resource config to the session

  • fix mpiexec_mpt LM

  • silence scheduler log

  • removed resource aliases

  • updated docs for resource config

  • updated env variable RADICAL_BASE for a job description

  • work around pip problem on Summit


1.5.7 Release 2020-10-30

  • Adding init files in all test folders

  • document containerized tasks

  • Fix #2221

  • Fix read_config

  • doc fixes / additions

  • adding unit tests, component tests

  • remove old examples

  • fixing rp_analytics #2114

  • inject workers as MPI task

  • remove debug prints

  • mpirun configs for traverse, stampede2

  • ru.Config is responsible to pick configs from correct paths

  • test agent execution/base

  • unit test for popen/spawn #1881


1.5.4 Release 2020-10-01

  • fix jsrun GPU mapping


1.5.4 Release 2020-09-14

  • Arbitrary udurations for consumed resources

  • Fix unit tests

  • Fix python stack on Summit

  • add module test

  • added PRTE2 for PRRTEv2

  • added attribute for SAGA job description using env variable (SMT)

  • added config for PRRTE launch method at Frontera

  • added test for PRTE2

  • added test for rcfg parameter SystemArchitecture

  • allow virtenv_mode=local to reuse client ve

  • bulk communication for task overlay

  • fixed db close/disconnect method

  • fixed tests and pylint

  • PRTE fixes / updates

  • remove “debug” rp_version remnant


1.5.2 Hotfix Release 2020-08-11

  • add/fix RA prof metrics

  • clean dependencies

  • fix RS file system cache


1.5.1 Hotfix Release 2020-08-05

  • added config parameter for MongoDB tunneling

  • applied exception chaining

  • filtering for login/batch nodes that should not be considered (LSF RM)

  • fix for Resource Set file at JSRUN LM

  • support memory required per node at the RP level

  • added Profiler instance into Publisher and Subscriber (zmq.pubsub)

  • tests added and fixed

  • configs for Lassen, Frontera

  • radical-pilot-resources tool

  • document event model

  • comm bulking

  • example cleanup

  • fix agent base dir

  • Fix durations and add defaults for app durations

  • fixed flux import

  • fixing inconsistent nodelist error

  • iteration on task overlay

  • hide passwords on dburl reports / logs

  • multi-master load distribution

  • pep8

  • RADICAL_BASE_DIR -> RADICAL_BASE

  • remove private TMPDIR export - this fixes #2158

  • Remove SKIP_FAILED (unused)

  • support for custom batch job names

  • updated cuda hook for JSRUN LM

  • updated license file

  • updated readme

  • updated version requirement for python (min is 3.6)


1.4.1 Hotfix Release 2020-06-09

  • fix tmpdir mosconfiguration for summit / prrte


1.4.0 Release 2020-05-12

  • merge #2122: fixed n_nodes for the case when slots are set

  • merge #2123: fix #2121

  • merge #2124: fixed conda-env path definition

  • merge #2127: bootstrap env fix

  • merge #2133, #2138: IBRun fixes

  • merge #2134: agent stage_in test1

  • merge #2137: agent_0 initialization fix

  • merge #2142: config update

  • add deactivate support for tasks

  • add cancelation example

  • added comet_mpirun to resource_xsede.json

  • added test for launch method “srun”

  • adding cobalt test

  • consistent process counting

  • preliminary FLUX support

  • fix RA utilization in case of no agent nodes

  • fix queue naming, prte tmp dir and process count

  • fix static ve location

  • fixed version discovery (srun)

  • cleanup bootstrap_0.sh

  • separate tacc and xsede resources

  • support for Princeton’s Traverse cluster

  • updated IBRun tests

  • updated LM IBRun


1.3.0 Release 2020-04-10

  • task overlay + docs

  • iteration on srun placement

  • add env support to srun

  • theta config

  • clean up launcher termination guard against lower level termination errors

  • cobalt rm

  • optional output stager

  • revive ibrun support

  • switch comet FS


1.2.1 Hotfix Release 2020-02-11

  • scattered fixes cfor summit


1.2.0 Release 2020-02-11

  • support for bulk callbacks

  • fixed package paths for launch methods (radical.pilot.agent.launch_method)

  • updated documentation references

  • raise minimum Python version to 3.6

  • local submit configuration for Frontera

  • switch frontera to default agent cfg

  • fix cray agent config

  • fix issue #2075 part 2


1.1.1 Hotfix Release 2020-02-11

  • fix dependency version for radical.utils


1.1 Release 2020-02-11

  • code cleanup


1.0.0 Release 2019-12-24

  • transition to Python3

  • migrate Rhea to Slurm

  • ensure PATH setting for sub-agents

  • CUDA is now handled by LM

  • fix / improve documentation

  • Sched optimization: task lookup in O(1)

  • Stampede2 prun config

  • testing, flaking, linting and travis fixes

  • add pilot.stage_out (symmetric to pilot.stage_in)

  • add noop sleep executor

  • improve prrte support

  • avoid state publish during idle times

  • cheyenne support

  • default to cont scheduler

  • configuration system revamp

  • heartbeat based process management

  • faster termination

  • support for Frontera

  • lockfree scheduler base class

  • switch to RU ZMQ layer


0.90.1 Release 2019-10-12

  • port pubsub hotfix


0.90.0 Release 2019-10-07

  • transition to Python3


0.73.1 Release 2019-10-07

  • Stampede-2 support


0.72.2 Hotfix Release 2019-09-30

  • fix sandbox setting on absolute paths


0.72.0 Release 2019-09-11

  • implement function executor

  • implement / improve PRTE launch method

  • PRTE profiling support (experimental)

  • agent scheduler optimizations

  • summit related configuration and fixes

  • initial frontera support

  • archive ORTE

  • increase bootstrap timeouts

  • consolidate MPI related launch methods

  • unit testing and linting

  • archive ORTE, issue #1915

  • fix get_mpi_info for Open MPI

  • base classes to raise notimplemented. issue #1920

  • remove outdated resources

  • ensure that pilot env reaches func executor

  • ensureID uniqueness across processes

  • fix inconsistencies in task sandbox handling

  • fix gpu placement alg

  • fix issue #1910

  • fix torque nodefile name and path

  • add metric definitions in RA support

  • make DB comm bulkier

  • expand resource configs with pilot description keys

  • better tiger support

  • add NOOP scheduler

  • add debug executor


0.70.3 Hotfix Release 2019-08-02

  • fix example and summit configuration


0.70.2 Hotfix Release 2019-07-31

  • fix static ve creation for Tiger (Princeton)


0.70.1 Hotfix Release 2019-07-30

  • fix configuration for Tiger (Princeton)


0.70.0 Release 2019-07-07

  • support summitdev, summit @ ORNL (JSRUN, PRTE, RS, ERF, LSF, SMT)

  • support tiger @ princeton (JSRUN)

  • implement NOOP scheduler

  • backport application communicators from v2

  • ensure session close on some tests

  • continous integration: pep8, travis, increasing test coverage

  • fix profile settings for several LMs

  • fix issue #1827

  • fix issue #1790

  • fix issue #1759

  • fix HOMBRE scheduler

  • remove cprof support

  • unify mpirun / mpirun_ccmrun

  • unify mpirun / mpirun_dplace

  • unify mpirun / mpirun_dplace

  • unify mpirun / mpirun_dplace

  • unify mpirun / mpirun_mpt

  • unify mpirun / mpirun_rsh


0.63.0 Release 2019-06-25

  • support for summit (experimental, jsrun + ERF)

  • PRRTE support (experimental, summit only)

  • many changes to the test setup (pytest, pylint, flake8, coverage, travis)

  • support for Tiger (adds SRUN launch method)

  • support NOOP scheduler

  • support application level communication

  • support ordered scheduling of tasks

  • partial code cleanup (coding guidelines)

  • simplifying MPI base launch methods

  • support for resource specific SMT settings

  • resource specific ranges of cores/threads can now be blocked from use

  • ORTE support is doscontinued

  • fixes in hombre scheduler

  • improvements on GPU support

  • fix in continuous scheduler which caused underutilization on heterogeneous tasks

  • fixed: #1758, #1764, #1792, #1790, #1827, #187


0.62.0 Release 2019-06-08

  • add unit test

  • trigger tests

  • remove obsolete fifo scheduler (use the ordered scheduler instead)

  • add ordered scheduler

  • add tiger support

  • add ssh access to cheyenne

  • cleanup examples

  • fix dplace support

  • support app specified task sandboxes

  • fix pilot statepush over tunnels

  • fix titan ve creation, add new static ve

  • fix for cheyenne


0.61.0 Release 2019-05-07

  • add travis support, test cleanup

  • ensure safe bootstrapper termination on faulty installs

  • push node_list to mongodb for analytics

  • fix default dburl

  • fix imports in tests

  • remove deprecated special case in bootstrapper


0.60.1 Hotfix 2019-04-12

  • work around a pip install problem


0.60.0 Release 2019-04-10

  • add issue template

  • rename RP_PILOT_SBOX to RP_PILOT_STAGING and expose to tasks

  • fix bridges default partition (#1816)

  • fix #1826

  • fix off-by-one error on task state check

  • ignore failing DB disconnect

  • follow rename of saga-python to radical.saga


0.50.23 Release 2019-03-20

  • hotfix: use popen spawner for localhost


0.50.22 Release 2019-02-11

  • another fix LSF var expansion


0.50.21 Release 2018-12-19

  • fix LSF var expansion


0.50.20 Release 2018-11-25

  • fix Titan OMPI installation

  • support metdata for tasks

  • fix git error detection during setup


0.50.19 Release 2018-11-15

  • ensure profile fetching on empty tarballs


0.50.18 Release 2018-11-13

  • support for data locality aware scheduling


0.50.17 Release 2018-10-31

  • improve event documentation

  • support Task level metadata


0.50.16 Release 2018-10-26

  • add new shell spawner as popen replacement


0.50.15 Release 2018-10-24

  • fix recursive pilot staging


0.50.14 Release 2018-10-24

  • add Cheyenne support - thanks Vivek!


0.50.13 Release 2018-10-16

  • survive if SAGA does not support job.name (#1744)


0.50.12 Release 2018-10-12

  • fix stacksize usage on BW


0.50.11 Release 2018-10-09

  • fix ‘getting_started’ example (no MPI)


0.50.10 Release 2018-09-29

  • ensure the correct code path in SAGA for Blue Waters


0.50.9 Release 2018-09-28

  • fix examples

  • fix issue #1715 (#1716)

  • remove Stampede’s resource configs. issue #1711

  • supermic does not like curl -1 (#1723)


0.50.8 Release 2018-08-03

  • make sure that CUD values are not None (#1688)

  • don’t limit pymongo version anymore (#1687)


0.50.7 Release 2018-08-01

  • fix bwpy handling


0.50.6 Release 2018-07-31

  • fix curl tssl negotiation problem (#1683)


0.50.5 Release 2018-07-30

  • fix default values for process and thread types (#1681)

  • fix outdated links in ompi deploy script

  • fix/issue 1671 (#1680)

  • fix scheduler config checks (#1677)


0.50.4 Release 2018-07-13

  • set oversubscribe default to True


0.50.3 Release 2018-07-11

  • disable rcfg expnsion


0.50.2 Release 2018-07-08

  • fix relative tarball unpack paths


0.50.1 Release 2018-07-05

  • GPU support

  • many bug fixes


0.47.14 Release 2018-06-13

  • fix recursive output staging


0.47.13 Release 2018-06-02

  • catch up with RU log, rep and prof settings


0.47.12 Release 2018-05-19

  • ensure that tasks are started in their own process group, to ensure clean cancellation semantics.


0.47.11 Release 2018-05-08

  • fix schemas on BW (local orte, local aprun)


0.47.10 Release 2018-04-19

  • fix #1602


0.47.9 Release 2018-04-18

  • fix default scheduler for localhost


0.47.8 Release 2018-04-16

  • hotfix to catch up with pypi upgrade


0.47.7 Release 2018-04-15

  • bugfix related to radical.entk #255


0.47.6 Release 2018-04-12

  • bugfix related to #1590


0.47.5 Release 2018-04-12

  • make sure a dict object exists even on empty env settings (#1590)


0.47.4 Release 2018-03-20

  • fifo agent scheduler (#1537)

  • hombre agent scheduler (#1536)

  • Fix/issue 1466 (#1544)

  • Fix/issue 1501 (#1541)

  • switch to new OMPI deployment on titan (#1529)

  • add agent configuration doc (#1540)


0.47.3 Release 2018-03-20

  • add resource limit test

  • add tmp cheyenne config

  • api rendering proposal for partitions

  • fix bootstrap sequence (BW)

  • tighten bootstrap process, add documentation


0.47.2 Release 2018-02-28

  • fix issue 1538

  • fix issue 1554

  • expose profiler to LM hooks (#1522)

  • fix bin names (#1549)

  • fix event docs, add an event for symmetry (#1527)

  • name attribute has been changed to uid, fixes issue #1518

  • make use of flags consistent between RP and RS (#1547)

  • add support for recursive data staging (#1513. #1514) (JD, VB, GC)

  • change staging flags to integers (inherited from RS)

  • add support for bulk data transfer (#1512) (IP, SM)


0.47 Release 2017-11-19

  • Correctly added ‘lm_info.cores_per_node’ SLURM

  • Torque RM now respects config settings for cpn

  • Update events.md

  • add SIGUSR2 for clean termination on SGE

  • add information about partial event orders

  • add issue demonstrators

  • add some notes on cpython issue demonstrators

  • add xsede.supermic_orte configuration

  • add xsede.supermic_ortelib configuration

  • apply RU’s managed process to termination stress test

  • attempt to localize aprun tasks

  • better hops for titan

  • better integration of Task script and app profs

  • catch up with config changes for local testing

  • centralize URL derivation for pilot job service endpoints, hops, and sandboxes

  • clarify use of namespace vs. full qualified URL in the context of RP file staging

  • clean up config management, inheritance

  • don’t fetch json twice

  • ensure that profiles are flushed and packed correctly

  • fail missing pilots on termination

  • fix AGENT_ACTIVE profile timing

  • fix close-session purge mode

  • fix cray agent config, avoid termination race

  • fix duplicated transition events

  • fix osg config

  • fix #1283

  • fixing error from bootstrapper + aprun parsing error

  • force profile flush earlier

  • get cpn for ibrun

  • implement session.list_resources() per #1419

  • make sure a canceled pilot stays canceled

  • make cb return codes consistent

  • make sure profs are flushed on termination

  • make sure the tmgr only pulls tasks its interested in

  • profile mkdir

  • publish resource_details (incl. lm_info) again

  • re-add a profile flag to advance()

  • remove old controllers

  • remove old files

  • remove uid clashes for sub-agent components and components in general

  • setup number of cores per node on stampede2

  • smaller default pilot size for supermic

  • switch to ibrun for comet_ssh

  • track task drops

  • use js hop for untar

  • using new process class

  • GPU/CPU pinning test is now complete, needs some env settings in the launchers


0.46.2 Release 2017-09-02

  • hotfix for #1426 - thanks Iannis!


0.46.1 Release 2017-08-23

  • hotfix for #1415


Version 0.46 2017-08-11

  • TODO


0.45.3 Release 2017-05-09

  • Documentation update for the BW tutorial


0.45.1 Release 2017-03-05

  • NOTE: OSG and ORTE_LIB on titan are considered unsupported. You can enable those resources for experiments by setting the enabled keys in the respective config entries to true.

  • hotfix the configurations markers above


0.45 Release 2017-02-28

  • NOTE: OSG and ORTE_LIB on titan are considered unsupported. You can enable those resources for experiments by removing the comment markers from the respective resource configs.

  • Adapt to new orte-submit interface.

  • Add orte-cffi dependency to bootstrapper.

  • Agent based staging directives.

  • Fixes to various resource configs

  • Change orte-submit to orterun.

  • Conditional importing of executors. Fixes #926.

  • Config entries for orte lib on Titan.

  • Corrected environment export in executing POPEN

  • Extend virtenv lock timeout, use private rp_installs by default

  • Fix non-mpi execution analogous to #975.

  • Fix/issue 1226 (#1232)

  • Fresh orte installation for bw.

  • support more OSG sites

  • Initial version of ORTE lib interface.

  • Make cprofiling of scheduler conditional.

  • Make list of cprofile subscribers configurable.

  • Move env safekeeping until after the pre bootstrap.

  • Record OSG site name in mongodb.

  • Remove bash’isms from shell script.

  • pylint motivated cleanups

  • Resolving issue #1211.

  • Resource and example config for Shark at LUMC.

  • SGE changes for non-homogeneous nodes.

  • Use ru.which

  • add allegro.json config file for FUB allegro cluster

  • add rsh launch method

  • switch to gsissh on wrangler

  • use new ompi installation on comet (#1228)

  • add a simple/stupid ompi deployment helper

  • updated Config for Stampede and YARN

  • fix state transition to UNSCHEDDULED to avoid repetition and invalid state ordering


0.44.1 Release 2016-11-01

  • add an agent config for cray/aprun all on mom node

  • add anaconda config for examples

  • gsissh as default for wrangler, stampede, supermic

  • add conf for spark n wrangler, comet

  • add docs to the cu env inject

  • expose spark’s master url

  • fix Task env setting (stampede)

  • configuration for spark and anaconda

  • resource config entries for titan

  • disable PYTHONHOME setting in titan_aprun

  • dynamic configuration of spark_env

  • fix for gordon config

  • hardcode the netiface version until it is fixed upstream.

  • implement NON_FATAL for staging directives.

  • make resource config available to agent

  • rename scripts

  • update installation.rst

  • analytics backport

  • use profiler from RU

  • when calling a task state callback, missed states also trigger callbacks


0.43.1 Release 2016-09-09

  • hotfix: fix netifaces to version 0.10.4 to avoid trouble on BlueWaters


0.43 Release 2016-09-08

  • Add aec_handover for orte.

  • add a local confiuration for bw

  • add early binding eample for osg

  • add greenfield config (only works for single-node runs at the moment)

  • add PYTHONPATH to the vars we reset for Task envs

  • allow overloading of agent config

  • fix #1071

  • fix synapse example

  • avoid profiling of empty state transitions

  • Check of YARN start-all script. Raising Runtime error in case of error.

  • disable hwm altogether

  • drop clones before push

  • enable scheduling time measurements.

  • First commit for multinode YARN cluster

  • fix getip

  • fix iface detection

  • fix reordering of states for some update sequences

  • fix task cancellation

  • improve ve create script

  • make orte-submit aware of non-mpi CUs

  • move env preservation to an earlier point, to avoid pre-exec stuff

  • Python distribution mandatory to all confs

  • Remove temp agent config directory.

  • Resolving #1107

  • Schedule behind the real task and support multicore.

  • SchedulerContinuous -> AgentSchedulingComponent.

  • Take ccmrun out of bootstrap_2.

  • Tempfile is not a tempfile so requires explicit removal.

  • resolve #1001

  • Unbreak CCM.

  • use high water mark for ZMQ to avoid message drops on high loads


0.42 Release 2016-08-09

  • change examples to use 2 cores on localhost

  • Iterate documentation

  • Manual cherry pick fix for getip.


0.41 Release 2016-07-15

  • address some of error messages and type checks

  • add scheduler documentation simplify interpretation of BF oversubscription fix a log message

  • fix logging problem reported by Ming and Vivek

  • global default url, sync profile/logfile/db fetching tools

  • make staging path resilient against cwd changes

  • Switch SSH and ORTE for Comet

  • sync session cleanup tool with rpu

  • update allocation IDs


0.40.4 Release 2016-05-18

  • point release with more tutorial configurations


0.40.3 Release 2016-05-17

  • point release with tutorial configurations


0.40.2 Release 2016-05-13

  • hotfix to fix vnode parsing on archer


0.40.1 Release 2016-02-11

  • hotfix which makes sure agents don’t report FAILED on cancel()


0.40 Release 2016-02-03

  • Really numberous changes, fixes and features, most prominently:

  • OSG support

  • Yarn support

  • new resource supported

  • ORTE used for more resources

  • improved examples, profiling

  • communication cleanup

  • large Task support

  • lrms hook fixes

  • agent code splitup


0.38 Release 2015-12-22

  • fix busy mongodb pull


0.37.10 Release 2015-10-20

  • config fix


0.37.9 Release 2015-10-20

  • Example fix


0.37.8 Release 2015-10-20

  • Allocation fix


0.37.7 Release 2015-10-20

  • Allocation fix


0.37.6 Release 2015-10-20

  • Documentation


0.37.5 Release 2015-10-19

  • timing fix to ensure task state ordering


0.37.3 Release 2015-10-19

  • small fixes, doc changes


0.37.2 Release 2015-10-18

  • fix example installation


0.37.1 Release 2015-10-18

  • update of documentation and examples

  • some small fixes on shutdown installation


0.37 Release 2015-10-15

  • change default spawner to POPEN

  • use hostlist to avoid mpirun* limitations

  • support default callbacks on tasks and pilots

  • use a config for examples

  • add lrms shutdown hook for ORTE LM

  • various updates to examples and documentation

  • create logfile and profile tarballs on the fly

  • export some RP env vars to tasks

  • Fix a mongodb race

  • internally unregister pilot cbs on shutdown

  • move agent.stop to finally clause, to correctly react on signals

  • remove RADICAL_DEBUG, use proper logger in queue, pubsub

  • small changes to getting_started

  • add APRUN entry for ARCHER.

  • Updated APRUN config for ARCHER. Thanks Vivek!

  • Use designated termination procedure for ORTE.

  • Use statically compiled and linked OMPI/ORTE.

  • Wait for its component children on termination

  • make localhost (ForkLRMS) behave like a resource with an inifnite number of cores


0.36 Release 2015-10-08

(the release notes also cover some changes from 0.34 to 0.35)

  • simplify agent process tree, process naming

  • improve session and agent termination

  • several fixes and chages to the task state model (refer to documentation!)

  • fix POPEN state reporting

  • split agent component into individual, relocatable processes

  • improve and generalize agent bootstrapping

  • add support for dynamic agent layout over compute nodes

  • support for ORTE launch method on CRAY (and others)

  • add a watcher thread for the ORTE DVM

  • improves profiling support, expand to RP module

  • add various profiling analysis tools

  • add support for profile fetching from remote pilot sandbox

  • synchronize and recombine profiles from different pilots

  • add a simple tool to run a recorded session.

  • add several utility classes: component, queue, pubsub

  • clean configuration passing from module to agent.

  • clean tunneling support

  • support different data frame formats for profiling

  • use agent infrastructure (LRMS, LM) for spawning sub-agents

  • allow LM to specify env vars to be unset.

  • allow agent on mom node to use tunnel.

  • fix logging to avoid log leakage from lower layers

  • avoid some file system bottlenecks

  • several resource specific configuration fixes (mostly stampede, archer, bw)

  • backport stdout/stderr/log retrieval

  • better logging of clone/drops, better error handling for configs

  • fix, improve profiling of Task execution

  • make profile an object

  • use ZMQ pubsub and queues for agent/sub-agent communication

  • decouple launch methods from scheduler for most LMs NOTE: RUNJOB remains coupled!

  • detect disappearing orte-dvm when exit code is zero

  • perform node allocation for sub-agents

  • introduce a barrier on agent startup

  • fix some errors on shell spanwer (quoting, monotoring delays)

  • make localhost layout configurable via cpn

  • make setup.py report a decent error when being used with python3

  • support nodename lookup on Cray

  • only mkdir in input staging controller when we intent to stage data

  • protect agent cb invokation by lock

  • (re)add command line for profile fetching

  • cleanup of data staging, with better support for different schemas (incl. GlobusOnline)

  • work toward better OSG support

  • Use netifaces for ip address mangling.

  • Use ORTE from the 2.x branch.

  • remove Url class


0.35.1 Release 2015-09-29

  • hotfix to use popen on localhost


0.35 Release 2015-07-14

  • numerous bug fixes and support for new resources


0.34 Release 2015-07-14

  • Hotfix release for an installation issue


0.33 Release 2015-05-27

  • Hotfix release for off-by-one error (#621)


0.32 Release 2015-05-18

  • Hotfix release for MPIRUN_RSH on Stampede (#572).


0.31 Release 2015-04-30

  • version bump to trigger pypi release update


0.30 Release 2015-04-29

  • hotfix to handle broken pip/bash combo on archer


0.29 Release 2015-04-28

- hotfix to handle stale ve locks

0.28 Release 2015-04-16

  • This release contains a very large set of commits, and covers a fundamental overhaul of the RP agent (amongst others). It also includes:

  • support for agent profiling

  • removes a number of state race conditions

  • support for new backends (ORTE, CCM)

  • fixes for other backends

  • revamp of the integration tests


0.26 Release 2015-04-08

- hotfix to cope with API changing pymongo release

0.25 Release 2015-04-01

  • hotfix for a stampede configuration change


0.24 Release 2015-03-30

- More support for URLs in StagingDirectives (#489).
- Create parent directories of staged files.
- Only process entries for Output FTW, fixes #490.
- SuperMUC config change.
- switch from bson to json for session dumps
- fixes #451
- update resources.rst
- remove superfluous ```\n`{=tex}``
- fix #438
- add documentation on resource config changes, closes #421
- .ssh/authorized_keys2 is deprecated since 2011
- improved intra-node SSH FAQ item

0.23 Release 2014-12-13

  • fix #455


0.22 Release 2014-12-11

- several state races fixed
- fix to tools for session cleanup and purging
- partial fix for pilot cancelation
- improved shutdown behavior
- improved hopper support
- adapt plotting to changed slothistory format
- make instructions clearer on data staging examples
- addresses issue #216
- be more resilient on pilot shutdown
- take care of cancelling of active pilots
- fix logic error on state check for pilot cancellation
- fix blacklight config (#360)
- attempt to cancel pilots timely...
- as fallback, use PPN information provided by SAGA
- hopper usues torque (thanks Mark!)
- Re-fix blacklight config. Addresses #359 (again).
- allow to pass application data to callbacks
- threads should not be daemons...
- workaround on failing bson encoding...
- report pilot id on cu inspection
- ignore caching errors
- also use staging flags on input staging
- stampede environment fix
- Added missing stampede alias
- adds timestamps to task and pilot logentries
- fix state tags for plots
- fix plot style for waitq
- introduce UNSCHEDULED state as per #233
- selectable terminal type for plot
- document pilot log env
- add faq about VE problems on setuptools upgrade
- allow to specify session cache files
- added configuration for BlueBiou (Thanks Jordane)
- better support for json/bson/timestamp handling; cache mongodb data
  for stats, plots etc
- localize numpy dependency
- retire input_data and output_data
- remove obsolete staging examples
- address #410
- fix another subtle state race

0.21 Release 2014-10-29

  • Documentation of MPI support

  • Documentation of data staging operations

  • correct handling of data transfer exceptions

  • fix handling of non-ascii data in task stdio

  • simplify switching of access schemas on pilot submission

  • disable pilot virtualenv for task execution

  • MPI support for DaVinci

  • performance optimizations on file transfers, task sandbox setup

  • fix ibrun tmp file problem on stampede


0.19 Release September 12. 2014


0.18 Release July 22. 2014


0.17 Release June 18. 2014

  • Bugfix release - fixed file permissions et al. :/


0.16 Release June 17. 2014

  • Bugfix release - fixed file permissions et al.


0.15 Release June 12. 2014

  • Bugfix release

  • fixed distribution MANIFEST: issues #174


0.14 Release June 11. 2014

New Features

  • Experimental pilot-agent for Cray systems

  • New multi-core agent with MPI support

  • New ResourceConfig mechanism does not reuquire the user to add resource configurations explicitly. Resources can be configured programatically on API-level.

API Changes:

  • TaskDescription.working_dir_priv removed

  • Extended state model

  • resource_configurations parameter removed from PilotManager c`tor


0.13 Release May 19. 2014

  • ExTASY demo release

  • Support for project / allocation

  • Updated / simplified resource files

  • Refactored bootstrap mechnism


0.12 Release May 09. 2014


0.11 Release Apr. 29. 2014

  • Fixes error in state history reporting


0.10 Release Apr.  29. 2014

  • Support for state transition introspection via Task/Pilot state_history

  • Cleaned up an streamlined Input and Outpout file transfer workers

  • Support for interchangeable pilot agents

  • Closed tickets


0.9 Release Apr. 16. 2014

  • Support for output file staging

  • Streamlines data model

  • More loosely coupled components connected via DB queues

  • Closed tickets


0.8 Release Mar. 24. 2014

  • Renamed codebase from sagapilot to radical.pilot

  • Added explicit close() calls to PM, UM and Session.

  • Closed tickets


0.7 Release Feb. 25. 2014

  • Added support for callbacks

  • Added support for input file transfer !


0.6 Release Feb. 24. 2014

  • BROKEN RELEASE


0.5 Release Feb. 06. 2014

  • Tutorial 2 release (Github only)

  • Added support for multiprocessing worker

  • Support for Task stdout and stderr transfer via MongoDB GridFS


0.4 Release

  • Tutorial 1 release (Github only)

  • Consistent naming (sagapilot instead of sinon)


0.1.3 Release

  • Github only release

  • Added logging

  • Added security context handling


0.1.2 Release

  • Github only release


  • (HEAD -> master, origin/devel, origin/HEAD, devel) fix the fix

  • devel version bump

  • (HEAD -> master, origin/devel, origin/HEAD, devel) fix the fix

  • devel version bump

  • devel version bump

  • draft

  • remove rct dist staging

  • trigger tests