API Reference
Sessions
- class radical.pilot.Session(proxy_url: Optional[str] = None, uid: Optional[str] = None, cfg: Optional[dict] = None, _role: Optional[str] = 'primary', _reg_addr: Optional[str] = None, **close_options)[source]
Root of RP object hierarchy for an application instance.
A Session is the root object of all RP objects in an application instance: it holds
radical.pilot.PilotManager
andradical.pilot.TaskManager
instances which in turn holdradical.pilot.Pilot
andradical.pilot.Task
instances, and several other components which operate on those stateful entities.- __init__(proxy_url: Optional[str] = None, uid: Optional[str] = None, cfg: Optional[dict] = None, _role: Optional[str] = 'primary', _reg_addr: Optional[str] = None, **close_options)[source]
Create a new session.
A new Session instance is created and stored in the database.
Any RP Session will require an RP Proxy to facilitate communication between the client machine (i.e., the host where the application created this Session instance) and the target resource (i.e., the host where the pilot agent/s is/are running and where the workload is being executed).
A proxy_url can be specified which then must point to an RP Proxy Service instance which this session can use to establish a communication proxy. If proxy_url is not specified, the session will check for the environment variables RADICAL_PILOT_PROXY_URL and will interpret it as such above. If that information is not available, the session will instantiate a proxy service on the local host. Note that any proxy service instantiated by the session itself will be terminated once the session instance is closed or goes out of scope and is thus garbage collected and as such should not be used by other session instances.
- Note: an RP proxy will have to be accessible by both the client and the
target hosts to facilitate communication between both parties. That implies access to the respective ports. Proxies started by the session itself will use the first port larger than 10.000 which is found to be free.
- Parameters
proxy_url (str, optional) – proxy service URL - points to an RP proxy service which is used to establish an RP communication proxy for this session.
uid (str, optional) – Create a session with this UID. Session UIDs MUST be unique - otherwise they will lead to communication conflicts, resulting in undefined behaviours.
cfg (str | dict, optional) – a named or instantiated configuration to be used for the session.
_role (bool) – only PRIMARY sessions created by the original application process (via rp.Session()), will create proxies and Registry Services. AGENT sessions will also create a Registry but no proxies. CLIENT sessions will be limited to allow client interactions with RP components. All other DEFAULT session instances are instantiated internally in processes spawned (directly or indirectly) by the initial session, for example in some of it’s components, or by the RP agent. Those sessions will inherit the original session ID, but will not attempt to create a new proxies or registries.
**close_options (optional) – If additional key word arguments are provided, they will be used as the default arguments to Session.close(). This can be useful when the Session is used as a Python context manager, such that close() is called automatically at the end of a
with
block._reg_addr (str, optional) – Non-primary sessions will connect to the registry at that endpoint and pull session config and resource configurations from there.
- close(**kwargs)[source]
Close the session.
All subsequent attempts access objects attached to the session will result in an error. If cleanup is set to True, the session data is removed from the database.
- get_pilot_managers(pmgr_uids=None)[source]
Get known PilotManager(s).
- Parameters
pmgr_uids (str | Iterable[str], optional) – uids of the PilotManagers we want.
- Returns
- One
or more radical.pilot.PilotManager objects.
- Return type
radical.pilot.PilotManager | list[radical.pilot.PilotManager]
- get_resource_config(resource, schema=None)[source]
Returns a dictionary of the requested resource config.
- get_task_managers(tmgr_uids=None)[source]
Get known TaskManager(s).
- inject_metadata(metadata)[source]
Insert (experiment) metadata into an active session.
RP stack version info always get added.
- list_pilot_managers()[source]
Get PilotManager instances.
Lists the unique identifiers of all
radical.pilot.PilotManager
instances associated with this session.- Returns
A list of
radical.pilot.PilotManager
uids.- Return type
- list_resources()[source]
Get list of known resource labels.
Returns a list of known resource labels which can be used in a pilot description.
- list_task_managers()[source]
Get TaskManager identifiers.
Lists the unique identifiers of all
radical.pilot.TaskManager
instances associated with this session.- Returns
A list of
radical.pilot.TaskManager
uids.- Return type
Pilots and PilotManagers
PilotManagers
- class radical.pilot.PilotManager(session, cfg='default')[source]
Manage Pilot instances.
A PilotManager manages
rp.Pilot
instances that are submitted via theradical.pilot.PilotManager.submit_pilots()
method. It is possible to attach one or more HPC resources to a PilotManager to outsource machine specific configuration parameters to an external configuration file.Example:
s = rp.Session() pm = rp.PilotManager(session=s) pd = rp.PilotDescription() pd.resource = "futuregrid.alamo" pd.cpus = 16 p1 = pm.submit_pilots(pd) # create first pilot with 16 cores p2 = pm.submit_pilots(pd) # create second pilot with 16 cores # Create a workload of 128 '/bin/sleep' tasks tasks = [] for task_count in range(0, 128): t = rp.TaskDescription() t.executable = "/bin/sleep" t.arguments = ['60'] tasks.append(t) # Combine the two pilots, the workload and a scheduler via # a TaskManager. tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN) tm.add_pilot(p1) tm.submit_tasks(tasks)
The pilot manager can issue notification on pilot state changes. Whenever state notification arrives, any callback registered for that notification is fired.
Note
State notifications can arrive out of order wrt the pilot state model!
- __init__(session, cfg='default')[source]
Creates a new PilotManager and attaches is to the session.
- close(terminate=True)[source]
Shut down the PilotManager and all its components.
- Parameters
terminate (bool) –
cancel non-final pilots if True (default)
Note
Pilots cannot be reconnected to after termination.
- control_cb(topic, msg)[source]
This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.
- register_callback(cb, cb_data=None, metric='PILOT_STATE')[source]
Registers a new callback function with the PilotManager.
Manager-level callbacks get called if the specified metric changes. The default metric PILOT_STATE fires the callback if any of the Pilots managed by the PilotManager change their state.
All callback functions need to have the same signature:
def cb(obj, value, cb_data)
where
object
is a handle to the object that triggered the callback,value
is the metric, anddata
is the data provided on callback registration.. In the example of PILOT_STATE above, the object would be the pilot in question, and the value would be the new state of the pilot.Available metrics are
- PILOT_STATE: fires when the state of any of the pilots which are
managed by this pilot manager instance is changing. It communicates the pilot object instance and the pilots new state.
- wait_pilots(uids=None, state=None, timeout=None)[source]
Block for state transition.
Returns when one or more
rp.Pilots
reach a specific state.If pilot_uids is None, wait_pilots returns when all Pilots reach the state defined in state. This may include pilots which have previously terminated or waited upon.
Example
# TODO – add example
- Parameters
uids (str | list[str], optional) – If set, only the Pilots with the specified uids are considered. If None (default), all Pilots are considered.
The state that Pilots have to reach in order for the call to return.
By default wait_pilots waits for the Pilots to reach a terminal state, which can be one of the following: *
rp.rps.DONE
*rp.rps.FAILED
*rp.rps.CANCELED
timeout (float, optional) – Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.
PilotDescription
- class radical.pilot.PilotDescription(from_dict=None)[source]
Specify a requested Pilot.
A PilotDescription object describes the requirements and properties of a
radical.pilot.Pilot
and is passed as a parameter toradical.pilot.PilotManager.submit_pilots()
to instantiate and run a new pilot.- Example::
pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.PilotDescription() pd.resource = “local.localhost” pd.cores = 16 pd.runtime = 5 # minutes
pilot = pm.submit_pilots(pd)
- uid
A unique ID for the pilot. A unique ID will be assigned by RP if the field is not set.
- Type
str, optional
- job_name
The name of the job / pilot as provided to the batch system. If not set then
uid
will be used instead.- Type
str, optional
- resource
The key of a platform description entry. If the key exists, the machine-specific configuration is loaded from the config file once the PilotDescription is passed to
radical.pilot.PilotManager.submit_pilots()
. If the key doesn’t exist, an exceptionValueError
is raised.- Type
- access_schema
The key of an access mechanism to use. The valid access mechanism is defined in the resource configuration. See Configuration System. The first
schema
defined in the resource configuration is used by default, if no access_schema is specified.- Type
str, optional
- runtime
The maximum run time (wall-clock time) in minutes of the pilot. Default 10.
- Type
int, optional
- sandbox
The working (“sandbox”) directory of the pilot agent. This parameter is optional and if not set, it defaults to radical.pilot.sandbox in your home or login directory. Default None.
Warning
If you define a pilot on an HPC cluster and you want to set sandbox manually, make sure that it points to a directory on a shared filesystem that can be reached from all compute nodes.
- Type
str, optional
- nodes
The number of nodes the pilot should allocate on the target resource. This parameter could be set instead of cores and gpus (and memory). Default 1.
Note
If nodes is specified, gpus and cores must not be specified.
- Type
int, optional
- cores
The number of cores the pilot should allocate on the target resource. This parameter could be set instead of nodes.
Note
For local pilots, you can set a number larger than the physical machine limit (corresponding resource configuration should have the attribute “fake_resources”).
Note
If cores is specified, nodes must not be specified.
- Type
int, optional
- gpus
The number of gpus the pilot should allocate on the target resource.
Note
If gpus is specified, nodes must not be specified.
- Type
int, optional
- memory
The total amount of physical memory the pilot (and related to it job) requires.
- Type
int, optional
- queue
The name of the job queue the pilot should get submitted to. If queue is set in the resource configuration (
resource
), defining queue will override it explicitly.- Type
str, optional
- project
The name of the project / allocation to charge for used CPU time. If project is set in the resource configuration (
resource
), defining project will override it explicitly.- Type
str, optional
- app_comm
The list of names is interpreted as communication channels to start within the pilot agent, for the purpose of application communication, i.e., that tasks running on that pilot are able to use those channels to communicate amongst each other.
The names are expected to end in _queue or _pubsub, indicating the type of channel to create. Once created, tasks will find environment variables of the name
RP_%s_IN
andRP_%s_OUT
, where%s
is replaced with the given channel name (uppercased), andIN/OUT
indicate the respective endpoint addresses for the created channels.
- cleanup
If cleanup is set to True, the pilot will delete its entire sandbox upon termination. This includes individual Task sandboxes and all generated output data. Only log files will remain in the sandbox directory. Default False.
- Type
bool, optional
- exit_on_error
Flag to trigger app termination in case of the pilot failure. Default True.
- Type
bool, optional
- services
A list of commands which get started on a separate service compute node right after bootstrapping, and before any RP task is launched. That service compute node will not be used for any other tasks.
- Type
list[TaskDescription], optional
- prepare_env
A dictionary of {env_name: env_spec} as documented for the pilot’s prepare_env(env_name, env_spec) method. The given specifications will be enacted during pilot startup and can be used for service tasks.
- Type
dict, optional
- reconfig_src
name of a data file to be used by the agent’s reconfig scheduler.
- Type
string, optional
Pilots
- class radical.pilot.Pilot(pmgr: PilotManager, descr)[source]
Represent a resource overlay on a local or remote resource.
Note
A Pilot cannot be created directly. The factory method
radical.pilot.PilotManager.submit_pilots()
has to be used instead.Example:
pm = radical.pilot.PilotManager(session=s) pd = radical.pilot.PilotDescription() pd.resource = "local.localhost" pd.cores = 2 pd.runtime = 5 # minutes pilot = pm.submit_pilots(pd)
- as_dict()[source]
Dictionary representation.
- Returns
a Python dictionary representation of the object.
- Return type
- property endpoint_fs
The URL which is internally used to access the target resource’s root file system.
- Type
radical.utils.Url
- property log
A list of human readable [timestamp, string] tuples describing various events during the pilot’s lifetime. Those strings are not normative, only informative!
- property nodelist
NodeList, describing the nodes the pilot can place tasks on
- property pilot_sandbox
The full sandbox URL of this pilot, if that is already known, or ‘None’ otherwise.
- Type
- property pmgr
The pilot’s manager.
- Type
- prepare_env(env_name, env_spec)[source]
Prepare a virtual environment.
Request the preparation of a task or worker environment on the target resource. This call will block until the env is created.
- Parameters
env_name (str) – name of the environment to prepare.
env_spec (dict) –
specification of the environment to prepare, like:
{'type' : 'shell', 'pre_exec': ['export PATH=$PATH:/opt/bin']}, {'type' : 'venv', 'version' : '3.7', 'pre_exec': ['module load python'], 'setup' : ['radical.pilot==1.0', 'pandas']}, {'type' : 'conda', 'version' : '3.8', 'setup' : ['numpy']} {'type' : 'conda', 'version': '3.8', 'path' : '/path/to/ve', 'setup' : ['numpy']}
where the type specifies the environment type, version specifies the Python version to deploy, and setup specifies how the environment is to be prepared. If path is specified the env will be created at that path. If path is not specified, RP will place the named env in the pilot sandbox (under
env/named_env_name
). If a VE exists at that path, it will be used as is (an update is not performed). pre_exec commands are executed before env creation and setup are attempted.
Note
The optional version specifier is only interpreted up to minor version, subminor and less are ignored.
- register_callback(cb, metric='PILOT_STATE', cb_data=None)[source]
Add callback for state changes.
Registers a callback function that is triggered every time the pilot’s state changes.
All callback functions need to have the same signature:
def cb(obj, state)
where
obj
is a handle to the object that triggered the callback andstate
is the new state of that object. If cb_data is given, then the cb signature changes todef cb(obj, state, cb_data)
and cb_data are passed along.
- register_service(uid, info)[source]
Instead of running a service task and injecting the obtained service info into other task environments, we can register an external service with the pilot, and have the pilot inject the service info the same way.
- property resource_sandbox
The full URL of the path that RP considers the resource sandbox, i.e., the sandbox on the target resource’s file system which is shared by all sessions which access that resource.
- Type
radical.utils.Url
- rpc(cmd, *args, rpc_addr=None, **kwargs)[source]
Remote procedure call.
Send am RPC command and arguments to the pilot and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.
- property session_sandbox
The full URL of the path that RP considers the session sandbox on the target resource’s file system which is shared by all pilots which access that resource in the current session.
- Type
radical.utils.Url
- stage_in(sds)[source]
Stage files “in”.
Stages the content of the
staging_directives
to the pilot sandbox.Please note the documentation of
radical.pilot.staging_directives.complete_url()
for details on path and sandbox semantics.
- stage_out(sds=None)[source]
Stage data “out”.
Fetches the content of the
staging_directives
from the pilot sandbox.Please note the documentation of
radical.pilot.staging_directives.complete_url()
for details on path and sandbox semantics.
- property stderr
A snapshot of the pilot’s STDERR stream.
If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.
Warning
This can be inefficient. Output may be incomplete and/or filtered.
- Type
- property stdout
A snapshot of the pilot’s STDOUT stream.
If this property is queried before the pilot has reached ‘DONE’ or ‘FAILED’ state it will return None.
Warning
This can be inefficient. Output may be incomplete and/or filtered.
- Type
- property uid
The pilot’s unique identifier within a
PilotManager
.- Type
- wait(state=None, timeout=None)[source]
Block for state change.
Returns when the pilot reaches a specific state or when an optional timeout is reached.
- Parameters
The
state(s)
that pilot has to reach in order for the call to return.By default wait waits for the pilot to reach a final state, which can be one of the following:
radical.pilot.states.DONE
radical.pilot.states.FAILED
radical.pilot.states.CANCELED
timeout (float) – Optional timeout in seconds before the call returns regardless whether the pilot has reached the desired state or not. The default value None never times out.
Tasks and TaskManagers
TaskManager
- class radical.pilot.TaskManager(session, cfg='default', scheduler=None)[source]
A TaskManager manages
radical.pilot.Task
instances which represent the executable workload in RADICAL-Pilot. A TaskManager connects the Tasks with one or morePilot
instances (which represent the workload executors in RADICAL-Pilot) and a scheduler which determines whichTask
gets executed on whichPilot
.Example:
s = rp.Session() pm = rp.PilotManager(session=s) pd = rp.PilotDescription() pd.resource = "futuregrid.alamo" pd.cores = 16 p1 = pm.submit_pilots(pd) # create first pilot with 16 cores p2 = pm.submit_pilots(pd) # create second pilot with 16 cores # Create a workload of 128 '/bin/sleep' tasks tasks = [] for task_count in range(0, 128): t = rp.TaskDescription() t.executable = "/bin/sleep" t.arguments = ['60'] tasks.append(t) # Combine the two pilots, the workload and a scheduler via # a TaskManager. tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN) tm.add_pilot(p1) tm.submit_tasks(tasks)
The task manager can issue notification on task state changes. Whenever state notification arrives, any callback registered for that notification is fired.
Note
State notifications can arrive out of order wrt the task state model!
- __init__(session, cfg='default', scheduler=None)[source]
Create a new TaskManager and attaches it to the session.
- Parameters
session (radical.pilot.Session) – The session instance to use.
cfg (dict | str) – The configuration or name of configuration to use.
scheduler (str) – The name of the scheduler plug-in to use.
uid (str) – ID for unit manager, to be used for reconnect
- Returns
A new TaskManager object.
- Return type
- add_pilots(pilots)[source]
Associates one or more pilots with the task manager.
- Parameters
pilots (radical.pilot.Pilot | list[radical.pilot.Pilot]) – The pilot objects that will be added to the task manager.
- cancel_tasks(uids=None)[source]
Cancel one or more
radical.pilot.Task
s.Note that cancellation of tasks is not immediate, i.e. their state is not necessarily CANCELED after this method returns. Instead, the cancellation request is sent to the components which currently manage the tasks and which then will enact the request at their discretion, eventually leading to the state transition to CANCELLED.
- get_pilots()[source]
Get the pilot instances currently associated with the task manager.
- Returns
A list of
radical.pilot.Pilot
instances.- Return type
- get_tasks(uids=None)[source]
Get one or more tasks identified by their IDs.
- Parameters
uids (str | list[str]) – The IDs of the task objects to return.
- Returns
A list of
radical.pilot.Task
objects.- Return type
- list_pilots()[source]
Lists the UIDs of the pilots currently associated with the task manager.
- Returns
A list of
radical.pilot.Pilot
UIDs.- Return type
- list_tasks()[source]
Get the UIDs of the tasks managed by this task manager.
- Returns
A list of
radical.pilot.Task
UIDs.- Return type
- pilot_rpc(pid, cmd, *args, rpc_addr=None, **kwargs)[source]
Remote procedure call.
Send an RPC command and arguments to the pilot and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.
- register_callback(cb, cb_data=None, metric=None, uid=None)[source]
Registers a new callback function with the TaskManager.
Manager-level callbacks get called if the specified metric changes. The default metric TASK_STATE fires the callback if any of the Tasks managed by the PilotManager change their state.
All callback functions need to have the same signature:
def cb(obj, value) -> None: ...
where
obj
is a handle to the object that triggered the callback,value
is the metric, anddata
is the data provided on callback registration.. In the example of TASK_STATE above, the object would be the task in question, and the value would be the new state of the task.If
cb_data
is given, then thecb
signature changes todef cb(obj, state, cb_data) -> None: ...
and
cb_data
are passed unchanged.If
uid
is given, the callback will invoked only for the specified task.Available metrics are
TASK_STATE: fires when the state of any of the tasks which are managed by this task manager instance is changing. It communicates the task object instance and the tasks new state.
WAIT_QUEUE_SIZE: fires when the number of unscheduled tasks (i.e. of tasks which have not been assigned to a pilot for execution) changes.
- remove_pilots(pilot_ids, drain=False)[source]
Disassociates one or more pilots from the task manager.
After a pilot has been removed from a task manager, it won’t process any of the task manager’s tasks anymore. Calling remove_pilots doesn’t stop the pilot itself.
- Parameters
drain (bool) – Drain determines what happens to the tasks which are managed by the removed pilot(s). If True, all tasks currently assigned to the pilot are allowed to finish execution. If False (the default), then non-final tasks will be canceled.
- submit_raptors(descriptions, pilot_id=None)[source]
Submit RAPTOR master tasks.
Submits on or more
radical.pilot.TaskDescription
instances to the task manager, where the TaskDescriptions have the mode RAPTOR_MASTER set.This is a thin wrapper around submit_tasks.
- Parameters
descriptions – (radical.pilot.TaskDescription) description of the workers to submit.
- Returns
- A list of
radical.pilot.Task
objects.
- A list of
- Return type
- submit_tasks(descriptions)[source]
Submit tasks for execution.
Submits one or more
radical.pilot.Task
instances to the task manager.- Parameters
list (descriptions (radical.pilot.TaskDescription |) – [radical.pilot.TaskDescription]): The description of the task instance(s) to create.
- Returns
- A list of
radical.pilot.Task
objects.
- A list of
- Return type
- submit_workers(descriptions)[source]
Submit RAPTOR workers.
Submits on or more
radical.pilot.TaskDescription
instances to the task manager, where the TaskDescriptions have the mode RAPTOR_WORKER set.This method is a thin wrapper around submit_tasks.
- Parameters
descriptions – (radical.pilot.TaskDescription) description of the workers to submit.
- Returns
- A list of
radical.pilot.Task
objects.
- A list of
- Return type
- wait_tasks(uids=None, state=None, timeout=None)[source]
Block for state transition.
Returns when one or more
radical.pilot.Tasks
reach a specific state.If uids is None, wait_tasks returns when all Tasks reach the state defined in state. This may include tasks which have previously terminated or waited upon.
Example:
# TODO -- add example
- Parameters
uids (str | list[str]) – If uids is set, only the Tasks with the specified uids are considered. If uids is None (default), all Tasks are considered.
state (str) –
The state that Tasks have to reach in order for the call to return.
By default wait_tasks waits for the Tasks to reach a terminal state, which can be one of the following.
radical.pilot.rps.DONE
radical.pilot.rps.FAILED
radical.pilot.rps.CANCELED
timeout (float) – Timeout in seconds before the call returns regardless of Pilot state changes. The default value None waits forever.
TaskDescription
- class radical.pilot.TaskDescription(from_dict=None)[source]
Describe the requirements and properties of a Task.
A TaskDescription object describes the requirements and properties of a
radical.pilot.Task
and is passed as a parameter toradical.pilot.TaskManager.submit_tasks()
to instantiate and run a new task.- uid
A unique ID for the task. This attribute is optional, a unique ID will be assigned by RP if the field is not set.
- Type
str, optional
- name
A descriptive name for the task. This attribute can be used to map individual tasks back to application level workloads.
- Type
str, optional
- mode
The execution mode to be used for this task. Default “executable”. The following modes are accepted.
TASK_EXECUTABLE: the task is spawned as an external executable via a resource specific launch method (srun, aprun, mpiexec, etc).
required attributes: executable
related attributes: arguments
TASK_SERVICE: exactly like TASK_EXECUTABLE, but the task is handled differently by the agent. This mode is used to start service tasks whose connection endpoint is made available to other tasks.
NOTE: the mode TASK_SERVICE is only used internally and should not be used by application developers.
required attributes: executable
related attributes: arguments
TASK_FUNCTION: the task references a python function to be called.
required attributes: function
related attributes: args
related attributes: kwargs
TASK_METHOD: the task references a raptor worker method to be called.
required attributes: method
related attributes: args
related attributes: kwargs
TASK_EVAL: the task is a code snippet to be evaluated.
required attributes: code
TASK_EXEC: the task is a code snippet to be exec’ed.
required attributes: code
TASK_SHELL: the task is a shell command line to be run.
required attributes: command
TASK_PROC: the task is a single core process to be executed.
required attributes: executable
related attributes: arguments
- TASK_RAPTOR_MASTER: the task references a raptor master to be
instantiated.
related attributes: raptor_file
related attributes: raptor_class
- TASK_RAPTOR_WORKER: the task references a raptor worker to be
instantiated.
related attributes: raptor_file
related attributes: raptor_class
There is a certain overlap between TASK_EXECUTABLE, TASK_SHELL and TASK_PROC modes. As a general rule, TASK_SHELL and TASK_PROC should be used for short running tasks which require a single core and no additional resources (gpus, storage, memory). TASK_EXECUTABLE should be used for all other tasks and is in fact the default. TASK_SHELL should only be used if the command to be run requires shell specific functionality (e.g., pipes, I/O redirection) which cannot easily be mapped to other task attributes.
TASK_RAPTOR_MASTER and TASK_RAPTOR_WORKER are special types of tasks that define RAPTOR’s master(s) and worker(s) components and their resource requirements. They are launched by the Agent on one or more nodes, depending on their requirements.
- Type
str, optional
- executable
The executable to launch. The executable is expected to be either available via
$PATH
on the target resource, or to be an absolute path.- Type
- info_pattern
A regular expression pattern to extract service startup information (only for tasks with mode TASK_SERVICE). The pattern is formed like:
‘src:regex’
where src is stdout, stderr, or a file path, and regex is a regular expression to extract the information from the respective source.
- Type
- code
The code to run. This field is expected to contain valid python code which is executed when the task mode is TASK_EXEC or TASK_EVAL.
- Type
- function
The function to run. This field is expected to contain a python function name which can be resolved in the scope of the respective RP worker implementation (see documentation there). The task mode must be set to TASK_FUNCTION. args and kwargs are passed as function parameters.
- Type
- args
Positional arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.
- Type
list, optional
- kwargs
Named arguments to be passed to the function (see above). This field will be serialized with msgpack and can thus contain any serializable data types.
- Type
dict, optional
- use_mpi
flag if the task should be provided an MPI communicator. Defaults to True if more than 1 rank is requested (see ranks), otherwise defaults to False. Set this to True if you want to enfoce an MPI communicator on single-ranked tasks.
- Type
bool, optional
- ranks
The number of application processes to start on CPU cores. Default 1.
For two ranks or more, an MPI communicator will be available to the processes.
ranks replaces the deprecated attribute cpu_processes. The attribute cpu_process_type was previously used to signal the need for an MPI communicator - that attribute is now also deprecated and will be ignored.
- Type
int, optional
- ranks_per_node
The number of ranks to start on each node. If not set, the number of ranks per node will be determined by the scheduler depending on resource availability.
- Type
int, optional
- cores_per_rank
The number of cpu cores each process will have available to start its own threads or processes on. By default, core refers to a physical CPU core - but if the pilot has been launched with SMT-settings > 1, core will refer to a virtual core or hyperthread instead (the name depends on the CPU vendor).
cores_per_rank replaces the deprecated attribute cpu_threads.
- Type
int, optional
- threading_type
The thread type, influences startup and environment (<empty>/POSIX, OpenMP).
threading_type replaces the deprecated attribute cpu_thread_type.
- Type
str, optional
- gpus_per_rank
The number of gpus made available to each rank. If gpu is shared among several ranks, then a fraction of gpu should be provided (e.g., 2 ranks share a GPU, then gpus_per_rank=.5).
gpus_per_rank replaces the deprecated attribute gpu_processes. The attributes gpu_threads and gpu_process_type are also deprecated and will be ignored.
- Type
float, optional
- gpu_type
The type of GPU environment to provide to the ranks (<empty>, CUDA, ROCm).
gpu_type replaces the deprecated attribute gpu_thread_type.
- Type
str, optional
- lfs_per_rank
Local File Storage per rank - amount of data (MB) required on the local file system of the node.
lfs_per_rank replaces the deprecated attribute lfs_per_process.
- Type
int, optional
- mem_per_rank
Amount of physical memory required per rank.
mem_per_rank replaces the deprecated attribute mem_per_process.
- Type
int, optional
- environment
Environment variables to set in the environment before the execution (launching picked LaunchMethod).
- Type
dict, optional
- named_env
A named virtual environment as prepared by the pilot. The task will remain in AGENT_SCHEDULING state until that environment gets created.
- Type
str, optional
- sandbox
This specifies the working directory of the task. It will be created if it does not exist. By default, the sandbox has the name of the task’s uid and is relative to the pilot’s sandbox.
- Type
str, optional
- stdout
The name of the file to store stdout. If not set then
uid.out
will be used.- Type
str, optional
- stderr
The name of the file to store stderr. If not set then
uid.err
will be used.- Type
str, optional
- input_staging
The files that need to be staged before the execution (list of staging directives, see below).
- Type
list, optional
- output_staging
The files that need to be staged after the execution (list of staging directives, see below).
- Type
list, optional
- stage_on_error
Output staging is normally skipped on FAILED or CANCELED tasks, but if this flag is set, staging is attempted either way. This may though lead to additional errors if the tasks did not manage to produce the expected output files to stage. Default False.
- Type
bool, optional
- priority
(int, optional): The priority of the task. Tasks with higher priority will be scheduled first. The default priority is 0. Note that task priorities are not guaranteed to be enforced strictly, under certain conditions the task may be started later than lower priority tasks.
- pre_launch
Actions (shell commands) to perform before launching (i.e., before LaunchMethod is submitted), potentially on a batch node which is different from the node the task is placed on.
Note that the set of shell commands given here are expected to load environments, check for work directories and data, etc. They are not expected to consume any significant amount of CPU time or other resources! Deviating from that rule will likely result in reduced overall throughput.
- Type
list, optional
- post_launch
Actions (shell commands) to perform after launching (i.e., after LaunchMethod is executed).
Precautions are the same as for pre_launch actions.
- Type
list, optional
- pre_exec
Actions (shell commands) to perform before the task starts (LaunchMethod is submitted, but no actual task running yet). Each item could be either a string (str), which represents an action applied to all ranks, or a dictionary (dict), which represents a list of actions applied to specified ranks (key is a rankID and value is a list of actions to be performed for this rank).
The actions/commands are executed on the respective nodes where the ranks are placed, and the actual rank startup will be delayed until all pre_exec commands have completed.
Precautions are the same as for pre_launch actions.
No assumption should be made as to where these commands are executed (although RP attempts to perform them in the task’s execution environment).
No assumption should be made on the specific shell environment the commands are executed in other than a POSIX shell environment.
Errors in executing these commands will result in the task to enter FAILED state, and no execution of the actual workload will be attempted.
- Type
list, optional
- pre_exec_sync
Flag indicates necessary to sync ranks execution, which enforce to delay individual rank execution, until all pre_exec actions for all ranks are completed. Default False.
- Type
bool, optional
- post_exec
Actions (shell commands) to perform after the task finishes. The same remarks as on pre_exec apply, inclusive the point on error handling, which again will cause the task to fail, even if the actual execution was successful.
- Type
list, optional
- restartable
If the task starts to execute on a pilot, but cannot finish because the pilot fails or is canceled, the task can be restarted. Default False.
- Type
bool, optional
- tags
Configuration specific tags, which influence task scheduling and execution (e.g., tasks co-location).
- Type
dict, optional
- raptor_file
Optional application supplied Python source file to load raptor_class from.
- Type
str, optional
- services
list of service names the task wants to use. If the services are up and running, an envvariable RP_INFO_XXX will be set where XXX is the uppercased service name, and the variable’s value is the service information obtained by the agent during service startup.
- Type
[str], optional
- timeout
Any timeout larger than 0 will result in the task process to be killed after the specified amount of seconds. The task will then end up in CANCELED state.
- Type
float, optional
- startup_timeout
This setting is specific for service tasks: any value larger than 0 will abort the service if after that time no service information has been obtaines, i.e., if the service startup takes too long. The service will end up in FAILED state.
- Type
float, optional
- cleanup
If cleanup flag is set, the pilot will delete the entire task sandbox upon termination. This includes all generated output data in that sandbox. Output staging will be performed before cleanup. Note that task sandboxes are also deleted if the pilot’s own cleanup flag is set. Default False.
- Type
bool, optional
- pilot
Pilot uid. If specified, the task is submitted to the pilot with the given ID. If that pilot is not known to the TaskManager, an exception is raised.
- Type
str, optional
- slots
information on where exactly each rank of the task should be placed.
- Type
radical.pilot.Slots, optional
Task Ranks
The notion of ranks is central to RP’s TaskDescription class. We here use the same notion as MPI, in that the number of ranks refers to the number of individual processes to be spawned by the task execution backend. These processes will be near-exact copies of each other: they run in the same workdir and the same environment, are defined by the same executable and arguments, get the same amount of resources allocated, etc. Notable exceptions are:
Rank processes may run on different nodes;
rank processes can communicate via MPI;
each rank process obtains a unique rank ID.
It is up to the underlying MPI implementation to determine the exact value of the process’ rank ID. The MPI implementation may also set a number of additional environment variables for each process.
It is important to understand that only applications which make use of MPI should have more than one rank – otherwise identical copies of the same application instance are launched which will compute the same results, thus wasting resources for all ranks but one. Worse: I/O-routines of these non-MPI ranks can interfere with each other and invalidate those results.
Also, applications with a single rank cannot make effective use of MPI— depending on the specific resource configuration, RP may launch those tasks without providing an MPI communicator.
Task Environment
RP tasks are expected to be executed in isolation, meaning that their runtime environment is completely independent from the environment of other tasks, independent from the launch mechanism used to start the task, and also independent from the environment of the RP stack itself.
The task description provides several hooks to help setting up the environment in that context. It is important to understand the way those hooks interact with respect to the environments mentioned above.
pre_launch directives are set and executed before the task is passed on to the task launch method. As such, pre_launch usually executed on the node where RP’s agent is running, and not on the tasks target node. Executing pre_launch directives for many tasks can thus negatively impact RP’s performance (*). Note also that pre_launch directives can in some cases interfere with the launch method.
Use pre_launch directives for rare, heavy-weight operations which prepare the runtime environment for multiple tasks: fetch data from a remote source, unpack input data, create global communication channels, etc.
pre_exec directives are set and executed after the launch method placed the task on the compute nodes and are thus running on the target node. Note that for MPI tasks, the pre_exec directives are executed once per rank. Running large numbers of pre_exec directives concurrently can lead to system performance degradation (*), for example when those directives concurrently hot the shared files system (for loading modules or Python virtualenvs etc).
Use pre_exec directives for task environment setup such as module load, virtualenv activate, export whose effects are expected to be applied either to all task ranks or to specified ranks. Avoid file staging operations at this point (files would be redundantly staged multiple times - once per rank).
(*) The performance impact of repeated concurrent access to the system’s shared file system can be significant and can pose a major bottleneck for your application. Specifically module load and virtualenv activate operations and the like are heavy on file system I/O, and executing those for many tasks is ill advised. Having said that: RP attempts to optimize those operations: if it identifies that identical pre_exec directives are shared between multiple tasks, RP will execute the directives exactly once and will cache the resulting environment settings - those cached settings are then applied to all other tasks with the same directives, without executing the directives again.
Staging Directives
The Staging Directives are specified using a dict in the following form:
staging_directive = { 'source' : None, # see 'Location' below 'target' : None, # see 'Location' below 'action' : None, # See 'Action operators' below 'flags' : None, # See 'Flags' below 'priority': 0 # Control ordering of actions (unused) }
Locations
source and target locations can be given as strings or ru.Url instances. Strings containing :// are converted into URLs immediately. Otherwise, they are considered absolute or relative paths and are then interpreted in the context of the client’s working directory.
Special URL schemas
client://
: relative to the client’s working directoryresource://
: relative to the RP sandbox on the target resourcepilot://
: relative to the pilot sandbox on the target resourcetask://
: relative to the task sandbox on the target resource
In all these cases, the hostname element of the URL is expected to be empty, and the path is always considered relative to the locations specified above (even though URLs usually don’t have a notion of relative paths).
For more details on path and sandbox handling check the documentation of
radical.pilot.staging_directives.complete_url()
.Action operators
rp.TRANSFER : remote file transfer from source to target URL (client)
rp.COPY : local file copy, i.e., not crossing host boundaries
rp.MOVE : local file move
rp.LINK : local file symlink
rp.DOWNLOAD : fetch remote file from source URL to target URL (agent)
Flags
rp.CREATE_PARENTS: create the directory hierarchy for targets on the fly
rp.RECURSIVE: if source is a directory, handles it recursively
Task
- class radical.pilot.Task(tmgr, descr, origin)[source]
Represent a ‘task’ that is executed on a Pilot.
Tasks allow to control and query the state of this task.
Note
A task cannot be created directly. The factory method
rp.TaskManager.submit_tasks()
has to be used instead.Example:
tmgr = rp.TaskManager(session=s) ud = rp.TaskDescription() ud.executable = "/bin/date" task = tmgr.submit_tasks(ud)
- property client_sandbox
The full URL of the client sandbox, which is usually the same as the current working directory of the Python script in which the RP Session is instantiated.
Note that the URL may not be usable to access that sandbox from another machine: RP in general knows nothing about available access endpoints on the local host.
- Type
radical.utils.Url
- property endpoint_fs
The URL which is internally used to access the target resource’s root file system.
- Type
radical.utils.Url
- property exception
A string representation (__repr__) of the exception which caused the task’s FAILED state if such one was raised while managing or executing the task.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.
- Type
- property exception_detail
Additional information about the exception which caused this task to enter FAILED state.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.
- Type
- property exit_code
The exit code of the task, if that is already known, or ‘None’ otherwise.
- Type
- property info
The metadata field of the task’s description.
- property metadata
The metadata field of the task’s description.
- property output_files
A list of output file names.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.
Warning
This can be incomplete: the heuristics will not detect files which start with <task_id>., for example. It will also not detect files which are not created in the task sandbox.
- property pilot_sandbox
The full URL of the path that RP considers the pilot sandbox on the target resource’s file system which is shared by all tasks which are executed by that pilot.
- Type
radical.utils.Url
- register_callback(cb, cb_data=None, metric=None)[source]
Add a state-change callback.
Registers a callback function that is triggered every time a task’s state changes.
All callback functions need to have the same signature:
def cb(obj, state) -> None: ...
where
obj
is a handle to the object that triggered the callback andstate
is the new state of that object. Ifcb_data
is given, then thecb
signature changes todef cb(obj, state, cb_data) -> None: ...
and
cb_data
are passed unchanged.
- property resource_sandbox
The full URL of the path that RP considers the resource sandbox, i.e., the sandbox on the target resource’s file system that is shared by all sessions which access that resource.
- Type
radical.utils.Url
- property return_value
The return value for tasks which represent function call (or None otherwise).
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will always return None.
- Type
Any
- property sandbox
An alias for
task_sandbox
.- Type
- property session
The task’s session.
- property session_sandbox
The full URL of the path that RP considers the session sandbox on the target resource’s file system which is shared by all pilots which access that resource in the current session.
- Type
radical.utils.Url
- property stderr
A snapshot of the executable’s STDERR stream.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.
Warning
This can be inefficient. Output may be incomplete and/or filtered.
- Type
- property stdout
A snapshot of the executable’s STDOUT stream.
If this property is queried before the task has reached ‘DONE’ or ‘FAILED’ state it will return None.
Warning
This can be inefficient. Output may be incomplete and/or filtered.
- Type
- property task_sandbox
The full sandbox URL of this task, if that is already known, or ‘None’ otherwise.
- Type
radical.utils.Url
- property tmgr
The task’s manager.
- property uid
The task’s unique identifier within a
TaskManager
.- Type
- wait(state=None, timeout=None)[source]
Block for state change.
Returns when the task reaches a specific state or when an optional timeout is reached.
- Parameters
state (str | list[str], optional) –
The state(s) that task has to reach in order for the call to return.
By default wait waits for the task to reach a final state, which can be one of the following.
rp.states.DONE
rp.states.FAILED
rp.states.CANCELED
timeout (float, optional) – Optional timeout in seconds before the call returns regardless whether the task has reached the desired state or not. The default value None never times out.
Raptor
- class radical.pilot.raptor.Master(cfg: Optional[Config] = None)[source]
Raptor Master class
The rp.raptor.Master instantiates and orchestrates a set of workers which are used to rapidly and efficiently execute function tasks. As such the raptor master acts as an RP executor: it hooks into the RP agent communication channels to receive tasks from the RP agent scheduler in order to execute them. Once completed tasks are pushed toward the agent output staging component and will then continue their life cycle as all other tasks.
- control_cb(topic, msg)[source]
listen for worker_register, worker_unregister, worker_rank_heartbeat and rpc_req messages.
- request_cb(tasks)[source]
A raptor master implementation can overload this cb to filter all newly submitted tasks: it recieves a list of tasks and returns a potentially different list of tasks which are then executed. It is up to the master implementation to ensure proper state transition for any tasks which are passed as argument but are not returned by the call and thus are not submitted for execution.
- result_cb(tasks)[source]
A raptor master implementation can overload this cb which get’s called when raptor tasks complete execution.
- Parameters
tasks ([List[Dict[str, ANY]]) – list of tasks which this master executed
- submit_tasks(tasks) None [source]
submit a list of tasks to the task queue We expect to get either TaskDescription instances which will then get converted into task dictionaries and pushed out, or we get task dictionaries which are used as is. Either way, self.request_cb will be called for all tasks submitted here.
- Parameters
tasks (List[TaskDescription]) – description of tasks to be submitted
- submit_workers(descriptions: List[TaskDescription]) List[str] [source]
Submit a raptor workers per given descriptions element and pass the queue raptor info as configuration file. Do not wait for the workers to come up - they are expected to register via the control channel.
The task descriptions specifically support the following keys:
raptor_class: str, type name of worker class to execute
- raptor_filestr, optional
Module file from which raptor_class may be imported, if a custom RP worker class is used
Note that only one worker rank (presumably rank 0) should register with the master - the workers are expected to synchronize their ranks as needed.
- Parameters
descriptions (List[TaskDescription]) – a list of worker descriptions
- Returns
list of uids for submitted worker tasks
- Return type
List[str]
- wait_workers(count=None, uids=None)[source]
Wait for n workers, or for workers with given UID, or for all workers to become available, then return. A worker is considered available when it registered with this master and get’s its status flag set to ACTIVE.
- worker_state_cb(worker_dict, state)[source]
This callback can be overloaded - it will be invoked whenever the master receives a state update for a worker it is connected to.
- property workers
task dictionaries representing all currently registered workers
- class radical.pilot.raptor.Worker(manager, rank, raptor_id)[source]
Implement the Raptor protocol for dispatching multiple Tasks on persistent resources.
- get_dispatcher(name)[source]
Query a registered execution mode.
- Parameters
name (str) – name of execution mode to query for
- Returns
the dispatcher method for that execution mode
- Return type
Callable
- get_master()[source]
The worker can submit tasks back to the master - this method will return a small shim class to provide that capability. That class has a single method run_task which accepts a single rp.TaskDescription from which a rp.Task is created and executed. The call then waits for the task’s completion before returning it in a dict representation, the same as when passed to the master’s result_cb.
- Note: the run_task call is running in a separate thread and will thus
not block the master’s progress.
- Returns
- a shim class with only one method: run_task(td) where
td is a TaskDescription to run.
- Return type
- register_mode(name, dispatcher) None [source]
Register a new task execution mode that this worker can handle. The specified dispatcher callable should accept a single argument: the task to execute.
- Parameters
name (str) – name of the mode to register
dispatcher (callable) – function which implements the execution mode
A radical.pilot.Task managing a radical.pilot.raptor.Master instance is created using
radical.pilot.TaskDescription.mode
rp.RAPTOR_MASTER
, or through submit_raptors()
.
The object returned to the client is a Task subclass with additional features.
- class radical.pilot.raptor_tasks.Raptor(tmgr, descr, origin)[source]
RAPTOR (‘RAPid Task executOR’) is a task executor which, other than other RADICAL-Pilot executors can handle function tasks.
A Raptor must be submitted to a pilot. It will be associated with RaptorWorker instances on that pilot and use those workers to rapidly execute tasks. Raptors excel at high throughput execution for large numbers of short running tasks. However, they have limited capabilities with respect to managing task data dependencies, multinode tasks, MPI executables, and tasks with heterogeneous resource requirements.
- rpc(cmd, *args, **kwargs)[source]
Send a raptor command, wait for the response, and return the result.
- Parameters
- Returns
- the returned dictionary has the following fields:
out: captured standard output
err: captured standard error
ret: return value of the call (can be any serializable type)
exc: tuple of exception type (str) and error message (str)
- Return type
Dict[str, Any]
- submit_tasks(descriptions: List[TaskDescription]) List[Task] [source]
Submit a set of tasks to this Raptor instance.
- Parameters
descriptions (List[TaskDescription]) – ;aunch a raptor worker for each provided description.
- Returns
a list of rp.Task instances, one for each task.
- Return type
List[Tasks]
The tasks might not get executed until a worker is available for this Raptor instance.
- submit_workers(descriptions: List[TaskDescription]) List[Task] [source]
Submit a set of workers for this Raptor instance.
- Parameters
descriptions (List[TaskDescription]) – ;aunch a raptor worker for each provided description.
- Returns
- a list of rp.Task instances, one for each created
worker task
- Return type
List[Tasks]
The method will return immediately without waiting for actual task instantiation. The submitted tasks will operate solely on behalf of the Raptor master this method is being called on.
Utilities and helpers
- class radical.pilot.agent.scheduler.base.AgentSchedulingComponent(cfg, session)[source]
- control_cb(topic, msg)[source]
listen on the control channel for raptor queue registration commands
- slot_status(msg=None, uid=None)[source]
Returns a multi-line string corresponding to the status of the node list
- work(tasks)[source]
This is the main callback of the component, which is called for any incoming (set of) task(s). Tasks arriving here must always be in AGENT_SCHEDULING_PENDING state, and must always leave in either AGENT_EXECUTING_PENDING or in a FINAL state (FAILED or CANCELED). While handled by this component, the tasks will be in AGENT_SCHEDULING state.
This method takes care of initial state change to AGENT_SCHEDULING, and then puts them forward onto the queue towards the actual scheduling process (self._schedule_tasks).
- class radical.pilot.agent.scheduler.continuous.Continuous(cfg, session)[source]
The Continuous scheduler attempts to place threads and processes of a tasks onto nodes in the cluster.
- _configure()[source]
Configure this scheduler instance
scattered: This is the continuous scheduler, because it attempts to allocate a continuous set of cores/nodes for a task. It does, however, also allow to scatter the allocation over discontinuous nodes if this option is set. This implementation is not optimized for the scattered mode! The default is ‘True’.
- _find_resources(node, n_slots, cores_per_slot, gpus_per_slot, lfs_per_slot, mem_per_slot, partial)[source]
Find up to the requested number of slots, where each slot features the respective x_per_slot resources. This call will return a list of slots of the following structure:
- {
‘node_name’ : ‘node_name’, ‘node_index’ : 1, ‘cores’ : [{‘index’ : 1,
‘occupation’: 1.0},
- {‘index’2,
‘occupation’: 1.0},
- {‘index’4,
‘occupation’: 1.0},
- {‘index’5,
‘occupation’: 1.0}],
- ‘gpus’[{‘index’1,
‘occupation’: 1.0},
- {‘index’2,
‘occupation’: 1.0}],
‘lfs’ : 1234, ‘mem’ : 4321
}
The call will not change the allocation status of the node, atomicity must be guaranteed by the caller.
We don’t care about continuity within a single node - cores [1, 5] are assumed to be as close together as cores [1, 2].
When partial is set, the method CAN return less than n_slots number of slots - otherwise, the method returns the requested number of slots or None.
- FIXME: SMT handling: we should assume that hardware threads of the same
physical core cannot host different executables, so HW threads can only account for thread placement, not process placement. This might best be realized by internally handling SMT as minimal thread count and using physical core IDs for process placement?
- _iterate_nodes()[source]
The scheduler iterates through the node list for each task placement. However, we want to avoid starting from node zero every time as tasks have likely placed on that node previously - instead, we in general want to pick off where the last task placement succeeded. This iterator is preserving that state.
Note that the first index is yielded twice, so that the respective node can function as first and last node in an allocation.
- schedule_task(task)[source]
Find an available set of slots, potentially across node boundaries (in the MPI case).
A slot is here considered the amount of resources required by a single MPI rank. Those resources need to be available on a single node - but slots can be distributed across multiple nodes. Resources for non-MPI tasks will always need to be placed on a single node.
By default, we only allow for partial allocations on the first and last node - but all intermediate nodes MUST be completely used (this is the ‘CONTINUOUS’ scheduler after all).
If the scheduler is configured with scattered=True, then that constraint is relaxed, and any set of slots (be it continuous across nodes or not) is accepted as valid allocation.
No matter the mode, we always make sure that we allocate slots in chunks of cores, gpus, lfs and mem required per process - otherwise the application processes would not be able to acquire the requested resources on the respective node.
- class radical.pilot.utils.component.AgentComponent(cfg, session)[source]
- advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=True, prof=True)[source]
Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.
things: list of things to advance
state: new state to set for the things
publish: determine if state update notifications should be issued
push: determine if things should be pushed to outputs
fwd: determine if notifications are forarded to the ZMQ bridge
prof: determine if state advance creates a profile event (publish, and push are always profiled)
‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.
If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.
This is evaluated in self.publish.
- class radical.pilot.utils.component.BaseComponent(cfg, session)[source]
This class provides the basic structure for any RP component which operates on stateful things. It provides means to:
define input channels on which to receive new things in certain states
define work methods which operate on the things to advance their state
define output channels to which to send the things after working on them
define notification channels over which messages with other components can be exchanged (publish/subscriber channels)
All low level communication is handled by the base class. Deriving classes will register the respective channels, valid state transitions, and work methods. When a ‘thing’ is received, the component is assumed to have full ownership over it, and that no other component will change the ‘thing’s state during that time.
The main event loop of the component – work() – is executed on run() and will not terminate on its own, unless it encounters a fatal error.
Components inheriting this class should attempt not to use shared resources. That will ensure that multiple instances of the component can coexist for higher overall system throughput. Should access to shared resources be necessary, it will require some locking mechanism across process boundaries.
This approach should ensure that
‘thing’s are always in a well defined state;
- components are simple and focus on the semantics of ‘thing’ state
progression;
no state races can occur on ‘thing’ state progression;
- only valid state transitions can be enacted (given correct declaration
of the component’s semantics);
the overall system is performant and scalable.
Inheriting classes SHOULD overload the following methods:
- initialize():
set up the component state for operation
register input/output/notification channels
register work methods
register callbacks to be invoked on state notification
the component will terminate if this method raises an exception.
work()
- called in the main loop of the component process, on all entities
arriving on input channels. The component will not terminate if this method raises an exception. For termination, terminate() must be called.
- finalize()
tear down the component (close threads, unregister resources, etc).
Inheriting classes MUST call the constructor:
class StagingComponent(rpu.BaseComponent): def __init__(self, cfg, session): super().__init__(cfg, session)
A component thus must be passed a configuration (either as a path pointing to a file name to be opened as ru.Config, or as a pre-populated ru.Config instance). That config MUST contain a session ID (sid) for the session under which to run this component, and a uid for the component itself which MUST be unique within the scope of the given session.
Components will send a startup message to the component manager upon successful initialization.
Further, the class must implement the registered work methods, with a signature of:
work(self, things)
The method is expected to change the state of the ‘thing’s given. ‘Thing’s will not be pushed to outgoing channels automatically – to do so, the work method has to call (see call documentation for other options):
self.advance(thing)
Until that method is called, the component is considered the sole owner of the ‘thing’s. After that method is called, the ‘thing’s are considered disowned by the component. If, however, components return from the work methods without calling advance on the given ‘thing’s, then the component keeps ownership of the ‘thing’s to advance it asynchronously at a later point in time. That implies that a component can collect ownership over an arbitrary number of ‘thing’s over time, and they can be advanced at the component’s discretion.
The component process is a stand-alone daemon process which runs outside of Python’s multiprocessing domain. As such, it can freely use Python’s multithreading (and it extensively does so by default) - but developers should be aware that spawning additional processes in this component is discouraged, as Python’s process management is not playing well with it’s multithreading implementation.
- advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=False, prof=True)[source]
Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.
things: list of things to advance
state: new state to set for the things
publish: determine if state update notifications should be issued
push: determine if things should be pushed to outputs
fwd: determine if notifications are forarded to the ZMQ bridge
prof: determine if state advance creates a profile event (publish, and push are always profiled)
‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.
If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.
This is evaluated in self.publish.
- control_cb(topic, msg)[source]
This callback can be overloaded by the component to handle any control message which is not already handled by the component base class.
- output(things, state=None)[source]
this pushes the given things to the output queue register for the given state
- register_input(states, queue, cb=None, qname=None, path=None)[source]
Using this method, the component can be connected to a queue on which things are received to be worked upon. The given set of states (which can be a single state or a list of states) will trigger an assert check upon thing arrival.
This method will further associate a thing state with a specific worker callback cb. Upon thing arrival, the thing state will be used to lookup the respective worker, and the thing will be handed over. Workers should call self.advance(thing), in order to push the thing toward the next component. If, for some reason, that is not possible before the worker returns, the component will retain ownership of the thing, and should call advance() asynchronously at a later point in time.
Worker invocation is synchronous, ie. the main event loop will only check for the next thing once the worker method returns.
- register_output(states, qname)[source]
Using this method, the component can be connected to a queue to which things are sent after being worked upon. The given set of states (which can be a single state or a list of states) will trigger an assert check upon thing departure.
If a state but no output is specified, we assume that the state is final, and the thing is then considered ‘dropped’ on calling advance() on it. The advance() will trigger a state notification though, and then mark the drop in the log. No other component should ever again work on such a final thing. It is the responsibility of the component to make sure that the thing is in fact in a final state.
- register_publisher(pubsub)[source]
Using this method, the component can registered itself to be a publisher of notifications on the given pubsub channel.
- register_subscriber(pubsub, cb)[source]
This method is complementary to the register_publisher() above: it registers a subscription to a pubsub channel. If a notification is received on thag channel, the registered callback will be invoked. The callback MUST have one of the signatures:
callback(topic, msg)
where ‘topic’ is set to the name of the pubsub channel.
The subscription will be handled in a separate thread, which implies that the callback invocation will also happen in that thread. It is the caller’s responsibility to ensure thread safety during callback invocation.
- register_timed_cb(cb, cb_data=None, timer=None)[source]
Idle callbacks are invoked at regular intervals – they are guaranteed to not be called more frequently than ‘timer’ seconds, no promise is made on a minimal call frequency. The intent for these callbacks is to run lightweight work in semi-regular intervals.
- rpc(cmd, *args, rpc_addr=None, **kwargs)[source]
Remote procedure call.
Send am RPC command and arguments to the control pubsub and wait for the response. This is a synchronous operation at this point, and it is not thread safe to have multiple concurrent RPC calls.
- stop()[source]
We need to terminate and join all threads, close all communication channels, etc. But we trust on the correct invocation of the finalizers to do all this, and thus here only forward the stop request to the base class.
- unregister_input(states, qname, worker)[source]
This methods is the inverse to the ‘register_input()’ method.
- unregister_timed_cb(cb)[source]
This method is reverts the register_timed_cb() above: it removes an idler from the component, and will terminate the respective thread.
- work_cb()[source]
This is the main routine of the component, as it runs in the component process. It will first initialize the component in the process context. Then it will attempt to get new things from all input queues (round-robin). For each thing received, it will route that thing to the respective worker method. Once the thing is worked upon, the next attempt on getting a thing is up.
- class radical.pilot.utils.component.ClientComponent(cfg, session)[source]
- advance(things, state=None, publish=True, push=False, qname=None, ts=None, fwd=False, prof=True)[source]
Things which have been operated upon are pushed down into the queues again, only to be picked up by the next component, according to their state model. This method will update the thing state, and push it into the output queue registered as target for that state.
things: list of things to advance
state: new state to set for the things
publish: determine if state update notifications should be issued
push: determine if things should be pushed to outputs
fwd: determine if notifications are forarded to the ZMQ bridge
prof: determine if state advance creates a profile event (publish, and push are always profiled)
‘Things’ are expected to be a dictionary, and to have ‘state’, ‘uid’ and optionally ‘type’ set.
If ‘thing’ contains an ‘$all’ key, the complete dict is published; otherwise, only the state is published.
This is evaluated in self.publish.
- radical.pilot.utils.prof_utils.get_consumed_resources(session, rtype='cpu', tdurations=None)[source]
For all ra.pilot or ra.task entities, return the amount and time of resources consumed. A consumed resource is characterized by:
a resource type (we know about cores and gpus)
a metric name (what the resource was used for)
a list of 4-tuples of the form: [t0, t1, r0, r1]
The tuples are formed so that t0 to t1 and r0 to r1 are continuous:
t0: time, begin of resource consumption
t1: time, begin of resource consumption
r0: int, index of resources consumed (min)
r1: int, index of resources consumed (max)
An entity can consume different resources under different metrics, but the returned consumption specs will never overlap. Thus, any resource is accounted for exactly one metric at any point in time. The returned structure has the following overall form:
{ 'metric_1' : { uid_1 : [[t0, t1, r0, r1], [t2, t3, r2, r3], ... ], uid_2 : ... }, 'metric_2' : ... }
- radical.pilot.utils.prof_utils.get_hostmap(profile)[source]
We abuse the profile combination to also derive a pilot-host map, which will tell us on what exact host each pilot has been running. To do so, we check for the PMGR_ACTIVE advance event in agent_0.prof, and use the NTP sync info to associate a hostname.
- radical.pilot.utils.prof_utils.get_hostmap_deprecated(profiles)[source]
This method mangles combine_profiles and get_hostmap, and is deprecated. At this point it only returns the hostmap
- radical.pilot.utils.prof_utils.get_provided_resources(session, rtype='cpu')[source]
For all ra.pilots, return the amount and time of the type of resources provided. This computes sets of 4-tuples of the form: [t0, t1, r0, r1] where:
t0: time, begin of resource provision t1: time, begin of resource provision r0: int, index of resources provided (min) r1: int, index of resources provided (max)
The tuples are formed so that t0 to t1 and r0 to r1 are continuous.
- radical.pilot.utils.prof_utils.get_resource_timelines(task, transitions)[source]
For each specific task, return a set of tuples of the form:
[start, stop, metric]
which reports what metric has been used during what time span.
- radical.pilot.utils.prof_utils.get_session_description(sid, src=None)[source]
This will return a description which is usable for radical.analytics evaluation. It informs about:
set of stateful entities
state models of those entities
event models of those entities (maybe)
configuration of the application / module
If src is given, it is interpreted as path to search for session information (json dump). src defaults to $PWD/$sid.
The serializer should be able to (de)serialize information that we want to send over the wire from the client side to the agent side via 1- ZMQ 2- MongoDB
- we except:
1- Callables with and without dependecies. 2- Non-callables like classes and other python objects
- radical.pilot.utils.session.fetch_filetype(ext, name, sid, src=None, tgt=None, access=None, skip_existing=False, fetch_client=False, log=None, rep=None)[source]
- Parameters
ext (-) – file extension to fetch
name (-) – full name of filetype for log messages etc
sid (-) – session for which all files are fetched
src (-) – dir to look for client session files ($src/$sid/*.ext)
tgt (-) – dir to store the files in ($tgt/$sid/*.ext, $tgt/$sid/$pid/*.ext)
- Returns
list of file names (fetched and/or cached)
- Return type
- radical.pilot.utils.misc.create_tar(tgt: str, dnames: List[str]) None [source]
Create a tarball on the file system which contains all given directories
- radical.pilot.utils.misc.get_resource_config(resource: str) Union[None, Config] [source]
For the given resource label, return the resource configuration used by radical.pilot.
- Parameters
resource (
str
) – resource label for which to return the cfg- Returns
the resource configuration
The method returns None if no resource config is found for the specified resource label.
- Return type
radical.utils.Config
- radical.pilot.utils.misc.get_resource_configs() Config [source]
Return all resource configurations used by radical.pilot.
Configurations for the individual resources are organized as sites and resources:
cfgs = get_resource_configs()
sites = cfgs.keys()
for site in sites: resource_names = cfgs[site].keys()
- Returns
the resource configurations
- Return type
radical.utils.Config
- radical.pilot.utils.misc.get_resource_fs_url(resource: str, schema: Optional[str] = None) Union[None, Url] [source]
For the given resource label, return the contact URL of the resource’s file system. This corresponds to the filesystem_endpoint setting in the resource config.
For example, rs.filesystem.directory(get_resource_fs_url(…)).change_dir(‘/’) is equivalent to the base
endpoint:///
URL available for use in a staging_directive.- Parameters
- Returns
the file system URL
The method returns None if no resource config is found for the specified resource label and access schema.
- Return type
radical.utils.Url
- radical.pilot.utils.misc.get_resource_job_url(resource: str, schema: Optional[str] = None) Union[None, Url] [source]
For the given resource label, return the contact URL of the resource’s job manager.
- Parameters
- Returns
the job manager URL
The method returns None if no resource config is found for the specified resource label and access schema.
- Return type
radical.utils.Url