API Reference

Sessions

class radical.pilot.Session(proxy_url: Optional[str] = None, uid: Optional[str] = None, cfg: Optional[dict] = None, _role: Optional[str] = 'primary', _reg_addr: Optional[str] = None, **close_options)[source]

Root of RP object hierarchy for an application instance.

A Session is the root object of all RP objects in an application instance: it holds radical.pilot.PilotManager and radical.pilot.TaskManager instances which in turn hold radical.pilot.Pilot and radical.pilot.Task instances, and several other components which operate on those stateful entities.

__init__(proxy_url: Optional[str] = None, uid: Optional[str] = None, cfg: Optional[dict] = None, _role: Optional[str] = 'primary', _reg_addr: Optional[str] = None, **close_options)[source]

Create a new session.

A new Session instance is created and stored in the database.

Any RP Session will require an RP Proxy to facilitate communication between the client machine (i.e., the host where the application created this Session instance) and the target resource (i.e., the host where the pilot agent/s is/are running and where the workload is being executed).

A proxy_url can be specified which then must point to an RP Proxy Service instance which this session can use to establish a communication proxy. If proxy_url is not specified, the session will check for the environment variables RADICAL_PILOT_PROXY_URL and will interpret it as such above. If that information is not available, the session will instantiate a proxy service on the local host. Note that any proxy service instantiated by the session itself will be terminated once the session instance is closed or goes out of scope and is thus garbage collected and as such should not be used by other session instances.

Note: an RP proxy will have to be accessible by both the client and the

target hosts to facilitate communication between both parties. That implies access to the respective ports. Proxies started by the session itself will use the first port larger than 10.000 which is found to be free.

Parameters
  • proxy_url (str, optional) – proxy service URL - points to an RP proxy service which is used to establish an RP communication proxy for this session.

  • uid (str, optional) – Create a session with this UID. Session UIDs MUST be unique - otherwise they will lead to communication conflicts, resulting in undefined behaviours.

  • cfg (str | dict, optional) – a named or instantiated configuration to be used for the session.

  • _role (bool) – only PRIMARY sessions created by the original application process (via rp.Session()), will create proxies and Registry Services. AGENT sessions will also create a Registry but no proxies. All other DEFAULT session instances are instantiated internally in processes spawned (directly or indirectly) by the initial session, for example in some of it’s components, or by the RP agent. Those sessions will inherit the original session ID, but will not attempt to create a new proxies or registries.

  • **close_options (optional) – If additional key word arguments are provided, they will be used as the default arguments to Session.close(). This can be useful when the Session is used as a Python context manager, such that close() is called automatically at the end of a with block.

  • _reg_addr (str, optional) – Non-primary sessions will connect to the registry at that endpoint and pull session config and resource configurations from there.

as_dict()[source]

Returns a Python dictionary representation of the object.

close(**kwargs)[source]

Close the session.

All subsequent attempts access objects attached to the session will result in an error. If cleanup is set to True, the session data is removed from the database.

Parameters
  • terminate (bool, optional) – Shut down all pilots associated with the session.

  • download (bool, optional) – Fetch pilot profiles and database entries.

get_pilot_managers(pmgr_uids=None)[source]

Get known PilotManager(s).

Parameters

pmgr_uids (str | Iterable[str], optional) – uids of the PilotManagers we want.

Returns

One

or more radical.pilot.PilotManager objects.

Return type

radical.pilot.PilotManager | list[radical.pilot.PilotManager]

get_resource_config(resource, schema=None)[source]

Returns a dictionary of the requested resource config.

get_task_managers(tmgr_uids=None)[source]

Get known TaskManager(s).

Parameters

tmgr_uids (str | list[str]) – uids of the TaskManagers we want

Returns

One or more radical.pilot.TaskManager objects.

Return type

radical.pilot.TaskManager | list[radical.pilot.TaskManager]

inject_metadata(metadata)[source]

Insert (experiment) metadata into an active session.

RP stack version info always get added.

list_pilot_managers()[source]

Get PilotManager instances.

Lists the unique identifiers of all radical.pilot.PilotManager instances associated with this session.

Returns

A list of radical.pilot.PilotManager uids.

Return type

list[str]

list_resources()[source]

Get list of known resource labels.

Returns a list of known resource labels which can be used in a pilot description.

list_task_managers()[source]

Get TaskManager identifiers.

Lists the unique identifiers of all radical.pilot.TaskManager instances associated with this session.

Returns

A list of radical.pilot.TaskManager uids.

Return type

list[str]

Pilots and PilotManagers

PilotManagers

class radical.pilot.PilotManager(session, cfg='default')[source]

Manage Pilot instances.

A PilotManager manages rp.Pilot instances that are submitted via the radical.pilot.PilotManager.submit_pilots() method. It is possible to attach one or more HPC resources to a PilotManager to outsource machine specific configuration parameters to an external configuration file.

Example:

s = rp.Session()

pm = rp.PilotManager(session=s)

pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cpus = 16

p1 = pm.submit_pilots(pd)  # create first  pilot with 16 cores
p2 = pm.submit_pilots(pd)  # create second pilot with 16 cores

# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
    t = rp.TaskDescription()
    t.executable = "/bin/sleep"
    t.arguments = ['60']
    tasks.append(t)

# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)

The pilot manager can issue notification on pilot state changes. Whenever state notification arrives, any callback registered for that notification is fired.

Note

State notifications can arrive out of order wrt the pilot state model!

__init__(session, cfg='default')[source]

Creates a new PilotManager and attaches is to the session.

Parameters
  • session (rp.Session) – The session instance to use.

  • uid (str) – ID for pilot manager, to be used for reconnect

  • cfg (dict, str) – The configuration or name of configuration to use.

Returns

A new PilotManager object.

Return type

rp.PilotManager

as_dict()[source]

Returns a dictionary representation of the PilotManager object.

cancel_pilots(uids=None, _timeout=None)[source]

Cancel one or more rp.Pilots.

Parameters

uids (str | list[str], optional) – The IDs of the pilots to cancel.

close(terminate=True)[source]

Shut down the PilotManager and all its components.

Parameters

terminate (bool) –

cancel non-final pilots if True (default)

Note

Pilots cannot be reconnected to after termination.

control_cb(topic, msg)[source]

This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.

get_pilots(uids=None)[source]

Returns one or more pilots identified by their IDs.

Parameters

uids (str | list[str]) – The IDs of the pilot objects to return.

Returns

A list of rp.Pilot objects.

Return type

list

kill_pilots(uids=None, _timeout=None)[source]

Kill one or more rp.Pilots

Parameters

uids (str | list[str], optional) – The IDs of the pilots to cancel.

list_pilots()[source]

Get the UIDs of the managed rp.Pilots.

Returns

A list of rp.Pilot UIDs.

Return type

list[str]

register_callback(cb, cb_data=None, metric='PILOT_STATE')[source]

Registers a new callback function with the PilotManager.

Manager-level callbacks get called if the specified metric changes. The default metric PILOT_STATE fires the callback if any of the Pilots managed by the PilotManager change their state.

All callback functions need to have the same signature:

def cb(obj, value, cb_data)

where object is a handle to the object that triggered the callback, value is the metric, and data is the data provided on callback registration.. In the example of PILOT_STATE above, the object would be the pilot in question, and the value would be the new state of the pilot.

Available metrics are

  • PILOT_STATE: fires when the state of any of the pilots which are

    managed by this pilot manager instance is changing. It communicates the pilot object instance and the pilots new state.

submit_pilots(descriptions)[source]

Submit one or more rp.Pilot instances to the pilot manager.

Parameters

descriptions (rp.PilotDescription | list[rp.PilotDescription]) – The description of the pilot instance(s) to create.

Returns

A list of rp.Pilot objects.

Return type

list[rp.Pilot]

property uid

The unique id.

Type

str

wait_pilots(uids=None, state=None, timeout=None)[source]

Block for state transition.

Returns when one or more rp.Pilots reach a specific state.

If pilot_uids is None, wait_pilots returns when all Pilots reach the state defined in state. This may include pilots which have previously terminated or waited upon.

Example

# TODO – add example

Parameters
  • uids (str | list[str], optional) – If set, only the Pilots with the specified uids are considered. If None (default), all Pilots are considered.

  • state (str | list[str]) –

    The state that Pilots have to reach in order for the call to return.

    By default wait_pilots waits for the Pilots to reach a terminal state, which can be one of the following: * rp.rps.DONE * rp.rps.FAILED * rp.rps.CANCELED

  • timeout (float, optional) – Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.

PilotDescription

class radical.pilot.PilotDescription(from_dict=None)[source]

Specify a requested Pilot.

A PilotDescription object describes the requirements and properties of a radical.pilot.Pilot and is passed as a parameter to radical.pilot.PilotManager.submit_pilots() to instantiate and run a new pilot.

Note

A PilotDescription MUST define at least resource, cores or nodes, and runtime.

Example::

pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.PilotDescription() pd.resource = “local.localhost” pd.cores = 16 pd.runtime = 5 # minutes

pilot = pm.submit_pilots(pd)

uid

A unique ID for the pilot. A unique ID will be assigned by RP if the field is not set.

Type

str, optional

job_name

The name of the job / pilot as provided to the batch system. If not set then uid will be used instead.

Type

str, optional

resource

The key of a platform description entry. If the key exists, the machine-specific configuration is loaded from the config file once the PilotDescription is passed to radical.pilot.PilotManager.submit_pilots(). If the key doesn’t exist, an exception ValueError is raised.

Type

str

access_schema

The key of an access mechanism to use. The valid access mechanism is defined in the resource configuration. See Configuration System. The first schema defined in the resource configuration is used by default, if no access_schema is specified.

Type

str, optional

runtime

The maximum run time (wall-clock time) in minutes of the pilot. Default 10.

Type

int, optional

sandbox

The working (“sandbox”) directory of the pilot agent. This parameter is optional and if not set, it defaults to radical.pilot.sandbox in your home or login directory. Default None.

Warning

If you define a pilot on an HPC cluster and you want to set sandbox manually, make sure that it points to a directory on a shared filesystem that can be reached from all compute nodes.

Type

str, optional

nodes

The number of nodes the pilot should allocate on the target resource. This parameter could be set instead of cores and gpus (and memory). Default 1.

Note

If nodes is specified, gpus and cores must not be specified.

Type

int, optional

cores

The number of cores the pilot should allocate on the target resource. This parameter could be set instead of nodes.

Note

For local pilots, you can set a number larger than the physical machine limit (corresponding resource configuration should have the attribute “fake_resources”).

Note

If cores is specified, nodes must not be specified.

Type

int, optional

gpus

The number of gpus the pilot should allocate on the target resource.

Note

If gpus is specified, nodes must not be specified.

Type

int, optional

memory

The total amount of physical memory the pilot (and related to it job) requires.

Type

int, optional

queue

The name of the job queue the pilot should get submitted to. If queue is set in the resource configuration (resource), defining queue will override it explicitly.

Type

str, optional

project

The name of the project / allocation to charge for used CPU time. If project is set in the resource configuration (resource), defining project will override it explicitly.

Type

str, optional

candidate_hosts

The list of host names where this pilot is allowed to start on.

Type

list[str], optional

app_comm

The list of names is interpreted as communication channels to start within the pilot agent, for the purpose of application communication, i.e., that tasks running on that pilot are able to use those channels to communicate amongst each other.

The names are expected to end in _queue or _pubsub, indicating the type of channel to create. Once created, tasks will find environment variables of the name RP_%s_IN and RP_%s_OUT, where %s is replaced with the given channel name (uppercased), and IN/OUT indicate the respective endpoint addresses for the created channels.

Type

list[str], optional

input_staging

The list of files to be staged into the pilot sandbox.

Type

list, optional

output_staging

The list of files to be staged from the pilot sandbox.

Type

list, optional

cleanup

If cleanup is set to True, the pilot will delete its entire sandbox upon termination. This includes individual Task sandboxes and all generated output data. Only log files will remain in the sandbox directory. Default False.

Type

bool, optional

exit_on_error

Flag to trigger app termination in case of the pilot failure. Default True.

Type

bool, optional

services

A list of commands which get started on a separate service compute node right after bootstrapping, and before any RP task is launched. That service compute node will not be used for any other tasks.

Type

list[TaskDescription], optional

layout

Point to a json file or an explicit (dict) description of the pilot layout: number and size of partitions and their configuration. Default “default”.

Type

str | dict, optional

Pilots

class radical.pilot.Pilot(pmgr: PilotManager, descr)[source]

Represent a resource overlay on a local or remote resource.

Note

A Pilot cannot be created directly. The factory method radical.pilot.PilotManager.submit_pilots() has to be used instead.

Example:

pm = radical.pilot.PilotManager(session=s)
pd = radical.pilot.PilotDescription()

pd.resource = "local.localhost"
pd.cores    = 2
pd.runtime  = 5 # minutes

pilot = pm.submit_pilots(pd)
as_dict()[source]

Dictionary representation.

Returns

a Python dictionary representation of the object.

Return type

dict

cancel()[source]

Cancel the pilot.

property description

The description the pilot was started with, as a dictionary.

Type

dict

property endpoint_fs

The URL which is internally used to access the target resource’s root file system.

Type

radical.utils.Url

property log

A list of human readable [timestamp, string] tuples describing various events during the pilot’s lifetime. Those strings are not normative, only informative!

Type

list[tuple]

property pilot_sandbox

The full sandbox URL of this pilot, if that is already known, or ‘None’ otherwise.

Type

str

property pmgr

The pilot’s manager.

Type

PilotManager

prepare_env(env_name, env_spec)[source]

Prepare a virtual environment.

Request the preparation of a task or worker environment on the target resource. This call will block until the env is created.

Parameters
  • env_name (str) – name of the environment to prepare.

  • env_spec (dict) –

    specification of the environment to prepare, like:

    {'type'    : 'venv',
     'version' : '3.7',
     'pre_exec': ['module load python'],
     'setup'   : ['radical.pilot==1.0', 'pandas']},
    
    {'type'    : 'conda',
     'version' : '3.8',
     'setup'   : ['numpy']}
    
    {'type'   : 'conda',
     'version': '3.8',
     'path'   : '/path/to/ve',
     'setup'  : ['numpy']}
    

    where the type specifies the environment type, version specifies the Python version to deploy, and setup specifies how the environment is to be prepared. If path is specified the env will be created at that path. If path is not specified, RP will place the named env in the pilot sandbox (under env/named_env_name). If a VE exists at that path, it will be used as is (an update is not performed). pre_exec commands are executed before env creation and setup are attempted.

Note

The optional version specifier is only interpreted up to minor version, subminor and less are ignored.

register_callback(cb, metric='PILOT_STATE', cb_data=None)[source]

Add callback for state changes.

Registers a callback function that is triggered every time the pilot’s state changes.

All callback functions need to have the same signature:

def cb(obj, state)

where obj is a handle to the object that triggered the callback and state is the new state of that object. If cb_data is given, then the cb signature changes to

def cb(obj, state, cb_data)

and cb_data are passed along.

property resource

The resource tag of this pilot.

Type

str

property resource_details

agent level resource information.

Type

dict

property resource_sandbox

The full URL of the path that RP considers the resource sandbox, i.e., the sandbox on the target resource’s file system which is shared by all sessions which access that resource.

Type

radical.utils.Url

property resources

The amount of resources used by this pilot.

Type

str

rpc(cmd, *args, rpc_addr=None, **kwargs)[source]

Remote procedure call.

Send am RPC command and arguments to the pilot and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.

property session

The pilot’s session.

Type

Session

property session_sandbox

The full URL of the path that RP considers the session sandbox on the target resource’s file system which is shared by all pilots which access that resource in the current session.

Type

radical.utils.Url

stage_in(sds)[source]

Stage files “in”.

Stages the content of the staging_directives to the pilot sandbox.

Please note the documentation of radical.pilot.staging_directives.complete_url() for details on path and sandbox semantics.

stage_out(sds=None)[source]

Stage data “out”.

Fetches the content of the staging_directives from the pilot sandbox.

Please note the documentation of radical.pilot.staging_directives.complete_url() for details on path and sandbox semantics.

property state

The current state of the pilot.

Type

str

property stderr

A snapshot of the pilot’s STDERR stream.

If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property stdout

A snapshot of the pilot’s STDOUT stream.

If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property uid

The pilot’s unique identifier within a PilotManager.

Type

str

wait(state=None, timeout=None)[source]

Block for state change.

Returns when the pilot reaches a specific state or when an optional timeout is reached.

Parameters
  • state (list[str]) –

    The state(s) that pilot has to reach in order for the call to return.

    By default wait waits for the pilot to reach a final state, which can be one of the following:

    • radical.pilot.states.DONE

    • radical.pilot.states.FAILED

    • radical.pilot.states.CANCELED

  • timeout (float) – Optional timeout in seconds before the call returns regardless whether the pilot has reached the desired state or not. The default value None never times out.

Tasks and TaskManagers

TaskManager

class radical.pilot.TaskManager(session, cfg='default', scheduler=None)[source]

A TaskManager manages radical.pilot.Task instances which represent the executable workload in RADICAL-Pilot. A TaskManager connects the Tasks with one or more Pilot instances (which represent the workload executors in RADICAL-Pilot) and a scheduler which determines which Task gets executed on which Pilot.

Example:

s = rp.Session()

pm = rp.PilotManager(session=s)

pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cores = 16

p1 = pm.submit_pilots(pd) # create first pilot with 16 cores
p2 = pm.submit_pilots(pd) # create second pilot with 16 cores

# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
    t = rp.TaskDescription()
    t.executable = "/bin/sleep"
    t.arguments = ['60']
    tasks.append(t)

# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)

The task manager can issue notification on task state changes. Whenever state notification arrives, any callback registered for that notification is fired.

Note

State notifications can arrive out of order wrt the task state model!

__init__(session, cfg='default', scheduler=None)[source]

Create a new TaskManager and attaches it to the session.

Parameters
  • session (radical.pilot.Session) – The session instance to use.

  • cfg (dict | str) – The configuration or name of configuration to use.

  • scheduler (str) – The name of the scheduler plug-in to use.

  • uid (str) – ID for unit manager, to be used for reconnect

Returns

A new TaskManager object.

Return type

radical.pilot.TaskManager

add_pilots(pilots)[source]

Associates one or more pilots with the task manager.

Parameters

pilots (radical.pilot.Pilot | list[radical.pilot.Pilot]) – The pilot objects that will be added to the task manager.

as_dict()[source]

Returns a dictionary representation of the TaskManager object.

cancel_tasks(uids=None)[source]

Cancel one or more radical.pilot.Task s.

Note that cancellation of tasks is immediate, i.e. their state is immediately set to CANCELED, even if some RP component may still operate on the tasks. Specifically, other state transitions, including other final states (DONE, FAILED) can occur after cancellation. This is a side effect of an optimization: we consider this acceptable tradeoff in the sense “Oh, that task was DONE at point of cancellation – ok, we can use the results, sure!”.

If that behavior is not wanted, set the environment variable:

export RADICAL_PILOT_STRICT_CANCEL=True
Parameters

uids (str | list[str]) – The IDs of the tasks to cancel.

cancel_units(uids=None)[source]

Cancel one or more radical.pilot.Task s.

Deprecated since version 1.5.12: use cancel_tasks()

close()[source]

Shut down the TaskManager and all its components.

control_cb(topic, msg)[source]

This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.

get_pilots()[source]

Get the pilot instances currently associated with the task manager.

Returns

A list of radical.pilot.Pilot instances.

Return type

list[radical.pilot.Pilot]

get_tasks(uids=None)[source]

Get one or more tasks identified by their IDs.

Parameters

uids (str | list[str]) – The IDs of the task objects to return.

Returns

A list of radical.pilot.Task objects.

Return type

list[radical.pilot.Task]

get_units(uids=None)[source]

Get one or more tasks identified by their IDs.

Deprecated since version 1.5.12: use get_tasks()

list_pilots()[source]

Lists the UIDs of the pilots currently associated with the task manager.

Returns

A list of radical.pilot.Pilot UIDs.

Return type

list[str]

list_tasks()[source]

Get the UIDs of the tasks managed by this task manager.

Returns

A list of radical.pilot.Task UIDs.

Return type

list[str]

list_units()[source]

Get Task UIDs.

Deprecated since version 1.5.12: use list_tasks()

pilot_rpc(pid, cmd, *args, rpc_addr=None, **kwargs)[source]

Remote procedure call.

Send an RPC command and arguments to the pilot and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.

register_callback(cb, cb_data=None, metric=None, uid=None)[source]

Registers a new callback function with the TaskManager.

Manager-level callbacks get called if the specified metric changes. The default metric TASK_STATE fires the callback if any of the Tasks managed by the PilotManager change their state.

All callback functions need to have the same signature:

def cb(obj, value) -> None:
    ...

where obj is a handle to the object that triggered the callback, value is the metric, and data is the data provided on callback registration.. In the example of TASK_STATE above, the object would be the task in question, and the value would be the new state of the task.

If cb_data is given, then the cb signature changes to

def cb(obj, state, cb_data) -> None:
    ...

and cb_data are passed unchanged.

If uid is given, the callback will invoked only for the specified task.

Available metrics are

  • TASK_STATE: fires when the state of any of the tasks which are managed by this task manager instance is changing. It communicates the task object instance and the tasks new state.

  • WAIT_QUEUE_SIZE: fires when the number of unscheduled tasks (i.e. of tasks which have not been assigned to a pilot for execution) changes.

remove_pilots(pilot_ids, drain=False)[source]

Disassociates one or more pilots from the task manager.

After a pilot has been removed from a task manager, it won’t process any of the task manager’s tasks anymore. Calling remove_pilots doesn’t stop the pilot itself.

Parameters

drain (bool) – Drain determines what happens to the tasks which are managed by the removed pilot(s). If True, all tasks currently assigned to the pilot are allowed to finish execution. If False (the default), then non-final tasks will be canceled.

property scheduler

The scheduler name.

Type

str

submit_raptors(descriptions, pilot_id=None)[source]

Submit RAPTOR master tasks.

Submits on or more radical.pilot.TaskDescription instances to the task manager, where the TaskDescriptions have the mode RAPTOR_MASTER set.

This is a thin wrapper around submit_tasks.

Parameters

descriptions – (radical.pilot.TaskDescription) description of the workers to submit.

Returns

A list of radical.pilot.Task

objects.

Return type

list[radical.pilot.Task]

submit_tasks(descriptions)[source]

Submit tasks for execution.

Submits one or more radical.pilot.Task instances to the task manager.

Parameters

list (descriptions (radical.pilot.TaskDescription |) – [radical.pilot.TaskDescription]): The description of the task instance(s) to create.

Returns

A list of radical.pilot.Task

objects.

Return type

list[radical.pilot.Task]

submit_units(descriptions)[source]

Submit tasks for execution.

Deprecated since version 1.5.12: use submit_tasks()

submit_workers(descriptions)[source]

Submit RAPTOR workers.

Submits on or more radical.pilot.TaskDescription instances to the task manager, where the TaskDescriptions have the mode RAPTOR_WORKER set.

This method is a thin wrapper around submit_tasks.

Parameters

descriptions – (radical.pilot.TaskDescription) description of the workers to submit.

Returns

A list of radical.pilot.Task

objects.

Return type

list[radical.pilot.Task]

property uid

The unique id.

Type

str

wait_tasks(uids=None, state=None, timeout=None)[source]

Block for state transition.

Returns when one or more radical.pilot.Tasks reach a specific state.

If uids is None, wait_tasks returns when all Tasks reach the state defined in state. This may include tasks which have previously terminated or waited upon.

Example:

# TODO -- add example
Parameters
  • uids (str | list[str]) – If uids is set, only the Tasks with the specified uids are considered. If uids is None (default), all Tasks are considered.

  • state (str) –

    The state that Tasks have to reach in order for the call to return.

    By default wait_tasks waits for the Tasks to reach a terminal state, which can be one of the following.

    • radical.pilot.rps.DONE

    • radical.pilot.rps.FAILED

    • radical.pilot.rps.CANCELED

  • timeout (float) – Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.

wait_units(uids=None, state=None, timeout=None)[source]

Block for state transition.

Deprecated since version 1.5.12: use wait_tasks()

TaskDescription

class radical.pilot.TaskDescription(from_dict=None)[source]

Describe the requirements and properties of a Task.

A TaskDescription object describes the requirements and properties of a radical.pilot.Task and is passed as a parameter to radical.pilot.TaskManager.submit_tasks() to instantiate and run a new task.

uid

A unique ID for the task. This attribute is optional, a unique ID will be assigned by RP if the field is not set.

Type

str, optional

name

A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads.

Type

str, optional

mode

The execution mode to be used for this task. Default “executable”. The following modes are accepted.

  • TASK_EXECUTABLE: the task is spawned as an external executable via a resource specific launch method (srun, aprun, mpiexec, etc).

    • required attributes: executable

    • related attributes: arguments

  • TASK_FUNCTION: the task references a python function to be called.

    • required attributes: function

    • related attributes: args

    • related attributes: kwargs

  • TASK_METHOD: the task references a raptor worker method to be called.

    • required attributes: method

    • related attributes: args

    • related attributes: kwargs

  • TASK_EVAL: the task is a code snippet to be evaluated.

    • required attributes: code

  • TASK_EXEC: the task is a code snippet to be exec’ed.

    • required attributes: code

  • TASK_SHELL: the task is a shell command line to be run.

    • required attributes: command

  • TASK_PROC: the task is a single core process to be executed.

    • required attributes: executable

    • related attributes: arguments

  • TASK_RAPTOR_MASTER: the task references a raptor master to be

    instantiated.

    • related attributes: raptor_file

    • related attributes: raptor_class

  • TASK_RAPTOR_WORKER: the task references a raptor worker to be

    instantiated.

    • related attributes: raptor_file

    • related attributes: raptor_class

There is a certain overlap between TASK_EXECUTABLE, TASK_SHELL and TASK_PROC modes. As a general rule, TASK_SHELL and TASK_PROC should be used for short running tasks which require a single core and no additional resources (gpus, storage, memory). TASK_EXECUTABLE should be used for all other tasks and is in fact the default. TASK_SHELL should only be used if the command to be run requires shell specific functionality (e.g., pipes, I/O redirection) which cannot easily be mapped to other task attributes.

TASK_RAPTOR_MASTER and TASK_RAPTOR_WORKER are special types of tasks that define RAPTOR’s master(s) and worker(s) components and their resource requirements. They are launched by the Agent on one or more nodes, depending on their requirements.

Type

str, optional

executable

The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.

Type

str

arguments

The command line arguments for the given executable (list of strings).

Type

list[str]

code

The code to run. This field is expected to contain valid python code which is executed when the task mode is TASK_EXEC or TASK_EVAL.

Type

str

function

The function to run. This field is expected to contain a python function name which can be resolved in the scope of the respective RP worker implementation (see documentation there). The task mode must be set to TASK_FUNCTION. args and kwargs are passed as function parameters.

Type

str

args

Positional arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.

Type

list, optional

kwargs

Named arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.

Type

dict, optional

command

A shell command to be executed. This attribute is used for the TASK_SHELL mode.

Type

str

use_mpi

flag if the task should be provided an MPI communicator. Defaults to True if more than 1 rank is requested (see ranks), otherwise defaults to False. Set this to True if you want to enfoce an MPI communicator on single-ranked tasks.

Type

bool, optional

ranks

The number of application processes to start on CPU cores. Default 1.

For two ranks or more, an MPI communicator will be available to the processes.

ranks replaces the deprecated attribute cpu_processes. The attribute cpu_process_type was previously used to signal the need for an MPI communicator - that attribute is now also deprecated and will be ignored.

Type

int, optional

cores_per_rank

The number of cpu cores each process will have available to start its own threads or processes on. By default, core refers to a physical CPU core - but if the pilot has been launched with SMT-settings > 1, core will refer to a virtual core or hyperthread instead (the name depends on the CPU vendor).

cores_per_rank replaces the deprecated attribute cpu_threads.

Type

int, optional

threading_type

The thread type, influences startup and environment (<empty>/POSIX, OpenMP).

threading_type replaces the deprecated attribute cpu_thread_type.

Type

str, optional

gpus_per_rank

The number of gpus made available to each rank. If gpu is shared among several ranks, then a fraction of gpu should be provided (e.g., 2 ranks share a GPU, then gpus_per_rank=.5).

gpus_per_rank replaces the deprecated attribute gpu_processes. The attributes gpu_threads and gpu_process_type are also deprecated and will be ignored.

Type

float, optional

gpu_type

The type of GPU environment to provide to the ranks (<empty>, CUDA, ROCm).

gpu_type replaces the deprecated attribute gpu_thread_type.

Type

str, optional

lfs_per_rank

Local File Storage per rank - amount of data (MB) required on the local file system of the node.

lfs_per_rank replaces the deprecated attribute lfs_per_process.

Type

int, optional

mem_per_rank

Amount of physical memory required per rank.

mem_per_rank replaces the deprecated attribute mem_per_process.

Type

int, optional

environment

Environment variables to set in the environment before the execution (launching picked LaunchMethod).

Type

dict, optional

named_env

A named virtual environment as prepared by the pilot. The task will remain in AGENT_SCHEDULING state until that environment gets created.

Type

str, optional

sandbox

This specifies the working directory of the task. It will be created if it does not exist. By default, the sandbox has the name of the task’s uid and is relative to the pilot’s sandbox.

Type

str, optional

stdout

The name of the file to store stdout. If not set then uid.out will be used.

Type

str, optional

stderr

The name of the file to store stderr. If not set then uid.err will be used.

Type

str, optional

input_staging

The files that need to be staged before the execution (list of staging directives, see below).

Type

list, optional

output_staging

The files that need to be staged after the execution (list of staging directives, see below).

Type

list, optional

stage_on_error

Output staging is normally skipped on FAILED or CANCELED tasks, but if this flag is set, staging is attempted either way. This may though lead to additional errors if the tasks did not manage to produce the expected output files to stage. Default False.

Type

bool, optional

pre_launch

Actions (shell commands) to perform before launching (i.e., before LaunchMethod is submitted), potentially on a batch node which is different from the node the task is placed on.

Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of CPU time or other resources! Deviating from that rule will likely result in reduced overall throughput.

Type

list, optional

post_launch

Actions (shell commands) to perform after launching (i.e., after LaunchMethod is executed).

Precautions are the same as for pre_launch actions.

Type

list, optional

pre_exec

Actions (shell commands) to perform before the task starts (LaunchMethod is submitted, but no actual task running yet). Each item could be either a string (str), which represents an action applied to all ranks, or a dictionary (dict), which represents a list of actions applied to specified ranks (key is a rankID and value is a list of actions to be performed for this rank).

The actions/commands are executed on the respective nodes where the ranks are placed, and the actual rank startup will be delayed until all pre_exec commands have completed.

Precautions are the same as for pre_launch actions.

No assumption should be made as to where these commands are executed (although RP attempts to perform them in the task’s execution environment).

No assumption should be made on the specific shell environment the commands are executed in other than a POSIX shell environment.

Errors in executing these commands will result in the task to enter FAILED state, and no execution of the actual workload will be attempted.

Type

list, optional

pre_exec_sync

Flag indicates necessary to sync ranks execution, which enforce to delay individual rank execution, until all pre_exec actions for all ranks are completed. Default False.

Type

bool, optional

post_exec

Actions (shell commands) to perform after the task finishes. The same remarks as on pre_exec apply, inclusive the point on error handling, which again will cause the task to fail, even if the actual execution was successful.

Type

list, optional

restartable

If the task starts to execute on a pilot, but cannot finish because the pilot fails or is canceled, the task can be restarted. Default False.

Type

bool, optional

tags

Configuration specific tags, which influence task scheduling and execution (e.g., tasks co-location).

Type

dict, optional

scheduler

deprecated in favor of raptor_id.

Type

str, optional

raptor_id

Raptor master ID this task is associated with.

Type

str, optional

worker_class

deprecated in favor of raptor_class master or worker task.

Type

str, optional

raptor_class

Class name to instantiate for this Raptor master or worker task.

Type

str, optional

worker_file

deprecated in favor of raptor_class.

Type

str, optional

raptor_file

Optional application supplied Python source file to load raptor_class from.

Type

str, optional

metadata

User defined metadata. Default None.

Type

Any, optional

timeout

Any timeout larger than 0 will result in the task process to be killed after the specified amount of seconds. The task will then end up in CANCELED state.

Type

float, optional

cleanup

If cleanup flag is set, the pilot will delete the entire task sandbox upon termination. This includes all generated output data in that sandbox. Output staging will be performed before cleanup. Note that task sandboxes are also deleted if the pilot’s own cleanup flag is set. Default False.

Type

bool, optional

pilot

Pilot uid. If specified, the task is submitted to the pilot with the given ID. If that pilot is not known to the TaskManager, an exception is raised.

Type

str, optional

Task Ranks

The notion of ranks is central to RP’s TaskDescription class. We here use the same notion as MPI, in that the number of ranks refers to the number of individual processes to be spawned by the task execution backend. These processes will be near-exact copies of each other: they run in the same workdir and the same environment, are defined by the same executable and arguments, get the same amount of resources allocated, etc. Notable exceptions are:

  • Rank processes may run on different nodes;

  • rank processes can communicate via MPI;

  • each rank process obtains a unique rank ID.

It is up to the underlying MPI implementation to determine the exact value of the process’ rank ID. The MPI implementation may also set a number of additional environment variables for each process.

It is important to understand that only applications which make use of MPI should have more than one rank – otherwise identical copies of the same application instance are launched which will compute the same results, thus wasting resources for all ranks but one. Worse: I/O-routines of these non-MPI ranks can interfere with each other and invalidate those results.

Also, applications with a single rank cannot make effective use of MPI— depending on the specific resource configuration, RP may launch those tasks without providing an MPI communicator.

Task Environment

RP tasks are expected to be executed in isolation, meaning that their runtime environment is completely independent from the environment of other tasks, independent from the launch mechanism used to start the task, and also independent from the environment of the RP stack itself.

The task description provides several hooks to help setting up the environment in that context. It is important to understand the way those hooks interact with respect to the environments mentioned above.

  • pre_launch directives are set and executed before the task is passed on to the task launch method. As such, pre_launch usually executed on the node where RP’s agent is running, and not on the tasks target node. Executing pre_launch directives for many tasks can thus negatively impact RP’s performance (*). Note also that pre_launch directives can in some cases interfere with the launch method.

    Use pre_launch directives for rare, heavy-weight operations which prepare the runtime environment for multiple tasks: fetch data from a remote source, unpack input data, create global communication channels, etc.

  • pre_exec directives are set and executed after the launch method placed the task on the compute nodes and are thus running on the target node. Note that for MPI tasks, the pre_exec directives are executed once per rank. Running large numbers of pre_exec directives concurrently can lead to system performance degradation (*), for example when those directives concurrently hot the shared files system (for loading modules or Python virtualenvs etc).

    Use pre_exec directives for task environment setup such as module load, virtualenv activate, export whose effects are expected to be applied either to all task ranks or to specified ranks. Avoid file staging operations at this point (files would be redundantly staged multiple times - once per rank).

(*) The performance impact of repeated concurrent access to the system’s shared file system can be significant and can pose a major bottleneck for your application. Specifically module load and virtualenv activate operations and the like are heavy on file system I/O, and executing those for many tasks is ill advised. Having said that: RP attempts to optimize those operations: if it identifies that identical pre_exec directives are shared between multiple tasks, RP will execute the directives exactly once and will cache the resulting environment settings - those cached settings are then applied to all other tasks with the same directives, without executing the directives again.

Staging Directives

The Staging Directives are specified using a dict in the following form:

staging_directive = {
   'source'  : None, # see 'Location' below
   'target'  : None, # see 'Location' below
   'action'  : None, # See 'Action operators' below
   'flags'   : None, # See 'Flags' below
   'priority': 0     # Control ordering of actions (unused)
}

Locations

source and target locations can be given as strings or ru.Url instances. Strings containing :// are converted into URLs immediately. Otherwise, they are considered absolute or relative paths and are then interpreted in the context of the client’s working directory.

Special URL schemas

  • client:// : relative to the client’s working directory

  • resource:// : relative to the RP sandbox on the target resource

  • pilot:// : relative to the pilot sandbox on the target resource

  • task:// : relative to the task sandbox on the target resource

In all these cases, the hostname element of the URL is expected to be empty, and the path is always considered relative to the locations specified above (even though URLs usually don’t have a notion of relative paths).

For more details on path and sandbox handling check the documentation of radical.pilot.staging_directives.complete_url().

Action operators

  • rp.TRANSFER : remote file transfer from source URL to target URL

  • rp.COPY : local file copy, i.e., not crossing host boundaries

  • rp.MOVE : local file move

  • rp.LINK : local file symlink

Flags

  • rp.CREATE_PARENTS: create the directory hierarchy for targets on the fly

  • rp.RECURSIVE: if source is a directory, handles it recursively

Task

class radical.pilot.Task(tmgr, descr, origin)[source]

Represent a ‘task’ that is executed on a Pilot.

Tasks allow to control and query the state of this task.

Note

A task cannot be created directly. The factory method rp.TaskManager.submit_tasks() has to be used instead.

Example:

tmgr = rp.TaskManager(session=s)

ud = rp.TaskDescription()
ud.executable = "/bin/date"

task = tmgr.submit_tasks(ud)
as_dict()[source]

Returns a Python dictionary representation of the object.

cancel()[source]

Cancel the task.

property client_sandbox

The full URL of the client sandbox, which is usually the same as the current working directory of the Python script in which the RP Session is instantiated.

Note that the URL may not be usable to access that sandbox from another machine: RP in general knows nothing about available access endpoints on the local host.

Type

radical.utils.Url

property description

The description the task was started with, as a dictionary.

Type

dict

property endpoint_fs

The URL which is internally used to access the target resource’s root file system.

Type

radical.utils.Url

property exception

A string representation (__repr__) of the exception which caused the task’s FAILED state if such one was raised while managing or executing the task.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.

Type

str

property exception_detail

Additional information about the exception which caused this task to enter FAILED state.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.

Type

str

property exit_code

The exit code of the task, if that is already known, or ‘None’ otherwise.

Type

int

property metadata

The metadata field of the task’s description.

property mode

The task mode.

Type

str

property name

The task’s application specified name.

Type

str

property origin

Indicates where the task was created.

Type

str

property pilot

The pilot ID of this task, if that is already known, or ‘None’ otherwise.

Type

str

property pilot_sandbox

The full URL of the path that RP considers the pilot sandbox on the target resource’s file system which is shared by all tasks which are executed by that pilot.

Type

radical.utils.Url

register_callback(cb, cb_data=None, metric=None)[source]

Add a state-change callback.

Registers a callback function that is triggered every time a task’s state changes.

All callback functions need to have the same signature:

def cb(obj, state) -> None:
    ...

where obj is a handle to the object that triggered the callback and state is the new state of that object. If cb_data is given, then the cb signature changes to

def cb(obj, state, cb_data) -> None:
    ...

and cb_data are passed unchanged.

property resource_sandbox

The full URL of the path that RP considers the resource sandbox, i.e., the sandbox on the target resource’s file system that is shared by all sessions which access that resource.

Type

radical.utils.Url

property return_value

The return value for tasks which represent function call (or None otherwise).

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.

Type

Any

property sandbox

An alias for task_sandbox.

Type

str

property session

The task’s session.

Type

radical.pilot.Session

property session_sandbox

The full URL of the path that RP considers the session sandbox on the target resource’s file system which is shared by all pilots which access that resource in the current session.

Type

radical.utils.Url

property state

The current state of the task.

Type

str

property stderr

A snapshot of the executable’s STDERR stream.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property stdout

A snapshot of the executable’s STDOUT stream.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.

Warning

This can be inefficient. Output may be incomplete and/or filtered.

Type

str

property task_sandbox

The full sandbox URL of this task, if that is already known, or ‘None’ otherwise.

Type

radical.utils.Url

property tmgr

The task’s manager.

Type

radical.pilot.TaskManager

property uid

The task’s unique identifier within a TaskManager.

Type

str

wait(state=None, timeout=None)[source]

Block for state change.

Returns when the task reaches a specific state or when an optional timeout is reached.

Parameters
  • state (str | list[str], optional) –

    The state(s) that task has to reach in order for the call to return.

    By default wait waits for the task to reach a final state, which can be one of the following.

    • rp.states.DONE

    • rp.states.FAILED

    • rp.states.CANCELED

  • timeout (float, optional) – Optional timeout in seconds before the call returns regardless whether the task has reached the desired state or not. The default value None never times out.

Raptor

class radical.pilot.raptor.Master(cfg: Optional[Config] = None)[source]

Raptor Master class

The rp.raptor.Master instantiates and orchestrates a set of workers which are used to rapidly and efficiently execute function tasks. As such the raptor master acts as an RP executor: it hooks into the RP agent communication channels to receive tasks from the RP agent scheduler in order to execute them. Once completed tasks are pushed toward the agent output staging component and will then continue their life cycle as all other tasks.

alive()[source]

check if the main work thread of this master is running

control_cb(topic, msg)[source]

listen for worker_register, worker_unregister, worker_rank_heartbeat and rpc_req messages.

join()[source]

wait until the main work thread of this master completes

request_cb(tasks)[source]

A raptor master implementation can overload this cb to filter all newly submitted tasks: it recieves a list of tasks and returns a potentially different list of tasks which are then executed. It is up to the master implementation to ensure proper state transition for any tasks which are passed as argument but are not returned by the call and thus are not submitted for execution.

Parameters

tasks ([List[Dict[str, ANY]]) – list of tasks which this master received for execution

Returns

possibly different list of tasks than

received

Return type

tasks ([List[Dict[str, ANY]])

result_cb(tasks)[source]

A raptor master implementation can overload this cb which get’s called when raptor tasks complete execution.

Parameters

tasks ([List[Dict[str, ANY]]) – list of tasks which this master executed

start()[source]

start the main work thread of this master

stop()[source]

stop the main work thread of this master

submit_tasks(tasks) None[source]

submit a list of tasks to the task queue We expect to get either TaskDescription instances which will then get converted into task dictionaries and pushed out, or we get task dictionaries which are used as is. Either way, self.request_cb will be called for all tasks submitted here.

Parameters

tasks (List[TaskDescription]) – description of tasks to be submitted

submit_workers(descriptions: List[TaskDescription]) List[str][source]

Submit a raptor workers per given descriptions element and pass the queue raptor info as configuration file. Do not wait for the workers to come up - they are expected to register via the control channel.

The task descriptions specifically support the following keys:

  • raptor_class: str, type name of worker class to execute

  • raptor_filestr, optional

    Module file from which raptor_class may be imported, if a custom RP worker class is used

Note that only one worker rank (presumably rank 0) should register with the master - the workers are expected to synchronize their ranks as needed.

Parameters

descriptions (List[TaskDescription]) – a list of worker descriptions

Returns

list of uids for submitted worker tasks

Return type

List[str]

terminate()[source]

terminate all workers and the master’s own work loop.

wait_workers(count=None, uids=None)[source]

Wait for n workers, or for workers with given UID, or for all workers to become available, then return. A worker is considered available when it registered with this master and get’s its status flag set to ACTIVE.

Parameters
  • count (int) – number of workers to wait for

  • uids (List[str]) – set of worker UIDs to wait for

worker_state_cb(worker_dict, state)[source]

This callback can be overloaded - it will be invoked whenever the master receives a state update for a worker it is connected to.

Parameters
  • worker_dict (Dict[str, Any]) – a task dictionary representing the worker whose state was updated

  • state (str) – new state of the worker

property workers

task dictionaries representing all currently registered workers

class radical.pilot.raptor.Worker(manager, rank, raptor_id)[source]

Implement the Raptor protocol for dispatching multiple Tasks on persistent resources.

get_dispatcher(name)[source]

Query a registered execution mode.

Parameters

name (str) – name of execution mode to query for

Returns

the dispatcher method for that execution mode

Return type

Callable

get_master()[source]

The worker can submit tasks back to the master - this method will return a small shim class to provide that capability. That class has a single method run_task which accepts a single rp.TaskDescription from which a rp.Task is created and executed. The call then waits for the task’s completion before returning it in a dict representation, the same as when passed to the master’s result_cb.

Note: the run_task call is running in a separate thread and will thus

not block the master’s progress.

Returns

a shim class with only one method: run_task(td) where

td is a TaskDescription to run.

Return type

Master

join()[source]

Wait until the worker’s main work loop completed.

register_mode(name, dispatcher) None[source]

Register a new task execution mode that this worker can handle. The specified dispatcher callable should accept a single argument: the task to execute.

Parameters
  • name (str) – name of the mode to register

  • dispatcher (callable) – function which implements the execution mode

start()[source]

Start the workers main work loop.

stop()[source]

Signal the workers to stop the main work loop.

A radical.pilot.Task managing a radical.pilot.raptor.Master instance is created using radical.pilot.TaskDescription.mode rp.RAPTOR_MASTER, or through submit_raptors(). The object returned to the client is a Task subclass with additional features.

class radical.pilot.raptor_tasks.Raptor(tmgr, descr, origin)[source]

RAPTOR (‘RAPid Task executOR’) is a task executor which, other than other RADICAL-Pilot executors can handle function tasks.

A Raptor must be submitted to a pilot. It will be associated with RaptorWorker instances on that pilot and use those workers to rapidly execute tasks. Raptors excel at high throughput execution for large numbers of short running tasks. However, they have limited capabilities with respect to managing task data dependencies, multinode tasks, MPI executables, and tasks with heterogeneous resource requirements.

rpc(cmd, *args, **kwargs)[source]

Send a raptor command, wait for the response, and return the result.

Parameters
  • rpc (str) – name of the rpc call to invoke

  • *args (*List[Any]) – unnamed arguments

  • **kwargs (**Dict[str, Any])) – named arguments

Returns

the returned dictionary has the following fields:
  • out: captured standard output

  • err: captured standard error

  • ret: return value of the call (can be any serializable type)

  • exc: tuple of exception type (str) and error message (str)

Return type

Dict[str, Any]

submit_tasks(descriptions: List[TaskDescription]) List[Task][source]

Submit a set of tasks to this Raptor instance.

Parameters

descriptions (List[TaskDescription]) – ;aunch a raptor worker for each provided description.

Returns

a list of rp.Task instances, one for each task.

Return type

List[Tasks]

The tasks might not get executed until a worker is available for this Raptor instance.

submit_workers(descriptions: List[TaskDescription]) List[Task][source]

Submit a set of workers for this Raptor instance.

Parameters

descriptions (List[TaskDescription]) – ;aunch a raptor worker for each provided description.

Returns

a list of rp.Task instances, one for each created

worker task

Return type

List[Tasks]

The method will return immediately without waiting for actual task instantiation. The submitted tasks will operate solely on behalf of the Raptor master this method is being called on.

Utilities and helpers

class radical.pilot.agent.scheduler.base.AgentSchedulingComponent(cfg, session)[source]
control_cb(topic, msg)[source]

listen on the control channel for raptor queue registration commands

slot_status(msg=None, uid=None)[source]

Returns a multi-line string corresponding to the status of the node list

unschedule_cb(topic, msg)[source]

release (for whatever reason) all slots allocated to this task

work(tasks)[source]

This is the main callback of the component, which is called for any incoming (set of) task(s). Tasks arriving here must always be in AGENT_SCHEDULING_PENDING state, and must always leave in either AGENT_EXECUTING_PENDING or in a FINAL state (FAILED or CANCELED). While handled by this component, the tasks will be in AGENT_SCHEDULING state.

This method takes care of initial state change to AGENT_SCHEDULING, and then puts them forward onto the queue towards the actual scheduling process (self._schedule_tasks).

class radical.pilot.agent.scheduler.continuous.Continuous(cfg, session)[source]

The Continuous scheduler attempts to place threads and processes of a tasks onto nodes in the cluster.

_configure()[source]

Configure this scheduler instance

  • scattered: This is the continuous scheduler, because it attempts to allocate a continuous set of cores/nodes for a task. It does, however, also allow to scatter the allocation over discontinuous nodes if this option is set. This implementation is not optimized for the scattered mode! The default is ‘False’.

_find_resources(node, find_slots, ranks_per_slot, cores_per_slot, gpus_per_slot, lfs_per_slot, mem_per_slot, partial)[source]

Find up to the requested number of slots, where each slot features the respective x_per_slot resources. This call will return a list of slots of the following structure:

{

‘node_name’: ‘node_name’, ‘node_id’ : ‘1’, ‘core_map’ : [[1, 2, 4, 5], [6, 7, 8, 9]], ‘gpu_map’ : [[1, 3], [1, 3]], ‘lfs’ : 1234, ‘mem’ : 4321

}

The call will not change the allocation status of the node, atomicity must be guaranteed by the caller.

We don’t care about continuity within a single node - cores [1, 5] are assumed to be as close together as cores [1, 2].

When partial is set, the method CAN return less than find_slots number of slots - otherwise, the method returns the requested number of slots or None.

FIXME: SMT handling: we should assume that hardware threads of the same

physical core cannot host different executables, so HW threads can only account for thread placement, not process placement. This might best be realized by internally handling SMT as minimal thread count and using physical core IDs for process placement?

_iterate_nodes()[source]

The scheduler iterates through the node list for each task placement. However, we want to avoid starting from node zero every time as tasks have likely placed on that node previously - instead, we in general want to pick off where the last task placement succeeded. This iterator is preserving that state.

Note that the first index is yielded twice, so that the respective node can function as first and last node in an allocation.

schedule_task(task)[source]

Find an available set of slots, potentially across node boundaries (in the MPI case).

A slot is here considered the amount of resources required by a single MPI rank. Those resources need to be available on a single node - but slots can be distributed across multiple nodes. Resources for non-MPI tasks will always need to be placed on a single node.

By default, we only allow for partial allocations on the first and last node - but all intermediate nodes MUST be completely used (this is the ‘CONTINUOUS’ scheduler after all).

If the scheduler is configured with scattered=True, then that constraint is relaxed, and any set of slots (be it continuous across nodes or not) is accepted as valid allocation.

No matter the mode, we always make sure that we allocate slots in chunks of cores, gpus, lfs and mem required per process - otherwise the application processes would not be able to acquire the requested resources on the respective node.

unschedule_task(tasks)[source]

This method is called when previously acquired resources are not needed anymore. slots are the resource slots as previously returned by schedule_task().

class radical.pilot.utils.component.AgentComponent(cfg, session)[source]
advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=True, prof=True)[source]

Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.

  • things: list of things to advance

  • state: new state to set for the things

  • publish: determine if state update notifications should be issued

  • push: determine if things should be pushed to outputs

  • fwd: determine if notifications are forarded to the ZMQ bridge

  • prof: determine if state advance creates a profile event (publish, and push are always profiled)

‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.

If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.

This is evaluated in self.publish.

class radical.pilot.utils.component.BaseComponent(cfg, session)[source]

This class provides the basic structure for any RP component which operates on stateful things. It provides means to:

  • define input channels on which to receive new things in certain states

  • define work methods which operate on the things to advance their state

  • define output channels to which to send the things after working on them

  • define notification channels over which messages with other components can be exchanged (publish/subscriber channels)

All low level communication is handled by the base class. Deriving classes will register the respective channels, valid state transitions, and work methods. When a ‘thing’ is received, the component is assumed to have full ownership over it, and that no other component will change the ‘thing’s state during that time.

The main event loop of the component – work() – is executed on run() and will not terminate on its own, unless it encounters a fatal error.

Components inheriting this class should attempt not to use shared resources. That will ensure that multiple instances of the component can coexist for higher overall system throughput. Should access to shared resources be necessary, it will require some locking mechanism across process boundaries.

This approach should ensure that

  • ‘thing’s are always in a well defined state;

  • components are simple and focus on the semantics of ‘thing’ state

    progression;

  • no state races can occur on ‘thing’ state progression;

  • only valid state transitions can be enacted (given correct declaration

    of the component’s semantics);

  • the overall system is performant and scalable.

Inheriting classes SHOULD overload the following methods:

  • initialize():
    • set up the component state for operation

    • register input/output/notification channels

    • register work methods

    • register callbacks to be invoked on state notification

    • the component will terminate if this method raises an exception.

  • work()

  • called in the main loop of the component process, on all entities

    arriving on input channels. The component will not terminate if this method raises an exception. For termination, terminate() must be called.

  • finalize()
    • tear down the component (close threads, unregister resources, etc).

Inheriting classes MUST call the constructor:

class StagingComponent(rpu.BaseComponent):
    def __init__(self, cfg, session):
        super().__init__(cfg, session)

A component thus must be passed a configuration (either as a path pointing to a file name to be opened as ru.Config, or as a pre-populated ru.Config instance). That config MUST contain a session ID (sid) for the session under which to run this component, and a uid for the component itself which MUST be unique within the scope of the given session.

All components and the component managers will continuously sent heartbeat messages on the control pubsub - missing heartbeats will by default lead to component termination.

Further, the class must implement the registered work methods, with a signature of:

work(self, things)

The method is expected to change the state of the ‘thing’s given. ‘Thing’s will not be pushed to outgoing channels automatically – to do so, the work method has to call (see call documentation for other options):

self.advance(thing)

Until that method is called, the component is considered the sole owner of the ‘thing’s. After that method is called, the ‘thing’s are considered disowned by the component. If, however, components return from the work methods without calling advance on the given ‘thing’s, then the component keeps ownership of the ‘thing’s to advance it asynchronously at a later point in time. That implies that a component can collect ownership over an arbitrary number of ‘thing’s over time, and they can be advanced at the component’s discretion.

The component process is a stand-alone daemon process which runs outside of Python’s multiprocessing domain. As such, it can freely use Python’s multithreading (and it extensively does so by default) - but developers should be aware that spawning additional processes in this component is discouraged, as Python’s process management is not playing well with it’s multithreading implementation.

advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=False, prof=True)[source]

Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.

  • things: list of things to advance

  • state: new state to set for the things

  • publish: determine if state update notifications should be issued

  • push: determine if things should be pushed to outputs

  • fwd: determine if notifications are forarded to the ZMQ bridge

  • prof: determine if state advance creates a profile event (publish, and push are always profiled)

‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.

If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.

This is evaluated in self.publish.

control_cb(topic, msg)[source]

This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.

get_input_ep(qname)[source]

return an input endpoint

get_output_ep(qname)[source]

return an output endpoint

output(things, state=None)[source]

this pushes the given things to the output queue register for the given state

publish(pubsub, msg, topic=None)[source]

push information into a publication channel

register_input(states, queue, cb=None, qname=None, path=None)[source]

Using this method, the component can be connected to a queue on which things are received to be worked upon. The given set of states (which can be a single state or a list of states) will trigger an assert check upon thing arrival.

This method will further associate a thing state with a specific worker callback cb. Upon thing arrival, the thing state will be used to lookup the respective worker, and the thing will be handed over. Workers should call self.advance(thing), in order to push the thing toward the next component. If, for some reason, that is not possible before the worker returns, the component will retain ownership of the thing, and should call advance() asynchronously at a later point in time.

Worker invocation is synchronous, ie. the main event loop will only check for the next thing once the worker method returns.

register_output(states, qname)[source]

Using this method, the component can be connected to a queue to which things are sent after being worked upon. The given set of states (which can be a single state or a list of states) will trigger an assert check upon thing departure.

If a state but no output is specified, we assume that the state is final, and the thing is then considered ‘dropped’ on calling advance() on it. The advance() will trigger a state notification though, and then mark the drop in the log. No other component should ever again work on such a final thing. It is the responsibility of the component to make sure that the thing is in fact in a final state.

register_publisher(pubsub)[source]

Using this method, the component can registered itself to be a publisher of notifications on the given pubsub channel.

register_subscriber(pubsub, cb)[source]

This method is complementary to the register_publisher() above: it registers a subscription to a pubsub channel. If a notification is received on thag channel, the registered callback will be invoked. The callback MUST have one of the signatures:

callback(topic, msg)

where ‘topic’ is set to the name of the pubsub channel.

The subscription will be handled in a separate thread, which implies that the callback invocation will also happen in that thread. It is the caller’s responsibility to ensure thread safety during callback invocation.

register_timed_cb(cb, cb_data=None, timer=None)[source]

Idle callbacks are invoked at regular intervals – they are guaranteed to not be called more frequently than ‘timer’ seconds, no promise is made on a minimal call frequency. The intent for these callbacks is to run lightweight work in semi-regular intervals.

rpc(cmd, *args, rpc_addr=None, **kwargs)[source]

Remote procedure call.

Send am RPC command and arguments to the control pubsub and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.

stop()[source]

We need to terminate and join all threads, close all communication channels, etc. But we trust on the correct invocation of the finalizers to do all this, and thus here only forward the stop request to the base class.

unregister_input(states, qname, worker)[source]

This methods is the inverse to the ‘register_input()’ method.

unregister_output(states)[source]

this removes any outputs registerd for the given states.

unregister_timed_cb(cb)[source]

This method is reverts the register_timed_cb() above: it removes an idler from the component, and will terminate the respective thread.

work_cb()[source]

This is the main routine of the component, as it runs in the component process. It will first initialize the component in the process context. Then it will attempt to get new things from all input queues (round-robin). For each thing received, it will route that thing to the respective worker method. Once the thing is worked upon, the next attempt on getting a thing is up.

class radical.pilot.utils.component.ClientComponent(cfg, session)[source]
advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=False, prof=True)[source]

Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.

  • things: list of things to advance

  • state: new state to set for the things

  • publish: determine if state update notifications should be issued

  • push: determine if things should be pushed to outputs

  • fwd: determine if notifications are forarded to the ZMQ bridge

  • prof: determine if state advance creates a profile event (publish, and push are always profiled)

‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.

If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.

This is evaluated in self.publish.

radical.pilot.utils.prof_utils.get_consumed_resources(session, rtype='cpu', tdurations=None)[source]

For all ra.pilot or ra.task entities, return the amount and time of resources consumed. A consumed resource is characterized by:

  • a resource type (we know about cores and gpus)

  • a metric name (what the resource was used for)

  • a list of 4-tuples of the form: [t0, t1, r0, r1]

The tuples are formed so that t0 to t1 and r0 to r1 are continuous:

  • t0: time, begin of resource consumption

  • t1: time, begin of resource consumption

  • r0: int, index of resources consumed (min)

  • r1: int, index of resources consumed (max)

An entity can consume different resources under different metrics, but the returned consumption specs will never overlap. Thus, any resource is accounted for exactly one metric at any point in time. The returned structure has the following overall form:

{
  'metric_1' : {
      uid_1 : [[t0, t1, r0, r1],
               [t2, t3, r2, r3],
               ...
              ],
      uid_2 : ...
  },
  'metric_2' : ...
}
radical.pilot.utils.prof_utils.get_hostmap(profile)[source]

We abuse the profile combination to also derive a pilot-host map, which will tell us on what exact host each pilot has been running. To do so, we check for the PMGR_ACTIVE advance event in agent_0.prof, and use the NTP sync info to associate a hostname.

radical.pilot.utils.prof_utils.get_hostmap_deprecated(profiles)[source]

This method mangles combine_profiles and get_hostmap, and is deprecated. At this point it only returns the hostmap

radical.pilot.utils.prof_utils.get_provided_resources(session, rtype='cpu')[source]

For all ra.pilots, return the amount and time of the type of resources provided. This computes sets of 4-tuples of the form: [t0, t1, r0, r1] where:

t0: time, begin of resource provision t1: time, begin of resource provision r0: int, index of resources provided (min) r1: int, index of resources provided (max)

The tuples are formed so that t0 to t1 and r0 to r1 are continuous.

radical.pilot.utils.prof_utils.get_resource_timelines(task, transitions)[source]

For each specific task, return a set of tuples of the form:

[start, stop, metric]

which reports what metric has been used during what time span.

radical.pilot.utils.prof_utils.get_session_description(sid, src=None)[source]

This will return a description which is usable for radical.analytics evaluation. It informs about:

  • set of stateful entities

  • state models of those entities

  • event models of those entities (maybe)

  • configuration of the application / module

If src is given, it is interpreted as path to search for session information (json dump). src defaults to $PWD/$sid.

The serializer should be able to (de)serialize information that we want to send over the wire from the client side to the agent side via 1- ZMQ 2- MongoDB

we except:

1- Callables with and without dependecies. 2- Non-callables like classes and other python objects

radical.pilot.utils.serializer.deserialize_file(fname)[source]

Deserialize object from file

radical.pilot.utils.serializer.deserialize_obj(data)[source]

Deserialize object from str.

radical.pilot.utils.serializer.serialize_file(obj, fname=None)[source]

serialize object to file

radical.pilot.utils.serializer.serialize_obj(obj)[source]

serialize object

radical.pilot.utils.session.fetch_filetype(ext, name, sid, src=None, tgt=None, access=None, skip_existing=False, fetch_client=False, log=None, rep=None)[source]
Parameters
  • ext (-) – file extension to fetch

  • name (-) – full name of filetype for log messages etc

  • sid (-) – session for which all files are fetched

  • src (-) – dir to look for client session files ($src/$sid/*.ext)

  • tgt (-) – dir to store the files in ($tgt/$sid/*.ext, $tgt/$sid/$pid/*.ext)

Returns

list of file names (fetched and/or cached)

Return type

list[str]

radical.pilot.utils.misc.create_tar(tgt: str, dnames: List[str]) None[source]

Create a tarball on the file system which contains all given directories

radical.pilot.utils.misc.get_resource_config(resource: str) Union[None, Config][source]

For the given resource label, return the resource configuration used by radical.pilot.

Parameters

resource (str) – resource label for which to return the cfg

Returns

the resource configuration

The method returns None if no resource config is found for the specified resource label.

Return type

radical.utils.Config

radical.pilot.utils.misc.get_resource_configs() Config[source]

Return all resource configurations used by radical.pilot.

Configurations for the individual resources are organized as sites and resources:

  • cfgs = get_resource_configs()

  • sites = cfgs.keys()

  • for site in sites: resource_names = cfgs[site].keys()

Returns

the resource configurations

Return type

radical.utils.Config

radical.pilot.utils.misc.get_resource_fs_url(resource: str, schema: Optional[str] = None) Union[None, Url][source]

For the given resource label, return the contact URL of the resource’s file system. This corresponds to the filesystem_endpoint setting in the resource config.

For example, rs.filesystem.directory(get_resource_fs_url(…)).change_dir(‘/’) is equivalent to the base endpoint:/// URL available for use in a staging_directive.

Parameters
  • resource (str) – resource label for which to return the url

  • schema (str, optional) – access schema to use for resource access. Defaults to the default access schema as defined in the resources config files.

Returns

the file system URL

The method returns None if no resource config is found for the specified resource label and access schema.

Return type

radical.utils.Url

radical.pilot.utils.misc.get_resource_job_url(resource: str, schema: Optional[str] = None) Union[None, Url][source]

For the given resource label, return the contact URL of the resource’s job manager.

Parameters
  • resource (str) – resource label for which to return the url

  • schema (str, optional) – access schema to use for resource access. Defaults to the default access schema as defined in the resources config files.

Returns

the job manager URL

The method returns None if no resource config is found for the specified resource label and access schema.

Return type

radical.utils.Url