Source code for radical.pilot.pilot_manager


# pylint: disable=protected-access

__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__   = "MIT"


import os
import time
import threading as mt

import radical.utils as ru

from .  import utils     as rpu
from .  import states    as rps
from .  import constants as rpc

from .staging_directives import expand_staging_directives


# bulk callbacks are implemented, but are currently not used nor exposed.
_USE_BULK_CB = False
if os.environ.get('RADICAL_PILOT_BULK_CB', '').lower() in ['true', 'yes', '1']:
    _USE_BULK_CB = True


# ------------------------------------------------------------------------------
#
[docs]class PilotManager(rpu.ClientComponent): """Manage Pilot instances. A PilotManager manages :class:`rp.Pilot` instances that are submitted via the :func:`radical.pilot.PilotManager.submit_pilots` method. It is possible to attach one or more :ref:`HPC resources </tutorials/configuration.ipynb#Platform-description>` to a PilotManager to outsource machine specific configuration parameters to an external configuration file. Example:: s = rp.Session() pm = rp.PilotManager(session=s) pd = rp.PilotDescription() pd.resource = "futuregrid.alamo" pd.cpus = 16 p1 = pm.submit_pilots(pd) # create first pilot with 16 cores p2 = pm.submit_pilots(pd) # create second pilot with 16 cores # Create a workload of 128 '/bin/sleep' tasks tasks = [] for task_count in range(0, 128): t = rp.TaskDescription() t.executable = "/bin/sleep" t.arguments = ['60'] tasks.append(t) # Combine the two pilots, the workload and a scheduler via # a TaskManager. tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN) tm.add_pilot(p1) tm.submit_tasks(tasks) The pilot manager can issue notification on pilot state changes. Whenever state notification arrives, any callback registered for that notification is fired. Note: State notifications can arrive out of order wrt the pilot state model! """ # -------------------------------------------------------------------------- #
[docs] def __init__(self, session, cfg='default'): """Creates a new PilotManager and attaches is to the session. Arguments: session (rp.Session): The session instance to use. uid (str): ID for pilot manager, to be used for reconnect cfg (dict, str): The configuration or name of configuration to use. Returns: rp.PilotManager: A new `PilotManager` object. """ assert session._role == session._PRIMARY, 'pmgr needs primary session' # initialize the base class (with no intent to fork) self._uid = ru.generate_id('pmgr.%(item_counter)04d', ru.ID_CUSTOM, ns=session.uid) self._uids = list() # known UIDs self._pilots = dict() self._pilots_lock = mt.RLock() self._callbacks = dict() self._pcb_lock = mt.RLock() self._terminate = mt.Event() self._closed = False for m in rpc.PMGR_METRICS: self._callbacks[m] = dict() # NOTE: `name` and `cfg` are overloaded, the user cannot point to # a predefined config and amed it at the same time. This might # be ok for the session, but introduces a minor API inconsistency. # name = None if isinstance(cfg, str): name = cfg cfg = None cfg = ru.Config('radical.pilot.pmgr', name=name, cfg=cfg) cfg.uid = self._uid cfg.owner = self._uid cfg.sid = session.uid cfg.path = session.path cfg.reg_addr = session.reg_addr cfg.heartbeat = session.cfg.heartbeat cfg.client_sandbox = session._get_client_sandbox() super().__init__(cfg, session=session) self.start() self._log.info('started pmgr %s', self._uid) self._rep = self._session._get_reporter(name=self._uid) self._rep.info('<<create pilot manager') # create pmgr bridges and components, use session cmgr for that self._cmgr = rpu.ComponentManager(cfg.sid, cfg.reg_addr, self._uid) self._cmgr.start_bridges(self._cfg.bridges) self._cmgr.start_components(self._cfg.components) self._session._register_pmgr(self) # The output queue is used to forward submitted pilots to the # launching component. self.register_output(rps.PMGR_LAUNCHING_PENDING, rpc.PMGR_LAUNCHING_QUEUE) self._stager = rpu.StagingHelper(self._log, self._prof) # register the state notification pull cb and hb pull cb # FIXME: we may want to have the frequency configurable # FIXME: this should be a tailing cursor in the update worker self.register_timed_cb(self._pilot_heartbeat_cb, timer=self._cfg['db_poll_sleeptime']) # also listen to the state pubsub for pilot state changes self.register_subscriber(rpc.STATE_PUBSUB, self._state_sub_cb) # let session know we exist self._session._register_pmgr(self) self._prof.prof('setup_done', uid=self._uid) self._rep.ok('>>ok\n')
# -------------------------------------------------------------------------- # def initialize(self): # the manager must not carry bridge and component handles across forks ru.atfork(self._atfork_prepare, self._atfork_parent, self._atfork_child) # -------------------------------------------------------------------------- # # EnTK forks, make sure we don't carry traces of children across the fork # def _atfork_prepare(self): pass def _atfork_parent(self) : pass def _atfork_child(self) : self._bridges = dict() self._components = dict() # -------------------------------------------------------------------------- # def finalize(self): self._cmgr.close() self._fail_missing_pilots() # -------------------------------------------------------------------------- #
[docs] def close(self, terminate=True): """Shut down the PilotManager and all its components. Arguments: terminate (bool): cancel non-final pilots if True (default) Note: Pilots cannot be reconnected to after termination. """ if self._closed: return self._rep.info('<<close pilot manager') # disable callbacks during shutdown # FIXME: really? with self._pcb_lock: for m in rpc.PMGR_METRICS: self._callbacks[m] = dict() # If terminate is set, kill all pilots. if terminate: # skip reporting for `wait_pilots` is_rep_enabled = self._rep._enabled self._rep._enabled = False self.cancel_pilots(_timeout=1) self._rep._enabled = is_rep_enabled self.kill_pilots(_timeout=10) self._terminate.set() self._cmgr.close() self._log.info("Closed PilotManager %s." % self._uid) self._closed = True self._rep.ok('>>ok\n') # dump json json = self.as_dict() # json['_id'] = self.uid json['type'] = 'pmgr' json['uid'] = self.uid json['pilots'] = [pilot.as_dict() for pilot in self._pilots.values()] tgt = '%s/%s.json' % (self._session.path, self.uid) ru.write_json(json, tgt)
# -------------------------------------------------------------------------- #
[docs] def as_dict(self): """Returns a dictionary representation of the PilotManager object.""" ret = { 'uid': self.uid, 'cfg': self.cfg } return ret
# -------------------------------------------------------------------------- # def __str__(self): """Returns a string representation of the PilotManager object.""" return str(self.as_dict()) # -------------------------------------------------------------------------- # def _pilot_heartbeat_cb(self): if self._terminate.is_set(): return False # send heartbeat self._pilot_send_hb() return True # -------------------------------------------------------------------------- # def _state_sub_cb(self, topic, msg): if self._terminate.is_set(): return False # self._log.debug('state event: %s', msg) cmd = msg.get('cmd') arg = msg.get('arg') if cmd != 'update': self._log.debug('ignore state cb msg with cmd %s', cmd) return True if isinstance(arg, list): things = arg else : things = [arg] for thing in things: if 'type' in thing and thing['type'] == 'pilot': self._log.debug('state push: %s: %s', thing['uid'], thing['state']) # we got the state update from the state callback - don't # publish it again self._update_pilot(thing, publish=False) return True # -------------------------------------------------------------------------- #
[docs] def control_cb(self, topic, msg): if self._terminate.is_set(): return False cmd = msg['cmd'] arg = msg['arg'] self._log.debug_9('got control cmd %s: %s', cmd, arg) if cmd == 'pilot_activate': pilot = arg['pilot'] self._update_pilot(pilot, publish=True) # store resource json for RA fname = '%s/%s.resources.json' % (self._cfg.path, pilot['uid']) ru.write_json(fname, pilot['resources'])
# -------------------------------------------------------------------------- # def _update_pilot(self, pilot_dict, publish=False, advance=True): # FIXME: this is breaking the bulk! pid = pilot_dict['uid'] # state = pilot_dict['state'] with self._pilots_lock: # we don't care about pilots we don't know if pid not in self._pilots: return # this is not an error # only update on state changes current = self._pilots[pid].state target = pilot_dict['state'] # always update the pilot instance, even if state didn't change if current == target: self._pilots[pid]._update(pilot_dict) return target, passed = rps._pilot_state_progress(pid, current, target) # self._log.debug('%s current: %s', pid, current) # self._log.debug('%s target : %s', pid, target ) # self._log.debug('%s passed : %s', pid, passed ) if target in [rps.CANCELED, rps.FAILED]: # don't replay intermediate states passed = passed[-1:] for s in passed: self._log.debug('%s advance: %s', pid, s ) # we got state from either pubsub or DB, so don't publish again. # we also don't need to maintain bulks for that reason. pilot_dict['state'] = s self._pilots[pid]._update(pilot_dict) if advance: self.advance(pilot_dict, s, publish=publish, push=False) if s in [rps.PMGR_ACTIVE]: self._log.info('pilot %s is %s: %s [%s]', pid, s, pilot_dict.get('lm_info'), pilot_dict.get('lm_detail')) # -------------------------------------------------------------------------- # def _call_pilot_callbacks(self, pilot): state = pilot.state with self._pcb_lock: for cb_dict in self._callbacks[rpc.PILOT_STATE].values(): cb = cb_dict['cb'] cb_data = cb_dict['cb_data'] # print ' ~~~ call PCBS: %s -> %s : %s [%s]' \ # % (self.uid, pilot.state, cb_name, cb_data) self._log.debug('pmgr calls cb %s for %s', pilot.uid, cb) if _USE_BULK_CB: if cb_data: cb([pilot], cb_data) else : cb([pilot]) else: if cb_data: cb(pilot, state, cb_data) else : cb(pilot, state) # -------------------------------------------------------------------------- # def _pilot_send_hb(self, pid=None): self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'pmgr_heartbeat', 'arg' : {'pmgr' : self.uid}}) # -------------------------------------------------------------------------- # def _pilot_staging_input(self, sds): """Run some staging directives for a pilot.""" for sd in expand_staging_directives(sds): self._stager.handle_staging_directive(sd) # -------------------------------------------------------------------------- # def _pilot_staging_output(self, sds): """Run some staging directives for a pilot.""" for sd in expand_staging_directives(sds): self._stager.handle_staging_directive(sd) # -------------------------------------------------------------------------- # @property def uid(self): """str: The unique id.""" return self._uid # -------------------------------------------------------------------------- #
[docs] def list_pilots(self): """Get the UIDs of the managed :class:`rp.Pilots`. Returns: list[str]: A list of :class:`rp.Pilot` UIDs. """ with self._pilots_lock: ret = list(self._pilots.keys()) return ret
# -------------------------------------------------------------------------- #
[docs] def submit_pilots(self, descriptions): """Submit one or more `rp.Pilot` instances to the pilot manager. Arguments: descriptions (rp.PilotDescription | list[rp.PilotDescription]): The description of the pilot instance(s) to create. Returns: list[rp.Pilot]: A list of :class:`rp.Pilot` objects. """ from .pilot import Pilot ret_list = True if not isinstance(descriptions, list): ret_list = False descriptions = [descriptions] if len(descriptions) == 0: raise ValueError('cannot submit no pilot descriptions') self._rep.info('<<submit %d pilot(s)' % len(descriptions)) # create the pilot instance pilots = list() pilot_docs = list() for pd in descriptions : pilot = Pilot(pmgr=self, descr=pd) pilots.append(pilot) pilot_doc = pilot.as_dict() pilot_docs.append(pilot_doc) # keep pilots around with self._pilots_lock: self._pilots[pilot.uid] = pilot if pd.get('nodes'): self._rep.plain('\n\t%s %-20s %6d nodes' % (pilot.uid, pd['resource'], pd['nodes'])) else: self._rep.plain('\n\t%s %-20s %6d cores %6d gpus' % (pilot.uid, pd['resource'], pd.get('cores', 0), pd.get('gpus', 0))) # initial state advance to 'NEW' # FIXME: we should use update_pilot(), but that will not trigger an # advance, since the state did not change. We would then miss # the profile entry for the advance to NEW. So we here basically # only trigger the profile entry for NEW. self.advance(pilot_docs, state=rps.NEW, publish=False, push=False) # immediately send first heartbeat for pilot_doc in pilot_docs: pid = pilot_doc['uid'] self._pilot_send_hb(pid) # Only after the insert/update can we hand the pilots over to the next # components (ie. advance state). for pd in pilot_docs: pd['state'] = rps.PMGR_LAUNCHING_PENDING self._update_pilot(pd, advance=False) self.advance(pilot_docs, publish=True, push=True) self._rep.ok('>>ok\n') if ret_list: return pilots else : return pilots[0]
# -------------------------------------------------------------------------- # def _reconnect_pilots(self): """Restore Pilot info from database. When reconnecting, we need to dig information about all pilots from the DB for which this pmgr is responsible. """ from .pilot import Pilot from .pilot_description import PilotDescription # self.is_valid() # FIXME MONGODB # pilot_docs = self._session._dbs.get_pilots(pmgr_uid=self.uid) # with self._pilots_lock: # for ud in pilot_docs: # descr = PilotDescription(ud['description']) # pilot = Pilot(pmgr=self, descr=descr) # self._pilots[pilot.uid] = pilot # -------------------------------------------------------------------------- #
[docs] def get_pilots(self, uids=None): """Returns one or more pilots identified by their IDs. Arguments: uids (str | list[str]): The IDs of the pilot objects to return. Returns: list: A list of :class:`rp.Pilot` objects. """ if not uids: with self._pilots_lock: ret = list(self._pilots.values()) return ret ret_list = True if (not isinstance(uids, list)) and (uids is not None): ret_list = False uids = [uids] ret = list() with self._pilots_lock: for uid in uids: if uid not in self._pilots: raise ValueError('pilot %s not known' % uid) ret.append(self._pilots[uid]) if ret_list: return ret else : return ret[0]
# -------------------------------------------------------------------------- #
[docs] def wait_pilots(self, uids=None, state=None, timeout=None): """Block for state transition. Returns when one or more :class:`rp.Pilots` reach a specific state. If `pilot_uids` is `None`, `wait_pilots` returns when **all** Pilots reach the state defined in `state`. This may include pilots which have previously terminated or waited upon. Example: # TODO -- add example Arguments: uids (str | list[str], optional): If set, only the Pilots with the specified uids are considered. If `None` (default), all Pilots are considered. state (str | list[str]): The state that Pilots have to reach in order for the call to return. By default `wait_pilots` waits for the Pilots to reach a terminal state, which can be one of the following: * :data:`rp.rps.DONE` * :data:`rp.rps.FAILED` * :data:`rp.rps.CANCELED` timeout (float, optional): Timeout in seconds before the call returns regardless of Pilot state changes. The default value **None** waits forever. """ if not uids: with self._pilots_lock: uids = list() for uid,pilot in self._pilots.items(): if pilot.state not in rps.FINAL: uids.append(uid) if not state: states = rps.FINAL elif isinstance(state, list): states = state else: states = [state] ret_list = True if not isinstance(uids, list): ret_list = False uids = [uids] self._rep.info('<<wait for %d pilot(s)\n\t' % len(uids)) start = time.time() to_check = None with self._pilots_lock: for uid in uids: if uid not in self._pilots: raise ValueError('pilot %s not known' % uid) to_check = [self._pilots[uid] for uid in uids] # We don't want to iterate over all pilots again and again, as that # would duplicate checks on pilots which were found in matching states. # So we create a list from which we drop the pilots as we find them in # a matching state self._rep.idle(mode='start') while to_check and not self._terminate.is_set(): self._rep.idle() to_check = [pilot for pilot in to_check if pilot.state not in states and pilot.state not in rps.FINAL] if to_check: if timeout and (timeout <= (time.time() - start)): self._log.debug ("wait timed out") break time.sleep (0.1) self._rep.idle(mode='stop') if to_check: self._rep.warn('>>timeout\n') else : self._rep.ok ('>>ok\n') # grab the current states to return state = None with self._pilots_lock: states = [self._pilots[uid].state for uid in uids] # done waiting if ret_list: return states else : return states[0]
# -------------------------------------------------------------------------- # def _fail_missing_pilots(self): """Advance remaining pilots to failed state. During termination, fail all pilots for which we did not manage to obtain a final state - we trust that they'll follow up on their cancellation command in due time, if they can """ pass # with self._pilots_lock: # for pid in self._pilots: # pilot = self._pilots[pid] # if pilot.state not in rps.FINAL: # self.advance(pilot.as_dict(), rps.FAILED, # publish=True, push=False) # -------------------------------------------------------------------------- #
[docs] def cancel_pilots(self, uids=None, _timeout=None): """Cancel one or more :class:`rp.Pilots`. Arguments: uids (str | list[str], optional): The IDs of the pilots to cancel. """ if not uids: with self._pilots_lock: uids = list(self._pilots.keys()) if not isinstance(uids, list): uids = [uids] self._log.debug('pilot(s).need(s) cancellation %s', uids) # send the cancellation request to the pilots # FIXME: MongoDB # self._session._dbs.pilot_command('cancel_pilot', [], uids) self._log.debug('issue cancel_pilots for %s', uids) self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'cancel_pilots', 'arg' : {'pmgr' : self.uid, 'uids' : uids}}) # wait for the cancel to be enacted self.wait_pilots(uids=uids, timeout=_timeout) # FIXME: only finalize pilots which actually terminated with self._pilots_lock: for uid in uids: if uid not in self._pilots: raise ValueError('pilot %s not known' % uid)
# -------------------------------------------------------------------------- #
[docs] def kill_pilots(self, uids=None, _timeout=None): """Kill one or more :class:`rp.Pilots` Arguments: uids (str | list[str], optional): The IDs of the pilots to cancel. """ if not uids: with self._pilots_lock: uids = list(self._pilots.keys()) if not isinstance(uids, list): uids = [uids] with self._pilots_lock: for uid in uids: if uid not in self._pilots: raise ValueError('pilot %s not known' % uid) self._log.debug('pilot(s).need(s) killing %s', uids) # inform pmgr.launcher - it will force-kill the pilots self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'kill_pilots', 'arg' : {'pmgr' : self.uid, 'uids' : uids}}) # wait for the kill to be enacted self.wait_pilots(uids=uids, timeout=_timeout)
# -------------------------------------------------------------------------- #
[docs] def register_callback(self, cb, cb_data=None, metric=rpc.PILOT_STATE): """Registers a new callback function with the PilotManager. Manager-level callbacks get called if the specified metric changes. The default metric `PILOT_STATE` fires the callback if any of the Pilots managed by the PilotManager change their state. All callback functions need to have the same signature:: def cb(obj, value, cb_data) where ``object`` is a handle to the object that triggered the callback, ``value`` is the metric, and ``data`` is the data provided on callback registration.. In the example of `PILOT_STATE` above, the object would be the pilot in question, and the value would be the new state of the pilot. Available metrics are * `PILOT_STATE`: fires when the state of any of the pilots which are managed by this pilot manager instance is changing. It communicates the pilot object instance and the pilots new state. """ # FIXME: the signature should be (self, metrics, cb, cb_data) if metric not in rpc.PMGR_METRICS : raise ValueError ("invalid pmgr metric '%s'" % metric) with self._pcb_lock: cb_id = id(cb) self._callbacks[metric][cb_id] = {'cb' : cb, 'cb_data' : cb_data}
# -------------------------------------------------------------------------- # def unregister_callback(self, cb, metric=rpc.PILOT_STATE): if metric and metric not in rpc.PMGR_METRICS : raise ValueError ("invalid pmgr metric '%s'" % metric) if not metric: metrics = rpc.PMGR_METRICS elif isinstance(metric, list): metrics = metric else: metrics = [metric] with self._pcb_lock: for metric in metrics: if cb: to_delete = [id(cb)] else: to_delete = list(self._callbacks[metric].keys()) for cb_id in to_delete: if cb_id not in self._callbacks[metric]: raise ValueError("unknown callback '%s'" % cb_id) del self._callbacks[metric][cb_id] # -------------------------------------------------------------------------- # def check_uid(self, uid): # ensure that uid is not yet known if uid in self._uids: return False else: self._uids.append(uid) return True
# ------------------------------------------------------------------------------