Source code for radical.pilot.utils.prof_utils

# pylint: disable=protected-access,unused-argument

import glob
import json
import os
import sys

import radical.utils as ru

from ..                 import states as s
from ..task_description import RAPTOR_MASTER, RAPTOR_WORKER, TASK_EXECUTABLE

_debug      = os.environ.get('RP_PROF_DEBUG')
_node_index = dict()

_CACHE_BASEDIR = '/tmp/rp_cache_%d/' % os.getuid()


# ------------------------------------------------------------------------------
#
# pilot and task activities: core hours are derived by multiplying the
# respective time durations with pilot size / task size.  The 'idle'
# utilization and the 'agent' utilization are derived separately.
#
# Note that durations should add up to the `x_total` generations to ensure
# accounting for the complete task/pilot utilization.
#
# An updated list of events is available at docs/source/events.md


PILOT_DURATIONS = {
    'provide' : {
        'total'     : [{ru.EVENT: 'bootstrap_0_start'},
                       {ru.EVENT: 'bootstrap_0_stop' }]
    },
    # times between PMGR_ACTIVE and the termination command are not
    # considered pilot specific consumptions.  If some resources remain
    # unused during that time, it is either due to inefficiencies of
    # workload management (accounted for in the task consumption metrics),
    # or the pilot is starving for workload.
    'consume' : {
        'boot'      : [{ru.EVENT: 'bootstrap_0_start'},
                       {ru.EVENT: 'bootstrap_0_ok'   }],
        'setup_1'   : [{ru.EVENT: 'bootstrap_0_ok'   },
                       {ru.STATE: s.PMGR_ACTIVE      }],
        'idle'      : [{ru.STATE: s.PMGR_ACTIVE      },
                       {ru.EVENT: 'cmd'              ,
                        ru.MSG  : 'cancel_pilot'     }],
        'term'      : [{ru.EVENT: 'cmd'              ,
                        ru.MSG  : 'cancel_pilot'     },
                       {ru.EVENT: 'bootstrap_0_stop' }],
        'agent'     : [{ru.EVENT: 'sub_agent_start'  },
                       {ru.EVENT: 'sub_agent_stop'   }],
    },
    # FIXME: separate out DVM startup time
    #   'rte'       : [{ru.STATE: s.PMGR_ACTIVE    },
    #                  {ru.STATE: s.PMGR_ACTIVE    }],
    #   'setup_2'   : [{ru.STATE: s.PMGR_ACTIVE    },
    #                  {ru.STATE: s.PMGR_ACTIVE    }],
    #
    # resources on agent nodes are consumed for all of the pilot's lifetime
    'agent' : {
        'total'     : [{ru.EVENT: 'bootstrap_0_start'},
                       {ru.EVENT: 'bootstrap_0_stop' }]
    }
}


# The set of default task durations that are available for every task
# description, default resource configuration, and default scheduler and
# launcher.
TASK_DURATIONS_DEFAULT = {
    'consume' : {
        'exec_queue'  : [{ru.EVENT: 'schedule_ok'            },
                         {ru.STATE: s.AGENT_EXECUTING        }],
        'exec_prep'   : [{ru.STATE: s.AGENT_EXECUTING        },
                         {ru.EVENT: 'task_run_start'         }],
        'exec_rp'     : [{ru.EVENT: 'task_run_start'         },
                         {ru.EVENT: 'launch_start'           }],
        'exec_sh'     : [{ru.EVENT: 'launch_start'           },
                         {ru.EVENT: 'exec_start'             }],
        'exec_cmd'    : [{ru.EVENT: 'exec_start'             },
                         {ru.EVENT: 'exec_stop'              }],
        'term_sh'     : [{ru.EVENT: 'exec_stop'              },
                         {ru.EVENT: 'launch_stop'            }],
        'term_rp'     : [{ru.EVENT: 'launch_stop'            },
                         {ru.EVENT: 'task_run_stop'          }],
        'unschedule'  : [{ru.EVENT: 'task_run_stop'          },
                         {ru.EVENT: 'unschedule_stop'        }]
    }
}


# The set of default task durations augmented with the durations of the app
# events. App events are generated by RADICAL Synapse and by the script
# `radical-pilot-hello.sh`. The latter is useful for testing as
# a sleep command drop-in.
TASK_DURATIONS_APP = {
    'consume' : {
        'exec_queue'  : [{ru.EVENT: 'schedule_ok'            },
                         {ru.STATE: s.AGENT_EXECUTING        }],
        'exec_prep'   : [{ru.STATE: s.AGENT_EXECUTING        },
                         {ru.EVENT: 'task_run_start'         }],
        'exec_rp'     : [{ru.EVENT: 'task_run_start'         },
                         {ru.EVENT: 'launch_start'           }],
        'exec_sh'     : [{ru.EVENT: 'launch_start'           },
                         {ru.EVENT: 'exec_start'             }],
        'init_app'    : [{ru.EVENT: 'exec_start'             },
                         {ru.EVENT: 'rank_start'             }],
        'exec_cmd'    : [{ru.EVENT: 'rank_start'             },
                         {ru.EVENT: 'rank_stop'              }],
        'term_app'    : [{ru.EVENT: 'rank_stop'              },
                         {ru.EVENT: 'exec_stop'              }],
        'term_sh'     : [{ru.EVENT: 'exec_stop'              },
                         {ru.EVENT: 'launch_stop'            }],
        'term_rp'     : [{ru.EVENT: 'launch_stop'            },
                         {ru.EVENT: 'task_run_stop'          }],
        'unschedule'  : [{ru.EVENT: 'task_run_stop'          },
                         {ru.EVENT: 'unschedule_stop'        }]
    }
}


# FIXME: TASK_DURATIONS_PRTE
# The set of default task durations with the durations generated when using
# PRRTE as a launch method.


# ----------------------------------------------------------------------------
#
def _convert_sdurations(sdurations):
    '''
    Converts a collection of durations expressed in short form to the same
    collection of durations expressed in long form.

    Definitions:

    - Short form collection: one dictionary of short form durations
    - Long form: one dictionary of long form durations.

    Args:

        sdurations (dict): a collections of durations in short form

    Return:

        ldurations (dict): a collection of long form durations

    Example:

        sdurations = {'name_of_duration': [{'STATE': s.STATE_NAME},
                                           {'EVENT': 'event_name'}]}
        ldurations = {'name_of_duration': [{ru.EVENT: 'state',
                                            ru.STATE: s.STATE_NAME},
                                           {ru.EVENT: 'event_name',
                                            ru.STATE: None}]}
        sdurations = {'name_of_duration': [{'STATE': s.STATE_NAME},
                                           [{'EVENT': 'event_name'},
                                            {'STATE': s.STATE_NAME}]]}
        ldurations = {'name_of_duration': [{ru.EVENT: 'state',
                                            ru.STATE: s.STATE_NAME},
                                           [{ru.EVENT: 'event_name',
                                             ru.STATE: None},
                                            {ru.EVENT: 'state',
                                             ru.STATE: s.STATE_NAME}]]}
        sdurations = {'name_of_duration': [{'STATE': s.STATE_NAME},
                                           {'MSG': 'message_name'}]}
        ldurations = {'name_of_duration': [{ru.EVENT: 'state',
                                            ru.STATE: s.STATE_NAME},
                                           {ru.EVENT: 'cmd',
                                            ru.MSG: 'message_name'}]}
    '''

    ldurations = dict()

    for k,v in sdurations.items():

        ldurations[k] = list()
        for ts in v:

            if isinstance(ts, dict):
                ldurations[k].append(_expand_sduration(ts))

            if isinstance(ts, list):
                lds = list()
                for i in ts:
                    lds.append(_expand_sduration(i))
                ldurations[k].append(lds)

    return ldurations


# ----------------------------------------------------------------------------
#
def _expand_sduration(sduration):
    '''
    Expands a duration expressed in short form to its long form for the
    timestamp types `ru.STATE`, `ru.EVENT` and `ru.MSG`.

    Definitions:

    - Short form duration: one dictionary containing a state or event name.
    - Long form duration: one dictionary containing two keys, one of type
      `ru.EVENT` and one of type `ru.STATE`. The `ru.EVENT` key has a string
      value while the `ru.STATE` key has a `s.STATE_NAME` object as its value.

    Args:

        sduration (dict): a duration in short form

    Return:

        lduration (dict): sduration in long form

    Example:

        sduration = {'STATE': s.STATE_NAME}
        lduration = {ru.EVENT: 'state', ru.STATE: s.STATE_NAME}
        sduration = {'EVENT': 'event_name'}
        lduration = {ru.EVENT: 'event_name', ru.STATE: None}
        sduration = {'MSG': 'mesage_name'}
        lduration = {ru.EVENT: 'cmd', ru.MSG: 'message_name'}
    '''

    # Allow durations with both ru.EVENT and ru.STATE.
    tt = list(sduration.keys())
    if len(tt) == 1 and tt[0] not in ['STATE', 'EVENT', 'MSG']:
        raise Exception('unknown timestamp type: %s' % tt)
    if len(tt) == 2:
        return sduration
    if len(tt) > 2:
        raise Exception('invalid duration: too many timestamps (%s)' % tt)

    # Expand known timestamps.
    lduration = None

    for k,v in sduration.items():
        if k == 'STATE':
            lduration = {ru.EVENT: 'state', ru.STATE: v}
        elif k == 'EVENT':
            lduration = {ru.EVENT: v, ru.STATE: None}
        elif k == 'MSG':
            lduration = {ru.EVENT: 'cmd', ru.MSG: v}

    return lduration


# Set of default pilot durations for RADICAL-Analytics. All the durations
# are contiguos.
# NOTE: _init durations are most often 0.
PILOT_DURATIONS_DEBUG_SHORT = {
    'p_pmgr_create'           : [{'STATE': s.NEW                   },
                                 {'STATE': s.PMGR_LAUNCHING_PENDING}],
    'p_pmgr_launching_init'   : [{'STATE': s.PMGR_LAUNCHING_PENDING},
                                 {'STATE': s.PMGR_LAUNCHING        }],
    'p_pmgr_launching'        : [{'STATE': s.PMGR_LAUNCHING        },
                                 {'EVENT': 'staging_in_start'      }],
    'p_pmgr_stage_in'         : [{'EVENT': 'staging_in_start'      },
                                 {'EVENT': 'staging_in_stop'       }],
    'p_pmgr_submission_init'  : [{'EVENT': 'staging_in_stop'       },
                                 {'EVENT': 'submission_start'      }],
    'p_pmgr_submission'       : [{'EVENT': 'submission_start'      },
                                 {'EVENT': 'submission_stop'       }],
    'p_pmgr_scheduling_init'  : [{'EVENT': 'submission_stop'       },
                                 {'STATE': s.PMGR_ACTIVE_PENDING   }],
    # batch system queue time
    'p_pmgr_scheduling'       : [{'STATE': s.PMGR_ACTIVE_PENDING   },
                                 {'EVENT': 'bootstrap_0_start'     }],
    'p_agent_ve_setup_init'   : [{'EVENT': 'bootstrap_0_start'     },
                                 {'EVENT': 've_setup_start'        }],
    'p_agent_ve_setup'        : [{'EVENT': 've_setup_start'        },
                                 {'EVENT': 've_setup_stop'         }],
    'p_agent_ve_activate_init': [{'EVENT': 've_setup_stop'         },
                                 {'EVENT': 've_activate_start'     }],
    'p_agent_ve_activate'     : [{'EVENT': 've_activate_start'     },
                                 {'EVENT': 've_activate_stop'      }],
    'p_agent_install_init'    : [{'EVENT': 've_activate_stop'      },
                                 {'EVENT': 'rp_install_start'      }],
    'p_agent_install'         : [{'EVENT': 'rp_install_start'      },
                                 {'EVENT': 'rp_install_stop'       }],
    'p_agent_launching'       : [{'EVENT': 'rp_install_stop'       },
                                 {'STATE': s.PMGR_ACTIVE           }],
    'p_agent_terminate_init'  : [{'STATE': s.PMGR_ACTIVE           },
                                 {'MSG'  : 'cancel_pilot'          }],
    'p_agent_terminate'       : [{'MSG'  : 'cancel_pilot'          },
                                 {'EVENT': 'bootstrap_0_stop'      }],
    # total pilot runtime
    'p_agent_finalize'        : [{'EVENT': 'bootstrap_0_stop'      },
                                 [{'STATE': s.DONE                 },
                                  {'STATE': s.CANCELED             },
                                  {'STATE': s.FAILED               }]],
    'p_agent_runtime'         : [{'EVENT': 'bootstrap_0_start'     },
                                 {'EVENT': 'bootstrap_0_stop'      }]
}

PILOT_DURATIONS_DEBUG = _convert_sdurations(PILOT_DURATIONS_DEBUG_SHORT)


# Debug pilot durations tagged with keys taht can be used when calculating
# resource utilization.
# TODO: add the 'client' tag to relevant resource utilization methods.
_pdd = PILOT_DURATIONS_DEBUG
PILOT_DURATIONS_DEBUG_RU = {
    'provide' : {
        'p_agent_runtime'         : _pdd['p_agent_runtime']
    },
    'client'  : {
        'p_pmgr_create'           : _pdd['p_pmgr_create'],
        'p_pmgr_launching_init'   : _pdd['p_pmgr_launching_init'],
        'p_pmgr_launching'        : _pdd['p_pmgr_launching'],
        'p_pmgr_stage_in'         : _pdd['p_pmgr_stage_in'],
        'p_pmgr_submission_init'  : _pdd['p_pmgr_submission_init'],
        'p_pmgr_submission'       : _pdd['p_pmgr_submission'],
        'p_pmgr_scheduling_init'  : _pdd['p_pmgr_scheduling_init'],
        'p_pmgr_scheduling'       : _pdd['p_pmgr_scheduling'],
        'p_agent_finalize'        : _pdd['p_agent_finalize']
    },
    'consume' : {
        'p_agent_ve_setup_init'   : _pdd['p_agent_ve_setup_init'],
        'p_agent_ve_setup'        : _pdd['p_agent_ve_setup'],
        'p_agent_ve_activate_init': _pdd['p_agent_ve_activate_init'],
        'p_agent_ve_activate'     : _pdd['p_agent_ve_activate'],
        'p_agent_install_init'    : _pdd['p_agent_install_init'],
        'p_agent_install'         : _pdd['p_agent_install'],
        'p_agent_launching'       : _pdd['p_agent_launching'],
        'p_agent_terminate_init'  : _pdd['p_agent_terminate_init'],
        'p_agent_terminate'       : _pdd['p_agent_terminate']
    },
    'agent'   : {
        'p_agent_runtime'         : _pdd['p_agent_runtime']
    }
}


# Set of default task durations for RADICAL-Analytics. All the durations
# are contiguous.
TASK_DURATIONS_DEBUG_SHORT = {
    't_tmgr_create'              : [{'STATE': s.NEW                         },
                                    {'STATE': s.TMGR_SCHEDULING_PENDING     }],
    't_tmgr_schedule_queue'      : [{'STATE': s.TMGR_SCHEDULING_PENDING     },
                                    {'STATE': s.TMGR_SCHEDULING             }],
    't_tmgr_schedule'            : [{'STATE': s.TMGR_SCHEDULING             },
                                    {'STATE': s.TMGR_STAGING_INPUT_PENDING  }],
    # push to mongodb
    't_tmgr_stage_in_queue'      : [{'STATE': s.TMGR_STAGING_INPUT_PENDING  },
                                    {'STATE': s.TMGR_STAGING_INPUT          }],
    # wait in mongodb
    't_tmgr_stage_in'            : [{'STATE': s.TMGR_STAGING_INPUT          },
                                    {'STATE': s.AGENT_STAGING_INPUT_PENDING }],
    # pull from mongodb
    't_agent_stage_in_queue'     : [{'STATE': s.AGENT_STAGING_INPUT_PENDING },
                                    {'STATE': s.AGENT_STAGING_INPUT         }],
    't_agent_stage_in'           : [{'STATE': s.AGENT_STAGING_INPUT         },
                                    {'STATE': s.AGENT_SCHEDULING_PENDING    }],
    't_agent_schedule_queue'     : [{'STATE': s.AGENT_SCHEDULING_PENDING    },
                                    {'STATE': s.AGENT_SCHEDULING            }],
    't_agent_schedule'           : [{'STATE': s.AGENT_SCHEDULING            },
                                    {'STATE': s.AGENT_EXECUTING_PENDING     }],
    't_agent_execute_queue'      : [{'STATE': s.AGENT_EXECUTING_PENDING     },
                                    {'STATE': s.AGENT_EXECUTING             }],
    't_agent_execute_prepare'    : [{'STATE': s.AGENT_EXECUTING             },
                                    {'EVENT': 'task_mkdir'                  }],
    't_agent_execute_mkdir'      : [{'EVENT': 'task_mkdir'                  },
                                    {'EVENT': 'task_mkdir_done'             }],
    't_agent_execute_layer_start': [{'EVENT': 'task_mkdir_done'             },
                                    {'EVENT': 'task_run_start'              }],
    # ssh, mpi, ...
    't_agent_execute_layer'      : [{'EVENT': 'task_run_start'              },
                                    {'EVENT': 'task_run_ok'                 }],
    # PROBLEM: discontinuity
    't_agent_lm_start'           : [{'EVENT': 'task_run_start'              },
                                    {'EVENT': 'launch_start'                }],
    't_agent_lm_submit'          : [{'EVENT': 'launch_start'                },
                                    {'EVENT': 'exec_start'                  }],
    't_agent_lm_execute'         : [{'EVENT': 'exec_start'                  },
                                    {'EVENT': 'exec_stop'                   }],
    't_agent_lm_stop'            : [{'EVENT': 'exec_stop'                   },
                                    {'EVENT': 'task_run_stop'               }],
    't_agent_stage_out_start'    : [{'EVENT': 'task_run_stop'               },
                                    {'STATE': s.AGENT_STAGING_OUTPUT_PENDING}],
    't_agent_stage_out_queue'    : [{'STATE': s.AGENT_STAGING_OUTPUT_PENDING},
                                    {'STATE': s.AGENT_STAGING_OUTPUT        }],
    't_agent_stage_out'          : [{'STATE': s.AGENT_STAGING_OUTPUT        },
                                    {'STATE': s.TMGR_STAGING_OUTPUT_PENDING }],
    # push/pull mongodb
    't_agent_push_to_tmgr'       : [{'STATE': s.TMGR_STAGING_OUTPUT_PENDING },
                                    {'STATE': s.TMGR_STAGING_OUTPUT         }],
    't_tmgr_destroy'             : [{'STATE': s.TMGR_STAGING_OUTPUT         },
                                    [{'STATE': s.DONE                       },
                                     {'STATE': s.CANCELED                   },
                                     {'STATE': s.FAILED                     }]],
    't_agent_unschedule'         : [{'EVENT': 'unschedule_start'            },
                                    {'EVENT': 'unschedule_stop'             }]
}

TASK_DURATIONS_DEBUG = _convert_sdurations(TASK_DURATIONS_DEBUG_SHORT)


# Debug task durations tagged with keys taht can be used when calculating
# resource utilization.
# TODO: add the 'client' tag to relevant resource utilization methods.
_udd = TASK_DURATIONS_DEBUG
TASK_DURATIONS_DEBUG_RU = {
    'client' : {
        't_tmgr_create'              : _udd['t_tmgr_create'],
        't_tmgr_schedule_queue'      : _udd['t_tmgr_schedule_queue'],
        't_tmgr_schedule'            : _udd['t_tmgr_schedule'],
        't_tmgr_stage_in_queue'      : _udd['t_tmgr_stage_in_queue'],
        't_tmgr_stage_in'            : _udd['t_tmgr_stage_in'],
        't_tmgr_destroy'             : _udd['t_tmgr_destroy'],
        't_agent_unschedule'         : _udd['t_agent_unschedule']
    },
    'consume'  : {
        't_agent_stage_in_queue'     : _udd['t_agent_stage_in_queue'],
        't_agent_stage_in'           : _udd['t_agent_stage_in'],
        't_agent_schedule_queue'     : _udd['t_agent_schedule_queue'],
        't_agent_schedule'           : _udd['t_agent_schedule'],
        't_agent_execute_queue'      : _udd['t_agent_execute_queue'],
        't_agent_execute_prepare'    : _udd['t_agent_execute_prepare'],
        't_agent_execute_mkdir'      : _udd['t_agent_execute_mkdir'],
        't_agent_execute_layer_start': _udd['t_agent_execute_layer_start'],
        't_agent_execute_layer'      : _udd['t_agent_execute_layer'],
        't_agent_lm_start'           : _udd['t_agent_lm_start'],
        't_agent_lm_submit'          : _udd['t_agent_lm_submit'],
        't_agent_lm_execute'         : _udd['t_agent_lm_execute'],
        't_agent_lm_stop'            : _udd['t_agent_lm_stop'],
        't_agent_stage_out_start'    : _udd['t_agent_stage_out_start'],
        't_agent_stage_out_queue'    : _udd['t_agent_stage_out_queue'],
        't_agent_stage_out'          : _udd['t_agent_stage_out'],
        't_agent_push_to_tmgr'       : _udd['t_agent_push_to_tmgr'],
    }
}


# ------------------------------------------------------------------------------
#
[docs]def get_hostmap(profile): ''' We abuse the profile combination to also derive a pilot-host map, which will tell us on what exact host each pilot has been running. To do so, we check for the PMGR_ACTIVE advance event in agent_0.prof, and use the NTP sync info to associate a hostname. ''' # FIXME: This should be replaced by proper hostname logging # in `pilot.resource_details`. hostmap = dict() # map pilot IDs to host names for entry in profile: if entry[ru.EVENT] == 'hostname': hostmap[entry[ru.UID]] = entry[ru.MSG] return hostmap
# ------------------------------------------------------------------------------ #
[docs]def get_hostmap_deprecated(profiles): ''' This method mangles combine_profiles and get_hostmap, and is deprecated. At this point it only returns the hostmap ''' hostmap = dict() # map pilot IDs to host names for pname, prof in profiles.items(): if not len(prof): continue if not prof[0][ru.MSG]: continue host, ip, _, _, _ = prof[0][ru.MSG].split(':') host_id = '%s:%s' % (host, ip) for row in prof: if 'agent_0.prof' in pname and \ row[ru.EVENT] == 'advance' and \ row[ru.STATE] == s.PMGR_ACTIVE: hostmap[row[ru.UID]] = host_id break return hostmap
# ------------------------------------------------------------------------------ # def get_session_profile(sid, src=None): if not src: src = '%s/%s' % (os.getcwd(), sid) if os.path.exists(src): # we have profiles locally profiles = glob.glob('%s/*.prof' % src) profiles += glob.glob('%s/**/*.prof' % src) else: # need to fetch profiles from .session import fetch_profiles profiles = fetch_profiles(sid=sid, skip_existing=True) # filter out some frequent, but uninteresting events efilter = {ru.EVENT: [ # 'get', 'publish', 'schedule_skip', 'schedule_fail', 'staging_stderr_start', 'staging_stderr_stop', 'staging_stdout_start', 'staging_stdout_stop', 'staging_uprof_start', 'staging_uprof_stop', 'update_pushed', ]} profiles = ru.read_profiles(profiles, sid, efilter=efilter) profile, accuracy = ru.combine_profiles(profiles) profile = ru.clean_profile(profile, sid, s.FINAL, s.CANCELED) hostmap = get_hostmap(profile) # we sometimes miss the `bootstrap_0_stop` event when the bootstrapper is # killed before being able to terminate nicely. In that case we use the # last timestamp for that pilot for that event. last_ts = dict() seen_bs = dict() for e in profile: if 'pilot.' in e[4]: pid = e[4] last_ts[pid] = max(last_ts.get(pid, 0), e[0]) if e[1] == 'bootstrap_0_stop': seen_bs[pid] = True for pid in last_ts: if seen_bs.get(pid) is None: profile.append([last_ts[pid], 'bootstrap_0_stop', 'bootstrap_0', 'MainThread', pid, 'pilot_state', '', 'pilot']) if not hostmap: # FIXME: legacy host notation - deprecated hostmap = get_hostmap_deprecated(profiles) return profile, accuracy, hostmap # ------------------------------------------------------------------------------ #
[docs]def get_session_description(sid, src=None): ''' This will return a description which is usable for radical.analytics evaluation. It informs about: - set of stateful entities - state models of those entities - event models of those entities (maybe) - configuration of the application / module If `src` is given, it is interpreted as path to search for session information (json dump). `src` defaults to `$PWD/$sid`. ''' if not src: src = '%s/%s' % (os.getcwd(), sid) # construct session json from registry dump, tmgr and pmgr json files, and # pilot and task json files json = dict() reg = ru.read_json('%s/%s.reg.json' % (src, sid)) del reg['rcfgs'] json['session'] = [ reg ] json['tmgr'] = list() json['pmgr'] = list() json['pilot'] = list() json['task'] = list() for fname in glob.glob(str('%s/tmgr.*.json' % src)): json['tmgr'].append(ru.read_json(fname)) for fname in glob.glob(str('%s/pmgr.*.json' % src)): json['pmgr'].append(ru.read_json(fname)) for tmgr in json['tmgr']: json['task'].extend(tmgr['tasks'].values()) del tmgr['tasks'] for pmgr in json['pmgr']: json['pilot'].extend(pmgr['pilots']) del pmgr['pilots'] json['session'][0]['uid'] = sid ret = dict() ret['entities'] = dict() tree = dict() tree[sid] = {'uid' : sid, 'etype' : 'session', 'cfg' : json['session'][0]['cfg'], 'has' : ['tmgr', 'pmgr'], 'children' : list() } for pmgr in sorted(json['pmgr'], key=lambda k: k['uid']): uid = pmgr['uid'] tree[sid]['children'].append(uid) tree[uid] = {'uid' : uid, 'etype' : 'pmgr', 'cfg' : pmgr.get('cfg', {}), 'has' : ['pilot'], 'children' : list() } for tmgr in sorted(json['tmgr'], key=lambda k: k['uid']): uid = tmgr['uid'] tree[sid]['children'].append(uid) tree[uid] = {'uid' : uid, 'etype' : 'tmgr', 'cfg' : tmgr.get('cfg', {}), 'has' : ['task'], 'children' : list() } # also inject the pilot description, and resource specifically tree[uid]['description'] = dict() for pilot in sorted(json['pilot'], key=lambda k: k['uid']): pid = pilot['uid'] pmgr = pilot['pmgr'] details = pilot['description'] details = ru.dict_merge(details, pilot['resource_details']) pilot['cfg'] = details pilot['cfg']['resource_details'] = details pilot['cfg']['resource_details']['rm_info'] = details tree[pmgr]['children'].append(pid) tree[pid] = {'uid' : pid, 'etype' : 'pilot', 'cfg' : pilot['cfg'], 'resources' : pilot['resources'], 'description': pilot['description'], 'has' : ['task'], 'children' : list() } # also inject the pilot description, and resource specifically for task in sorted(json['task'], key=lambda k: k['uid']): uid = task['uid'] tmgr = task.get('tmgr') if tmgr: tree[tmgr]['children'].append(uid) if 'pilot' not in task: # if task haven't finished, while session got terminated continue pid = task['pilot'] tree[pid]['children'].append(uid) if 'resources' not in task: td = task['description'] task['resources'] = {'cpu': td['ranks'] * td['cores_per_rank'], 'gpu': td['ranks'] * td['gpus_per_rank']} # we determine the entity type by the task mode etype = task.get('etype') mode = task['description'].get('mode') if not etype: mode = task['description']['mode'] if mode in [RAPTOR_MASTER, RAPTOR_WORKER]: etype = mode elif mode in [TASK_EXECUTABLE]: etype = 'task' else: etype = 'raptor.task' if _debug: print('%-30s [%-30s] -> %-30s' % (uid, mode, etype)) tree[uid] = {'uid' : uid, 'etype' : etype, 'cfg' : task, 'resources' : task['resources'], 'description' : task['description'], 'has' : list(), 'children' : list() } # remove duplicate del tree[uid]['cfg']['description'] ret['tree'] = tree ret['entities']['pilot'] = {'state_model' : s._pilot_state_values, 'state_values' : s._pilot_state_inv_full, 'event_model' : dict()} ret['entities']['task'] = {'state_model' : s._task_state_values, 'state_values' : s._task_state_inv_full, 'event_model' : dict()} ret['entities']['session'] = {'state_model' : None, # has no states 'state_values' : None, 'event_model' : dict()} ret['config'] = dict() # session config goes here return ret
# ------------------------------------------------------------------------------ # def get_node_index(node_list, node_uid, pn): if not _node_index: for idx,n in enumerate(node_list): _node_index[n['node_id']] = idx r0 = _node_index[node_uid] * pn r1 = r0 + pn - 1 return [r0, r1] # ------------------------------------------------------------------------------ # def get_duration(thing, dur): for e in dur: if ru.STATE in e and ru.EVENT not in e: e[ru.EVENT] = 'state' t0 = thing.timestamps(event=dur[0]) t1 = thing.timestamps(event=dur[1]) if not len(t0) or not len(t1): return [None, None] return (t0[0], t1[-1]) # ------------------------------------------------------------------------------ # def cluster_resources(resources): # resources is a list of # - single index (single core of gpu # - [r0, r1] tuples (ranges of core, gpu indexes) # cluster continuous stretches of resources ret = list() idx = set() for r in resources: if isinstance(r, int): idx.add(r) else: for i in range(r[0], r[1] + 1): idx.add(i) r0 = None r1 = None for i in sorted(list(idx)): if r0 is None: r0 = i continue if r1 is None: if i == r0 + 1: r1 = i continue ret.append([r0, r0]) r0 = None continue if i == r1 + 1: r1 = i continue ret.append([r0, r1]) r0 = i r1 = None if r0 is not None: if r1 is not None: ret.append([r0, r1]) else: ret.append([r0, r0]) return ret # ------------------------------------------------------------------------------ # def _get_pilot_provision(pilot, rtype): pid = pilot.uid ret = dict() rnd = 'cores_per_node' if rtype == 'gpu': rnd = 'gpus_per_node' pn = pilot.cfg['resource_details']['rm_info'][rnd] nodes, _, _ = _get_nodes(pilot) for metric in PILOT_DURATIONS['provide']: boxes = list() t0, t1 = get_duration(pilot, PILOT_DURATIONS['provide'][metric]) if t0 is None: t0 = pilot.events [0][ru.TIME] t1 = pilot.events[-1][ru.TIME] for node in nodes: r0, r1 = get_node_index(nodes, node['node_id'], pn) boxes.append([t0, t1, r0, r1]) ret['total'] = {pid: boxes} return ret # ------------------------------------------------------------------------------ #
[docs]def get_provided_resources(session, rtype='cpu'): ''' For all ra.pilots, return the amount and time of the type of resources provided. This computes sets of 4-tuples of the form: [t0, t1, r0, r1] where: t0: time, begin of resource provision t1: time, begin of resource provision r0: int, index of resources provided (min) r1: int, index of resources provided (max) The tuples are formed so that t0 to t1 and r0 to r1 are continuous. ''' if rtype not in ['cpu', 'gpu']: raise Exception('unknown resource type: %s' % rtype) provided = dict() for p in session.get(etype='pilot'): data = _get_pilot_provision(p, rtype) for metric in data: if metric not in provided: provided[metric] = dict() for uid in data[metric]: provided[metric][uid] = data[metric][uid] return provided
# ------------------------------------------------------------------------------ #
[docs]def get_consumed_resources(session, rtype='cpu', tdurations=None): ''' For all ra.pilot or ra.task entities, return the amount and time of resources consumed. A consumed resource is characterized by: - a resource type (we know about cores and gpus) - a metric name (what the resource was used for) - a list of 4-tuples of the form: [t0, t1, r0, r1] The tuples are formed so that t0 to t1 and r0 to r1 are continuous: - t0: time, begin of resource consumption - t1: time, begin of resource consumption - r0: int, index of resources consumed (min) - r1: int, index of resources consumed (max) An entity can consume different resources under different metrics, but the returned consumption specs will never overlap. Thus, any resource is accounted for exactly one metric at any point in time. The returned structure has the following overall form:: { 'metric_1' : { uid_1 : [[t0, t1, r0, r1], [t2, t3, r2, r3], ... ], uid_2 : ... }, 'metric_2' : ... } ''' log = ru.Logger('radical.pilot.utils') consumed = dict() for e in session.get(etype=['pilot', 'task']): if e.etype == 'pilot': data = _get_pilot_consumption(e, rtype) elif e.etype == 'task' : data = _get_task_consumption(session, e, rtype, tdurations) for metric in data: if metric not in consumed: consumed[metric] = dict() for uid in data[metric]: consumed[metric][uid] = data[metric][uid] # we defined two additional metrics, 'warmup' and 'drain', which are defined # for all resources of the pilot. `warmup` is defined as the time from # when the pilot becomes active, to the time the resource is first consumed # by a task. `drain` is the inverse: the time from when any task last # consumed the resource to the time when the pilot begins termination. for pilot in session.get(etype='pilot'): # if tdurations: # # print('DEBUG: using tdurations') # task_durations = tdurations # elif pilot.cfg['task_launch_method'] == 'PRTE': # # print('DEBUG: using prte configuration') # task_durations = TASK_DURATIONS_PRTE # else: # # print('DEBUG: using default configuration') # task_durations = TASK_DURATIONS_DEFAULT pt = pilot.timestamps log.debug('timestamps:') for ts in pt(): log.debug(' %10.2f %-20s %-15s %-15s %-15s %-15s %s', ts[0], ts[1], ts[2], ts[3], ts[4], ts[5], ts[6]) p_min = None p_max = None try : p_min = pilot.timestamps(event={1: 'bootstrap_0_start'})[0] except: pass try : p_max = pilot.timestamps(event={1: 'bootstrap_0_stop'})[0] except: pass # fallback for missing bootstrap events if p_min is None: p_min = pilot.timestamps(state='PMGR_ACTIVE') if p_max is None: p_max = pilot.events[-1][ru.TIME] assert p_min is not None assert p_max is not None log.debug('pmin, pmax: %10.2f / %10.2f', p_min, p_max) pid = pilot.uid rnd = 'cores_per_node' rmap = 'core_map' if rtype == 'gpu': rnd = 'gpus_per_node' rmap = 'gpu_map' pn = pilot.cfg['resource_details']['rm_info'][rnd] nodes, _, pnodes = _get_nodes(pilot) # find resource utilization scope for all resources. We begin filling # the resource dict with # # resource_id : [t_min=None, t_max=None] # # and then iterate over all tasks. Wen we find a task using some # resource id, we set or adjust t_min / t_max. resources = dict() for pnode in pnodes: idx = get_node_index(nodes, pnode['node_id'], pn) for c in range(idx[0], idx[1] + 1): resources[c] = [None, None] for task in session.get(etype='task'): if task.cfg.get('pilot') != pid: continue try: ranks = task.cfg['slots']['ranks'] tts = task.timestamps task_min = tts(event=task['consume']['exec_queue'][0]) [0] task_max = tts(event=task['consume']['unschedule'][1])[-1] except: continue for rank in ranks: r0, _ = get_node_index(nodes, rank['node_id'], pn) for resource_map in rank[rmap]: for resource in resource_map: idx = r0 + resource t_min = resources[idx][0] t_max = resources[idx][1] if t_min is None or t_min > task_min: t_min = task_min if t_max is None or t_max < task_max: t_max = task_max resources[idx] = [t_min, t_max] # now sift through resources and find buckets of pairs with same t_min # or same t_max bucket_min = dict() bucket_max = dict() bucket_none = list() for idx in resources: t_min = resources[idx][0] t_max = resources[idx][1] if t_min is None: assert t_max is None bucket_none.append(idx) else: if t_min not in bucket_min: bucket_min[t_min] = list() bucket_min[t_min].append(idx) if t_max not in bucket_max: bucket_max[t_max] = list() bucket_max[t_max].append(idx) boxes_warm = list() boxes_drain = list() boxes_idle = list() # now cluster all lists and add the respective boxes for t_min in bucket_min: for r in cluster_resources(bucket_min[t_min]): boxes_warm.append([p_min, t_min, r[0], r[1]]) for t_max in bucket_max: for r in cluster_resources(bucket_max[t_max]): boxes_drain.append([t_max, p_max, r[0], r[1]]) for r in cluster_resources(bucket_none): boxes_idle.append([p_min, p_max, r[0], r[1]]) if 'warm' not in consumed: consumed['warm'] = dict() if 'drain' not in consumed: consumed['drain'] = dict() if 'idle' not in consumed: consumed['idle'] = dict() consumed['warm'][pid] = boxes_warm consumed['drain'][pid] = boxes_drain consumed['idle'][pid] = boxes_idle # pprint.pprint(consumed) return consumed
# ------------------------------------------------------------------------------ # def _get_nodes(pilot): pnodes = pilot.cfg['resource_details']['rm_info']['node_list'] agents = pilot.cfg['resource_details']['rm_info'].get('agent_nodes', []) anodes = list() nodes = list() for agent in agents: anodes.append(agents[agent]) nodes = pnodes + anodes return nodes, anodes, pnodes # ------------------------------------------------------------------------------ # def _get_pilot_consumption(pilot, rtype): # Pilots consume resources in different ways: # # - the pilot needs to bootstrap and initialize before becoming active, # i.e., before it can begin to manage the workload, and needs to # terminate and clean up during shutdown; # - the pilot may block one or more nodes or cores for it's own components # (sub-agents), and those components are not available for workload # execution # - the pilot may perform operations while managing the workload. # # This method will compute the first two contributions and part of the 3rd. # It will *not* account for those parts of the 3rd which are performed while # specfic resources are blocked for the affected workload element (task) # - those resource consumption is considered to be a consumption *of that # task*, which allows us to compute tasks specific resource utilization # overheads. pid = pilot.uid ret = dict() rnd = 'cores_per_node' if rtype == 'gpu': rnd = 'gpus_per_node' pn = pilot.cfg['resource_details']['rm_info'][rnd] # Account for agent resources. Agents use full nodes, i.e., cores and GPUs # We happen to know that agents use the first nodes in the allocation and # their resource tuples thus start at index `0`, but for completeness we # ensure that by inspecting the pilot cfg. # Duration is for all of the pilot runtime. This is not precises really, # since several bootstrapping phases happen before the agents exist - but we # consider the nodes blocked for the sub-agents from the get-go. t0, t1 = get_duration(pilot, PILOT_DURATIONS['agent']['total']) boxes = list() # Substract agent nodes from the nodelist, so that we correctly attribute # other global pilot metrics to the remaining nodes. nodes, anodes, pnodes = _get_nodes(pilot) if anodes and t0 is not None: for anode in anodes: r0, r1 = get_node_index(nodes, anode['node_id'], pn) boxes.append([t0, t1, r0, r1]) ret['agent'] = {pid: boxes} # account for all other pilot metrics for metric in PILOT_DURATIONS['consume']: if metric == 'idle': continue boxes = list() t0, t1 = get_duration(pilot, PILOT_DURATIONS['consume'][metric]) if t0 is not None: for node in pnodes: r0, r1 = get_node_index(nodes, node['node_id'], pn) boxes.append([t0, t1, r0, r1]) ret[metric] = {pid: boxes} return ret # ------------------------------------------------------------------------------ # def _get_task_consumption(session, task, rtype, tdurations=None): # we need to know what pilot the task ran on. If we don't find a designated # pilot, no resources were consumed uid = task.uid pid = task.cfg.get('pilot') if not pid: return dict() # get the pilot for inspection pilot = session.get(uid=pid) if isinstance(pilot, list): assert len(pilot) == 1 pilot = pilot[0] # FIXME: it is inefficient to query those values again and again nodes, _, _ = _get_nodes(pilot) rnd = 'cores_per_node' rmap = 'core_map' if rtype == 'gpu': rnd = 'gpus_per_node' rmap = 'gpu_map' pn = pilot.cfg['resource_details']['rm_info'][rnd] # Tasks consume only those resources they are scheduled on. if 'slots' not in task.cfg: return dict() ranks = task.cfg['slots']['ranks'] resources = list() for rank in ranks: r0, _ = get_node_index(nodes, rank['node_id'], pn) for resource_map in rank[rmap]: for resource in resource_map: resources.append(r0 + resource) # find continuous stretched of resources to minimize number of boxes resources = cluster_resources(resources) if tdurations: task_durations = tdurations else: task_durations = TASK_DURATIONS_DEFAULT if _debug: print() ret = dict() for metric in task_durations['consume']: boxes = list() t0, t1 = get_duration(task, task_durations['consume'][metric]) if t0 is not None: if _debug: print('%s: %-15s : %10.3f - %10.3f = %10.3f' % (task.uid, metric, t1, t0, t1 - t0)) for r in resources: boxes.append([t0, t1, r[0], r[1]]) else: if _debug: print('%s: %-15s : -------------- ' % (task.uid, metric)) dur = task_durations['consume'][metric] print(dur) for e in dur: if ru.STATE in e and ru.EVENT not in e: e[ru.EVENT] = 'state' t0 = task.timestamps(event=dur[0]) t1 = task.timestamps(event=dur[1]) print(t0) print(t1) for e in task.events: print('\t'.join([str(x) for x in e])) # sys.exit() ret[metric] = {uid: boxes} return ret # ------------------------------------------------------------------------------ # def get_resource_transitions(pilot, task_metrics=None, pilot_metrics=None): task_metrics = task_metrics or TASK_DURATIONS_DEFAULT pilot_metrics = pilot_metrics or PILOT_DURATIONS # we try to find points in time where resource usage moved from purpose A to # purpose B. For example, consider this metric: # # 'exec_queue' : [{ru.EVENT: 'schedule_ok' }, # {ru.STATE: s.AGENT_EXECUTING}], # 'exec_prep' : [{ru.STATE: s.AGENT_EXECUTING}, # {ru.EVENT: 'exec_start' }], # 'exec_cmd' : [{ru.EVENT: 'exec_start' }, # {ru.EVENT: 't_start' }], # # then we convert this into the following structure: # # [ # # event from to # [{ru.EVENT: 'schedule_ok' }, None, 'exec_queue'], # [{ru.STATE: s.AGENT_EXECUTING}, 'exec_queue', 'exec_prep'], # [{ru.EVENT: 'exec_start' }, 'exec_prep', 'exec_rp'], # [{ru.EVENT: 't_start' }, 'exec_rp', 'None] # ] # # which we will use like this: # # - go through all events for a task # - if event is in the list above # - reduce resources in `from` metric # - increase resource in `to` metric # # If `from` or `to` are None, then the resources are taken from / given to # the pilot's idle resources. We thus rename the 'None' to 'idle'. # dig though metric, find all pairs of matching start/stop events task_transitions = list() for metric,spec in task_metrics['consume'].items(): # find the metric's transition spec m = None for m in task_transitions: if m[0] == spec[0]: break m = None # if we don't find that spec registered, register it as start event if not m: task_transitions.append([spec[0], None, metric]) else: # if we know the transition, just register the stop event assert m[2] is None m[2] = metric # the inverse of the above where we check stop events, register, and # insert start events (in case transitions are not causally ordered) m = None for m in task_transitions: if m[0] == spec[1]: break m = None if not m: task_transitions.append([spec[1], metric, None]) else: assert m[1] is None m[1] = metric # task transitions which, after the above search, miss start or stop events # will add / remove resources from the pilot's idle pool. for t in task_transitions: if t[1] is None: t[1] = 'idle' if t[2] is None: t[2] = 'idle' # do the same for the pilot metrics / transitions, only that now we # translate `None` as `system`, which is where the pilot obtains it's # resources from. Also note that some transition events can have double # entries, for example, if a event passes some resources to a sub-agent, and # some other resources remain in the pilot. # # Note that we also handle the `agent` metrics to cover resources used by # the agent (and thus not making it into usable). pilot_transitions = list() for metric,spec in pilot_metrics['consume'].items(): # same as above, but for pilot transition events m = None for m in pilot_transitions: if m[0] == spec[0]: break m = None if not m: pilot_transitions.append([spec[0], None, metric]) else: assert m[2] is None m[2] = metric m = None for m in pilot_transitions: if m[0] == spec[1]: break m = None if not m: pilot_transitions.append([spec[1], metric, None]) else: assert m[1] is None m[1] = metric for t in pilot_transitions: # agent resources are taked from the pilot # pilot resources are taken from the system if t[1] is None: if t[2] == 'agent': t[1] = 'setup_1' else : t[1] = 'system' if t[2] is None: if t[1] == 'agent': t[2] = 'term' else : t[2] = 'system' return pilot_transitions, task_transitions # ------------------------------------------------------------------------------ #
[docs]def get_resource_timelines(task, transitions): ''' For each specific task, return a set of tuples of the form: [start, stop, metric] which reports what metric has been used during what time span. ''' # we need to know what pilot the task ran on. If we don't find a designated # pilot, no resources were consumed tid = task.uid pid = task.cfg['pilot'] if not pid: # task was never assigned to a pilot return dict() if 'slots' not in task.cfg: # the task was never scheduled return dict() ret = list() for metric, spec in transitions['consume'].items(): t0, t1 = get_duration(task, spec) ret.append([t0, t1, metric, tid]) return ret
# ------------------------------------------------------------------------------ # def get_session_json(sid, cachedir=None): # Session may have been cached in /tmp/rp_cache_<uid>/<sid>.json # An optional cachdir parameter changes that default location. if not cachedir: cachedir = _CACHE_BASEDIR cache = os.path.join(cachedir, '%s.json' % sid) try: if os.path.isfile(cache): return ru.read_json(cache) except Exception as e: # continue w/o cache sys.stderr.write('cannot read session cache from %s: %s\n' % (cache, e)) jsons = glob.glob('%s/r[ep].session.*.json' % os.path.abspath(sid)) assert jsons, 'session missed, check <sid>/<sid>.json' session_json = jsons[0] with ru.ru_open(session_json, 'r') as f: json_data = json.load(f) # we want to add a list of handled tasks to each pilot doc for pilot in json_data['pilot']: pilot['task_ids'] = list() for task in json_data['task']: if task['pilot'] == pilot['uid']: pilot['task_ids'].append(task['uid']) try: os.system('mkdir -p %s' % cachedir) ru.write_json(json_data, cache) except: # we can live without cache, no problem... pass return json_data # ------------------------------------------------------------------------------