import os
import time
from collections import defaultdict
from typing import List
import threading as mt
import radical.utils as ru
from .. import utils as rpu
from .. import states as rps
from .. import constants as rpc
from .. import Session, Task, TaskDescription, TASK_EXECUTABLE
from ..task_description import RAPTOR_WORKER
# ------------------------------------------------------------------------------
#
[docs]class Master(rpu.AgentComponent):
'''
Raptor Master class
The `rp.raptor.Master` instantiates and orchestrates a set of workers which
are used to rapidly and efficiently execute function tasks. As such the
raptor master acts as an RP executor: it hooks into the RP agent
communication channels to receive tasks from the RP agent scheduler in order
to execute them. Once completed tasks are pushed toward the agent output
staging component and will then continue their life cycle as all other
tasks.
'''
# flags for worker readiness. These flags have somewhat different meaning
# than the worker's task state: the worker reaching `AGENT_EXECUTING` is
# necessary, but the worker also needs to perform some setup steps and needs
# to hook into the agent's communication channels - only then is the worker
# considered `ACTIVE` and ready to receive tasks.
NEW = 'NEW'
ACTIVE = 'ACTIVE'
DONE = 'DONE'
# --------------------------------------------------------------------------
#
def __init__(self, cfg: ru.Config = None):
'''
This raptor master is expected to be hosted in a main thread of a RP
task instance. As such the normal `RP_*` environment variables are
expected to be available.
This c'tor will create communication channels which are later used by
workers to communicate with this master instance.
Args:
cfg: session config. fallback: agent config
'''
self._uid = os.environ['RP_TASK_ID']
self._pid = os.environ['RP_PILOT_ID']
self._sid = os.environ['RP_SESSION_ID']
self._name = os.environ['RP_TASK_NAME']
self._sbox = os.environ['RP_TASK_SANDBOX']
self._psbox = os.environ['RP_PILOT_SANDBOX']
self._ssbox = os.environ['RP_SESSION_SANDBOX']
self._rsbox = os.environ['RP_RESOURCE_SANDBOX']
self._reg_addr = os.environ['RP_REGISTRY_ADDRESS']
self._workers = dict() # wid: worker
self._tasks = dict() # bookkeeping of submitted requests
self._exec_tasks = list() # keep track of executable tasks
self._term = mt.Event() # termination signal
self._thread = None # run loop
self._session = Session(uid=self._sid, _reg_addr=self._reg_addr,
_role=Session._DEFAULT)
ccfg = ru.Config(from_dict={'uid' : self._uid,
'sid' : self._sid,
'owner' : self._pid,
'reg_addr': self._reg_addr})
super().__init__(ccfg, self._session)
# get hb configs (RegistryClient instance is initiated in Session)
self._hb_freq = self._session.rcfg.raptor.hb_frequency
self._hb_tout = self._session.rcfg.raptor.hb_timeout
self._log.debug('hb freq: %s', self._hb_freq)
self._log.debug('hb tout: %s', self._hb_tout)
# we never run `self.start()` which is ok - but it means we miss out on
# some of the component initialization. Call it manually thus
self._initialize()
# register termination handler
self.register_rpc_handler('raptor_rpc', self._raptor_rpc,
rpc_addr=self.uid)
# send new worker tasks and agent input staging / agent scheduler
self.register_output(rps.AGENT_STAGING_INPUT_PENDING,
rpc.AGENT_STAGING_INPUT_QUEUE)
# set up zmq queues between the agent scheduler and this master so that
# we can receive new requests from RP tasks
qname = '%s.input_queue' % self._uid
input_cfg = ru.Config(cfg={'channel' : qname,
'type' : 'queue',
'uid' : '%s_input' % self._uid,
'path' : self._sbox,
'stall_hwm' : 0,
'bulk_size' : 1})
# FIXME: how to pass cfg?
self._input_queue = ru.zmq.Queue(qname, cfg=input_cfg)
self._input_queue.start()
# send completed request tasks to agent output staging / tmgr
self.register_output(rps.AGENT_STAGING_OUTPUT_PENDING,
rpc.AGENT_STAGING_OUTPUT_QUEUE)
# set up zmq queues between this master and all workers for request
# distribution and result collection
req_cfg = ru.Config(cfg={'channel' : 'raptor_tasks',
'type' : 'queue',
'uid' : self._uid + '.req',
'path' : self._sbox,
'stall_hwm' : 0,
'bulk_size' : 1})
res_cfg = ru.Config(cfg={'channel' : 'raptor_results',
'type' : 'queue',
'uid' : self._uid + '.res',
'path' : self._sbox,
'stall_hwm' : 0,
'bulk_size' : 1})
self._req_queue = ru.zmq.Queue('raptor_tasks', cfg=req_cfg)
self._res_queue = ru.zmq.Queue('raptor_results', cfg=res_cfg)
self._req_queue.start()
self._res_queue.start()
self._req_addr_put = str(self._req_queue.addr_put)
self._req_addr_get = str(self._req_queue.addr_get)
self._res_addr_put = str(self._res_queue.addr_put)
self._res_addr_get = str(self._res_queue.addr_get)
# this master will put requests onto the request queue, and will get
# responses from the response queue. Note that the responses will be
# delivered via an async callback (`self._result_cb`).
self._req_put = ru.zmq.Putter('raptor_tasks', self._req_addr_put)
self._res_get = ru.zmq.Getter('raptor_results', self._res_addr_get,
cb=self._result_cb)
# also create a ZMQ server endpoint for the workers to
# send task execution requests back to the master
self._task_service = ru.zmq.Server()
self._task_service.register_request('run_task', self._run_task)
self._task_service.start()
self._task_service_data = dict() # task.uid : [mt.Event, task]
# for the workers it is the opposite: they will get requests from the
# request queue, and will send responses to the response queue.
self._info = {'req_addr_get': self._req_addr_get,
'res_addr_put': self._res_addr_put,
'task_service': self._task_service.addr}
# make sure the channels are up before allowing to submit requests
time.sleep(1)
# begin to receive tasks in that queue
ru.zmq.Getter(qname, self._input_queue.addr_get, cb=self._request_cb)
# everything is set up - we can serve messages on the pubsubs also
self.register_subscriber(rpc.STATE_PUBSUB, self._state_cb)
# and register that input queue with the scheduler
self._log.debug('registered raptor queue: %s / %s', self._uid, qname)
self.publish(rpc.CONTROL_PUBSUB,
{'cmd': 'register_raptor_queue',
'arg': {'name' : self._uid,
'queue': qname,
'addr' : str(self._input_queue.addr_put)}})
# all comm channels are set up - begin to work
self._log.debug('startup complete')
# --------------------------------------------------------------------------
#
@property
def workers(self):
'''
task dictionaries representing all currently registered workers
'''
return self._workers
# --------------------------------------------------------------------------
#
def _raptor_rpc(self, *args, **kwargs):
self._log.debug('r rpc (%s, %s)', args, kwargs)
raptor_cmd = kwargs.get('raptor_cmd')
if raptor_cmd == 'stop':
self.stop()
# --------------------------------------------------------------------------
#
[docs] def control_cb(self, topic, msg):
'''
listen for `worker_register`, `worker_unregister`,
`worker_rank_heartbeat` and `rpc_req` messages.
'''
cmd = msg['cmd']
arg = msg.get('arg')
if cmd == 'worker_register':
uid = arg['uid']
rid = arg['raptor_id']
ranks = arg['ranks']
if rid != self._uid:
return
now = time.time()
if uid not in self._workers:
self._workers[uid] = {
'uid' : uid,
'status' : self.NEW,
'heartbeats' : {r: now for r in range(ranks)}
}
self._workers[uid]['status'] = self.ACTIVE
# return a message to the worker to inform about the master service
# endpoints
info = {'req_addr_get': self._req_addr_get,
'res_addr_put': self._res_addr_put,
'ts_addr' : self._task_service.addr}
self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'worker_registered',
'arg': {'uid' : uid,
'info': info}})
elif cmd == 'worker_rank_heartbeat':
uid = arg['uid']
rank = arg['rank']
self._log.debug('recv rank heartbeat %s:%s', uid, rank)
if uid not in self._workers:
return
self._workers[uid]['heartbeats'][rank] = time.time()
elif cmd == 'worker_unregister':
uid = arg['uid']
self._log.debug('unregister %s', uid)
if uid not in self._workers:
return
self._workers[uid]['status'] = self.DONE
# --------------------------------------------------------------------------
#
def _state_cb(self, topic, msg):
'''
listen for state updates for tasks executed by raptor workers, but also
check for state updates originating directly from our workers.
'''
cmd = msg['cmd']
arg = msg['arg']
# state update for tasks created by raptor
if cmd == 'raptor_state_update':
# raptor workers can get caught in the `raptor_state_update` hook
# when they are submitted by the master. Filter them out - we don't
# want result callbacks for workers.
tasks = [task for task in arg
if task['description']['mode'] != RAPTOR_WORKER]
for task in tasks:
self._log.debug('raptor state update: %s : %s',
task['uid'], task['state'])
self._result_cb(tasks)
# general task state updates -- check if our workers are affected
elif cmd == 'update':
for thing in ru.as_list(arg):
uid = thing['uid']
state = thing['state']
if uid in self._workers:
if state == rps.AGENT_STAGING_OUTPUT:
self._workers[uid]['status'] = self.DONE
self._log.info('worker %s final: %s', uid, state)
self.worker_state_cb(self._workers, state)
return True
# --------------------------------------------------------------------------
#
[docs] def worker_state_cb(self, worker_dict, state):
'''
This callback can be overloaded - it will be invoked whenever the master
receives a state update for a worker it is connected to.
args:
worker_dict (Dict[str, Any]): a task dictionary representing the
worker whose state was updated
state (str): new state of the worker
'''
pass
# --------------------------------------------------------------------------
#
[docs] def submit_workers(self, descriptions: List[TaskDescription]
) -> List[str]:
'''
Submit a raptor workers per given `descriptions` element and pass the
queue raptor info as configuration file. Do *not* wait for the workers
to come up - they are expected to register via the control channel.
The task `descriptions` specifically support the following keys:
- raptor_class: str, type name of worker class to execute
- raptor_file : str, optional
Module file from which *raptor_class* may be imported,
if a custom RP worker class is used
Note that only one worker rank (presumably rank 0) should register with
the master - the workers are expected to synchronize their ranks as
needed.
Args:
descriptions (List[TaskDescription]): a list of worker descriptions
Returns:
List[str]: list of uids for submitted worker tasks
'''
tasks = list()
for td in descriptions:
if td.mode != RAPTOR_WORKER:
raise ValueError('unexpected task mode [%s]' % td.mode)
# sharing GPUs among multiple ranks not yet supported
if td.gpus_per_rank and not td.gpus_per_rank.is_integer():
raise RuntimeError('GPU sharing for workers is not supported')
raptor_file = td.get('raptor_file') or ''
raptor_class = td.get('raptor_class') or 'DefaultWorker'
td.raptor_id = self.uid
if not td.get('uid'):
td.uid = '%s.%s' % (self.uid, ru.generate_id('worker',
ns=self.uid))
if not td.get('executable'):
td.executable = 'radical-pilot-raptor-worker'
if not td.get('sandbox'):
td.sandbox = self._sbox
# this master is obviously running in a suitable python3 env,
# so we expect that the same env is also suitable for the worker
# NOTE: shell escaping is a bit tricky here - careful on change!
td.arguments = [raptor_file, raptor_class, self.uid]
# ensure that defaults and backward compatibility kick in
td.verify()
# the default worker needs its own task description to derive the
# amount of available resources
self._reg['raptor.%s.cfg' % td.uid] = td.as_dict()
# all workers run in the same sandbox as the master
task = dict()
task['origin'] = 'raptor'
task['description'] = td.as_dict()
task['state'] = rps.AGENT_STAGING_INPUT_PENDING
task['status'] = self.NEW
task['type'] = 'task'
task['uid'] = td.uid
task['task_sandbox_path'] = td.sandbox
task['task_sandbox'] = 'file://localhost/' + td.sandbox
task['pilot_sandbox'] = self._psbox
task['session_sandbox'] = self._ssbox
task['resource_sandbox'] = self._rsbox
task['pilot'] = self._pid
task['resources'] = {'cpu': td.ranks * td.cores_per_rank,
'gpu': td.ranks * td.gpus_per_rank}
tasks.append(task)
# NOTE: the order of insert / state update relies on that order
# being maintained through the component's message push,
# the update worker's message receive up to the insertion
# order into the update worker's DB bulk op.
self._log.debug('insert %s', td.uid)
self.publish(rpc.STATE_PUBSUB, {'cmd': 'insert', 'arg': task})
now = time.time()
self._workers[td.uid] = {
'uid' : td.uid,
'status' : self.NEW,
'heartbeats' : {r: now for r in range(td.ranks)}
}
self.advance(tasks, publish=True, push=True)
# dump registry with all worker descriptions ("raptor.<worker_uid>.cfg")
self._reg.dump(self._uid)
return [task['uid'] for task in tasks]
# --------------------------------------------------------------------------
#
[docs] def wait_workers(self, count=None, uids=None):
'''
Wait for `n` workers, *or* for workers with given UID, *or* for all
workers to become available, then return. A worker is considered
`available` when it registered with this master and get's its status
flag set to `ACTIVE`.
Args:
count (int): number of workers to wait for
uids (List[str]): set of worker UIDs to wait for
'''
if not uids and not count:
# wait for all known workers by default
uids = list(self._workers.keys())
while True:
# workers can be submitted by the client - we thus re-check for new
# IDs on every loop
if uids:
check_uids = [uid for uid in uids if uid in self._workers]
else:
check_uids = list(self._workers.keys())
if check_uids:
stats = defaultdict(list)
for uid in check_uids:
stats[self._workers[uid]['status']].append(uid)
# if we wait for specific uids, check if all are ACTIVE
if uids:
ok = True
for uid in check_uids:
if uid not in stats[self.ACTIVE]:
ok = False
break
if ok:
self._log.debug('wait ok')
return
elif count:
if count <= len(stats[self.ACTIVE]):
self._log.debug('wait ok')
return
if self._term.is_set():
raise RuntimeError('wait interrupted by master termination')
time.sleep(1)
# --------------------------------------------------------------------------
#
[docs] def start(self):
'''
start the main work thread of this master
'''
self._thread = mt.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
# --------------------------------------------------------------------------
#
[docs] def stop(self):
'''
stop the main work thread of this master
'''
self._log.debug('set term from stop: %s', ru.get_stacktrace())
self._term.set()
super().stop()
self.terminate()
# --------------------------------------------------------------------------
#
[docs] def alive(self):
'''
check if the main work thread of this master is running
'''
if not self._thread or self._term.is_set():
return False
return True
# --------------------------------------------------------------------------
#
[docs] def join(self):
'''
wait until the main work thread of this master completes
'''
if self._thread:
self._thread.join()
# --------------------------------------------------------------------------
#
def _hb_thread(self):
'''
main work threda of this master
'''
# wait for the submitted requests to complete
while not self._term.is_set():
self._log.debug('still alive')
# check worker heartbeats
now = time.time()
lost = set()
for uid in self._workers:
for rank, hb in self._workers[uid]['heartbeats'].items():
if hb < now - self._hb_tout:
self._log.warn('lost rank %d on worker %s', rank, uid)
lost.add(uid)
time.sleep(self._hb_freq)
for uid in lost:
self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'worker_terminate',
'arg': {'uid': uid}})
self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'cancel_tasks',
'arg': {'uids': [uid]}})
# --------------------------------------------------------------------------
#
def _run(self):
'''
main work threda of this master
'''
hb_thread = mt.Thread(target=self._hb_thread)
hb_thread.daemon = True
hb_thread.start()
# wait for the submitted requests to complete
while not self._term.is_set():
time.sleep(1)
self._log.debug('terminate run loop')
# --------------------------------------------------------------------------
#
def _run_task(self, td):
'''
accept a single task request for execution, execute it and wait for it's
completion before returning the call.
Note: this call is running in a separate thread created by an ZMQ
Server instance and will thus not block the master's progress.
'''
# we get a dict but want a proper `TaskDescription` instance
td = TaskDescription(td)
# Create a new task ID for the submitted task (we do not allow
# user-specified IDs in this case as we can't check uniqueness with the
# client tmgr). Then submit that task and wait for the `result_cb` to
# report completion of the task
tid = '%s.%s' % (self.uid, ru.generate_id('subtask'))
event = mt.Event()
td['uid'] = tid
task = Task(self, td, origin='raptor').as_dict()
self._task_service_data[tid] = [event, task]
self.submit_tasks([task])
# wait for the result cb to pick up the td again
event.wait()
# update td info and remove data
if len(self._task_service_data[tid]) != 3:
self._log.error('invalid task_service data: %s',
self._task_service_data[tid])
task.update(self._task_service_data[tid][2])
del self._task_service_data[tid]
# the task is completed and we can return it to the caller
return task
# --------------------------------------------------------------------------
#
[docs] def submit_tasks(self, tasks) -> None:
'''
submit a list of tasks to the task queue
We expect to get either `TaskDescription` instances which will then get
converted into task dictionaries and pushed out, or we get task
dictionaries which are used as is. Either way, `self.request_cb` will
be called for all tasks submitted here.
Args:
tasks (List[TaskDescription]): description of tasks to be submitted
'''
normalized = list()
for task in ru.as_list(tasks):
if isinstance(task, TaskDescription):
# convert to task dict
normalized.append(Task(self, task, origin='raptor').as_dict())
else:
normalized.append(task)
self._submit_tasks(self.request_cb(normalized))
# --------------------------------------------------------------------------
#
def _submit_tasks(self, tasks) -> None:
'''
This is the internal implementation of `self.submit_tasks` which
performs the actual submission after the tasks passed through the
`request_cb`.
'''
if not tasks:
return
raptor_tasks = list()
executable_tasks = list()
for task in ru.as_list(tasks):
assert 'description' in task
self._log.debug('submit: %s', task['uid'])
mode = task['description'].get('mode', TASK_EXECUTABLE)
if mode == TASK_EXECUTABLE:
executable_tasks.append(task)
else:
raptor_tasks.append(task)
# tasks submitted in raptor will miss sandbox completion as
# performed by the tmgr.scheduler, so we add it here
dummy = {'pilot_sandbox': self._psbox}
sbox = self._session._get_task_sandbox(task, dummy)
task['task_sandbox'] = str(sbox)
task['task_sandbox_path'] = ru.Url(sbox).path
self._submit_executable_tasks(executable_tasks)
self._submit_raptor_tasks(raptor_tasks)
# --------------------------------------------------------------------------
#
def _submit_raptor_tasks(self, tasks) -> None:
if tasks:
self.advance(tasks, state=rps.AGENT_SCHEDULING,
publish=True, push=False)
self._req_put.put(tasks)
# --------------------------------------------------------------------------
#
def _submit_executable_tasks(self, tasks) -> None:
'''
Submit tasks per given task description to the agent this
master is running in.
'''
if not tasks:
return
for task in tasks:
td = task['description']
# task['uid'] = td.get('uid')
task['state'] = rps.AGENT_STAGING_INPUT_PENDING
task['pilot_sandbox'] = self._psbox
task['session_sandbox'] = self._ssbox
task['resource_sandbox'] = self._rsbox
task['pilot'] = self._pid
task['resources'] = {'cpu': td['ranks'] *
td.get('cores_per_rank', 1),
'gpu': td['ranks'] *
td.get('gpus_per_rank', 0.)}
# NOTE: the order of insert / state update relies on that order
# being maintained through the component's message push,
# the update worker's message receive up to the insertion
# order into the update worker's DB bulk op.
self._log.debug('insert %s', td['uid'])
self.publish(rpc.STATE_PUBSUB, {'cmd': 'insert', 'arg': task})
self.advance(tasks, state=rps.AGENT_STAGING_INPUT_PENDING,
publish=True, push=True)
# --------------------------------------------------------------------------
#
def _request_cb(self, tasks) -> None:
'''
This cb will be called for all tasks which are under control of this
raptor master, upon receival by the master.
'''
tasks = ru.as_list(tasks)
for task in tasks:
self._log.debug('request_cb: %s', task['uid'])
# executable tasks which come from the scheduler are in danger of
# entering a loop as we will push them back to the scheduler for
# execution. To avoid that loop we set an additional attribute
# (raptor_seen=True) which the scheduler can filter for
if task['description']['mode'] == TASK_EXECUTABLE:
task['raptor_seen'] = True
try:
self.submit_tasks(tasks)
except:
self._log.exception('request cb failed')
self.advance(tasks, rps.FAILED, publish=True, push=False)
# --------------------------------------------------------------------------
#
[docs] def request_cb(self, tasks):
'''
A raptor master implementation can overload this cb to filter all newly
submitted tasks: it recieves a list of tasks and returns a potentially
different list of tasks which are then executed. It is up to the master
implementation to ensure proper state transition for any tasks which
are passed as argument but are not returned by the call and thus are not
submitted for execution.
Args:
tasks ([List[Dict[str, ANY]]): list of tasks which this master
received for execution
Returns:
tasks ([List[Dict[str, ANY]]): possibly different list of tasks than
received
'''
# FIXME: document task format
return tasks
# --------------------------------------------------------------------------
#
def _result_cb(self, tasks):
'''
As pendant to the `_request_cb`, the `_result_cb` is invoked when raptor
tasks complete execution.
'''
tasks = ru.as_list(tasks)
for task in tasks:
uid = task['uid']
if not task.get('target_state'):
ret = task.get('exit_code')
if ret is None:
ret = -1
if int(ret) == 0: task['target_state'] = rps.DONE
else : task['target_state'] = rps.FAILED
if uid in self._task_service_data:
# update task info and signal task service thread
self._log.debug('unlock 2 %s', uid)
self._task_service_data[uid].append(task)
self._task_service_data[uid][0].set()
try:
self.result_cb(tasks)
except:
self._log.exception('result callback failed')
self.advance(tasks, rps.AGENT_STAGING_OUTPUT_PENDING,
publish=True, push=True)
# --------------------------------------------------------------------------
#
[docs] def result_cb(self, tasks):
'''
A raptor master implementation can overload this cb which get's called
when raptor tasks complete execution.
Args:
tasks ([List[Dict[str, ANY]]): list of tasks which this master
executed
'''
# FIXME: document task format
pass
# --------------------------------------------------------------------------
#
[docs] def terminate(self):
'''
terminate all workers and the master's own work loop.
'''
# unregister input queue
self.publish(rpc.CONTROL_PUBSUB, {
'cmd': 'unregister_raptor_queue',
'arg': {'name' : self._uid,
'queue': self._input_queue.channel}})
self._log.debug('set term from terminate')
self._term.set()
for uid in self._workers:
self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'worker_terminate',
'arg': {'uid': uid}})
# wait for workers to terminate
# FIXME: TS
# self.wait()
self._log.debug('all workers terminated')
# ------------------------------------------------------------------------------