Source code for radical.pilot.utils.misc


# pylint: disable=global-statement

import os
import time

from typing import List, Union

import radical.utils as ru

# max number of t out/err chars to push to tail
MAX_IO_LOGLENGTH = 1024

# cache resource configs
_rcfgs = None


# ------------------------------------------------------------------------------
#
def tail(txt: str, maxlen: int = MAX_IO_LOGLENGTH) -> str:

    # shorten the given string to the last <n> characters, and prepend
    # a notification.  This is used to keep logging information in mongodb
    # manageable(the size of mongodb documents is limited).

    if not txt:
        return ''

    if len(txt) > maxlen:
        return "[... CONTENT SHORTENED ...]\n%s" % txt[-maxlen:]
    else:
        return txt


# ------------------------------------------------------------------------------
#
def get_rusage() -> str:

    import resource

    self_usage  = resource.getrusage(resource.RUSAGE_SELF)
    child_usage = resource.getrusage(resource.RUSAGE_CHILDREN)

    rtime = time.time()
    utime = self_usage.ru_utime  + child_usage.ru_utime
    stime = self_usage.ru_stime  + child_usage.ru_stime
    rss   = self_usage.ru_maxrss + child_usage.ru_maxrss

    return "real %3f sec | user %.3f sec | system %.3f sec | mem %.2f kB" \
         % (rtime, utime, stime, rss)


# ------------------------------------------------------------------------------
#
[docs]def create_tar(tgt: str, dnames: List[str]) -> None: ''' Create a tarball on the file system which contains all given directories ''' uid = os.getuid() gid = os.getgid() mode = 16893 mtime = int(time.time()) fout = open(tgt, 'wb') def rpad(s, size): return s + (size - len(s)) * '\0' def write_dir(path): data = rpad(path, 100) \ + rpad('%o' % mode, 8) \ + rpad('%o' % uid, 8) \ + rpad('%o' % gid, 8) \ + rpad('%o' % 0, 12) \ + rpad('%o' % mtime, 12) \ + 8 * '\0' + '5' cksum = 256 + sum(ord(h) for h in data) data = rpad(data , 512) data = data [:-364] + '%06o\0' % cksum + data[-357:] fout.write(ru.as_bytes(data)) for dname in dnames: write_dir(dname) fout.close()
# ------------------------------------------------------------------------------ #
[docs]def get_resource_configs() -> ru.Config: ''' Return all resource configurations used by `radical.pilot`. Configurations for the individual resources are organized as sites and resources: - cfgs = get_resource_configs() - sites = cfgs.keys() - for site in sites: resource_names = cfgs[site].keys() Returns: :obj:`radical.utils.Config`: the resource configurations ''' global _rcfgs if not _rcfgs: _rcfgs = ru.Config('radical.pilot.resource', name='*', expand=False) # return a deep copy return ru.Config(from_dict=_rcfgs)
# ------------------------------------------------------------------------------ #
[docs]def get_resource_config(resource: str) -> Union[None, ru.Config]: ''' For the given resource label, return the resource configuration used by `radical.pilot`. Args: resource (:obj:`str`): resource label for which to return the cfg Returns: :obj:`radical.utils.Config`: the resource configuration The method returns `None` if no resource config is found for the specified resource label. ''' # populate cache if not _rcfgs: get_resource_configs() site, host = resource.split('.', 1) if site not in _rcfgs: return None if host not in _rcfgs[site]: return None # return a deep copy return ru.Config(cfg=_rcfgs[site][host])
# ------------------------------------------------------------------------------ #
[docs]def get_resource_fs_url(resource: str, schema : str = None) -> Union[None, ru.Url]: ''' For the given resource label, return the contact URL of the resource's file system. This corresponds to the `filesystem_endpoint` setting in the resource config. For example, `rs.filesystem.directory(get_resource_fs_url(...)).change_dir('/')` is equivalent to the base ``endpoint:///`` URL available for use in a `staging_directive`. Args: resource (:obj:`str`): resource label for which to return the url schema (:obj:`str`, optional): access schema to use for resource access. Defaults to the default access schema as defined in the resources config files. Returns: :obj:`radical.utils.Url`: the file system URL The method returns `None` if no resource config is found for the specified resource label and access schema. ''' rcfg = get_resource_config(resource) if not schema: schema = rcfg['default_schema'] # return a deep copy return ru.Url(rcfg['schemas'][schema]['filesystem_endpoint'])
# ------------------------------------------------------------------------------ #
[docs]def get_resource_job_url(resource: str, schema : str = None) -> Union[None, ru.Url]: ''' For the given resource label, return the contact URL of the resource's job manager. Args: resource (:obj:`str`): resource label for which to return the url schema (:obj:`str`, optional): access schema to use for resource access. Defaults to the default access schema as defined in the resources config files. Returns: :obj:`radical.utils.Url`: the job manager URL The method returns `None` if no resource config is found for the specified resource label and access schema. ''' rcfg = get_resource_config(resource) if not schema: schema = rcfg['default_schema'] # return a deep copy return ru.Url(rcfg.schemas[schema]['job_manager_endpoint'])
# ------------------------------------------------------------------------------