# pylint: disable=access-member-before-definition
#
__copyright__ = 'Copyright 2013-2022, The RADICAL-Cybertools Team'
__license__ = 'MIT'
import radical.utils as ru
# task modes
TASK_EXECUTABLE = 'task.executable'
TASK_FUNCTION = 'task.function'
TASK_FUNC = 'task.function'
TASK_METHOD = 'task.method'
TASK_METH = 'task.method'
TASK_EVAL = 'task.eval'
TASK_EXEC = 'task.exec'
TASK_PROC = 'task.proc'
TASK_SHELL = 'task.shell'
RAPTOR_MASTER = 'raptor.master'
RAPTOR_WORKER = 'raptor.worker'
AGENT_SERVICE = 'agent.service'
# task description attributes
UID = 'uid'
NAME = 'name'
MODE = 'mode'
# mode: TASK_EXECUTABLE
EXECUTABLE = 'executable'
ARGUMENTS = 'arguments'
# mode: TASK_METHOD
METHOD = 'method'
ARGS = 'args'
KWARGS = 'kwargs'
# mode: TASK_FUNC
FUNCTION = 'function'
ARGS = 'args'
KWARGS = 'kwargs'
# mode: TASK_EXEC, TASK_EVAL
CODE = 'code'
# mode: TASK_PROC, TASK_SHELL
COMMAND = 'command'
# mode: RAPTOR_MASTER, RAPTOR_WORKER
SCHEDULER = 'scheduler' # deprecated for `raptor_id`
RAPTOR_ID = 'raptor_id'
WORKER_CLASS = 'worker_class' # deprecated for raptor_class
RAPTOR_CLASS = 'raptor_class'
WORKER_FILE = 'worker_file' # deprecated for raptor_file
RAPTOR_FILE = 'raptor_file'
# environment
ENVIRONMENT = 'environment'
NAMED_ENV = 'named_env'
SANDBOX = 'sandbox'
# resource requirements
USE_MPI = 'use_mpi' # default `True if RANKS > 1`
RANKS = 'ranks' # ranks
CORES_PER_RANK = 'cores_per_rank' # cores per rank
GPUS_PER_RANK = 'gpus_per_rank' # gpus per rank
THREADING_TYPE = 'threading_type' # OpenMP?
GPU_TYPE = 'gpu_type' # CUDA / ROCm?
LFS_PER_RANK = 'lfs_per_rank' # disk space per rank
MEM_PER_RANK = 'mem_per_rank' # memory per rank
# deprecated
CPU_PROCESSES = 'cpu_processes' # ranks
CPU_PROCESS_TYPE = 'cpu_process_type' # n/a
CPU_THREADS = 'cpu_threads' # cores per rank
CPU_THREAD_TYPE = 'cpu_thread_type' # OpenMP?
GPU_PROCESSES = 'gpu_processes' # gpus per rank
GPU_PROCESS_TYPE = 'gpu_process_type' # CUDA?
GPU_THREADS = 'gpu_threads' # part of gpu?
GPU_THREAD_TYPE = 'gpu_thread_type' # ?
LFS_PER_PROCESS = 'lfs_per_process' # disk space per rank
MEM_PER_PROCESS = 'mem_per_process' # memory per rank
# task setup
INPUT_STAGING = 'input_staging'
OUTPUT_STAGING = 'output_staging'
STAGE_ON_ERROR = 'stage_on_error'
PRE_LAUNCH = 'pre_launch'
PRE_EXEC = 'pre_exec'
PRE_EXEC_SYNC = 'pre_exec_sync'
POST_LAUNCH = 'post_launch'
POST_EXEC = 'post_exec'
TIMEOUT = 'timeout'
CLEANUP = 'cleanup'
PILOT = 'pilot'
STDOUT = 'stdout'
STDERR = 'stderr'
RESTARTABLE = 'restartable'
TAGS = 'tags'
METADATA = 'metadata'
# ------------------------------------------------------------------------------
#
[docs]class TaskDescription(ru.TypedDict):
"""Describe the requirements and properties of a Task.
A TaskDescription object describes the requirements and properties of a
:class:`radical.pilot.Task` and is passed as a parameter to
:func:`radical.pilot.TaskManager.submit_tasks` to instantiate and run a new
task.
Attributes:
uid (str, optional): A unique ID for the task. This attribute
is optional, a unique ID will be assigned by RP if the field is not
set.
name (str, optional): A descriptive name for the task. This
attribute can be used to map individual tasks back to application
level workloads.
mode (str, optional): The execution mode to be used for
this task. Default "executable". The following modes are accepted.
- TASK_EXECUTABLE: the task is spawned as an external executable via
a resource specific launch method (srun, aprun, mpiexec, etc).
- required attributes: `executable`
- related attributes: `arguments`
- TASK_FUNCTION: the task references a python function to be called.
- required attributes: `function`
- related attributes: `args`
- related attributes: `kwargs`
- TASK_METHOD: the task references a raptor worker method to be
called.
- required attributes: `method`
- related attributes: `args`
- related attributes: `kwargs`
- TASK_EVAL: the task is a code snippet to be evaluated.
- required attributes: `code`
- TASK_EXEC: the task is a code snippet to be `exec`'ed.
- required attributes: `code`
- TASK_SHELL: the task is a shell command line to be run.
- required attributes: `command`
- TASK_PROC: the task is a single core process to be executed.
- required attributes: `executable`
- related attributes: `arguments`
- TASK_RAPTOR_MASTER: the task references a raptor master to be
instantiated.
- related attributes: `raptor_file`
- related attributes: `raptor_class`
- TASK_RAPTOR_WORKER: the task references a raptor worker to be
instantiated.
- related attributes: `raptor_file`
- related attributes: `raptor_class`
There is a certain overlap between `TASK_EXECUTABLE`, `TASK_SHELL`
and `TASK_PROC` modes. As a general rule, `TASK_SHELL` and
`TASK_PROC` should be used for short running tasks which require a
single core and no additional resources (gpus, storage, memory).
`TASK_EXECUTABLE` should be used for all other tasks and is in fact
the default. `TASK_SHELL` should only be used if the command to be
run requires shell specific functionality (e.g., pipes, I/O
redirection) which cannot easily be mapped to other task attributes.
TASK_RAPTOR_MASTER and TASK_RAPTOR_WORKER are special types of tasks
that define RAPTOR's master(s) and worker(s) components and their
resource requirements. They are launched by the Agent on one or more
nodes, depending on their requirements.
executable (str): The executable to launch. The executable is
expected to be either available via ``$PATH`` on the target
resource, or to be an absolute path.
arguments (list[str]): The command line arguments for the given
`executable` (`list` of `strings`).
code (str): The code to run. This field is expected to contain valid
python code which is executed when the task mode is `TASK_EXEC` or
`TASK_EVAL`.
function (str): The function to run. This field is expected to contain
a python function name which can be resolved in the scope of the
respective RP worker implementation (see documentation there). The
task mode must be set to `TASK_FUNCTION`. `args` and `kwargs` are
passed as function parameters.
args (list, optional): Positional arguments to be passed to the
`function` (see above). This field will be serialized with
`msgpack` and can thus contain any serializable data types.
kwargs (dict, optional): Named arguments to be passed to the `function`
(see above). This field will be serialized with `msgpack` and can
thus contain any serializable data types.
command (str): A shell command to be executed. This attribute is used
for the `TASK_SHELL` mode.
use_mpi (bool, optional): flag if the task should be provided an MPI
communicator. Defaults to `True` if more than 1 rank is requested
(see `ranks`), otherwise defaults to `False`. Set this to `True` if
you want to enfoce an MPI communicator on single-ranked tasks.
ranks (int, optional): The number of application processes to start
on CPU cores. Default 1.
For two ranks or more, an MPI communicator will be available to the
processes.
`ranks` replaces the deprecated attribute `cpu_processes`. The
attribute `cpu_process_type` was previously used to signal the need
for an MPI communicator - that attribute is now also deprecated and
will be ignored.
cores_per_rank (int, optional): The number of cpu cores each process
will have available to start its own threads or processes on. By
default, `core` refers to a physical CPU core - but if the pilot has
been launched with SMT-settings > 1, `core` will refer to a virtual
core or hyperthread instead (the name depends on the CPU vendor).
`cores_per_rank` replaces the deprecated attribute `cpu_threads`.
threading_type (str, optional): The thread type, influences startup and
environment (`<empty>/POSIX`, `OpenMP`).
`threading_type` replaces the deprecated attribute
`cpu_thread_type`.
gpus_per_rank (float, optional): The number of gpus made available to
each rank. If gpu is shared among several ranks, then a fraction of
gpu should be provided (e.g., 2 ranks share a GPU, then
*gpus_per_rank=.5*).
`gpus_per_rank` replaces the deprecated attribute `gpu_processes`.
The attributes `gpu_threads` and `gpu_process_type` are also
deprecated and will be ignored.
gpu_type (str, optional): The type of GPU environment to provide to
the ranks (`<empty>`, `CUDA`, `ROCm`).
`gpu_type` replaces the deprecated attribute `gpu_thread_type`.
lfs_per_rank (int, optional): Local File Storage per rank - amount of
data (MB) required on the local file system of the node.
`lfs_per_rank` replaces the deprecated attribute `lfs_per_process`.
mem_per_rank (int, optional): Amount of physical memory required per
rank.
`mem_per_rank` replaces the deprecated attribute `mem_per_process`.
environment (dict, optional): Environment variables to set in the
environment before the execution (launching picked `LaunchMethod`).
named_env (str, optional): A named virtual environment as prepared by
the pilot. The task will remain in `AGENT_SCHEDULING` state until
that environment gets created.
sandbox (str, optional): This specifies the working directory of the
task. It will be created if it does not exist. By default, the
sandbox has the name of the task's uid and is relative to the
pilot's sandbox.
stdout (str, optional): The name of the file to store stdout. If
not set then :file:`{uid}.out` will be used.
stderr (str, optional): The name of the file to store stderr. If
not set then :file:`{uid}.err` will be used.
input_staging (list, optional): The files that need to be staged before
the execution (`list` of `staging directives`, see below).
output_staging (list, optional): The files that need to be staged after
the execution (`list` of `staging directives`, see below).
stage_on_error (bool, optional): Output staging is normally skipped on
`FAILED` or `CANCELED` tasks, but if this flag is set, staging is
attempted either way. This may though lead to additional errors if
the tasks did not manage to produce the expected output files to
stage. Default False.
pre_launch (list, optional): Actions (shell commands) to perform
before launching (i.e., before LaunchMethod is submitted),
potentially on a batch node which is different from the node the
task is placed on.
Note that the set of shell commands given here are expected to load
environments, check for work directories and data, etc. They are not
expected to consume any significant amount of CPU time or other
resources! Deviating from that rule will likely result in reduced
overall throughput.
post_launch (list, optional): Actions (shell commands) to perform
after launching (i.e., after LaunchMethod is executed).
Precautions are the same as for `pre_launch` actions.
pre_exec (list, optional): Actions (shell commands) to perform
before the task starts (LaunchMethod is submitted, but no actual
task running yet). Each item could be either a string (`str`), which
represents an action applied to all ranks, or a dictionary (`dict`),
which represents a list of actions applied to specified ranks (key
is a rankID and value is a list of actions to be performed for this
rank).
The actions/commands are executed on the respective nodes where the
ranks are placed, and the actual rank startup will be delayed until
all `pre_exec` commands have completed.
Precautions are the same as for `pre_launch` actions.
No assumption should be made as to where these commands are executed
(although RP attempts to perform them in the task's execution
environment).
No assumption should be made on the specific shell environment the
commands are executed in other than a POSIX shell environment.
Errors in executing these commands will result in the task to enter
`FAILED` state, and no execution of the actual workload will be
attempted.
pre_exec_sync (bool, optional): Flag indicates necessary to sync ranks
execution, which enforce to delay individual rank execution, until
all `pre_exec` actions for all ranks are completed. Default False.
post_exec (list, optional): Actions (shell commands) to perform
after the task finishes. The same remarks as on `pre_exec` apply,
inclusive the point on error handling, which again will cause the
task to fail, even if the actual execution was successful.
restartable (bool, optional): If the task starts to execute on a pilot,
but cannot finish because the pilot fails or is canceled, the task
can be restarted. Default False.
tags (dict, optional): Configuration specific tags, which
influence task scheduling and execution (e.g., tasks co-location).
scheduler (str, optional): deprecated in favor of `raptor_id`.
raptor_id (str, optional): Raptor master ID this task is associated
with.
worker_class (str, optional): deprecated in favor of `raptor_class`
master or worker task.
raptor_class (str, optional): Class name to instantiate for this Raptor
master or worker task.
worker_file (str, optional): deprecated in favor of `raptor_class`.
raptor_file (str, optional): Optional application supplied Python
source file to load `raptor_class` from.
metadata (Any, optional): User defined metadata. Default None.
timeout (float, optional): Any timeout larger than 0 will result in
the task process to be killed after the specified amount of seconds.
The task will then end up in `CANCELED` state.
cleanup (bool, optional): If cleanup flag is set, the pilot will
delete the entire task sandbox upon termination. This includes all
generated output data in that sandbox. Output staging will be
performed before cleanup. Note that task sandboxes are also deleted
if the pilot's own `cleanup` flag is set. Default False.
pilot (str, optional): Pilot `uid`. If specified, the task is
submitted to the pilot with the given ID. If that pilot is not known
to the TaskManager, an exception is raised.
**Task Ranks**
The notion of `ranks` is central to RP's `TaskDescription` class. We here
use the same notion as MPI, in that the number of `ranks` refers to the
number of individual processes to be spawned by the task execution backend.
These processes will be near-exact copies of each other: they run in the
same workdir and the same `environment`, are defined by the same
`executable` and `arguments`, get the same amount of resources allocated,
etc. Notable exceptions are:
- Rank processes may run on different nodes;
- rank processes can communicate via MPI;
- each rank process obtains a unique rank ID.
It is up to the underlying MPI implementation to determine the exact value
of the process' rank ID. The MPI implementation may also set a number of
additional environment variables for each process.
It is important to understand that only applications which make use of MPI
should have more than one rank -- otherwise identical copies of the *same*
application instance are launched which will compute the same results, thus
wasting resources for all ranks but one. Worse: I/O-routines of these
non-MPI ranks can interfere with each other and invalidate those results.
Also, applications with a single rank cannot make effective use of MPI---
depending on the specific resource configuration, RP may launch those tasks
without providing an MPI communicator.
**Task Environment**
RP tasks are expected to be executed in isolation, meaning that their
runtime environment is completely independent from the environment of other
tasks, independent from the launch mechanism used to start the task, and
also independent from the environment of the RP stack itself.
The task description provides several hooks to help setting up the
environment in that context. It is important to understand the way those
hooks interact with respect to the environments mentioned above.
- `pre_launch` directives are set and executed before the task is passed on
to the task launch method. As such, `pre_launch` usually executed on the
node where RP's agent is running, and *not* on the tasks target node.
Executing `pre_launch` directives for many tasks can thus negatively
impact RP's performance (*). Note also that `pre_launch` directives can
in some cases interfere with the launch method.
Use `pre_launch` directives for rare, heavy-weight operations which
prepare the runtime environment for multiple tasks: fetch data from a
remote source, unpack input data, create global communication channels,
etc.
- `pre_exec` directives are set and executed *after* the launch method
placed the task on the compute nodes and are thus running on the target
node. Note that for MPI tasks, the `pre_exec` directives are executed
once per rank. Running large numbers of `pre_exec` directives
concurrently can lead to system performance degradation (*), for example
when those directives concurrently hot the shared files system (for
loading modules or Python virtualenvs etc).
Use `pre_exec` directives for task environment setup such as `module
load`, `virtualenv activate`, `export` whose effects are expected to be
applied either to all task ranks or to specified ranks. Avoid file
staging operations at this point (files would be redundantly staged
multiple times - once per rank).
(*) The performance impact of repeated concurrent access to the system's
shared file system can be significant and can pose a major bottleneck for
your application. Specifically `module load` and `virtualenv activate`
operations and the like are heavy on file system I/O, and executing those
for many tasks is ill advised. Having said that: RP attempts to optimize
those operations: if it identifies that identical `pre_exec` directives are
shared between multiple tasks, RP will execute the directives exactly *once*
and will cache the resulting environment settings - those cached settings
are then applied to all other tasks with the same directives, without
executing the directives again.
**Staging Directives**
The Staging Directives are specified using a dict in the following form::
staging_directive = {
'source' : None, # see 'Location' below
'target' : None, # see 'Location' below
'action' : None, # See 'Action operators' below
'flags' : None, # See 'Flags' below
'priority': 0 # Control ordering of actions (unused)
}
*Locations*
`source` and `target` locations can be given as strings or `ru.Url`
instances. Strings containing `://` are converted into URLs immediately.
Otherwise, they are considered absolute or relative paths and are then
interpreted in the context of the client's working directory.
.. rubric:: Special URL schemas
- ``client://`` : relative to the client's working directory
- ``resource://`` : relative to the RP sandbox on the target resource
- ``pilot://`` : relative to the pilot sandbox on the target resource
- ``task://`` : relative to the task sandbox on the target resource
In all these cases, the `hostname` element of the URL is expected to be
empty, and the path is *always* considered relative to the locations
specified above (even though URLs usually don't have a notion of relative
paths).
For more details on path and sandbox handling check the documentation of
:func:`radical.pilot.staging_directives.complete_url`.
*Action operators*
- rp.TRANSFER : remote file transfer from `source` URL to `target` URL
- rp.COPY : local file copy, i.e., not crossing host boundaries
- rp.MOVE : local file move
- rp.LINK : local file symlink
*Flags*
- rp.CREATE_PARENTS: create the directory hierarchy for targets on the fly
- rp.RECURSIVE: if `source` is a directory, handles it recursively
"""
_schema = {
UID : str ,
NAME : str ,
MODE : str ,
EXECUTABLE : str ,
ARGUMENTS : [str] ,
CODE : str ,
FUNCTION : str ,
ARGS : [None] ,
KWARGS : {str: None} ,
COMMAND : str ,
SANDBOX : str ,
ENVIRONMENT : {str: str} ,
NAMED_ENV : str ,
PRE_LAUNCH : [str] ,
PRE_EXEC : [None] ,
PRE_EXEC_SYNC : bool ,
POST_LAUNCH : [str] ,
POST_EXEC : [None] ,
STDOUT : str ,
STDERR : str ,
INPUT_STAGING : [None] ,
OUTPUT_STAGING : [None] ,
STAGE_ON_ERROR : bool ,
USE_MPI : bool ,
RANKS : int ,
CORES_PER_RANK : int ,
GPUS_PER_RANK : float ,
THREADING_TYPE : str ,
GPU_TYPE : str ,
LFS_PER_RANK : int ,
MEM_PER_RANK : int ,
# deprecated
CPU_PROCESSES : int , # RANKS
CPU_PROCESS_TYPE: str , # n/a
CPU_THREADS : int , # CORES_PER_RANK
CPU_THREAD_TYPE : str , # THREADING_TYPE
GPU_PROCESSES : int , # GPUS_PER_RANK
GPU_PROCESS_TYPE: str , # GPU_TYPE
GPU_THREADS : int , # n/a
GPU_THREAD_TYPE : str , # n/a
LFS_PER_PROCESS : int , # LFS_PER_RANK
MEM_PER_PROCESS : int , # MEM_PER_RANK
SCHEDULER : str , # RAPTOR_ID
WORKER_FILE : str , # RAPTOR_FILE
WORKER_CLASS : str , # RAPTOR_CLASS
RESTARTABLE : bool ,
TAGS : {None: None},
RAPTOR_ID : str ,
RAPTOR_FILE : str ,
RAPTOR_CLASS : str ,
METADATA : None ,
TIMEOUT : float ,
CLEANUP : bool ,
PILOT : str ,
}
_defaults = {
UID : '' ,
NAME : '' ,
MODE : TASK_EXECUTABLE,
EXECUTABLE : '' ,
ARGUMENTS : list() ,
CODE : '' ,
FUNCTION : '' ,
ARGS : list() ,
KWARGS : dict() ,
COMMAND : '' ,
SANDBOX : '' ,
ENVIRONMENT : dict() ,
NAMED_ENV : '' ,
PRE_LAUNCH : list() ,
PRE_EXEC : list() ,
PRE_EXEC_SYNC : False ,
POST_LAUNCH : list() ,
POST_EXEC : list() ,
STDOUT : '' ,
STDERR : '' ,
INPUT_STAGING : list() ,
OUTPUT_STAGING : list() ,
STAGE_ON_ERROR : False ,
USE_MPI : None ,
RANKS : 1 ,
CORES_PER_RANK : 1 ,
GPUS_PER_RANK : 0. ,
THREADING_TYPE : '' ,
GPU_TYPE : '' ,
LFS_PER_RANK : 0 ,
MEM_PER_RANK : 0 ,
# deprecated
CPU_PROCESSES : 0 ,
CPU_PROCESS_TYPE: '' ,
CPU_THREADS : 0 ,
CPU_THREAD_TYPE : '' ,
GPU_PROCESSES : 0 ,
GPU_PROCESS_TYPE: '' ,
GPU_THREADS : 0 ,
GPU_THREAD_TYPE : '' ,
LFS_PER_PROCESS : 0 ,
MEM_PER_PROCESS : 0 ,
SCHEDULER : '' ,
WORKER_FILE : '' ,
WORKER_CLASS : '' ,
RESTARTABLE : False ,
TAGS : dict() ,
RAPTOR_ID : '' ,
RAPTOR_FILE : '' ,
RAPTOR_CLASS : '' ,
METADATA : None ,
TIMEOUT : 0.0 ,
CLEANUP : False ,
PILOT : '' ,
}
# --------------------------------------------------------------------------
#
def __init__(self, from_dict=None):
super().__init__(from_dict=from_dict)
# --------------------------------------------------------------------------
#
def _verify(self):
if not self.get('mode'):
self['mode'] = TASK_EXECUTABLE
if self.mode in [TASK_EXECUTABLE, AGENT_SERVICE]:
if not self.get('executable'):
umode = self.mode.upper().replace('.', '_')
raise ValueError("%s Task mode needs 'executable'" % umode)
elif self.mode in [TASK_FUNC, TASK_METH]:
if not self.get('function'):
raise ValueError("TASK_FUNC Task mode needs 'function'")
if self.get('named_env'):
raise ValueError("TASK_FUNC and TASK_METH Task mode does not "
"support 'named_env'")
elif self.mode == TASK_PROC:
if not self.get('executable'):
raise ValueError("TASK_PROC Task mode needs 'executable'")
elif self.mode == TASK_EVAL:
if not self.get('code'):
raise ValueError("TASK_EVAL Task mode needs 'code'")
elif self.mode == TASK_EXEC:
if not self.get('code'):
raise ValueError("TASK_EXEC Task mode needs 'code'")
elif self.mode == TASK_SHELL:
if not self.get('command'):
raise ValueError("TASK_SHELL Task mode needs 'command'")
# backward compatibility for deprecated attributes
if self.cpu_processes:
self.ranks = self.cpu_processes
self.cpu_processes = 0
if self.cpu_threads:
self.cores_per_rank = self.cpu_threads
self.cpu_threads = 0
if self.cpu_thread_type:
self.threading_type = self.cpu_thread_type
self.cpu_thread_type = None
if self.gpu_processes:
self.gpus_per_rank = float(self.gpu_processes)
self.gpu_processes = 0
if self.gpu_process_type:
self.gpu_type = self.gpu_process_type
self.gpu_process_type = None
if self.lfs_per_process:
self.lfs_per_rank = self.lfs_per_process
self.lfs_per_process = 0
if self.mem_per_process:
self.mem_per_rank = self.mem_per_process
self.mem_per_process = 0
if self.scheduler:
self.raptor_id = self.scheduler
self.scheduler = ''
if self.worker_file:
self.raptor_file = self.worker_file
self.worker_file = ''
if self.worker_class:
self.raptor_class = self.worker_class
self.raptor_class = ''
if self.use_mpi is None:
self.use_mpi = bool(self.ranks - 1)
# deprecated and ignored
if self.cpu_process_type: pass
if self.gpu_threads : pass
if self.gpu_thread_type : pass
# if self.mode in [TASK_SHELL, TASK_PROC]:
#
# if self.get('cpu_processes', 1) * self.get('cpu_threads', 1) > 1:
# raise ValueError("TASK_SHELL and TASK_PROC Tasks must be single core")
#
# if self.get('gpu_processes', 0) > 0:
# raise ValueError("TASK_SHELL and TASK_PROC Tasks canont use GPUs")
# ------------------------------------------------------------------------------