# pylint: disable=protected-access
__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__ = "MIT"
import copy
import time
import queue
import threading as mt
import radical.utils as ru
from . import PilotManager
from . import states as rps
from . import constants as rpc
from .messages import RPCRequestMessage, RPCResultMessage
from .staging_directives import complete_url
# ------------------------------------------------------------------------------
#
[docs]class Pilot(object):
"""Represent a resource overlay on a local or remote resource.
Note:
A Pilot cannot be created directly. The factory method
:func:`radical.pilot.PilotManager.submit_pilots` has to be
used instead.
Example::
pm = radical.pilot.PilotManager(session=s)
pd = radical.pilot.PilotDescription()
pd.resource = "local.localhost"
pd.cores = 2
pd.runtime = 5 # minutes
pilot = pm.submit_pilots(pd)
"""
# --------------------------------------------------------------------------
# In terms of implementation, a Pilot is not much more than a dict whose
# content are dynamically updated to reflect the state progression through
# the PMGR components. As a Pilot is always created via a PMGR, it is
# considered to *belong* to that PMGR, and all activities are actually
# implemented by that PMGR.
#
# Note that this implies that we could create Pilots before submitting them
# to a PMGR, w/o any problems. (FIXME?)
# --------------------------------------------------------------------------
# --------------------------------------------------------------------------
#
def __init__(self, pmgr: PilotManager, descr):
# sanity checks on description
if not descr.runtime:
raise ValueError('pilot runtime must be defined')
if descr.runtime <= 0:
raise ValueError('pilot runtime must be positive')
if not descr.resource:
raise ValueError('pilot target resource must be defined')
# initialize state
self._descr = descr.as_dict()
self._pmgr = pmgr
self._session = self._pmgr.session
self._prof = self._session._prof
self._uid = self._descr.get('uid')
self._state = rps.NEW
self._log = pmgr._log
self._pilot_dict = dict()
self._callbacks = dict()
self._cb_lock = ru.RLock()
self._tmgr = None
# pilot failures can trigger app termination
self._exit_on_error = self._descr.get('exit_on_error')
# ensure uid is unique
if self._uid:
if not self._pmgr.check_uid(self._uid):
raise ValueError('uid %s is not unique' % self._uid)
else:
self._uid = ru.generate_id('pilot.%(item_counter)04d', ru.ID_CUSTOM,
ns=self._session.uid)
for m in rpc.PMGR_METRICS:
self._callbacks[m] = dict()
# we always invoke the default state cb
self._callbacks[rpc.PILOT_STATE][self._default_state_cb.__name__] = {
'cb' : self._default_state_cb,
'cb_data' : None}
# `as_dict()` needs `pilot_dict` and other attributes. Those should all
# be available at this point (apart from the sandboxes), so we now
# query for those sandboxes.
self._pilot_jsurl = ru.Url()
self._pilot_jshop = ru.Url()
self._endpoint_fs = ru.Url()
self._resource_sandbox = ru.Url()
self._session_sandbox = ru.Url()
self._pilot_sandbox = ru.Url()
self._client_sandbox = ru.Url()
pilot = self.as_dict()
self._pilot_jsurl, self._pilot_jshop \
= self._session._get_jsurl (pilot)
self._endpoint_fs = self._session._get_endpoint_fs (pilot)
self._resource_sandbox = self._session._get_resource_sandbox(pilot)
self._session_sandbox = self._session._get_session_sandbox (pilot)
self._pilot_sandbox = self._session._get_pilot_sandbox (pilot)
self._client_sandbox = self._session._get_client_sandbox()
# contexts for staging url expansion
# NOTE: no task sandboxes defined!
self._rem_ctx = {'pwd' : self._pilot_sandbox,
'client' : self._client_sandbox,
'pilot' : self._pilot_sandbox,
'resource': self._resource_sandbox,
'session' : self._session_sandbox,
'endpoint': self._endpoint_fs}
self._loc_ctx = {'pwd' : self._client_sandbox,
'client' : self._client_sandbox,
'pilot' : self._pilot_sandbox,
'resource': self._resource_sandbox,
'session' : self._session_sandbox,
'endpoint': self._endpoint_fs}
# we need to expand plaaceholders in the sandboxes
# FIXME: this code is a duplication from the pilot launcher code
expand = dict()
for k,v in pilot['description'].items():
if v is None:
v = ''
expand['pd.%s' % k] = v
if isinstance(v, str):
expand['pd.%s' % k.upper()] = v.upper()
expand['pd.%s' % k.lower()] = v.lower()
else:
expand['pd.%s' % k.upper()] = v
expand['pd.%s' % k.lower()] = v
self._endpoint_fs .path = self._endpoint_fs .path % expand
self._resource_sandbox.path = self._resource_sandbox.path % expand
self._session_sandbox .path = self._session_sandbox .path % expand
self._pilot_sandbox .path = self._pilot_sandbox .path % expand
# hook into the control pubsub for rpc handling
self._rpc_reqs = dict()
ctrl_addr_sub = self._session._reg['bridges.control_pubsub.addr_sub']
ctrl_addr_pub = self._session._reg['bridges.control_pubsub.addr_pub']
self._ctrl_pub = ru.zmq.Publisher(rpc.CONTROL_PUBSUB, url=ctrl_addr_pub,
log=self._log, prof=self._prof)
ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub,
log=self._log, prof=self._prof, cb=self._control_cb,
topic=rpc.CONTROL_PUBSUB)
# --------------------------------------------------------------------------
#
def __repr__(self):
return '<%s object, uid %s>' % (self.__class__.__qualname__, self._uid)
# --------------------------------------------------------------------------
#
def __str__(self):
return str([self.uid, self.resource, self.state])
# --------------------------------------------------------------------------
#
def _default_state_cb(self, pilot, state=None):
uid = self.uid
state = self.state
self._log.info("[Callback]: pilot %s state: %s.", uid, state)
if state == rps.FAILED and self._exit_on_error:
self._log.error("[Callback]: pilot '%s' failed (exit)", uid)
# There are different ways to tell main...
ru.cancel_main_thread('int')
# raise RuntimeError('pilot %s failed - fatal!' % self.uid)
# os.kill(os.getpid())
# sys.exit()
# --------------------------------------------------------------------------
#
def _update(self, pilot_dict):
"""Trigger an update.
This will update the facade object after state changes etc, and is
invoked by whatever component receiving that updated information.
Returns:
booL: True if state changed, False otherwise.
"""
self._log.debug('update %s', pilot_dict['uid'])
if pilot_dict['uid'] != self.uid:
self._log.error('invalid uid: %s / %s', pilot_dict['uid'], self.uid)
assert pilot_dict['uid'] == self.uid, 'update called on wrong instance'
# NOTE: this method relies on state updates to arrive in order and
# without gaps.
current = self.state
target = pilot_dict['state']
if target not in [rps.FAILED, rps.CANCELED]:
# ensure valid state transition
state_diff = rps._pilot_state_value(target) - \
rps._pilot_state_value(current)
if state_diff > 1:
raise RuntimeError('%s: invalid state transition %s -> %s',
self.uid, current, target)
self._state = target
# FIXME: this is a hack to get the resource details into the pilot
resources = pilot_dict.get('resources') or {}
rm_info = resources.get('rm_info')
if rm_info:
del pilot_dict['resources']['rm_info']
pilot_dict['resource_details'] = rm_info
for k in list(pilot_dict.keys()):
if pilot_dict[k] is None:
del pilot_dict[k]
# keep all other information around
ru.dict_merge(self._pilot_dict, pilot_dict, ru.OVERWRITE)
# invoke pilot specific callbacks
# FIXME: this iteration needs to be thread-locked!
for _,cb_val in self._callbacks[rpc.PILOT_STATE].items():
cb = cb_val['cb']
cb_data = cb_val['cb_data']
self._log.debug('call %s', cb)
self._log.debug('%s calls cb %s', self.uid, cb)
if cb_data: cb([self], cb_data)
else : cb([self])
# ask pmgr to invoke any global callbacks
self._pmgr._call_pilot_callbacks(self)
# --------------------------------------------------------------------------
#
[docs] def as_dict(self):
"""Dictionary representation.
Returns:
dict: a Python dictionary representation of the object.
"""
ret = {'session' : self.session.uid,
'pmgr' : self.pmgr.uid,
'uid' : self.uid,
'type' : 'pilot',
'state' : self.state,
'log' : self.log,
'stdout' : self.stdout,
'stderr' : self.stderr,
'resource' : self.resource,
'resources' : self.resources,
'endpoint_fs' : str(self._endpoint_fs),
'resource_sandbox' : str(self._resource_sandbox),
'session_sandbox' : str(self._session_sandbox),
'pilot_sandbox' : str(self._pilot_sandbox),
'client_sandbox' : str(self._client_sandbox),
'js_url' : str(self._pilot_jsurl),
'js_hop' : str(self._pilot_jshop),
'description' : self.description, # this is a deep copy
'resource_details' : self.resource_details
}
return ret
# --------------------------------------------------------------------------
#
@property
def session(self):
"""Session: The pilot's session."""
return self._session
# --------------------------------------------------------------------------
#
@property
def pmgr(self):
"""PilotManager: The pilot's manager."""
return self._pmgr
# -------------------------------------------------------------------------
#
@property
def resource_details(self):
"""dict: agent level resource information."""
return self._pilot_dict.get('resource_details')
# --------------------------------------------------------------------------
#
@property
def uid(self):
"""str: The pilot's unique identifier within a :class:`PilotManager`."""
return self._uid
# --------------------------------------------------------------------------
#
@property
def state(self):
"""str: The current :py:mod:`state <radical.pilot.states>` of the pilot."""
return self._state
# --------------------------------------------------------------------------
#
@property
def log(self):
"""list[tuple]: A list of human readable [timestamp, string] tuples describing
various events during the pilot's lifetime. Those strings are not
normative, only informative!
"""
return self._pilot_dict.get('log')
# --------------------------------------------------------------------------
#
@property
def stdout(self):
"""str: A snapshot of the pilot's STDOUT stream.
If this property is queried before the pilot has reached
'DONE' or 'FAILED' state it will return None.
Warning:
This can be inefficient. Output may be incomplete and/or filtered.
"""
return self._pilot_dict.get('stdout')
# --------------------------------------------------------------------------
#
@property
def stderr(self):
"""str: A snapshot of the pilot's STDERR stream.
If this property is queried before the pilot has reached
'DONE' or 'FAILED' state it will return None.
Warning:
This can be inefficient. Output may be incomplete and/or filtered.
"""
return self._pilot_dict.get('stderr')
# --------------------------------------------------------------------------
#
@property
def resource(self):
"""str: The resource tag of this pilot."""
return self._descr.get('resource')
# --------------------------------------------------------------------------
#
@property
def resources(self):
"""str: The amount of resources used by this pilot."""
return self._pilot_dict.get('resources')
# --------------------------------------------------------------------------
#
@property
def pilot_sandbox(self):
"""str: The full sandbox URL of this pilot, if that is already
known, or 'None' otherwise.
"""
# NOTE: The pilot has a sandbox property, containing the full sandbox
# path, which is used by the pmgr to stage data back and forth.
# However, the full path as visible from the pmgr side might not
# be what the agent is seeing, specifically in the case of
# non-shared filesystems (OSG). The agent thus uses
# `$PWD` as sandbox, with the assumption that this will
# get mapped to whatever is here returned as sandbox URL.
#
# There is thus implicit knowledge shared between the RP client
# and the RP agent that `$PWD` *is* the sandbox! The same
# implicitly also holds for the staging area, which is relative
# to the pilot sandbox.
if self._pilot_sandbox:
return str(self._pilot_sandbox)
@property
def endpoint_fs(self):
"""radical.utils.Url: The URL which is internally used to access the
target resource's root file system.
"""
return self._endpoint_fs
@property
def resource_sandbox(self):
"""radical.utils.Url: The full URL of the path that RP considers the
resource sandbox, i.e., the sandbox on the target resource's file system
which is shared by all sessions which access that resource.
"""
return self._resource_sandbox
@property
def session_sandbox(self):
"""radical.utils.Url: The full URL of the path that RP considers the
session sandbox on the target resource's file system which is shared by
all pilots which access that resource in the current session.
"""
return self._session_sandbox
@property
def client_sandbox(self):
return self._client_sandbox
# --------------------------------------------------------------------------
#
@property
def description(self):
"""dict: The description the pilot was started with, as a dictionary."""
return copy.deepcopy(self._descr)
# --------------------------------------------------------------------------
#
[docs] def register_callback(self, cb, metric=rpc.PILOT_STATE, cb_data=None):
"""Add callback for state changes.
Registers a callback function that is triggered every time the
pilot's state changes.
All callback functions need to have the same signature::
def cb(obj, state)
where ``obj`` is a handle to the object that triggered the callback
and ``state`` is the new state of that object. If *cb_data* is given,
then the *cb* signature changes to
::
def cb(obj, state, cb_data)
and *cb_data* are passed along.
"""
if metric not in rpc.PMGR_METRICS :
raise ValueError ("invalid pmgr metric '%s'" % metric)
with self._cb_lock:
cb_id = id(cb)
self._callbacks[metric][cb_id] = {'cb' : cb,
'cb_data' : cb_data}
# --------------------------------------------------------------------------
#
def unregister_callback(self, cb, metric=rpc.PILOT_STATE):
if metric and metric not in rpc.PMGR_METRICS :
raise ValueError ("invalid pmgr metric '%s'" % metric)
if not metric:
metrics = rpc.PMGR_METRICS
elif isinstance(metric, list):
metrics = metric
else:
metrics = [metric]
with self._cb_lock:
for metric in metrics:
if cb:
to_delete = [id(cb)]
else:
to_delete = list(self._callbacks[metric].keys())
for cb_id in to_delete:
if cb_id not in self._callbacks[metric]:
raise ValueError("unknown callback '%s'" % cb_id)
del self._callbacks[metric][cb_id]
# --------------------------------------------------------------------------
#
[docs] def wait(self, state=None, timeout=None):
"""Block for state change.
Returns when the pilot reaches a specific state or
when an optional timeout is reached.
Arguments:
state (list[str]):
The :py:mod:`state(s) <radical.pilot.states>` that pilot has to reach in
order for the call to return.
By default `wait` waits for the pilot to reach a **final**
state, which can be one of the following:
* :data:`radical.pilot.states.DONE`
* :data:`radical.pilot.states.FAILED`
* :data:`radical.pilot.states.CANCELED`
timeout (float):
Optional timeout in seconds before the call returns regardless
whether the pilot has reached the desired state or not. The
default value **None** never times out.
"""
if not state : states = rps.FINAL
elif not isinstance(state, list): states = [state]
else : states = state
if self.state in rps.FINAL:
# we will never see another state progression. Raise an error
# (unless we waited for this)
if self.state in states:
return
# FIXME: do we want a raise here, really? This introduces a race,
# really, on application level
# raise RuntimeError("can't wait on a pilot in final state")
return self.state
start_wait = time.time()
while self.state not in states:
time.sleep(0.1)
if timeout and (timeout <= (time.time() - start_wait)):
break
if self._pmgr._terminate.is_set():
break
return self.state
# --------------------------------------------------------------------------
#
[docs] def cancel(self):
"""Cancel the pilot."""
self._pmgr.cancel_pilots(self._uid)
# --------------------------------------------------------------------------
#
def attach_tmgr(self, tmgr) -> None:
if self._tmgr:
raise RuntimeError('this pilot is already attached to %s'
% self._tmgr.uid)
self._tmgr = tmgr
# if self._task_waitpool:
# self._tmgr.submit_tasks(self._task_waitpool)
#
# if self._raptor_waitpool:
# self._tmgr.submit_tasks(self._raptor_waitpool)
# --------------------------------------------------------------------------
#
def submit_tasks(self, descriptions):
for descr in descriptions:
descr.pilot = self.uid
if not self._tmgr:
# self._task_waitpool.append(descriptions)
# return # FIXME: cannot return tasks here
raise RuntimeError('pilot is not attached to a task manager, yet')
return self._tmgr.submit_tasks(descriptions)
# --------------------------------------------------------------------------
#
def submit_raptors(self, descriptions):
if not self._tmgr:
# self._task_waitpool.append(descriptions)
# return # FIXME: cannot return tasks here
raise RuntimeError('pilot is not attached to a task manager, yet')
return self._tmgr.submit_raptors(descriptions, self.uid)
# --------------------------------------------------------------------------
#
[docs] def prepare_env(self, env_name, env_spec):
"""Prepare a virtual environment.
Request the preparation of a task or worker environment on the target
resource. This call will block until the env is created.
Arguments:
env_name (str): name of the environment to prepare.
env_spec (dict): specification of the environment to prepare,
like::
{'type' : 'venv',
'version' : '3.7',
'pre_exec': ['module load python'],
'setup' : ['radical.pilot==1.0', 'pandas']},
{'type' : 'conda',
'version' : '3.8',
'setup' : ['numpy']}
{'type' : 'conda',
'version': '3.8',
'path' : '/path/to/ve',
'setup' : ['numpy']}
where the *type* specifies the environment type, *version*
specifies the Python version to deploy, and *setup* specifies
how the environment is to be prepared. If *path* is specified
the env will be created at that path. If *path* is not
specified, RP will place the named env in the pilot sandbox
(under :file:`env/named_env_{name}`). If a VE exists at that
path, it will be used as is (an update is not performed).
*pre_exec* commands are executed before env creation and setup
are attempted.
Note:
The optional `version` specifier is only interpreted up to minor
version, subminor and less are ignored.
"""
self.rpc('prepare_env', env_name=env_name, env_spec=env_spec)
# --------------------------------------------------------------------------
#
[docs] def stage_in(self, sds):
"""Stage files "in".
Stages the content of the :py:mod:`~radical.pilot.staging_directives`
to the pilot sandbox.
Please note the documentation of
:func:`radical.pilot.staging_directives.complete_url` for details on
path and sandbox semantics.
"""
sds = ru.as_list(sds)
for sd in sds:
sd['prof_id'] = self.uid
sd['source'] = str(complete_url(sd['source'], self._loc_ctx, self._log))
sd['target'] = str(complete_url(sd['target'], self._rem_ctx, self._log))
# ask the pmgr to send the staging requests to the stager
self._pmgr._pilot_staging_input(sds)
return [sd['target'] for sd in sds]
# --------------------------------------------------------------------------
#
def _control_cb(self, topic, msg_data):
# we only listen for RPCResponse messages
try:
msg = ru.zmq.Message.deserialize(msg_data)
if isinstance(msg, RPCResultMessage):
self._log.debug_4('handle rpc result %s', msg)
if msg.uid in self._rpc_reqs:
self._rpc_reqs[msg.uid]['res'] = msg
self._rpc_reqs[msg.uid]['evt'].set()
except:
pass
# --------------------------------------------------------------------------
#
[docs] def rpc(self, cmd, *args, rpc_addr=None, **kwargs):
'''Remote procedure call.
Send am RPC command and arguments to the pilot and wait for the
response. This is a synchronous operation at this point, and it is not
thread safe to have multiple concurrent RPC calls.
'''
# RPC's can only be handled in `PMGR_ACTIVE` state
# FIXME: RPCs will hang vorever if the pilot dies after sending the msg
self.wait(rps.PMGR_ACTIVE)
if rpc_addr is None:
rpc_addr = self.uid
rpc_id = ru.generate_id('%s.rpc' % self._uid)
rpc_req = RPCRequestMessage(uid=rpc_id, cmd=cmd, addr=rpc_addr,
args=args, kwargs=kwargs)
self._rpc_reqs[rpc_id] = {
'req': rpc_req,
'res': None,
'evt': mt.Event(),
'time': time.time(),
}
self._ctrl_pub.put(rpc.CONTROL_PUBSUB, rpc_req)
while True:
if not self._rpc_reqs[rpc_id]['evt'].wait(timeout=60):
self._log.debug('still waiting for rpc request %s', rpc_id)
continue
rpc_res = self._rpc_reqs[rpc_id]['res']
if rpc_res.exc:
raise RuntimeError('rpc failed: %s' % rpc_res.exc)
return rpc_res.val
# --------------------------------------------------------------------------
#
[docs] def stage_out(self, sds=None):
"""Stage data "out".
Fetches the content of the :py:mod:`~radical.pilot.staging_directives`
from the pilot sandbox.
Please note the documentation of
:func:`radical.pilot.staging_directives.complete_url` for details on
path and sandbox semantics.
"""
sds = ru.as_list(sds)
if not sds:
sds = [{'source': 'pilot:///staging_output.tgz',
'target': 'client:///staging_output.tgz',
'action': rpc.TRANSFER}]
for sd in sds:
sd['prof_id'] = self.uid
for sd in sds:
sd['source'] = str(complete_url(sd['source'], self._rem_ctx, self._log))
sd['target'] = str(complete_url(sd['target'], self._loc_ctx, self._log))
# ask the pmgr to send the staging reuests to the stager
self._pmgr._pilot_staging_output(sds)
return [sd['target'] for sd in sds]
# ------------------------------------------------------------------------------