5. API Reference

5.1. Sessions and Security Contexts

5.1.1. Sessions

class radical.pilot.Session(dburl=None, uid=None, cfg=None, _primary=True, **close_options)[source]

A Session is the root object of all RP objects in an application instance: it holds radical.pilot.PilotManager and radical.pilot.TaskManager instances which in turn hold radical.pilot.Pilot and radical.pilot.Task instances, and several other components which operate on those stateful entities.

__init__(dburl=None, uid=None, cfg=None, _primary=True, **close_options)[source]

Creates a new session. A new Session instance is created and stored in the database.

Arguments:
  • dburl (string): The MongoDB URL. If none is given, RP uses the environment variable RADICAL_PILOT_DBURL. If that is not set, an error will be raised.
  • cfg (str or dict): a named or instantiated configuration to be used for the session.
  • uid (string): Create a session with this UID. Session UIDs MUST be unique - otherwise they will lead to conflicts in the underlying database, resulting in undefined behaviours (or worse).
  • _primary (bool): only sessions created by the original application process (via rp.Session(), will connect to the DB. Secondary session instances are instantiated internally in processes spawned (directly or indirectly) by the initial session, for example in some of it’s components. A secondary session will inherit the original session ID, but will not attempt to create a new DB collection - if such a DB connection is needed, the component needs to establish that on its own.

If additional key word arguments are provided, they will be used as the default arguments to Session.close(). (This can be useful when the Session is used as a Python context manager, such that close() is called automatically at the end of a with block.)

as_dict()[source]

Returns a Python dictionary representation of the object.

close(**kwargs)[source]

Closes the session. All subsequent attempts access objects attached to the session will result in an error. If cleanup is set to True, the session data is removed from the database.

Arguments:
  • cleanup (bool): Remove session from MongoDB (implies * terminate)
  • terminate (bool): Shut down all pilots associated with the session.
  • download (bool): Fetch pilot profiles and database entries.
closed

Returns the time of closing

connected

Return time when the session connected to the DB

created

Returns the UTC date and time the session was created.

get_pilot_managers(pmgr_uids=None)[source]

returns known PilotManager(s).

Arguments:

  • pmgr_uids [string]: unique identifier of the PilotManager we want
Returns:
get_resource_config(resource, schema=None)[source]

Returns a dictionary of the requested resource config

get_task_managers(tmgr_uids=None)[source]

returns known TaskManager(s).

Arguments:

  • tmgr_uids [string]: unique identifier of the TaskManager we want
Returns:
inject_metadata(metadata)[source]

Insert (experiment) metadata into an active session RP stack version info always get added.

list_pilot_managers()[source]

Lists the unique identifiers of all radical.pilot.PilotManager instances associated with this session.

Returns:
list_resources()[source]

Returns a list of known resource labels which can be used in a pilot description.

list_task_managers()[source]

Lists the unique identifiers of all radical.pilot.TaskManager instances associated with this session.

Returns:

5.1.2. Security Contexts

class radical.pilot.Context(ctype, thedict=None)[source]
__init__(ctype, thedict=None)[source]

ctype: string ret: None

classmethod from_dict(thedict)[source]

Creates a new object instance from a string. c._from_dict(x.as_dict) == x

5.2. Pilots and PilotManagers

5.2.1. PilotManagers

class radical.pilot.PilotManager(session, cfg='default')[source]

A PilotManager manages rp.Pilot instances that are submitted via the radical.pilot.PilotManager.submit_pilots() method.

It is possible to attach one or more Using Local and Remote HPC Resources to a PilotManager to outsource machine specific configuration parameters to an external configuration file.

Example:

s = rp.Session(database_url=DBURL)

pm = rp.PilotManager(session=s)

pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cpus = 16

p1 = pm.submit_pilots(pd)  # create first  pilot with 16 cores
p2 = pm.submit_pilots(pd)  # create second pilot with 16 cores

# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
    t = rp.TaskDescription()
    t.executable = "/bin/sleep"
    t.arguments = ['60']
    tasks.append(t)

# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)

The pilot manager can issue notification on pilot state changes. Whenever state notification arrives, any callback registered for that notification is fired.

NOTE: State notifications can arrive out of order wrt the pilot state model!

__init__(session, cfg='default')[source]

Creates a new PilotManager and attaches is to the session.

Arguments:
  • session [rp.Session]: The session instance to use.
  • cfg (dict or string): The configuration or name of configuration to use.
Returns:
  • A new PilotManager object [rp.PilotManager].
as_dict()[source]

Returns a dictionary representation of the PilotManager object.

cancel_pilots(uids=None, _timeout=None)[source]

Cancel one or more rp.Pilots.

Arguments:
  • uids [string or list of strings]: The IDs of the pilot objects to cancel.
close(terminate=True)[source]

Shut down the PilotManager and all its components.

Arguments:
  • terminate [bool]: cancel non-final pilots if True (default)
get_pilots(uids=None)[source]

Returns one or more pilots identified by their IDs.

Arguments:
  • uids [string or list of strings]: The IDs of the pilot objects to return.
Returns:
  • A list of rp.Pilot objects.
list_pilots()[source]

Returns the UIDs of the rp.Pilots managed by this pilot manager.

Returns:
  • A list of rp.Pilot UIDs [string].
register_callback(cb, cb_data=None, metric='PILOT_STATE')[source]

Registers a new callback function with the PilotManager. Manager-level callbacks get called if the specified metric changes. The default metric PILOT_STATE fires the callback if any of the Pilots managed by the PilotManager change their state.

All callback functions need to have the same signature:

def cb(obj, value, cb_data)

where object is a handle to the object that triggered the callback, value is the metric, and data is the data provided on callback registration.. In the example of PILOT_STATE above, the object would be the pilot in question, and the value would be the new state of the pilot.

Available metrics are:

  • PILOT_STATE: fires when the state of any of the pilots which are managed by this pilot manager instance is changing. It communicates the pilot object instance and the pilots new state.
submit_pilots(descriptions)[source]

Submits on or more rp.Pilot instances to the pilot manager.

Arguments:
  • descriptions [rp.PilotDescription or list of rp.PilotDescription]: The description of the pilot instance(s) to create.
Returns:
  • A list of rp.Pilot objects.
uid

Returns the unique id.

wait_pilots(uids=None, state=None, timeout=None)[source]

Returns when one or more rp.Pilots reach a specific state.

If pilot_uids is None, wait_pilots returns when all Pilots reach the state defined in state. This may include pilots which have previously terminated or waited upon.

Example:

# TODO -- add example

Arguments:

  • pilot_uids [string or list of strings] If pilot_uids is set, only the Pilots with the specified uids are considered. If pilot_uids is None (default), all Pilots are considered.

  • state [string] The state that Pilots have to reach in order for the call to return.

    By default wait_pilots waits for the Pilots to reach a terminal state, which can be one of the following:

    • rp.rps.DONE
    • rp.rps.FAILED
    • rp.rps.CANCELED
  • timeout [float] Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.

5.2.2. PilotDescription

class radical.pilot.PilotDescription(from_dict=None)[source]

A PilotDescription object describes the requirements and properties of a radical.pilot.Pilot and is passed as a parameter to radical.pilot.PilotManager.submit_pilots() to instantiate and run a new pilot.

Note

A PilotDescription MUST define at least resource, cores and runtime.

Example:

pm = radical.pilot.PilotManager(session=s)
pd = radical.pilot.PilotDescription()
pd.resource = "local.localhost"
pd.cores    = 16
pd.runtime  = 5 # minutes

pilot = pm.submit_pilots(pd)
uid

[type: str | default: None] A unique ID for the pilot. This attribute is optional, a unique ID will be assigned by RP if the field is not set.

job_name

[type: str | default: None] The name of the job / pilot, which will be provided to radical.saga.job.Description. If not set then uid will be used instead.

resource

[type: str | default: None] [mandatory] The key of a Using Local and Remote HPC Resources entry. If the key exists, the machine-specific configuration is loaded from the config file once the PilotDescription is passed to radical.pilot.PilotManager.submit_pilots(). If the key doesn’t exist, an exception ValueError is raised.

access_schema

[type: str | default: None] The key of an access mechanism to use. The valid access mechanism is defined in the resource configuration, see Using Local and Remote HPC Resources. The first one defined there is used by default, if no other is specified.

runtime

[type: int | default: 10] [mandatory] The maximum run time (wall-clock time) in minutes of the pilot.

sandbox

[type: str | default: None] The working (“sandbox”) directory of the pilot agent. This parameter is optional and if not set, it defaults to radical.pilot.sandbox in your home or login directory.

Warning

If you define a pilot on an HPC cluster and you want to set sandbox manually, make sure that it points to a directory on a shared filesystem that can be reached from all compute nodes.

nodes

[type: int | default: 0] [NOT IN USE] The number of nodes the pilot should allocate on the target resource. This parameter is optional and could be set instead of cores and gpus (and memory).

Note

Either cores or nodes must be specified. If nodes are specified, gpus must not be specified.

cores

[type: int | default: 1] [mandatory] The number of cores the pilot should allocate on the target resource.

Note

For local pilots, you can set a number larger than the physical machine limit when setting RADICAL_PILOT_PROFILE in your environment (corresponding resource configuration should have the attribute “fake_resources”).

gpus

[type: int | default: 0] The number of gpus the pilot should allocate on the target resource.

memory

[type: int | default: 0] The total amount of physical memory the pilot (and related to it job) requires. This parameter translates into TotalPhysicalMemory at radical.saga.job.Description.

queue

[type: str | default: None] The name of the job queue the pilot should get submitted to. If queue is set in the resource configuration (resource), defining queue will override it explicitly.

project

[type: str | default: None] The name of the project / allocation to charge for used CPU time. If project is set in the resource configuration (resource), defining project will override it explicitly.

candidate_hosts

[type: list | default: []] The list of host names where this pilot is allowed to start on.

app_comm

[type: list | default: []] The list of names is interpreted as communication channels to start within the pilot agent, for the purpose of application communication, i.e., that tasks running on that pilot are able to use those channels to communicate amongst each other.

The names are expected to end in _queue or _pubsub, indicating the type of channel to create. Once created, tasks will find environment variables of the name RP_%s_IN and RP_%s_OUT, where %s is replaced with the given channel name (uppercased), and IN/OUT indicate the respective endpoint addresses for the created channels

input_staging

[type: list | default: []] The list of files to be staged into the pilot sandbox.

output_staging

[type: list | default: []] The list of files to be staged from the pilot sandbox.

cleanup

[type: bool | default: False] If cleanup is set to True, the pilot will delete its entire sandbox upon termination. This includes individual Task sandboxes and all generated output data. Only log files will remain in the sandbox directory.

exit_on_error

[type: bool | default: True] Flag to trigger app termination in case of the pilot failure.

services

[Type: [str] | default: []] [optional] A list of commands which get started on a separate service compute node right after bootstrapping, and before any RP task is launched. That service compute node will not be used for any other tasks.

layout

[type: str or dict | default: “default”] Point to a json file or an explicit (dict) description of the pilot layout: number and size of partitions and their configuration.

prepare_env

[type: dict | default: {}] Specification of task environments to be prepared by the pilot. The parameter is expected to be a dictionary of the form:

{
   'env_1' : {'type'   : 'virtualenv',
              'version': '3.6',
              'setup'  : ['radical.pilot==1.0', 'pandas']},
   'env_N' : {'type'   : 'conda',
              'version': '3.8',
              'setup'  : ['numpy']}
}

where the type specifies the environment type, version specifies the env version to deploy, and setup specifies how the environment is to be prepared.

At this point, the implementation only accepts virtualenv type requests, where version specifies the Python version to use, and setup is expected to be a list of module specifiers which need to be installed into the environment.

5.2.3. Pilots

class radical.pilot.Pilot(pmgr, descr)[source]

A Pilot represent a resource overlay on a local or remote resource.

Note

A Pilot cannot be created directly. The factory method radical.pilot.PilotManager.submit_pilots() has to be used instead.

Example:

pm = radical.pilot.PilotManager(session=s)
pd = radical.pilot.PilotDescription()

pd.resource = "local.localhost"
pd.cores    = 2
pd.runtime  = 5 # minutes

pilot = pm.submit_pilots(pd)
as_dict()[source]

Returns a Python dictionary representation of the object.

cancel()[source]

Cancel the pilot.

description

Returns the description the pilot was started with, as a dictionary.

Returns:
  • description (dict)
log

Returns a list of human readable [timestamp, string] tuples describing various events during the pilot’s lifetime. Those strings are not normative, only informative!

Returns:
  • log (list of [timestamp, string] tuples)
pilot_sandbox

Returns the full sandbox URL of this pilot, if that is already known, or ‘None’ otherwise.

Returns:
  • A string
pmgr

Returns the pilot’s manager.

Returns:
prepare_env(env_spec)[source]

request the preparation of a task or worker environment on the target resource. This call will return immediately, and the request will be enacted asynchronously. Any task or worker depending on the named environment will be delayed until the env preparation completed, or will fail if the env preparation failed.

Format: see PilotDescription

register_callback(cb, metric='PILOT_STATE', cb_data=None)[source]

Registers a callback function that is triggered every time the pilot’s state changes.

All callback functions need to have the same signature:

def cb(obj, state)

where object is a handle to the object that triggered the callback and state is the new state of that object. If ‘cb_data’ is given, then the ‘cb’ signature changes to

def cb(obj, state, cb_data)

and ‘cb_data’ are passed along.

resource

Returns the resource tag of this pilot.

Returns:
  • A resource tag (string)
resource_details

Returns agent level resource information

rpc(rpc, args)[source]

Send a pilot command, wait for the response, and return the result. This is basically an RPC into the pilot.

session

Returns the pilot’s session.

Returns:
stage_in(sds)[source]

Stages the content of the staging_directives into the pilot’s staging area.

stage_out(sds=None)[source]

Fetch files (default:staging_output.tgz) from the pilot sandbox.

See radical.pilot.staging_directives.

state

Returns the current state of the pilot.

Returns:
  • state (string enum)
stderr

Returns a snapshot of the pilot’s STDERR stream.

If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.

Returns:
  • stderr (string)
stdout

Returns a snapshot of the pilot’s STDOUT stream.

If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.

Returns:
  • stdout (string)
uid

Returns the pilot’s unique identifier.

The uid identifies the pilot within a PilotManager.

Returns:
  • A unique identifier (string).
wait(state=None, timeout=None)[source]

Returns when the pilot reaches a specific state or when an optional timeout is reached.

Arguments:

  • state [list of strings] The state(s) that pilot has to reach in order for the call to return.

    By default wait waits for the pilot to reach a final state, which can be one of the following:

    • radical.pilot.states.DONE
    • radical.pilot.states.FAILED
    • radical.pilot.states.CANCELED
  • timeout [float] Optional timeout in seconds before the call returns regardless whether the pilot has reached the desired state or not. The default value None never times out.

5.3. Tasks and TaskManagers

5.3.1. TaskManager

class radical.pilot.TaskManager(session, cfg='default', scheduler=None)[source]

A TaskManager manages radical.pilot.Task instances which represent the executable workload in RADICAL-Pilot. A TaskManager connects the Tasks with one or more Pilot instances (which represent the workload executors in RADICAL-Pilot) and a scheduler which determines which Task gets executed on which Pilot.

Example:

s = rp.Session(database_url=DBURL)

pm = rp.PilotManager(session=s)

pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cores = 16

p1 = pm.submit_pilots(pd) # create first pilot with 16 cores
p2 = pm.submit_pilots(pd) # create second pilot with 16 cores

# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
    t = rp.TaskDescription()
    t.executable = "/bin/sleep"
    t.arguments = ['60']
    tasks.append(t)

# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)

The task manager can issue notification on task state changes. Whenever state notification arrives, any callback registered for that notification is fired.

NOTE: State notifications can arrive out of order wrt the task state model!

__init__(session, cfg='default', scheduler=None)[source]

Creates a new TaskManager and attaches it to the session.

Arguments:
  • session [radical.pilot.Session]: The session instance to use.
  • cfg (dict or string): The configuration or name of configuration to use.
  • scheduler (string): The name of the scheduler plug-in to use.
Returns:
add_pilots(pilots)[source]

Associates one or more pilots with the task manager.

Arguments:

as_dict()[source]

Returns a dictionary representation of the TaskManager object.

cancel_tasks(uids=None)[source]

Cancel one or more radical.pilot.Tasks.

Note that cancellation of tasks is immediate, i.e. their state is immediately set to CANCELED, even if some RP component may still operate on the tasks. Specifically, other state transitions, including other final states (DONE, FAILED) can occur after cancellation. This is a side effect of an optimization: we consider this acceptable tradeoff in the sense “Oh, that task was DONE at point of cancellation – ok, we can use the results, sure!”.

If that behavior is not wanted, set the environment variable:

export RADICAL_PILOT_STRICT_CANCEL=True
Arguments:
  • uids [string or list of strings]: The IDs of the tasks objects to cancel.
cancel_units(uids=None)[source]

deprecated - use cancel_tasks()

close()[source]

Shut down the TaskManager and all its components.

get_pilots()[source]

Get the pilots instances currently associated with the task manager.

Returns:
get_tasks(uids=None)[source]

Returns one or more tasks identified by their IDs.

Arguments:
  • uids [string or list of strings]: The IDs of the task objects to return.
Returns:
get_units(uids=None)[source]

deprecated - use get_tasks()

list_pilots()[source]

Lists the UIDs of the pilots currently associated with the task manager.

Returns:
list_tasks()[source]

Returns the UIDs of the radical.pilot.Task managed by this task manager.

Returns:
list_units()[source]

deprecated - use list_tasks()

register_callback(cb, cb_data=None, metric=None, uid=None)[source]

Registers a new callback function with the TaskManager. Manager-level callbacks get called if the specified metric changes. The default metric TASK_STATE fires the callback if any of the Tasks managed by the PilotManager change their state.

All callback functions need to have the same signature:

def cb(obj, value)

where object is a handle to the object that triggered the callback, value is the metric, and data is the data provided on callback registration.. In the example of TASK_STATE above, the object would be the task in question, and the value would be the new state of the task.

If ‘cb_data’ is given, then the ‘cb’ signature changes to

def cb(obj, state, cb_data)

and ‘cb_data’ are passed unchanged.

If ‘uid’ is given, the callback will invoked only for the specified task.

Available metrics are:

  • TASK_STATE: fires when the state of any of the tasks which are managed by this task manager instance is changing. It communicates the task object instance and the tasks new state.
  • WAIT_QUEUE_SIZE: fires when the number of unscheduled tasks (i.e. of tasks which have not been assigned to a pilot for execution) changes.
remove_pilots(pilot_ids, drain=False)[source]

Disassociates one or more pilots from the task manager.

After a pilot has been removed from a task manager, it won’t process any of the task manager’s tasks anymore. Calling remove_pilots doesn’t stop the pilot itself.

Arguments:

  • drain [boolean]: Drain determines what happens to the tasks which are managed by the removed pilot(s). If True, all tasks currently assigned to the pilot are allowed to finish execution. If False (the default), then non-final tasks will be canceled.
scheduler

Returns the scheduler name.

submit_tasks(descriptions)[source]

Submits on or more radical.pilot.Task instances to the task manager.

Arguments:
Returns:
submit_units(descriptions)[source]

deprecated - use submit_tasks()

uid

Returns the unique id.

wait_tasks(uids=None, state=None, timeout=None)[source]

Returns when one or more radical.pilot.Tasks reach a specific state.

If uids is None, wait_tasks returns when all Tasks reach the state defined in state. This may include tasks which have previously terminated or waited upon.

Example:

# TODO -- add example

Arguments:

  • uids [string or list of strings] If uids is set, only the Tasks with the specified uids are considered. If uids is None (default), all Tasks are considered.

  • state [string] The state that Tasks have to reach in order for the call to return.

    By default wait_tasks waits for the Tasks to reach a terminal state, which can be one of the following:

    • radical.pilot.rps.DONE
    • radical.pilot.rps.FAILED
    • radical.pilot.rps.CANCELED
  • timeout [float] Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.

wait_units(uids=None, state=None, timeout=None)[source]

deprecated - use wait_tasks()

5.3.2. TaskDescription

class radical.pilot.TaskDescription(from_dict=None)[source]

A TaskDescription object describes the requirements and properties of a radical.pilot.Task and is passed as a parameter to radical.pilot.TaskManager.submit_tasks() to instantiate and run a new task.

Note

A TaskDescription MUST define at least an executable or kernel – all other elements are optional.

uid

[type: str | default: “”] A unique ID for the task. This attribute is optional, a unique ID will be assigned by RP if the field is not set.

name

[type: str | default: “”] A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads.

executable

[type: str | default: “”] The executable to launch. The executable is expected to be either available via $PATH on the target resource, or to be an absolute path.

cpu_processes

[type: int | default: 1] The number of application processes to start on CPU cores.

cpu_threads

[type: int | default: 1] The number of threads each process will start on CPU cores.

cpu_process_type

[type: str | default: “”] The process type, determines startup method (<empty>/POSIX, MPI).

cpu_thread_type

[type: str | default: “”] The thread type, influences startup and environment (<empty>/POSIX, OpenMP).

gpu_processes

[type: int | default: 0] The number of application processes to start on GPU cores.

gpu_threads

[type: int | default: 1] The number of threads each process will start on GPU cores.

gpu_process_type

[type: str | default: “”] The process type, determines startup method (<empty>/POSIX, MPI).

gpu_thread_type

[type: str | default: “”] The thread type, influences startup and environment (<empty>/POSIX, OpenMP, CUDA).

lfs_per_process

[type: int | default: 0] Local File Storage per process - amount of data (MB) required on the local file system of the node.

mem_per_process

[type: int | default: 0] Amount of physical memory required per process.

arguments

[type: list | default: []] The command line arguments for the given executable (list of strings).

environment

[type: dict | default: {}] Environment variables to set in the environment before the execution (launching picked LaunchMethod).

named_env

[type: str | default: “”] A named virtual environment as prepared by the pilot. The task will remain in AGENT_SCHEDULING state until that environment gets created.

sandbox

[type: str | default: “”] This specifies the working directory of the task. That directory MUST be relative to the pilot sandbox. It will be created if it does not exist. By default, the sandbox has the name of the task’s uid.

stdout

[type: str | default: “”] The name of the file to store stdout. If not set then the name of the following format will be used: <uid>.out.

stderr

[type: str | default: “”] The name of the file to store stderr. If not set then the name of the following format will be used: <uid>.err.

input_staging

[type: list | default: []] The files that need to be staged before the execution (list of staging directives, see below).

output_staging

[type: list | default: []] The files that need to be staged after the execution (list of staging directives, see below).

stage_on_error

[type: bool | default: False] Output staging is normally skipped on FAILED or CANCELED tasks, but if this flag is set, staging is attempted either way. This may though lead to additional errors if the tasks did not manage to produce the expected output files to stage.

pre_exec

[type: list | default: []] Actions (shell commands) to perform before this task starts. Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of CPU time or other resources! Deviating from that rule will likely result in reduced overall throughput.

No assumption should be made as to where these commands are executed (although RP attempts to perform them in the task’s execution environment).

No assumption should be made on the specific shell environment the commands are executed in.

Errors in executing these commands will result in the task to enter FAILED state, and no execution of the actual workload will be attempted.

post_exec

[type: list | default: []] Actions (shell commands) to perform after this task finishes. The same remarks as on pre_exec apply, inclusive the point on error handling, which again will cause the task to fail, even if the actual execution was successful.

kernel

[type: str | default: “”] Name of a simulation kernel, which expands to description attributes once the task is scheduled to a pilot and resource. TODO: explain in detail, referencing EnTK.

restartable

[type: bool | default: False] If the task starts to execute on a pilot, but cannot finish because the pilot fails or is canceled, the task can be restarted.

tags

[type: dict | default: {}] Configuration specific tags, which influence task scheduling and execution (e.g., tasks co-location).

metadata

[type: ANY | default: None] User defined metadata.

cleanup

[type: bool | default: False] If cleanup flag is set, the pilot will delete the entire task sandbox upon termination. This includes all generated output data in that sandbox. Output staging will be performed before cleanup. Note that task sandboxes are also deleted if the pilot’s own cleanup flag is set.

pilot

[type: str | default: “”] Pilot uid, if specified, the task is submitted to the pilot with the given ID. If that pilot is not known to the TaskManager, an exception is raised.

The Staging Directives are specified using a dict in the following form:

staging_directive = {
   'source'  : None, # see 'Location' below
   'target'  : None, # see 'Location' below
   'action'  : None, # See 'Action operators' below
   'flags'   : None, # See 'Flags' below
   'priority': 0     # Control ordering of actions (unused)
}

source and target locations can be given as strings or ru.Url instances. Strings containing :// are converted into URLs immediately. Otherwise they are considered absolute or relative paths and are then interpreted in the context of the client’s working directory.

Special URL schemas:

  • client:// : relative to the client’s working directory
  • resource:// : relative to the RP sandbox on the target resource
  • pilot:// : relative to the pilot sandbox on the target resource
  • task:// : relative to the task sandbox on the target resource

In all these cases, the hostname element of the URL is expected to be empty, and the path is always considered relative to the locations specified above (even though URLs usually don’t have a notion of relative paths).

Action operators:

  • rp.TRANSFER : remote file transfer from source URL to target URL
  • rp.COPY : local file copy, i.e., not crossing host boundaries
  • rp.MOVE : local file move
  • rp.LINK : local file symlink

Flags:

  • rp.CREATE_PARENTS : create the directory hierarchy for targets on

the fly * rp.RECURSIVE : if source is a directory, handles it recursively

5.3.3. Task

class radical.pilot.Task(tmgr, descr)[source]

A Task represent a ‘task’ that is executed on a Pilot. Tasks allow to control and query the state of this task.

Note

A task cannot be created directly. The factory method rp.TaskManager.submit_tasks() has to be used instead.

Example:

tmgr = rp.TaskManager(session=s)

ud = rp.TaskDescription()
ud.executable = "/bin/date"

task = tmgr.submit_tasks(ud)
as_dict()[source]

Returns a Python dictionary representation of the object.

cancel()[source]

Cancel the task.

description

Returns the description the task was started with, as a dictionary.

Returns:
  • description (dict)
exit_code

Returns the exit code of the task, if that is already known, or ‘None’ otherwise.

Returns:
  • exit code (int)
metadata

Returns the metadata field of the task’s description

name

Returns the task’s application specified name.

Returns:
  • A name (string).
pilot

Returns the pilot ID of this task, if that is already known, or ‘None’ otherwise.

Returns:
  • A pilot ID (string)
register_callback(cb, cb_data=None, metric=None)[source]

Registers a callback function that is triggered every time a task’s state changes.

All callback functions need to have the same signature:

def cb(obj, state)

where object is a handle to the object that triggered the callback and state is the new state of that object. If ‘cb_data’ is given, then the ‘cb’ signature changes to

def cb(obj, state, cb_data)

and ‘cb_data’ are passed unchanged.

session

Returns the task’s session.

Returns:
state

Returns the current state of the task.

Returns:
  • state (string enum)
stderr

Returns a snapshot of the executable’s STDERR stream.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.

Returns:
  • stderr (string)
stdout

Returns a snapshot of the executable’s STDOUT stream.

If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.

Returns:
  • stdout (string)
task_sandbox

Returns the full sandbox URL of this task, if that is already known, or ‘None’ otherwise.

Returns:
  • A URL (radical.utils.Url).
tmgr

Returns the task’s manager.

Returns:
uid

Returns the task’s unique identifier.

The uid identifies the task within a TaskManager.

Returns:
  • A unique identifier (string).
wait(state=None, timeout=None)[source]

Returns when the task reaches a specific state or when an optional timeout is reached.

Arguments:

  • state [list of strings] The state(s) that task has to reach in order for the call to return.

    By default wait waits for the task to reach a final state, which can be one of the following:

    • rp.states.DONE
    • rp.states.FAILED
    • rp.states.CANCELED
  • timeout [float] Optional timeout in seconds before the call returns regardless whether the task has reached the desired state or not. The default value None never times out.