__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__ = "MIT"
import os
import sys
import time
import queue
import threading as mt
import radical.utils as ru
from . import utils as rpu
from . import states as rps
from . import constants as rpc
from .task_description import TaskDescription, RAPTOR_MASTER, RAPTOR_WORKER
from .raptor_tasks import RaptorMaster, RaptorWorker
# bulk callbacks are implemented, but are currently not used nor exposed.
_USE_BULK_CB = False
if os.environ.get('RADICAL_PILOT_BULK_CB', '').lower() in ['true', 'yes', '1']:
_USE_BULK_CB = True
# ------------------------------------------------------------------------------
#
# make sure deprecation warning is shown only once per type
#
_seen = list()
def _warn(old_type, new_type):
if old_type not in _seen:
_seen.append(old_type)
sys.stderr.write('%s is deprecated - use %s\n' % (old_type, new_type))
# ------------------------------------------------------------------------------
#
[docs]class TaskManager(rpu.ClientComponent):
"""
A TaskManager manages :class:`radical.pilot.Task` instances which
represent the **executable** workload in RADICAL-Pilot. A TaskManager
connects the Tasks with one or more :class:`Pilot` instances (which
represent the workload **executors** in RADICAL-Pilot) and a **scheduler**
which determines which :class:`Task` gets executed on which
:class:`Pilot`.
Example::
s = rp.Session()
pm = rp.PilotManager(session=s)
pd = rp.PilotDescription()
pd.resource = "futuregrid.alamo"
pd.cores = 16
p1 = pm.submit_pilots(pd) # create first pilot with 16 cores
p2 = pm.submit_pilots(pd) # create second pilot with 16 cores
# Create a workload of 128 '/bin/sleep' tasks
tasks = []
for task_count in range(0, 128):
t = rp.TaskDescription()
t.executable = "/bin/sleep"
t.arguments = ['60']
tasks.append(t)
# Combine the two pilots, the workload and a scheduler via
# a TaskManager.
tm = rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)
tm.add_pilot(p1)
tm.submit_tasks(tasks)
The task manager can issue notification on task state changes. Whenever
state notification arrives, any callback registered for that notification is
fired.
Note:
State notifications can arrive out of order wrt the task state model!
"""
# --------------------------------------------------------------------------
#
[docs] def __init__(self, session, cfg='default', scheduler=None):
"""Create a new TaskManager and attaches it to the session.
Arguments:
session (radical.pilot.Session): The session instance to use.
cfg (dict | str): The configuration or name of configuration to use.
scheduler (str): The name of the scheduler plug-in to use.
uid (str): ID for unit manager, to be used for reconnect
Returns:
radical.pilot.TaskManager: A new `TaskManager` object.
"""
assert session._role == session._PRIMARY, 'tmgr needs primary session'
# initialize the base class (with no intent to fork)
self._uid = ru.generate_id('tmgr.%(item_counter)04d',
ru.ID_CUSTOM, ns=session.uid)
if not scheduler:
scheduler = rpc.SCHEDULER_ROUND_ROBIN
self._pilots = dict()
self._pilots_lock = mt.RLock()
self._tasks = dict()
self._tasks_lock = mt.RLock()
self._callbacks = dict()
self._tcb_lock = mt.RLock()
self._terminate = mt.Event()
self._closed = False
self._task_info = dict()
for m in rpc.TMGR_METRICS:
self._callbacks[m] = dict()
# NOTE: `name` and `cfg` are overloaded, the user cannot point to
# a predefined config and name it at the same time. This might
# be ok for the session, but introduces a minor API inconsistency.
#
name = None
if isinstance(cfg, str):
name = cfg
cfg = None
cfg = ru.Config('radical.pilot.tmgr', name=name, cfg=cfg)
cfg.uid = self._uid
cfg.owner = self._uid
cfg.sid = session.uid
cfg.path = session.path
cfg.reg_addr = session.reg_addr
cfg.heartbeat = session.cfg.heartbeat
cfg.client_sandbox = session._get_client_sandbox()
super().__init__(cfg, session=session)
self.start()
self._log.info('started tmgr %s', self._uid)
self._rep = self._session._get_reporter(name=self._uid)
self._rep.info('<<create task manager')
# overwrite the scheduler from the config file
if 'tmgr_scheduling' in self._cfg.components:
self._cfg.components.tmgr_scheduling.scheduler = scheduler
# create pmgr bridges and components, use session cmgr for that
self._cmgr = rpu.ComponentManager(cfg.sid, cfg.reg_addr, self._uid)
self._cmgr.start_bridges(self._cfg.bridges)
self._cmgr.start_components(self._cfg.components)
# let session know we exist
self._session._register_tmgr(self)
# The output queue is used to forward submitted tasks to the
# scheduling component.
self.register_output(rps.TMGR_SCHEDULING_PENDING,
rpc.TMGR_SCHEDULING_QUEUE)
# the tmgr will also collect tasks from the agent again, for output
# staging and finalization
if self._cfg.bridges.tmgr_staging_output_queue:
self._has_sout = True
self.register_output(rps.TMGR_STAGING_OUTPUT_PENDING,
rpc.TMGR_STAGING_OUTPUT_QUEUE)
else:
self._has_sout = False
# also listen to the state pubsub for task state changes
self.register_subscriber(rpc.STATE_PUBSUB, self._state_sub_cb)
# hook into the control pubsub for rpc handling
self._rpc_queue = queue.Queue()
ctrl_addr_pub = self._session._reg['bridges.control_pubsub.addr_pub']
self._ctrl_pub = ru.zmq.Publisher(rpc.CONTROL_PUBSUB, url=ctrl_addr_pub,
log=self._log, prof=self._prof)
self._prof.prof('setup_done', uid=self._uid)
self._rep.ok('>>ok\n')
# --------------------------------------------------------------------------
#
def initialize(self):
# the manager must not carry bridge and component handles across forks
ru.atfork(self._atfork_prepare, self._atfork_parent, self._atfork_child)
# --------------------------------------------------------------------------
#
# EnTK forks, make sure we don't carry traces of children across the fork
#
def _atfork_prepare(self): pass
def _atfork_parent(self) : pass
def _atfork_child(self) :
self._bridges = dict()
self._components = dict()
# --------------------------------------------------------------------------
#
def finalize(self):
self._cmgr.close()
# --------------------------------------------------------------------------
#
[docs] def close(self):
"""Shut down the TaskManager and all its components."""
# we do not cancel tasks at this point, in case any component or pilot
# wants to continue to progress task states, which should indeed be
# independent from the tmgr life cycle.
if self._closed:
return
self._terminate.set()
self._rep.info('<<close task manager')
# disable callbacks during shutdown
with self._tcb_lock:
self._callbacks = dict()
for m in rpc.TMGR_METRICS:
self._callbacks[m] = dict()
self._cmgr.close()
self._log.info("Closed TaskManager %s." % self._uid)
self._closed = True
self._rep.ok('>>ok\n')
# dump json
json = self.as_dict()
# json['_id'] = self.uid
json['type'] = 'tmgr'
json['uid'] = self.uid
json['tasks'] = self._task_info
tgt = '%s/%s.json' % (self._session.path, self.uid)
ru.write_json(json, tgt)
# --------------------------------------------------------------------------
#
[docs] def as_dict(self):
"""Returns a dictionary representation of the TaskManager object."""
ret = {
'uid': self.uid,
'cfg': self.cfg
}
return ret
# --------------------------------------------------------------------------
#
def __str__(self):
"""Returns a string representation of the TaskManager object."""
return str(self.as_dict())
# --------------------------------------------------------------------------
#
def _pilot_state_cb(self, pilots, state=None):
if self._terminate.is_set():
return False
# we register this callback for pilots added to this tmgr. It will
# specifically look out for pilots which complete, and will make sure
# that all tasks are pulled back into tmgr control if that happens
# prematurely.
#
# If we find tasks which have not completed the agent part of the task
# state model, we declare them FAILED. If they can be restarted, we
# resubmit an identical task, which then will get a new task ID. This
# avoids state model confusion (the state model is right now expected to
# be linear), but is not intuitive for the application (FIXME).
#
# FIXME: there is a race with the tmgr scheduler which may, just now,
# and before being notified about the pilot's demise, send new
# tasks to the pilot.
# we only look into pilot states when the tmgr is still active
# FIXME: note that there is a race in that the tmgr can be closed while
# we are in the cb.
# FIXME: `self._closed` is not an `mt.Event`!
if self._closed:
self._log.debug('tmgr closed, ignore pilot cb %s',
['%s:%s' % (p.uid, p.state) for p in pilots])
return True
if not isinstance(pilots, list):
pilots = [pilots]
for pilot in pilots:
pid = pilot.uid
state = pilot.state
if state in rps.FINAL:
self._log.debug('pilot %s is final', pilot.uid)
# FIXME: MongoDB
# TODO: fail all non-final tasks which were assigned to that
# pilot
continue
## for task in tasks:
##
## task['exception'] = 'RuntimeError("pilot died")'
## task['exception_detail'] = 'pilot %s is final' % pid
## task['state'] = rps.FAILED
##
## # final tasks are not pushed
## self.advance(tasks, publish=True, push=False)
# keep cb registered
return True
# --------------------------------------------------------------------------
#
def _state_sub_cb(self, topic, msg):
if self._terminate.is_set():
return False
cmd = msg.get('cmd')
arg = msg.get('arg')
if cmd != 'update':
self._log.debug('ignore state cb msg with cmd %s', cmd)
return True
things = ru.as_list(arg)
tasks = [thing for thing in things if thing.get('type') == 'task']
self._update_tasks(tasks)
return True
# --------------------------------------------------------------------------
#
def _update_tasks(self, task_dicts):
# return information about needed callback and advance activities, so
# that we don't break bulks here.
# note however that individual task callbacks are still being called on
# each task (if any are registered), which can lead to arbitrary,
# application defined delays.
to_notify = list()
with self._tasks_lock:
for task_dict in task_dicts:
uid = task_dict['uid']
# we don't care about tasks we don't know
task = self._tasks.get(uid)
if not task:
self._log.debug('tmgr: task unknown: %s', uid)
continue
# only update on state changes
current = task.state
target = task_dict['state']
if current == target:
continue
target, passed = rps._task_state_progress(uid, current, target)
if target in [rps.CANCELED, rps.FAILED]:
# don't replay intermediate states
passed = passed[-1:]
for s in passed:
task_dict['state'] = s
self._tasks[uid]._update(task_dict)
to_notify.append([task, s])
self._task_info[uid] = task_dict
if to_notify:
if _USE_BULK_CB:
self._bulk_cbs(set([task for task,_ in to_notify]))
else:
for task, state in to_notify:
self._task_cb(task, state)
# --------------------------------------------------------------------------
#
def _task_cb(self, task, state):
with self._tcb_lock:
uid = task.uid
cb_dicts = list()
metric = rpc.TASK_STATE
# get wildcard callbacks
cb_dicts += self._callbacks[metric].get('*', {}).values()
cb_dicts += self._callbacks[metric].get(uid, {}).values()
for cb_dict in cb_dicts:
cb = cb_dict['cb']
cb_data = cb_dict['cb_data']
try:
if cb_data: cb(task, state, cb_data)
else : cb(task, state)
except:
self._log.exception('cb error (%s)', cb.__name__)
# --------------------------------------------------------------------------
#
def _bulk_cbs(self, tasks, metrics=None):
if not metrics: metrics = [rpc.TASK_STATE]
else : metrics = ru.as_list(metrics)
cbs = dict() # bulked callbacks to call
with self._tcb_lock:
for metric in metrics:
# get wildcard callbacks
cb_dicts = self._callbacks[metric].get('*')
for cb_name in cb_dicts:
cbs[cb_name] = {'cb' : cb_dicts[cb_name]['cb'],
'cb_data': cb_dicts[cb_name]['cb_data'],
'tasks' : set(tasks)}
# add task specific callbacks if needed
for task in tasks:
uid = task.uid
if uid not in self._callbacks[metric]:
continue
cb_dicts = self._callbacks[metric].get(uid, {})
for cb_name in cb_dicts:
if cb_name in cbs:
cbs[cb_name]['tasks'].add(task)
else:
cbs[cb_name] = {'cb' : cb_dicts[cb_name]['cb'],
'cb_data': cb_dicts[cb_name]['cb_data'],
'tasks' : set([task])}
for cb_name in cbs:
cb = cbs[cb_name]['cb']
cb_data = cbs[cb_name]['cb_data']
objs = cbs[cb_name]['tasks']
if cb_data: cb(list(objs), cb_data)
else : cb(list(objs))
# --------------------------------------------------------------------------
#
# FIXME: this needs to go to the scheduler
def _default_wait_queue_size_cb(self, tmgr, wait_queue_size):
# FIXME: this needs to come from the scheduler?
if self._terminate.is_set():
return False
self._log.info("[Callback]: wait_queue_size: %s.", wait_queue_size)
# --------------------------------------------------------------------------
#
@property
def uid(self):
"""str: The unique id."""
return self._uid
# --------------------------------------------------------------------------
#
@property
def scheduler(self):
"""str: The scheduler name."""
return self._cfg.get('scheduler')
# --------------------------------------------------------------------------
#
[docs] def add_pilots(self, pilots):
"""Associates one or more pilots with the task manager.
Arguments:
pilots (radical.pilot.Pilot | list[radical.pilot.Pilot]):
The pilot objects that will be added to the task manager.
"""
if not isinstance(pilots, list):
pilots = [pilots]
if len(pilots) == 0:
raise ValueError('cannot add no pilots')
pilot_docs = list()
with self._pilots_lock:
# sanity check, and keep pilots around for inspection
for pilot in pilots:
if isinstance(pilot, dict):
pilot_dict = pilot
else:
# let pilot know we own it and subscribe for state updates
# FIXME: this is not working for pilot dicts (ENTK)
pilot.attach_tmgr(self)
pilot_dict = pilot.as_dict()
pilot.register_callback(self._pilot_state_cb)
pid = pilot_dict['uid']
if pid in self._pilots:
raise ValueError('pilot %s already added' % pid)
self._pilots[pid] = pilot
pilot_docs.append(pilot_dict)
# publish to the command channel for the scheduler to pick up
self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'add_pilots',
'arg' : {'pilots': pilot_docs,
'tmgr' : self.uid}})
# --------------------------------------------------------------------------
#
[docs] def list_pilots(self):
"""Lists the UIDs of the pilots currently associated with the task manager.
Returns:
list[str]: A list of :class:`radical.pilot.Pilot` UIDs.
"""
with self._pilots_lock:
return list(self._pilots.keys())
# --------------------------------------------------------------------------
#
[docs] def get_pilots(self):
"""Get the pilot instances currently associated with the task manager.
Returns:
list[radical.pilot.Pilot]: A list of :class:`radical.pilot.Pilot` instances.
"""
with self._pilots_lock:
return list(self._pilots.values())
# --------------------------------------------------------------------------
#
[docs] def remove_pilots(self, pilot_ids, drain=False):
"""Disassociates one or more pilots from the task manager.
After a pilot has been removed from a task manager, it won't process
any of the task manager's tasks anymore. Calling `remove_pilots`
doesn't stop the pilot itself.
Arguments:
drain (bool): Drain determines what happens to the tasks
which are managed by the removed pilot(s). If `True`, all tasks
currently assigned to the pilot are allowed to finish execution.
If `False` (the default), then non-final tasks will be canceled.
"""
# TODO: Implement 'drain'.
# NOTE: the actual removal of pilots from the scheduler is asynchron!
if drain:
raise RuntimeError("'drain' is not yet implemented")
if not isinstance(pilot_ids, list):
pilot_ids = [pilot_ids]
if len(pilot_ids) == 0:
raise ValueError('cannot remove no pilots')
with self._pilots_lock:
# sanity check, and keep pilots around for inspection
for pid in pilot_ids:
if pid not in self._pilots:
raise ValueError('pilot %s not removed' % pid)
del self._pilots[pid]
# publish to the command channel for the scheduler to pick up
self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'remove_pilots',
'arg' : {'pids' : pilot_ids,
'tmgr' : self.uid}})
# --------------------------------------------------------------------------
#
# FIXME RPC
[docs] def control_cb(self, topic, msg):
cmd = msg['cmd']
arg = msg['arg']
if cmd == 'rpc_res':
self._log.debug('rpc res: %s', arg)
self._rpc_queue.put(arg)
# --------------------------------------------------------------------------
#
[docs] def pilot_rpc(self, pid, cmd, *args, rpc_addr=None, **kwargs):
'''Remote procedure call.
Send an RPC command and arguments to the pilot and wait for the
response. This is a synchronous operation at this point, and it is not
thread safe to have multiple concurrent RPC calls.
'''
if pid not in self._pilots:
raise ValueError('tmgr does not know pilot %s' % pid)
return self._pilots[pid].rpc(cmd, *args, rpc_addr=rpc_addr, **kwargs)
# --------------------------------------------------------------------------
#
[docs] def list_units(self):
"""Get Task UIDs.
.. deprecated:: 1.5.12
use :func:`list_tasks()`
"""
_warn(self.list_units, self.list_tasks)
return self.list_tasks()
# --------------------------------------------------------------------------
#
[docs] def list_tasks(self):
"""Get the UIDs of the tasks managed by this task manager.
Returns:
list[str]: A list of :class:`radical.pilot.Task` UIDs.
"""
with self._pilots_lock:
return list(self._tasks.keys())
# --------------------------------------------------------------------------
#
[docs] def submit_units(self, descriptions):
"""Submit tasks for execution.
.. deprecated:: 1.5.12
use :py:func:`submit_tasks()`
"""
_warn(self.submit_units, self.submit_tasks)
return self.submit_tasks(descriptions=descriptions)
# --------------------------------------------------------------------------
#
[docs] def submit_raptors(self, descriptions, pilot_id=None):
"""Submit RAPTOR master tasks.
Submits on or more :class:`radical.pilot.TaskDescription` instances to
the task manager, where the `TaskDescriptions` have the mode
`RAPTOR_MASTER` set.
This is a thin wrapper around `submit_tasks`.
Arguments:
descriptions: (radical.pilot.TaskDescription) description of the
workers to submit.
Returns:
list[radical.pilot.Task]: A list of :class:`radical.pilot.Task`
objects.
"""
descriptions = ru.as_list(descriptions)
for td in descriptions:
if not td.mode == RAPTOR_MASTER:
raise ValueError('unexpected task mode [%s]' % td.mode)
raptor_file = td.get('raptor_file') or ''
raptor_class = td.get('raptor_class') or 'Master'
td.pilot = pilot_id
td.arguments = [raptor_file, raptor_class]
td.environment['PYTHONUNBUFFERED'] = '1'
if not td.get('uid'):
td.uid = ru.generate_id('raptor.%(item_counter)04d',
ru.ID_CUSTOM, ns=self._session.uid)
if not td.get('executable'):
td.executable = 'radical-pilot-raptor-master'
if not td.get('named_env'):
td.named_env = 'rp'
# ensure that defaults and backward compatibility kick in
td.verify()
return self.submit_tasks(descriptions)
# --------------------------------------------------------------------------
#
[docs] def submit_workers(self, descriptions):
"""Submit RAPTOR workers.
Submits on or more :class:`radical.pilot.TaskDescription` instances to
the task manager, where the `TaskDescriptions` have the mode
`RAPTOR_WORKER` set.
This method is a thin wrapper around `submit_tasks`.
Arguments:
descriptions: (radical.pilot.TaskDescription) description of the
workers to submit.
Returns:
list[radical.pilot.Task]: A list of :class:`radical.pilot.Task`
objects.
"""
descriptions = ru.as_list(descriptions)
for td in descriptions:
if not td.mode == RAPTOR_WORKER:
raise ValueError('unexpected task mode [%s]' % td.mode)
raptor_id = td.get('raptor_id')
raptor_file = td.get('raptor_file') or ''
raptor_class = td.get('raptor_class') or 'DefaultWorker'
if not raptor_id:
raise ValueError('RAPTOR_WORKER descriptions need `raptor_id`')
if not td.get('uid'):
td.uid = ru.generate_id(raptor_id + '.%(item_counter)04d',
ru.ID_CUSTOM, ns=self._session.uid)
if not td.get('executable'):
td.executable = 'radical-pilot-raptor-worker'
if not td.get('named_env'):
td.named_env = 'rp'
td.raptor_id = raptor_id
td.arguments = [raptor_file, raptor_class, raptor_id]
td.environment['PYTHONUNBUFFERED'] = '1'
# ensure that defaults and backward compatibility kick in
td.verify()
return self.submit_tasks(descriptions)
# --------------------------------------------------------------------------
#
[docs] def submit_tasks(self, descriptions):
"""Submit tasks for execution.
Submits one or more :class:`radical.pilot.Task` instances to the task
manager.
Arguments:
descriptions (radical.pilot.TaskDescription | list
[radical.pilot.TaskDescription]):
The description of the task instance(s) to create.
Returns:
list[radical.pilot.Task]: A list of :class:`radical.pilot.Task`
objects.
"""
from .task import Task
if not descriptions:
return []
ret_list = True
if descriptions and not isinstance(descriptions, list):
ret_list = False
descriptions = [descriptions]
# we return a list of tasks
tasks = list()
ret = list()
self._rep.progress_tgt(len(descriptions), label='submit')
for td in descriptions:
mode = td.mode
if mode == RAPTOR_MASTER:
task = RaptorMaster(tmgr=self, descr=td, origin='client')
elif mode == RAPTOR_WORKER:
task = RaptorWorker(tmgr=self, descr=td, origin='client')
else:
task = Task(tmgr=self, descr=td, origin='client')
tasks.append(task)
self._rep.progress()
if len(tasks) >= 1024:
# submit this bulk
task_docs = [u.as_dict() for u in tasks]
self.advance(task_docs, rps.TMGR_SCHEDULING_PENDING,
publish=True, push=True)
ret += tasks
tasks = list()
# submit remaining bulk (if any)
if tasks:
task_docs = [t.as_dict() for t in tasks]
self.advance(task_docs, rps.TMGR_SCHEDULING_PENDING,
publish=True, push=True)
ret += tasks
# keep tasks around
with self._tasks_lock:
for task in ret:
self._tasks[task.uid] = task
self._rep.progress_done()
if ret_list: return ret
else : return ret[0]
# --------------------------------------------------------------------------
#
def _reconnect_tasks(self):
"""Re-associate tasks on reconnect.
When reconnecting, we need to dig information about all tasks from the
DB for which this tmgr is responsible.
"""
from .task import Task
from .task_description import TaskDescription
# FIXME MongoDB
# task_docs = self._session._dbs.get_tasks(tmgr_uid=self.uid)
#
# with self._tasks_lock:
#
# for doc in task_docs:
#
# td = TaskDescription(doc['description'])
# td.uid = doc['uid']
#
# task = Task(tmgr=self, descr=td, origin='client')
# task._update(doc, reconnect=True)
#
# self._tasks[task.uid] = task
# --------------------------------------------------------------------------
#
[docs] def get_units(self, uids=None):
"""Get one or more tasks identified by their IDs.
.. deprecated:: 1.5.12
use :py:func:`get_tasks()`
"""
_warn(self.get_units, self.get_tasks)
return self.get_tasks(uids=uids)
# --------------------------------------------------------------------------
#
[docs] def get_tasks(self, uids=None):
"""Get one or more tasks identified by their IDs.
Arguments:
uids (str | list[str]): The IDs of the task objects to return.
Returns:
list[radical.pilot.Task]:
A list of :class:`radical.pilot.Task` objects.
"""
if not uids:
with self._tasks_lock:
ret = list(self._tasks.values())
return ret
ret_list = True
if (not isinstance(uids, list)) and (uids is not None):
ret_list = False
uids = [uids]
ret = list()
with self._tasks_lock:
for uid in uids:
if uid not in self._tasks:
raise ValueError('task %s not known' % uid)
ret.append(self._tasks[uid])
if ret_list: return ret
else : return ret[0]
# --------------------------------------------------------------------------
#
[docs] def wait_units(self, uids=None, state=None, timeout=None):
"""Block for state transition.
.. deprecated:: 1.5.12
use :py:func:`wait_tasks()`
"""
_warn(self.wait_units, self.wait_tasks)
return self.wait_tasks(uids=uids, state=state, timeout=timeout)
# --------------------------------------------------------------------------
#
[docs] def wait_tasks(self, uids=None, state=None, timeout=None):
"""Block for state transition.
Returns when one or more :class:`radical.pilot.Tasks` reach a
specific state.
If `uids` is `None`, `wait_tasks` returns when **all**
Tasks reach the state defined in `state`. This may include
tasks which have previously terminated or waited upon.
Example::
# TODO -- add example
Arguments:
uids (str | list[str]): If uids is set, only the Tasks with the
specified uids are considered. If uids is `None` (default), all
Tasks are considered.
state (str): The state that Tasks have to reach in order for the call
to return.
By default `wait_tasks` waits for the Tasks to
reach a terminal state, which can be one of the following.
* :data:`radical.pilot.rps.DONE`
* :data:`radical.pilot.rps.FAILED`
* :data:`radical.pilot.rps.CANCELED`
timeout (float):
Timeout in seconds before the call returns regardless of Pilot
state changes. The default value **None** waits forever.
"""
if not uids:
with self._tasks_lock:
uids = list()
for uid,task in self._tasks.items():
if task.state not in rps.FINAL:
uids.append(uid)
if not state : states = rps.FINAL
elif not isinstance(state, list): states = [state]
else : states = state
# we simplify state check by waiting for the *earliest* of the given
# states - if the task happens to be in any later state, we are sure the
# earliest has passed as well.
check_state_val = rps._task_state_values[rps.FINAL[-1]]
for state in states:
check_state_val = min(check_state_val,
rps._task_state_values[state])
ret_list = True
if not isinstance(uids, list):
ret_list = False
uids = [uids]
start = time.time()
to_check = None
with self._tasks_lock:
to_check = [self._tasks[uid] for uid in uids]
# We don't want to iterate over all tasks again and again, as that would
# duplicate checks on tasks which were found in matching states. So we
# create a list from which we drop the tasks as we find them in
# a matching state
self._rep.progress_tgt(len(to_check), label='wait')
while to_check and not self._terminate.is_set():
# check timeout
if timeout and (timeout <= (time.time() - start)):
self._log.debug ("wait timed out")
break
time.sleep (0.1)
# FIXME: print percentage...
# print 'wait tasks: %s' % [[u.uid, u.state] for u in to_check]
check_again = list()
for task in to_check:
# we actually don't check if a task is in a specific (set of)
# state(s), but rather check if it ever *has been* in any of
# those states
if task.state not in rps.FINAL and \
rps._task_state_values[task.state] < check_state_val:
# this task does not match the wait criteria
check_again.append(task)
else:
# stop watching this task
if task.state in [rps.FAILED]:
self._rep.progress() # (color='error', c='-')
elif task.state in [rps.CANCELED]:
self._rep.progress() # (color='warn', c='*')
else:
self._rep.progress() # (color='ok', c='+')
to_check = check_again
self._rep.progress_done()
# grab the current states to return
state = None
with self._tasks_lock:
states = [self._tasks[uid].state for uid in uids]
sdict = {state: states.count(state) for state in set(states)}
for state in sorted(set(states)):
self._rep.info('\t%-10s: %5d\n' % (state, sdict[state]))
if to_check: self._rep.warn('>>timeout\n')
else : self._rep.ok ('>>ok\n')
# done waiting
if ret_list: return states
else : return states[0]
# --------------------------------------------------------------------------
#
[docs] def cancel_units(self, uids=None):
"""Cancel one or more :class:`radical.pilot.Task` s.
.. deprecated:: 1.5.12
use :py:func:`cancel_tasks()`
"""
_warn(self.cancel_units, self.cancel_tasks)
return self.cancel_tasks(uids=uids)
# --------------------------------------------------------------------------
#
[docs] def cancel_tasks(self, uids=None):
"""Cancel one or more :class:`radical.pilot.Task` s.
Note that cancellation of tasks is *immediate*, i.e. their state is
immediately set to `CANCELED`, even if some RP component may still
operate on the tasks. Specifically, other state transitions, including
other final states (`DONE`, `FAILED`) can occur *after* cancellation.
This is a side effect of an optimization: we consider this
acceptable tradeoff in the sense "Oh, that task was DONE at point of
cancellation -- ok, we can use the results, sure!".
If that behavior is not wanted, set the environment variable::
export RADICAL_PILOT_STRICT_CANCEL=True
Arguments:
uids (str | list[str]): The IDs of the tasks to cancel.
"""
if not uids:
with self._tasks_lock:
uids = list(self._tasks.keys())
else:
if not isinstance(uids, list):
uids = [uids]
# NOTE: We advance all tasks to cancelled, and send a cancellation
# control command. If that command is picked up *after* some
# state progression, we'll see state transitions after cancel.
# For non-final states that is not a problem, as it is equivalent
# with a state update message race, which our state collapse
# mechanism accounts for. For an eventual non-canceled final
# state, we do get an invalid state transition. That is also
# corrected eventually in the state collapse, but the point
# remains, that the state model is temporarily violated. We
# consider this a side effect of the fast-cancel optimization.
#
# The env variable 'RADICAL_PILOT_STRICT_CANCEL == True' will
# disable this optimization.
#
# FIXME: the effect of the env var is not well tested
if 'RADICAL_PILOT_STRICT_CANCEL' not in os.environ:
with self._tasks_lock:
tasks = [self._tasks[uid] for uid in uids ]
task_docs = [task.as_dict() for task in tasks]
self.advance(task_docs, state=rps.CANCELED, publish=True, push=True)
# we *always* issue the cancellation command to the local components
self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'cancel_tasks',
'arg' : {'uids' : uids,
'tmgr' : self.uid}})
# we also inform all pilots about the cancelation request
# FIXME: MongoDB
# self._session._dbs.pilot_command(cmd='cancel_tasks', arg={'uids':uids})
# In the default case of calling 'advance' above, we just set the state,
# so we *know* tasks are canceled.
#
# We do not wait and block the call until all the tasks are marked
# cancelled. This means when inspecting for state just after a state
# change, we may observe a old state, instead of CANCELLED.
#
# This is done so cyclic state change do not get hanged. Example if
# task is changing state and user requests for task to be cancelled, the
# cancelling of task will hang because a previous state change operation
# is ongoing.
# --------------------------------------------------------------------------
#
# TODO: `metric` -> `metrics`, for consistency with `unregister_callback()`
#
[docs] def register_callback(self, cb, cb_data=None, metric=None, uid=None):
"""Registers a new callback function with the TaskManager.
Manager-level
callbacks get called if the specified metric changes. The default
metric `TASK_STATE` fires the callback if any of the Tasks
managed by the PilotManager change their state.
All callback functions need to have the same signature::
def cb(obj, value) -> None:
...
where ``obj`` is a handle to the object that triggered the callback,
``value`` is the metric, and ``data`` is the data provided on
callback registration.. In the example of `TASK_STATE` above, the
object would be the task in question, and the value would be the new
state of the task.
If ``cb_data`` is given, then the ``cb`` signature changes to
::
def cb(obj, state, cb_data) -> None:
...
and ``cb_data`` are passed unchanged.
If ``uid`` is given, the callback will invoked only for the specified
task.
Available metrics are
* `TASK_STATE`: fires when the state of any of the tasks which are
managed by this task manager instance is changing. It communicates
the task object instance and the tasks new state.
* `WAIT_QUEUE_SIZE`: fires when the number of unscheduled tasks (i.e.
of tasks which have not been assigned to a pilot for execution)
changes.
"""
# FIXME: the signature should be (self, metrics, cb, cb_data)
if not metric:
metric = rpc.TASK_STATE
metrics = ru.as_list(metric)
if not uid:
uid = '*'
elif uid not in self._tasks:
raise ValueError('no such task %s' % uid)
with self._tcb_lock:
for metric in metrics:
if metric not in rpc.TMGR_METRICS:
raise ValueError("invalid tmgr metric '%s'" % metric)
cb_id = id(cb)
if metric not in self._callbacks:
self._callbacks[metric] = dict()
if uid not in self._callbacks[metric]:
self._callbacks[metric][uid] = dict()
self._callbacks[metric][uid][cb_id] = {'cb' : cb,
'cb_data': cb_data}
# --------------------------------------------------------------------------
#
def unregister_callback(self, cb=None, metrics=None, uid=None):
if not metrics:
metrics = rpc.TASK_STATE
metrics = ru.as_list(metrics)
if not uid:
uid = '*'
elif uid not in self._tasks:
raise ValueError('no such task %s' % uid)
for metric in metrics:
if metric not in rpc.TMGR_METRICS:
raise ValueError("invalid tmgr metric '%s'" % metric)
with self._tcb_lock:
for metric in metrics:
if metric not in self._callbacks:
raise ValueError("cb metric '%s' invalid" % metric)
if uid not in self._callbacks[metric]:
raise ValueError("cb target '%s' invalid" % uid)
if cb:
to_delete = [id(cb)]
else:
to_delete = list(self._callbacks[metric][uid].keys())
for cb_id in to_delete:
if cb_id not in self._callbacks[metric][uid]:
raise ValueError("cb %s not registered" % cb_id)
del self._callbacks[metric][uid][cb_id]
# ------------------------------------------------------------------------------