# pylint: disable=protected-access,unused-argument
import glob
import json
import os
import sys
import radical.utils as ru
from .. import states as s
from ..task_description import RAPTOR_MASTER, RAPTOR_WORKER, TASK_EXECUTABLE
_debug = os.environ.get('RP_PROF_DEBUG')
_node_index = dict()
_CACHE_BASEDIR = '/tmp/rp_cache_%d/' % os.getuid()
# ------------------------------------------------------------------------------
#
# pilot and task activities: core hours are derived by multiplying the
# respective time durations with pilot size / task size. The 'idle'
# utilization and the 'agent' utilization are derived separately.
#
# Note that durations should add up to the `x_total` generations to ensure
# accounting for the complete task/pilot utilization.
#
# An updated list of events is available at docs/source/events.md
PILOT_DURATIONS = {
'provide' : {
'total' : [{ru.EVENT: 'bootstrap_0_start'},
{ru.EVENT: 'bootstrap_0_stop' }]
},
# times between PMGR_ACTIVE and the termination command are not
# considered pilot specific consumptions. If some resources remain
# unused during that time, it is either due to inefficiencies of
# workload management (accounted for in the task consumption metrics),
# or the pilot is starving for workload.
'consume' : {
'boot' : [{ru.EVENT: 'bootstrap_0_start'},
{ru.EVENT: 'bootstrap_0_ok' }],
'setup_1' : [{ru.EVENT: 'bootstrap_0_ok' },
{ru.STATE: s.PMGR_ACTIVE }],
'idle' : [{ru.STATE: s.PMGR_ACTIVE },
{ru.EVENT: 'cmd' ,
ru.MSG : 'cancel_pilot' }],
'term' : [{ru.EVENT: 'cmd' ,
ru.MSG : 'cancel_pilot' },
{ru.EVENT: 'bootstrap_0_stop' }],
'agent' : [{ru.EVENT: 'sub_agent_start' },
{ru.EVENT: 'sub_agent_stop' }],
},
# FIXME: separate out DVM startup time
# 'rte' : [{ru.STATE: s.PMGR_ACTIVE },
# {ru.STATE: s.PMGR_ACTIVE }],
# 'setup_2' : [{ru.STATE: s.PMGR_ACTIVE },
# {ru.STATE: s.PMGR_ACTIVE }],
#
# resources on agent nodes are consumed for all of the pilot's lifetime
'agent' : {
'total' : [{ru.EVENT: 'bootstrap_0_start'},
{ru.EVENT: 'bootstrap_0_stop' }]
}
}
# The set of default task durations that are available for every task
# description, default resource configuration, and default scheduler and
# launcher.
TASK_DURATIONS_DEFAULT = {
'consume' : {
'exec_queue' : [{ru.EVENT: 'schedule_ok' },
{ru.STATE: s.AGENT_EXECUTING }],
'exec_prep' : [{ru.STATE: s.AGENT_EXECUTING },
{ru.EVENT: 'task_run_start' }],
'exec_rp' : [{ru.EVENT: 'task_run_start' },
{ru.EVENT: 'launch_start' }],
'exec_sh' : [{ru.EVENT: 'launch_start' },
{ru.EVENT: 'exec_start' }],
'exec_cmd' : [{ru.EVENT: 'exec_start' },
{ru.EVENT: 'exec_stop' }],
'term_sh' : [{ru.EVENT: 'exec_stop' },
{ru.EVENT: 'launch_stop' }],
'term_rp' : [{ru.EVENT: 'launch_stop' },
{ru.EVENT: 'task_run_stop' }],
'unschedule' : [{ru.EVENT: 'task_run_stop' },
{ru.EVENT: 'unschedule_stop' }]
}
}
# The set of default task durations augmented with the durations of the app
# events. App events are generated by RADICAL Synapse and by the script
# `radical-pilot-hello.sh`. The latter is useful for testing as
# a sleep command drop-in.
TASK_DURATIONS_APP = {
'consume' : {
'exec_queue' : [{ru.EVENT: 'schedule_ok' },
{ru.STATE: s.AGENT_EXECUTING }],
'exec_prep' : [{ru.STATE: s.AGENT_EXECUTING },
{ru.EVENT: 'task_run_start' }],
'exec_rp' : [{ru.EVENT: 'task_run_start' },
{ru.EVENT: 'launch_start' }],
'exec_sh' : [{ru.EVENT: 'launch_start' },
{ru.EVENT: 'exec_start' }],
'init_app' : [{ru.EVENT: 'exec_start' },
{ru.EVENT: 'rank_start' }],
'exec_cmd' : [{ru.EVENT: 'rank_start' },
{ru.EVENT: 'rank_stop' }],
'term_app' : [{ru.EVENT: 'rank_stop' },
{ru.EVENT: 'exec_stop' }],
'term_sh' : [{ru.EVENT: 'exec_stop' },
{ru.EVENT: 'launch_stop' }],
'term_rp' : [{ru.EVENT: 'launch_stop' },
{ru.EVENT: 'task_run_stop' }],
'unschedule' : [{ru.EVENT: 'task_run_stop' },
{ru.EVENT: 'unschedule_stop' }]
}
}
# FIXME: TASK_DURATIONS_PRTE
# The set of default task durations with the durations generated when using
# PRRTE as a launch method.
# ----------------------------------------------------------------------------
#
def _convert_sdurations(sdurations):
'''
Converts a collection of durations expressed in short form to the same
collection of durations expressed in long form.
Definitions:
- Short form collection: one dictionary of short form durations
- Long form: one dictionary of long form durations.
Args:
sdurations (dict): a collections of durations in short form
Return:
ldurations (dict): a collection of long form durations
Example:
sdurations = {'name_of_duration': [{'STATE': s.STATE_NAME},
{'EVENT': 'event_name'}]}
ldurations = {'name_of_duration': [{ru.EVENT: 'state',
ru.STATE: s.STATE_NAME},
{ru.EVENT: 'event_name',
ru.STATE: None}]}
sdurations = {'name_of_duration': [{'STATE': s.STATE_NAME},
[{'EVENT': 'event_name'},
{'STATE': s.STATE_NAME}]]}
ldurations = {'name_of_duration': [{ru.EVENT: 'state',
ru.STATE: s.STATE_NAME},
[{ru.EVENT: 'event_name',
ru.STATE: None},
{ru.EVENT: 'state',
ru.STATE: s.STATE_NAME}]]}
sdurations = {'name_of_duration': [{'STATE': s.STATE_NAME},
{'MSG': 'message_name'}]}
ldurations = {'name_of_duration': [{ru.EVENT: 'state',
ru.STATE: s.STATE_NAME},
{ru.EVENT: 'cmd',
ru.MSG: 'message_name'}]}
'''
ldurations = dict()
for k,v in sdurations.items():
ldurations[k] = list()
for ts in v:
if isinstance(ts, dict):
ldurations[k].append(_expand_sduration(ts))
if isinstance(ts, list):
lds = list()
for i in ts:
lds.append(_expand_sduration(i))
ldurations[k].append(lds)
return ldurations
# ----------------------------------------------------------------------------
#
def _expand_sduration(sduration):
'''
Expands a duration expressed in short form to its long form for the
timestamp types `ru.STATE`, `ru.EVENT` and `ru.MSG`.
Definitions:
- Short form duration: one dictionary containing a state or event name.
- Long form duration: one dictionary containing two keys, one of type
`ru.EVENT` and one of type `ru.STATE`. The `ru.EVENT` key has a string
value while the `ru.STATE` key has a `s.STATE_NAME` object as its value.
Args:
sduration (dict): a duration in short form
Return:
lduration (dict): sduration in long form
Example:
sduration = {'STATE': s.STATE_NAME}
lduration = {ru.EVENT: 'state', ru.STATE: s.STATE_NAME}
sduration = {'EVENT': 'event_name'}
lduration = {ru.EVENT: 'event_name', ru.STATE: None}
sduration = {'MSG': 'mesage_name'}
lduration = {ru.EVENT: 'cmd', ru.MSG: 'message_name'}
'''
# Allow durations with both ru.EVENT and ru.STATE.
tt = list(sduration.keys())
if len(tt) == 1 and tt[0] not in ['STATE', 'EVENT', 'MSG']:
raise Exception('unknown timestamp type: %s' % tt)
if len(tt) == 2:
return sduration
if len(tt) > 2:
raise Exception('invalid duration: too many timestamps (%s)' % tt)
# Expand known timestamps.
lduration = None
for k,v in sduration.items():
if k == 'STATE':
lduration = {ru.EVENT: 'state', ru.STATE: v}
elif k == 'EVENT':
lduration = {ru.EVENT: v, ru.STATE: None}
elif k == 'MSG':
lduration = {ru.EVENT: 'cmd', ru.MSG: v}
return lduration
# Set of default pilot durations for RADICAL-Analytics. All the durations
# are contiguos.
# NOTE: _init durations are most often 0.
PILOT_DURATIONS_DEBUG_SHORT = {
'p_pmgr_create' : [{'STATE': s.NEW },
{'STATE': s.PMGR_LAUNCHING_PENDING}],
'p_pmgr_launching_init' : [{'STATE': s.PMGR_LAUNCHING_PENDING},
{'STATE': s.PMGR_LAUNCHING }],
'p_pmgr_launching' : [{'STATE': s.PMGR_LAUNCHING },
{'EVENT': 'staging_in_start' }],
'p_pmgr_stage_in' : [{'EVENT': 'staging_in_start' },
{'EVENT': 'staging_in_stop' }],
'p_pmgr_submission_init' : [{'EVENT': 'staging_in_stop' },
{'EVENT': 'submission_start' }],
'p_pmgr_submission' : [{'EVENT': 'submission_start' },
{'EVENT': 'submission_stop' }],
'p_pmgr_scheduling_init' : [{'EVENT': 'submission_stop' },
{'STATE': s.PMGR_ACTIVE_PENDING }],
# batch system queue time
'p_pmgr_scheduling' : [{'STATE': s.PMGR_ACTIVE_PENDING },
{'EVENT': 'bootstrap_0_start' }],
'p_agent_ve_setup_init' : [{'EVENT': 'bootstrap_0_start' },
{'EVENT': 've_setup_start' }],
'p_agent_ve_setup' : [{'EVENT': 've_setup_start' },
{'EVENT': 've_setup_stop' }],
'p_agent_ve_activate_init': [{'EVENT': 've_setup_stop' },
{'EVENT': 've_activate_start' }],
'p_agent_ve_activate' : [{'EVENT': 've_activate_start' },
{'EVENT': 've_activate_stop' }],
'p_agent_install_init' : [{'EVENT': 've_activate_stop' },
{'EVENT': 'rp_install_start' }],
'p_agent_install' : [{'EVENT': 'rp_install_start' },
{'EVENT': 'rp_install_stop' }],
'p_agent_launching' : [{'EVENT': 'rp_install_stop' },
{'STATE': s.PMGR_ACTIVE }],
'p_agent_terminate_init' : [{'STATE': s.PMGR_ACTIVE },
{'MSG' : 'cancel_pilot' }],
'p_agent_terminate' : [{'MSG' : 'cancel_pilot' },
{'EVENT': 'bootstrap_0_stop' }],
# total pilot runtime
'p_agent_finalize' : [{'EVENT': 'bootstrap_0_stop' },
[{'STATE': s.DONE },
{'STATE': s.CANCELED },
{'STATE': s.FAILED }]],
'p_agent_runtime' : [{'EVENT': 'bootstrap_0_start' },
{'EVENT': 'bootstrap_0_stop' }]
}
PILOT_DURATIONS_DEBUG = _convert_sdurations(PILOT_DURATIONS_DEBUG_SHORT)
# Debug pilot durations tagged with keys taht can be used when calculating
# resource utilization.
# TODO: add the 'client' tag to relevant resource utilization methods.
_pdd = PILOT_DURATIONS_DEBUG
PILOT_DURATIONS_DEBUG_RU = {
'provide' : {
'p_agent_runtime' : _pdd['p_agent_runtime']
},
'client' : {
'p_pmgr_create' : _pdd['p_pmgr_create'],
'p_pmgr_launching_init' : _pdd['p_pmgr_launching_init'],
'p_pmgr_launching' : _pdd['p_pmgr_launching'],
'p_pmgr_stage_in' : _pdd['p_pmgr_stage_in'],
'p_pmgr_submission_init' : _pdd['p_pmgr_submission_init'],
'p_pmgr_submission' : _pdd['p_pmgr_submission'],
'p_pmgr_scheduling_init' : _pdd['p_pmgr_scheduling_init'],
'p_pmgr_scheduling' : _pdd['p_pmgr_scheduling'],
'p_agent_finalize' : _pdd['p_agent_finalize']
},
'consume' : {
'p_agent_ve_setup_init' : _pdd['p_agent_ve_setup_init'],
'p_agent_ve_setup' : _pdd['p_agent_ve_setup'],
'p_agent_ve_activate_init': _pdd['p_agent_ve_activate_init'],
'p_agent_ve_activate' : _pdd['p_agent_ve_activate'],
'p_agent_install_init' : _pdd['p_agent_install_init'],
'p_agent_install' : _pdd['p_agent_install'],
'p_agent_launching' : _pdd['p_agent_launching'],
'p_agent_terminate_init' : _pdd['p_agent_terminate_init'],
'p_agent_terminate' : _pdd['p_agent_terminate']
},
'agent' : {
'p_agent_runtime' : _pdd['p_agent_runtime']
}
}
# Set of default task durations for RADICAL-Analytics. All the durations
# are contiguous.
TASK_DURATIONS_DEBUG_SHORT = {
't_tmgr_create' : [{'STATE': s.NEW },
{'STATE': s.TMGR_SCHEDULING_PENDING }],
't_tmgr_schedule_queue' : [{'STATE': s.TMGR_SCHEDULING_PENDING },
{'STATE': s.TMGR_SCHEDULING }],
't_tmgr_schedule' : [{'STATE': s.TMGR_SCHEDULING },
{'STATE': s.TMGR_STAGING_INPUT_PENDING }],
# push to mongodb
't_tmgr_stage_in_queue' : [{'STATE': s.TMGR_STAGING_INPUT_PENDING },
{'STATE': s.TMGR_STAGING_INPUT }],
# wait in mongodb
't_tmgr_stage_in' : [{'STATE': s.TMGR_STAGING_INPUT },
{'STATE': s.AGENT_STAGING_INPUT_PENDING }],
# pull from mongodb
't_agent_stage_in_queue' : [{'STATE': s.AGENT_STAGING_INPUT_PENDING },
{'STATE': s.AGENT_STAGING_INPUT }],
't_agent_stage_in' : [{'STATE': s.AGENT_STAGING_INPUT },
{'STATE': s.AGENT_SCHEDULING_PENDING }],
't_agent_schedule_queue' : [{'STATE': s.AGENT_SCHEDULING_PENDING },
{'STATE': s.AGENT_SCHEDULING }],
't_agent_schedule' : [{'STATE': s.AGENT_SCHEDULING },
{'STATE': s.AGENT_EXECUTING_PENDING }],
't_agent_execute_queue' : [{'STATE': s.AGENT_EXECUTING_PENDING },
{'STATE': s.AGENT_EXECUTING }],
't_agent_execute_prepare' : [{'STATE': s.AGENT_EXECUTING },
{'EVENT': 'task_mkdir' }],
't_agent_execute_mkdir' : [{'EVENT': 'task_mkdir' },
{'EVENT': 'task_mkdir_done' }],
't_agent_execute_layer_start': [{'EVENT': 'task_mkdir_done' },
{'EVENT': 'task_run_start' }],
# ssh, mpi, ...
't_agent_execute_layer' : [{'EVENT': 'task_run_start' },
{'EVENT': 'task_run_ok' }],
# PROBLEM: discontinuity
't_agent_lm_start' : [{'EVENT': 'task_run_start' },
{'EVENT': 'launch_start' }],
't_agent_lm_submit' : [{'EVENT': 'launch_start' },
{'EVENT': 'exec_start' }],
't_agent_lm_execute' : [{'EVENT': 'exec_start' },
{'EVENT': 'exec_stop' }],
't_agent_lm_stop' : [{'EVENT': 'exec_stop' },
{'EVENT': 'task_run_stop' }],
't_agent_stage_out_start' : [{'EVENT': 'task_run_stop' },
{'STATE': s.AGENT_STAGING_OUTPUT_PENDING}],
't_agent_stage_out_queue' : [{'STATE': s.AGENT_STAGING_OUTPUT_PENDING},
{'STATE': s.AGENT_STAGING_OUTPUT }],
't_agent_stage_out' : [{'STATE': s.AGENT_STAGING_OUTPUT },
{'STATE': s.TMGR_STAGING_OUTPUT_PENDING }],
# push/pull mongodb
't_agent_push_to_tmgr' : [{'STATE': s.TMGR_STAGING_OUTPUT_PENDING },
{'STATE': s.TMGR_STAGING_OUTPUT }],
't_tmgr_destroy' : [{'STATE': s.TMGR_STAGING_OUTPUT },
[{'STATE': s.DONE },
{'STATE': s.CANCELED },
{'STATE': s.FAILED }]],
't_agent_unschedule' : [{'EVENT': 'unschedule_start' },
{'EVENT': 'unschedule_stop' }]
}
TASK_DURATIONS_DEBUG = _convert_sdurations(TASK_DURATIONS_DEBUG_SHORT)
# Debug task durations tagged with keys taht can be used when calculating
# resource utilization.
# TODO: add the 'client' tag to relevant resource utilization methods.
_udd = TASK_DURATIONS_DEBUG
TASK_DURATIONS_DEBUG_RU = {
'client' : {
't_tmgr_create' : _udd['t_tmgr_create'],
't_tmgr_schedule_queue' : _udd['t_tmgr_schedule_queue'],
't_tmgr_schedule' : _udd['t_tmgr_schedule'],
't_tmgr_stage_in_queue' : _udd['t_tmgr_stage_in_queue'],
't_tmgr_stage_in' : _udd['t_tmgr_stage_in'],
't_tmgr_destroy' : _udd['t_tmgr_destroy'],
't_agent_unschedule' : _udd['t_agent_unschedule']
},
'consume' : {
't_agent_stage_in_queue' : _udd['t_agent_stage_in_queue'],
't_agent_stage_in' : _udd['t_agent_stage_in'],
't_agent_schedule_queue' : _udd['t_agent_schedule_queue'],
't_agent_schedule' : _udd['t_agent_schedule'],
't_agent_execute_queue' : _udd['t_agent_execute_queue'],
't_agent_execute_prepare' : _udd['t_agent_execute_prepare'],
't_agent_execute_mkdir' : _udd['t_agent_execute_mkdir'],
't_agent_execute_layer_start': _udd['t_agent_execute_layer_start'],
't_agent_execute_layer' : _udd['t_agent_execute_layer'],
't_agent_lm_start' : _udd['t_agent_lm_start'],
't_agent_lm_submit' : _udd['t_agent_lm_submit'],
't_agent_lm_execute' : _udd['t_agent_lm_execute'],
't_agent_lm_stop' : _udd['t_agent_lm_stop'],
't_agent_stage_out_start' : _udd['t_agent_stage_out_start'],
't_agent_stage_out_queue' : _udd['t_agent_stage_out_queue'],
't_agent_stage_out' : _udd['t_agent_stage_out'],
't_agent_push_to_tmgr' : _udd['t_agent_push_to_tmgr'],
}
}
# ------------------------------------------------------------------------------
#
[docs]def get_hostmap(profile):
'''
We abuse the profile combination to also derive a pilot-host map, which
will tell us on what exact host each pilot has been running. To do so, we
check for the PMGR_ACTIVE advance event in agent_0.prof, and use the NTP
sync info to associate a hostname.
'''
# FIXME: This should be replaced by proper hostname logging
# in `pilot.resource_details`.
hostmap = dict() # map pilot IDs to host names
for entry in profile:
if entry[ru.EVENT] == 'hostname':
hostmap[entry[ru.UID]] = entry[ru.MSG]
return hostmap
# ------------------------------------------------------------------------------
#
[docs]def get_hostmap_deprecated(profiles):
'''
This method mangles combine_profiles and get_hostmap, and is deprecated.
At this point it only returns the hostmap
'''
hostmap = dict() # map pilot IDs to host names
for pname, prof in profiles.items():
if not len(prof):
continue
if not prof[0][ru.MSG]:
continue
host, ip, _, _, _ = prof[0][ru.MSG].split(':')
host_id = '%s:%s' % (host, ip)
for row in prof:
if 'agent_0.prof' in pname and \
row[ru.EVENT] == 'advance' and \
row[ru.STATE] == s.PMGR_ACTIVE:
hostmap[row[ru.UID]] = host_id
break
return hostmap
# ------------------------------------------------------------------------------
#
def get_session_profile(sid, src=None):
if not src:
src = '%s/%s' % (os.getcwd(), sid)
if os.path.exists(src):
# we have profiles locally
profiles = glob.glob('%s/*.prof' % src)
profiles += glob.glob('%s/**/*.prof' % src)
else:
# need to fetch profiles
from .session import fetch_profiles
profiles = fetch_profiles(sid=sid, skip_existing=True)
# filter out some frequent, but uninteresting events
efilter = {ru.EVENT: [
# 'get',
'publish',
'schedule_skip',
'schedule_fail',
'staging_stderr_start',
'staging_stderr_stop',
'staging_stdout_start',
'staging_stdout_stop',
'staging_uprof_start',
'staging_uprof_stop',
'update_pushed',
]}
profiles = ru.read_profiles(profiles, sid, efilter=efilter)
profile, accuracy = ru.combine_profiles(profiles)
profile = ru.clean_profile(profile, sid, s.FINAL, s.CANCELED)
hostmap = get_hostmap(profile)
# we sometimes miss the `bootstrap_0_stop` event when the bootstrapper is
# killed before being able to terminate nicely. In that case we use the
# last timestamp for that pilot for that event.
last_ts = dict()
seen_bs = dict()
for e in profile:
if 'pilot.' in e[4]:
pid = e[4]
last_ts[pid] = max(last_ts.get(pid, 0), e[0])
if e[1] == 'bootstrap_0_stop':
seen_bs[pid] = True
for pid in last_ts:
if seen_bs.get(pid) is None:
profile.append([last_ts[pid],
'bootstrap_0_stop',
'bootstrap_0',
'MainThread',
pid,
'pilot_state',
'',
'pilot'])
if not hostmap:
# FIXME: legacy host notation - deprecated
hostmap = get_hostmap_deprecated(profiles)
return profile, accuracy, hostmap
# ------------------------------------------------------------------------------
#
[docs]def get_session_description(sid, src=None):
'''
This will return a description which is usable for radical.analytics
evaluation. It informs about:
- set of stateful entities
- state models of those entities
- event models of those entities (maybe)
- configuration of the application / module
If `src` is given, it is interpreted as path to search for session
information (json dump). `src` defaults to `$PWD/$sid`.
'''
if not src:
src = '%s/%s' % (os.getcwd(), sid)
# construct session json from registry dump, tmgr and pmgr json files, and
# pilot and task json files
json = dict()
reg = ru.read_json('%s/%s.reg.json' % (src, sid))
del reg['rcfgs']
json['session'] = [ reg ]
json['tmgr'] = list()
json['pmgr'] = list()
json['pilot'] = list()
json['task'] = list()
for fname in glob.glob(str('%s/tmgr.*.json' % src)):
json['tmgr'].append(ru.read_json(fname))
for fname in glob.glob(str('%s/pmgr.*.json' % src)):
json['pmgr'].append(ru.read_json(fname))
for tmgr in json['tmgr']:
json['task'].extend(tmgr['tasks'].values())
del tmgr['tasks']
for pmgr in json['pmgr']:
json['pilot'].extend(pmgr['pilots'])
del pmgr['pilots']
json['session'][0]['uid'] = sid
ret = dict()
ret['entities'] = dict()
tree = dict()
tree[sid] = {'uid' : sid,
'etype' : 'session',
'cfg' : json['session'][0]['cfg'],
'has' : ['tmgr', 'pmgr'],
'children' : list()
}
for pmgr in sorted(json['pmgr'], key=lambda k: k['uid']):
uid = pmgr['uid']
tree[sid]['children'].append(uid)
tree[uid] = {'uid' : uid,
'etype' : 'pmgr',
'cfg' : pmgr.get('cfg', {}),
'has' : ['pilot'],
'children' : list()
}
for tmgr in sorted(json['tmgr'], key=lambda k: k['uid']):
uid = tmgr['uid']
tree[sid]['children'].append(uid)
tree[uid] = {'uid' : uid,
'etype' : 'tmgr',
'cfg' : tmgr.get('cfg', {}),
'has' : ['task'],
'children' : list()
}
# also inject the pilot description, and resource specifically
tree[uid]['description'] = dict()
for pilot in sorted(json['pilot'], key=lambda k: k['uid']):
pid = pilot['uid']
pmgr = pilot['pmgr']
details = pilot['description']
details = ru.dict_merge(details, pilot['resource_details'])
pilot['cfg'] = details
pilot['cfg']['resource_details'] = details
pilot['cfg']['resource_details']['rm_info'] = details
tree[pmgr]['children'].append(pid)
tree[pid] = {'uid' : pid,
'etype' : 'pilot',
'cfg' : pilot['cfg'],
'resources' : pilot['resources'],
'description': pilot['description'],
'has' : ['task'],
'children' : list()
}
# also inject the pilot description, and resource specifically
for task in sorted(json['task'], key=lambda k: k['uid']):
uid = task['uid']
tmgr = task.get('tmgr')
if tmgr:
tree[tmgr]['children'].append(uid)
if 'pilot' not in task:
# if task haven't finished, while session got terminated
continue
pid = task['pilot']
tree[pid]['children'].append(uid)
if 'resources' not in task:
td = task['description']
task['resources'] = {'cpu': td['ranks'] * td['cores_per_rank'],
'gpu': td['ranks'] * td['gpus_per_rank']}
# we determine the entity type by the task mode
etype = task.get('etype')
mode = task['description'].get('mode')
if not etype:
mode = task['description']['mode']
if mode in [RAPTOR_MASTER, RAPTOR_WORKER]:
etype = mode
elif mode in [TASK_EXECUTABLE]:
etype = 'task'
else:
etype = 'raptor.task'
if _debug:
print('%-30s [%-30s] -> %-30s' % (uid, mode, etype))
tree[uid] = {'uid' : uid,
'etype' : etype,
'cfg' : task,
'resources' : task['resources'],
'description' : task['description'],
'has' : list(),
'children' : list()
}
# remove duplicate
del tree[uid]['cfg']['description']
ret['tree'] = tree
ret['entities']['pilot'] = {'state_model' : s._pilot_state_values,
'state_values' : s._pilot_state_inv_full,
'event_model' : dict()}
ret['entities']['task'] = {'state_model' : s._task_state_values,
'state_values' : s._task_state_inv_full,
'event_model' : dict()}
ret['entities']['session'] = {'state_model' : None, # has no states
'state_values' : None,
'event_model' : dict()}
ret['config'] = dict() # session config goes here
return ret
# ------------------------------------------------------------------------------
#
def get_node_index(node_list, node_uid, pn):
if not _node_index:
for idx,n in enumerate(node_list):
_node_index[n['node_id']] = idx
r0 = _node_index[node_uid] * pn
r1 = r0 + pn - 1
return [r0, r1]
# ------------------------------------------------------------------------------
#
def get_duration(thing, dur):
for e in dur:
if ru.STATE in e and ru.EVENT not in e:
e[ru.EVENT] = 'state'
t0 = thing.timestamps(event=dur[0])
t1 = thing.timestamps(event=dur[1])
if not len(t0) or not len(t1):
return [None, None]
return (t0[0], t1[-1])
# ------------------------------------------------------------------------------
#
def cluster_resources(resources):
# resources is a list of
# - single index (single core of gpu
# - [r0, r1] tuples (ranges of core, gpu indexes)
# cluster continuous stretches of resources
ret = list()
idx = set()
for r in resources:
if isinstance(r, int):
idx.add(r)
else:
for i in range(r[0], r[1] + 1):
idx.add(i)
r0 = None
r1 = None
for i in sorted(list(idx)):
if r0 is None:
r0 = i
continue
if r1 is None:
if i == r0 + 1:
r1 = i
continue
ret.append([r0, r0])
r0 = None
continue
if i == r1 + 1:
r1 = i
continue
ret.append([r0, r1])
r0 = i
r1 = None
if r0 is not None:
if r1 is not None:
ret.append([r0, r1])
else:
ret.append([r0, r0])
return ret
# ------------------------------------------------------------------------------
#
def _get_pilot_provision(pilot, rtype):
pid = pilot.uid
ret = dict()
rnd = 'cores_per_node'
if rtype == 'gpu':
rnd = 'gpus_per_node'
pn = pilot.cfg['resource_details']['rm_info'][rnd]
nodes, _, _ = _get_nodes(pilot)
for metric in PILOT_DURATIONS['provide']:
boxes = list()
t0, t1 = get_duration(pilot, PILOT_DURATIONS['provide'][metric])
if t0 is None:
t0 = pilot.events [0][ru.TIME]
t1 = pilot.events[-1][ru.TIME]
for node in nodes:
r0, r1 = get_node_index(nodes, node['node_id'], pn)
boxes.append([t0, t1, r0, r1])
ret['total'] = {pid: boxes}
return ret
# ------------------------------------------------------------------------------
#
[docs]def get_provided_resources(session, rtype='cpu'):
'''
For all ra.pilots, return the amount and time of the type of resources
provided. This computes sets of 4-tuples of the form: [t0, t1, r0, r1]
where:
t0: time, begin of resource provision
t1: time, begin of resource provision
r0: int, index of resources provided (min)
r1: int, index of resources provided (max)
The tuples are formed so that t0 to t1 and r0 to r1 are continuous.
'''
if rtype not in ['cpu', 'gpu']:
raise Exception('unknown resource type: %s' % rtype)
provided = dict()
for p in session.get(etype='pilot'):
data = _get_pilot_provision(p, rtype)
for metric in data:
if metric not in provided:
provided[metric] = dict()
for uid in data[metric]:
provided[metric][uid] = data[metric][uid]
return provided
# ------------------------------------------------------------------------------
#
[docs]def get_consumed_resources(session, rtype='cpu', tdurations=None):
'''
For all ra.pilot or ra.task entities, return the amount and time of
resources consumed. A consumed resource is characterized by:
- a resource type (we know about cores and gpus)
- a metric name (what the resource was used for)
- a list of 4-tuples of the form: [t0, t1, r0, r1]
The tuples are formed so that t0 to t1 and r0 to r1 are continuous:
- t0: time, begin of resource consumption
- t1: time, begin of resource consumption
- r0: int, index of resources consumed (min)
- r1: int, index of resources consumed (max)
An entity can consume different resources under different metrics, but the
returned consumption specs will never overlap. Thus, any resource is
accounted for exactly one metric at any point in time. The returned
structure has the following overall form::
{
'metric_1' : {
uid_1 : [[t0, t1, r0, r1],
[t2, t3, r2, r3],
...
],
uid_2 : ...
},
'metric_2' : ...
}
'''
log = ru.Logger('radical.pilot.utils')
consumed = dict()
for e in session.get(etype=['pilot', 'task']):
if e.etype == 'pilot': data = _get_pilot_consumption(e, rtype)
elif e.etype == 'task' : data = _get_task_consumption(session, e, rtype,
tdurations)
for metric in data:
if metric not in consumed:
consumed[metric] = dict()
for uid in data[metric]:
consumed[metric][uid] = data[metric][uid]
# we defined two additional metrics, 'warmup' and 'drain', which are defined
# for all resources of the pilot. `warmup` is defined as the time from
# when the pilot becomes active, to the time the resource is first consumed
# by a task. `drain` is the inverse: the time from when any task last
# consumed the resource to the time when the pilot begins termination.
for pilot in session.get(etype='pilot'):
# if tdurations:
# # print('DEBUG: using tdurations')
# task_durations = tdurations
# elif pilot.cfg['task_launch_method'] == 'PRTE':
# # print('DEBUG: using prte configuration')
# task_durations = TASK_DURATIONS_PRTE
# else:
# # print('DEBUG: using default configuration')
# task_durations = TASK_DURATIONS_DEFAULT
pt = pilot.timestamps
log.debug('timestamps:')
for ts in pt():
log.debug(' %10.2f %-20s %-15s %-15s %-15s %-15s %s',
ts[0], ts[1], ts[2], ts[3], ts[4], ts[5], ts[6])
p_min = None
p_max = None
try : p_min = pilot.timestamps(event={1: 'bootstrap_0_start'})[0]
except: pass
try : p_max = pilot.timestamps(event={1: 'bootstrap_0_stop'})[0]
except: pass
# fallback for missing bootstrap events
if p_min is None: p_min = pilot.timestamps(state='PMGR_ACTIVE')
if p_max is None: p_max = pilot.events[-1][ru.TIME]
assert p_min is not None
assert p_max is not None
log.debug('pmin, pmax: %10.2f / %10.2f', p_min, p_max)
pid = pilot.uid
rnd = 'cores_per_node'
rmap = 'core_map'
if rtype == 'gpu':
rnd = 'gpus_per_node'
rmap = 'gpu_map'
pn = pilot.cfg['resource_details']['rm_info'][rnd]
nodes, _, pnodes = _get_nodes(pilot)
# find resource utilization scope for all resources. We begin filling
# the resource dict with
#
# resource_id : [t_min=None, t_max=None]
#
# and then iterate over all tasks. Wen we find a task using some
# resource id, we set or adjust t_min / t_max.
resources = dict()
for pnode in pnodes:
idx = get_node_index(nodes, pnode['node_id'], pn)
for c in range(idx[0], idx[1] + 1):
resources[c] = [None, None]
for task in session.get(etype='task'):
if task.cfg.get('pilot') != pid:
continue
try:
ranks = task.cfg['slots']['ranks']
tts = task.timestamps
task_min = tts(event=task['consume']['exec_queue'][0]) [0]
task_max = tts(event=task['consume']['unschedule'][1])[-1]
except:
continue
for rank in ranks:
r0, _ = get_node_index(nodes, rank['node_id'], pn)
for resource_map in rank[rmap]:
for resource in resource_map:
idx = r0 + resource
t_min = resources[idx][0]
t_max = resources[idx][1]
if t_min is None or t_min > task_min: t_min = task_min
if t_max is None or t_max < task_max: t_max = task_max
resources[idx] = [t_min, t_max]
# now sift through resources and find buckets of pairs with same t_min
# or same t_max
bucket_min = dict()
bucket_max = dict()
bucket_none = list()
for idx in resources:
t_min = resources[idx][0]
t_max = resources[idx][1]
if t_min is None:
assert t_max is None
bucket_none.append(idx)
else:
if t_min not in bucket_min:
bucket_min[t_min] = list()
bucket_min[t_min].append(idx)
if t_max not in bucket_max:
bucket_max[t_max] = list()
bucket_max[t_max].append(idx)
boxes_warm = list()
boxes_drain = list()
boxes_idle = list()
# now cluster all lists and add the respective boxes
for t_min in bucket_min:
for r in cluster_resources(bucket_min[t_min]):
boxes_warm.append([p_min, t_min, r[0], r[1]])
for t_max in bucket_max:
for r in cluster_resources(bucket_max[t_max]):
boxes_drain.append([t_max, p_max, r[0], r[1]])
for r in cluster_resources(bucket_none):
boxes_idle.append([p_min, p_max, r[0], r[1]])
if 'warm' not in consumed: consumed['warm'] = dict()
if 'drain' not in consumed: consumed['drain'] = dict()
if 'idle' not in consumed: consumed['idle'] = dict()
consumed['warm'][pid] = boxes_warm
consumed['drain'][pid] = boxes_drain
consumed['idle'][pid] = boxes_idle
# pprint.pprint(consumed)
return consumed
# ------------------------------------------------------------------------------
#
def _get_nodes(pilot):
pnodes = pilot.cfg['resource_details']['rm_info']['node_list']
agents = pilot.cfg['resource_details']['rm_info'].get('agent_nodes', [])
anodes = list()
nodes = list()
for agent in agents:
anodes.append(agents[agent])
nodes = pnodes + anodes
return nodes, anodes, pnodes
# ------------------------------------------------------------------------------
#
def _get_pilot_consumption(pilot, rtype):
# Pilots consume resources in different ways:
#
# - the pilot needs to bootstrap and initialize before becoming active,
# i.e., before it can begin to manage the workload, and needs to
# terminate and clean up during shutdown;
# - the pilot may block one or more nodes or cores for it's own components
# (sub-agents), and those components are not available for workload
# execution
# - the pilot may perform operations while managing the workload.
#
# This method will compute the first two contributions and part of the 3rd.
# It will *not* account for those parts of the 3rd which are performed while
# specfic resources are blocked for the affected workload element (task)
# - those resource consumption is considered to be a consumption *of that
# task*, which allows us to compute tasks specific resource utilization
# overheads.
pid = pilot.uid
ret = dict()
rnd = 'cores_per_node'
if rtype == 'gpu':
rnd = 'gpus_per_node'
pn = pilot.cfg['resource_details']['rm_info'][rnd]
# Account for agent resources. Agents use full nodes, i.e., cores and GPUs
# We happen to know that agents use the first nodes in the allocation and
# their resource tuples thus start at index `0`, but for completeness we
# ensure that by inspecting the pilot cfg.
# Duration is for all of the pilot runtime. This is not precises really,
# since several bootstrapping phases happen before the agents exist - but we
# consider the nodes blocked for the sub-agents from the get-go.
t0, t1 = get_duration(pilot, PILOT_DURATIONS['agent']['total'])
boxes = list()
# Substract agent nodes from the nodelist, so that we correctly attribute
# other global pilot metrics to the remaining nodes.
nodes, anodes, pnodes = _get_nodes(pilot)
if anodes and t0 is not None:
for anode in anodes:
r0, r1 = get_node_index(nodes, anode['node_id'], pn)
boxes.append([t0, t1, r0, r1])
ret['agent'] = {pid: boxes}
# account for all other pilot metrics
for metric in PILOT_DURATIONS['consume']:
if metric == 'idle':
continue
boxes = list()
t0, t1 = get_duration(pilot, PILOT_DURATIONS['consume'][metric])
if t0 is not None:
for node in pnodes:
r0, r1 = get_node_index(nodes, node['node_id'], pn)
boxes.append([t0, t1, r0, r1])
ret[metric] = {pid: boxes}
return ret
# ------------------------------------------------------------------------------
#
def _get_task_consumption(session, task, rtype, tdurations=None):
# we need to know what pilot the task ran on. If we don't find a designated
# pilot, no resources were consumed
uid = task.uid
pid = task.cfg.get('pilot')
if not pid:
return dict()
# get the pilot for inspection
pilot = session.get(uid=pid)
if isinstance(pilot, list):
assert len(pilot) == 1
pilot = pilot[0]
# FIXME: it is inefficient to query those values again and again
nodes, _, _ = _get_nodes(pilot)
rnd = 'cores_per_node'
rmap = 'core_map'
if rtype == 'gpu':
rnd = 'gpus_per_node'
rmap = 'gpu_map'
pn = pilot.cfg['resource_details']['rm_info'][rnd]
# Tasks consume only those resources they are scheduled on.
if 'slots' not in task.cfg:
return dict()
ranks = task.cfg['slots']['ranks']
resources = list()
for rank in ranks:
r0, _ = get_node_index(nodes, rank['node_id'], pn)
for resource_map in rank[rmap]:
for resource in resource_map:
resources.append(r0 + resource)
# find continuous stretched of resources to minimize number of boxes
resources = cluster_resources(resources)
if tdurations:
task_durations = tdurations
else:
task_durations = TASK_DURATIONS_DEFAULT
if _debug:
print()
ret = dict()
for metric in task_durations['consume']:
boxes = list()
t0, t1 = get_duration(task, task_durations['consume'][metric])
if t0 is not None:
if _debug:
print('%s: %-15s : %10.3f - %10.3f = %10.3f'
% (task.uid, metric, t1, t0, t1 - t0))
for r in resources:
boxes.append([t0, t1, r[0], r[1]])
else:
if _debug:
print('%s: %-15s : -------------- ' % (task.uid, metric))
dur = task_durations['consume'][metric]
print(dur)
for e in dur:
if ru.STATE in e and ru.EVENT not in e:
e[ru.EVENT] = 'state'
t0 = task.timestamps(event=dur[0])
t1 = task.timestamps(event=dur[1])
print(t0)
print(t1)
for e in task.events:
print('\t'.join([str(x) for x in e]))
# sys.exit()
ret[metric] = {uid: boxes}
return ret
# ------------------------------------------------------------------------------
#
def get_resource_transitions(pilot, task_metrics=None, pilot_metrics=None):
task_metrics = task_metrics or TASK_DURATIONS_DEFAULT
pilot_metrics = pilot_metrics or PILOT_DURATIONS
# we try to find points in time where resource usage moved from purpose A to
# purpose B. For example, consider this metric:
#
# 'exec_queue' : [{ru.EVENT: 'schedule_ok' },
# {ru.STATE: s.AGENT_EXECUTING}],
# 'exec_prep' : [{ru.STATE: s.AGENT_EXECUTING},
# {ru.EVENT: 'exec_start' }],
# 'exec_cmd' : [{ru.EVENT: 'exec_start' },
# {ru.EVENT: 't_start' }],
#
# then we convert this into the following structure:
#
# [
# # event from to
# [{ru.EVENT: 'schedule_ok' }, None, 'exec_queue'],
# [{ru.STATE: s.AGENT_EXECUTING}, 'exec_queue', 'exec_prep'],
# [{ru.EVENT: 'exec_start' }, 'exec_prep', 'exec_rp'],
# [{ru.EVENT: 't_start' }, 'exec_rp', 'None]
# ]
#
# which we will use like this:
#
# - go through all events for a task
# - if event is in the list above
# - reduce resources in `from` metric
# - increase resource in `to` metric
#
# If `from` or `to` are None, then the resources are taken from / given to
# the pilot's idle resources. We thus rename the 'None' to 'idle'.
# dig though metric, find all pairs of matching start/stop events
task_transitions = list()
for metric,spec in task_metrics['consume'].items():
# find the metric's transition spec
m = None
for m in task_transitions:
if m[0] == spec[0]:
break
m = None
# if we don't find that spec registered, register it as start event
if not m:
task_transitions.append([spec[0], None, metric])
else:
# if we know the transition, just register the stop event
assert m[2] is None
m[2] = metric
# the inverse of the above where we check stop events, register, and
# insert start events (in case transitions are not causally ordered)
m = None
for m in task_transitions:
if m[0] == spec[1]:
break
m = None
if not m:
task_transitions.append([spec[1], metric, None])
else:
assert m[1] is None
m[1] = metric
# task transitions which, after the above search, miss start or stop events
# will add / remove resources from the pilot's idle pool.
for t in task_transitions:
if t[1] is None: t[1] = 'idle'
if t[2] is None: t[2] = 'idle'
# do the same for the pilot metrics / transitions, only that now we
# translate `None` as `system`, which is where the pilot obtains it's
# resources from. Also note that some transition events can have double
# entries, for example, if a event passes some resources to a sub-agent, and
# some other resources remain in the pilot.
#
# Note that we also handle the `agent` metrics to cover resources used by
# the agent (and thus not making it into usable).
pilot_transitions = list()
for metric,spec in pilot_metrics['consume'].items():
# same as above, but for pilot transition events
m = None
for m in pilot_transitions:
if m[0] == spec[0]:
break
m = None
if not m:
pilot_transitions.append([spec[0], None, metric])
else:
assert m[2] is None
m[2] = metric
m = None
for m in pilot_transitions:
if m[0] == spec[1]:
break
m = None
if not m:
pilot_transitions.append([spec[1], metric, None])
else:
assert m[1] is None
m[1] = metric
for t in pilot_transitions:
# agent resources are taked from the pilot
# pilot resources are taken from the system
if t[1] is None:
if t[2] == 'agent': t[1] = 'setup_1'
else : t[1] = 'system'
if t[2] is None:
if t[1] == 'agent': t[2] = 'term'
else : t[2] = 'system'
return pilot_transitions, task_transitions
# ------------------------------------------------------------------------------
#
[docs]def get_resource_timelines(task, transitions):
'''
For each specific task, return a set of tuples of the form:
[start, stop, metric]
which reports what metric has been used during what time span.
'''
# we need to know what pilot the task ran on. If we don't find a designated
# pilot, no resources were consumed
tid = task.uid
pid = task.cfg['pilot']
if not pid:
# task was never assigned to a pilot
return dict()
if 'slots' not in task.cfg:
# the task was never scheduled
return dict()
ret = list()
for metric, spec in transitions['consume'].items():
t0, t1 = get_duration(task, spec)
ret.append([t0, t1, metric, tid])
return ret
# ------------------------------------------------------------------------------
#
def get_session_json(sid, cachedir=None):
# Session may have been cached in /tmp/rp_cache_<uid>/<sid>.json
# An optional cachdir parameter changes that default location.
if not cachedir:
cachedir = _CACHE_BASEDIR
cache = os.path.join(cachedir, '%s.json' % sid)
try:
if os.path.isfile(cache):
return ru.read_json(cache)
except Exception as e:
# continue w/o cache
sys.stderr.write('cannot read session cache from %s: %s\n' % (cache, e))
jsons = glob.glob('%s/r[ep].session.*.json' % os.path.abspath(sid))
assert jsons, 'session missed, check <sid>/<sid>.json'
session_json = jsons[0]
with ru.ru_open(session_json, 'r') as f:
json_data = json.load(f)
# we want to add a list of handled tasks to each pilot doc
for pilot in json_data['pilot']:
pilot['task_ids'] = list()
for task in json_data['task']:
if task['pilot'] == pilot['uid']:
pilot['task_ids'].append(task['uid'])
try:
os.system('mkdir -p %s' % cachedir)
ru.write_json(json_data, cache)
except:
# we can live without cache, no problem...
pass
return json_data
# ------------------------------------------------------------------------------