Source code for radical.pilot.raptor_tasks


from typing import Dict, List, Any, Optional

import radical.utils as ru

from .task             import Task
from .task_description import TaskDescription


# ------------------------------------------------------------------------------
#
[docs]class Raptor(Task): ''' RAPTOR ('RAPid Task executOR') is a task executor which, other than other RADICAL-Pilot executors can handle function tasks. A `Raptor` must be submitted to a pilot. It will be associated with `RaptorWorker` instances on that pilot and use those workers to rapidly execute tasks. Raptors excel at high throughput execution for large numbers of short running tasks. However, they have limited capabilities with respect to managing task data dependencies, multinode tasks, MPI executables, and tasks with heterogeneous resource requirements. ''' # -------------------------------------------------------------------------- #
[docs] def submit_workers(self, descriptions: List[TaskDescription]) -> List[Task]: ''' Submit a set of workers for this `Raptor` instance. Args: descriptions (List[TaskDescription]): ;aunch a raptor worker for each provided description. Returns: List[Tasks]: a list of `rp.Task` instances, one for each created worker task The method will return immediately without waiting for actual task instantiation. The submitted tasks will operate solely on behalf of the `Raptor` master this method is being called on. ''' descriptions = ru.as_list(descriptions) for td in descriptions: td.raptor_id = self.uid return self._tmgr.submit_workers(descriptions)
# -------------------------------------------------------------------------- #
[docs] def submit_tasks(self, descriptions: List[TaskDescription]) -> List[Task]: ''' Submit a set of tasks to this `Raptor` instance. Args: descriptions (List[TaskDescription]): ;aunch a raptor worker for each provided description. Returns: List[Tasks]: a list of `rp.Task` instances, one for each task. The tasks might not get executed until a worker is available for this Raptor instance. ''' descriptions = ru.as_list(descriptions) for td in descriptions: td.raptor_id = self.uid return self._tmgr.submit_tasks(descriptions)
# -------------------------------------------------------------------------- #
[docs] def rpc(self, cmd, *args, **kwargs): ''' Send a raptor command, wait for the response, and return the result. Args: rpc (str): name of the rpc call to invoke *args (*List[Any]): unnamed arguments **kwargs (**Dict[str, Any])): named arguments Returns: Dict[str, Any]: the returned dictionary has the following fields: - out: captured standard output - err: captured standard error - ret: return value of the call (can be any serializable type) - exc: tuple of exception type (str) and error message (str) ''' if not self._pilot: raise RuntimeError('not assigned to a pilot yet, cannot run rpc') kwargs['raptor_cmd'] = cmd return self._tmgr.pilot_rpc(self._pilot, 'raptor_rpc', *args, rpc_addr=self.uid, **kwargs)
# ------------------------------------------------------------------------------ # RaptorMaster = Raptor ''' The `RaptorMaster` task is an alias for the `Raptor` class and is provided for naming symmetry with `RaptorWorker`. ''' # ------------------------------------------------------------------------------ # class RaptorWorker(Task): ''' A `RaptorWorker` task is a thin wrapper around a normal `rp.Task` instance for the purpose of representing a Raptor worker task. NOTE: this class is not yet used. ''' pass # ------------------------------------------------------------------------------