__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__ = "MIT"
import copy
import time
import radical.utils as ru
from . import states as rps
from . import constants as rpc
from .staging_directives import expand_description
from .task_description import TaskDescription
_uids = list()
# ------------------------------------------------------------------------------
#
def _check_uid(uid):
# ensure that uid is not yet known
if uid in _uids:
return False
else:
_uids.append(uid)
return True
# ------------------------------------------------------------------------------
#
[docs]class Task(object):
"""Represent a 'task' that is executed on a Pilot.
Tasks allow to control and query the state of this task.
Note:
A task cannot be created directly. The factory method
:func:`rp.TaskManager.submit_tasks` has to be used instead.
Example::
tmgr = rp.TaskManager(session=s)
ud = rp.TaskDescription()
ud.executable = "/bin/date"
task = tmgr.submit_tasks(ud)
"""
# --------------------------------------------------------------------------
# In terms of implementation, a Task is not much more than a dict whose
# content are dynamically updated to reflect the state progression through
# the TMGR components. As a Task is always created via a TMGR, it is
# considered to *belong* to that TMGR, and all activities are actually
# implemented by that TMGR.
#
# Note that this implies that we could create CUs before submitting them
# to a TMGR, w/o any problems. (FIXME?)
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
#
def __init__(self, tmgr, descr, origin):
# ensure that the description is viable
descr.verify()
# 'static' members
self._tmgr = tmgr
self._descr = descr.as_dict()
self._origin = origin
# initialize state
self._session = self._tmgr.session
self._uid = self._descr.get('uid')
self._state = rps.NEW
self._log = tmgr._log
self._exit_code = None
self._stdout = str()
self._stderr = str()
self._return_value = None
self._exception = None
self._exception_detail = None
self._pilot = descr.get('pilot')
self._endpoint_fs = None
self._resource_sandbox = None
self._session_sandbox = None
self._pilot_sandbox = None
self._task_sandbox = None
self._client_sandbox = None
self._callbacks = dict()
# ensure uid is unique
if self._uid:
if not _check_uid(self._uid):
raise ValueError('uid %s is not unique' % self._uid)
else:
self._uid = ru.generate_id('task.%(item_counter)06d', ru.ID_CUSTOM,
ns=self._session.uid)
for m in rpc.TMGR_METRICS:
self._callbacks[m] = dict()
# we always invke the default state cb
self._callbacks[rpc.TASK_STATE][self._default_state_cb.__name__] = {
'cb' : self._default_state_cb,
'cb_data' : None}
# If staging directives exist, expand them to the full dict version. Do
# not, however, expand any URLs as of yet, as we likely don't have
# sufficient information about pilot sandboxes etc.
expand_description(self._descr)
self._tmgr.advance(self.as_dict(), rps.NEW, publish=False, push=False)
# --------------------------------------------------------------------------
#
def __repr__(self):
return '<%s object, uid %s>' % (self.__class__.__qualname__, self._uid)
# --------------------------------------------------------------------------
#
def __str__(self):
return str([self.uid, self.pilot, self.state])
# --------------------------------------------------------------------------
#
def _default_state_cb(self, task, state=None):
self._log.info("[Callback]: task %s state: %s.", self.uid, self.state)
# --------------------------------------------------------------------------
#
def _update(self, task_dict, reconnect=False):
"""State change updater.
This will update the facade object after state changes etc, and is
invoked by whatever component receiving that updated information.
"""
assert task_dict['uid'] == self.uid, 'update called on wrong instance'
# this method relies on state updates to arrive in order
current = self.state
target = task_dict['state']
if not reconnect:
if target not in [rps.FAILED, rps.CANCELED]:
s_tgt = rps._task_state_value(target)
s_cur = rps._task_state_value(current)
if s_tgt - s_cur != 1:
self._log.error('%s: invalid state transition %s -> %s',
self.uid, current, target)
raise RuntimeError('invalid state transition %s: %s -> %s'
% (self.uid, current, target))
# we update all fields
# FIXME: well, not all really :/
# FIXME: setattr is ugly... we should maintain all state in a dict.
for key in ['state', 'stdout', 'stderr', 'exit_code', 'return_value',
'endpoint_fs', 'resource_sandbox', 'session_sandbox',
'pilot', 'pilot_sandbox', 'task_sandbox', 'client_sandbox',
'exception', 'exception_detail']:
val = task_dict.get(key, None)
if val is not None:
setattr(self, "_%s" % key, val)
# # RP's internal processes may update metadata
# if 'description' not in task_dict:
# # this should not happen!
# import pprint
# self._log.debug('invalid task dict: %s', pprint.pformat(task_dict))
if task_dict.get('description', {}).get('metadata'):
self._descr['metadata'] = task_dict['description']['metadata']
# callbacks are not invoked here, but are bulked in the tmgr
# --------------------------------------------------------------------------
#
[docs] def as_dict(self):
"""Returns a Python dictionary representation of the object."""
ret = {
'type': 'task',
'tmgr': self.tmgr.uid,
'uid': self.uid,
'name': self.name,
'state': self.state,
'origin': self.origin,
'exit_code': self.exit_code,
'stdout': self.stdout,
'stderr': self.stderr,
'return_value': self.return_value,
'exception': self.exception,
'exception_detail': self.exception_detail,
'pilot': self.pilot,
'endpoint_fs': self.endpoint_fs,
'resource_sandbox': self.resource_sandbox,
'session_sandbox': self.session_sandbox,
'pilot_sandbox': self.pilot_sandbox,
'task_sandbox': self.task_sandbox,
'client_sandbox': self.client_sandbox,
'description': self.description # this is a deep copy
}
return ret
# --------------------------------------------------------------------------
#
@property
def session(self):
"""radical.pilot.Session: The task's session."""
return self._session
# --------------------------------------------------------------------------
#
@property
def tmgr(self):
"""radical.pilot.TaskManager: The task's manager."""
return self._tmgr
# --------------------------------------------------------------------------
#
@property
def uid(self):
"""str: The task's unique identifier within a :class:`TaskManager`."""
return self._uid
# --------------------------------------------------------------------------
#
@property
def name(self):
"""str: The task's application specified name."""
return self._descr.get('name')
# --------------------------------------------------------------------------
#
@property
def mode(self):
"""str: The task mode."""
return self._descr['mode']
# --------------------------------------------------------------------------
#
@property
def origin(self):
"""str: Indicates where the task was created."""
return self._origin
# --------------------------------------------------------------------------
#
@property
def state(self):
"""str: The current state of the task."""
return self._state
# --------------------------------------------------------------------------
#
@property
def exit_code(self):
"""int: The exit code of the task, if that is already known, or
'None' otherwise.
"""
return self._exit_code
# --------------------------------------------------------------------------
#
@property
def stdout(self):
"""str: A snapshot of the executable's STDOUT stream.
If this property is queried before the task has reached
'DONE' or 'FAILED' state it will return None.
Warning:
This can be inefficient. Output may be incomplete and/or filtered.
"""
return self._stdout
# --------------------------------------------------------------------------
#
@property
def stderr(self):
"""str: A snapshot of the executable's STDERR stream.
If this property is queried before the task has reached
'DONE' or 'FAILED' state it will return None.
Warning:
This can be inefficient. Output may be incomplete and/or filtered.
"""
return self._stderr
# --------------------------------------------------------------------------
#
@property
def return_value(self):
"""Any: The return value for tasks which represent function call (or
None otherwise).
If this property is queried before the task has reached
'DONE' or 'FAILED' state it will always return None.
"""
return self._return_value
# --------------------------------------------------------------------------
#
@property
def exception(self):
"""str: A string representation (`__repr__`) of the exception which
caused the task's `FAILED` state if such one was raised while managing
or executing the task.
If this property is queried before the task has reached
'DONE' or 'FAILED' state it will always return None.
"""
return self._exception
# --------------------------------------------------------------------------
#
@property
def exception_detail(self):
"""str: Additional information about the exception which caused this
task to enter FAILED state.
If this property is queried before the task has reached
'DONE' or 'FAILED' state it will always return None.
"""
return self._exception_detail
# --------------------------------------------------------------------------
#
@property
def pilot(self):
"""str: The pilot ID of this task, if that is already known, or
'None' otherwise.
"""
return self._pilot
# --------------------------------------------------------------------------
#
@property
def sandbox(self):
"""str: An alias for :attr:`~radical.pilot.Task.task_sandbox`."""
return self.task_sandbox
@property
def task_sandbox(self):
"""radical.utils.Url: The full sandbox URL of this task, if that is already
known, or 'None' otherwise.
"""
return self._task_sandbox
@property
def endpoint_fs(self):
"""radical.utils.Url: The URL which is internally used to access the
target resource's root file system.
"""
return self._endpoint_fs
@property
def resource_sandbox(self):
"""radical.utils.Url: The full URL of the path that RP considers the
resource sandbox, i.e., the sandbox on the target resource's file system
that is shared by all sessions which access that resource.
"""
return self._resource_sandbox
@property
def session_sandbox(self):
"""radical.utils.Url: The full URL of the path that RP considers the
session sandbox on the target resource's file system which is shared
by all pilots which access that resource in the current session.
"""
return self._session_sandbox
@property
def pilot_sandbox(self):
"""radical.utils.Url: The full URL of the path that RP considers the
pilot sandbox on the target resource's file system which is shared
by all tasks which are executed by that pilot.
"""
return self._pilot_sandbox
@property
def client_sandbox(self):
"""radical.utils.Url: The full URL of the client sandbox, which is
usually the same as the current working directory of the Python script
in which the RP Session is instantiated.
Note that the URL may not be usable to access
that sandbox from another machine: RP in general knows nothing about
available access endpoints on the local host.
"""
return self._client_sandbox
# --------------------------------------------------------------------------
#
@property
def description(self):
"""dict: The description the task was started with, as a dictionary."""
return copy.deepcopy(self._descr)
# --------------------------------------------------------------------------
#
@property
def metadata(self):
"""The metadata field of the task's description."""
return copy.deepcopy(self._descr.get('metadata'))
# --------------------------------------------------------------------------
#
[docs] def register_callback(self, cb, cb_data=None, metric=None):
"""Add a state-change callback.
Registers a callback function that is triggered every time a
task's state changes.
All callback functions need to have the same signature::
def cb(obj, state) -> None:
...
where ``obj`` is a handle to the object that triggered the callback
and ``state`` is the new state of that object. If ``cb_data`` is given,
then the ``cb`` signature changes to
::
def cb(obj, state, cb_data) -> None:
...
and ``cb_data`` are passed unchanged.
"""
if not metric:
metric = rpc.TASK_STATE
self._tmgr.register_callback(cb, cb_data, metric=metric, uid=self._uid)
# --------------------------------------------------------------------------
#
[docs] def wait(self, state=None, timeout=None):
"""Block for state change.
Returns when the task reaches a specific state or
when an optional timeout is reached.
Arguments:
state (str | list[str], optional): The state(s) that task has to reach
in order for the call to return.
By default `wait` waits for the task to reach a **final**
state, which can be one of the following.
* :data:`rp.states.DONE`
* :data:`rp.states.FAILED`
* :data:`rp.states.CANCELED`
timeout (float, optional): Optional timeout in seconds before the
call returns regardless whether the task has reached the desired
state or not. The default value **None** never times out.
"""
if not state:
states = rps.FINAL
if not isinstance(state, list):
states = [state]
else:
states = state
if self.state in rps.FINAL:
# we will never see another state progression. Raise an error
# (unless we waited for this)
if self.state in states:
return self.state
# FIXME: do we want a raise here, really? This introduces a race,
# really, on application level
# raise RuntimeError("can't wait on a task in final state")
return self.state
start_wait = time.time()
while self.state not in states:
time.sleep(0.1)
if timeout and (timeout <= (time.time() - start_wait)):
break
if self._tmgr._terminate.is_set():
break
return self.state
# --------------------------------------------------------------------------
#
[docs] def cancel(self):
"""Cancel the task."""
self._tmgr.cancel_tasks(self.uid)
# ------------------------------------------------------------------------------
#
class TaskDict(ru.TypedDict):
"""Dictionary encoded Task.
rp.Task is an API level object and as that is not a useful internal
representation of an task on the level of RP components and message
channels. Instead, a task is there represented as a dictionary. To
facilitate a minimum of documentation and type consistency, this class
defines such task dictionaries as `TypedDict` objects.
"""
_schema = {
# where the task got created
# CLIENT | AGENT
'origin' : str,
# original task description used to create the task
'description' : TaskDescription,
# # what scope should be notified for task state changes
# # CLIENT, AGENT, RAPTOR - can be empty
# 'notification': [str],
# # what exact resources were used to execute the task
# # [[node_uid:core_id:gpu_id, ...]]
# 'resources' : [None],
}
_defaults = {
'origin' : None,
'description' : None,
# 'notification': [],
# 'resources' : [],
}
# ------------------------------------------------------------------------------