Source code for radical.pilot.utils.session


import os
import glob
import tarfile

import radical.utils as ru

from .. import constants as rpc

from .staging_helper import StagingHelper


# ------------------------------------------------------------------------------
#
[docs]def fetch_filetype(ext, name, sid, src=None, tgt=None, access=None, skip_existing=False, fetch_client=False, log=None, rep=None): ''' Args: - ext (str): file extension to fetch - name (str): full name of filetype for log messages etc - sid (str): session for which all files are fetched - src (str): dir to look for client session files (`$src/$sid/*.ext`) - tgt (str): dir to store the files in (`$tgt/$sid/*.ext`, `$tgt/$sid/$pid/*.ext`) Returns: list[str]: list of file names (fetched and/or cached) ''' if not log: log = ru.Logger('radical.pilot.utils') files = list() if not src: src = os.getcwd() if not tgt: tgt = os.getcwd() # no point in fetching client files into client sandbox if src == tgt: fetch_client = False if not tgt.startswith('/') and '://' not in tgt: tgt = "%s/%s" % (os.getcwd(), tgt) # we always create a session dir as real target tgt_url = ru.Url("%s/%s" % (tgt, sid)) # turn URLs without `schema://host` into `file://localhost`, # so that they dont become interpreted as relative paths. if not tgt_url.schema: tgt_url.schema = 'file' if not tgt_url.host : tgt_url.host = 'localhost' # create target dir for session ru.rec_makedir(tgt_url.path) # first fetch client files if fetch_client: client_files = glob.glob("%s/%s/**.%s" % (src, sid, ext)) if not client_files: raise RuntimeError('no client %s in %s/%s' % (name, src, sid)) for client_file in client_files: ftgt = ru.Url('%s/%s' % (tgt_url, os.path.basename(client_file))) files.append("%s" % ftgt.path) if skip_existing and os.path.isfile(ftgt.path) \ and os.path.getsize(ftgt.path): pass else: log.debug('fetch client file %s', client_file) stager = StagingHelper(log) stager.copy(client_file, ftgt, flags=rpc.CREATE_PARENTS) # we need the session json for pilot details pilots = list() for fname in glob.glob('%s/pmgr.*.json' % sid): json_doc = ru.read_json(fname) pilots.extend(json_doc['pilots']) num_pilots = len(pilots) log.debug("Session: %s", sid) log.debug("Number of pilots in session: %d", num_pilots) for pilot in pilots: pid = pilot['uid'] tar_name = '%s.%s.tgz' % (pid, ext) # create target dir for this pilot ru.rec_makedir('%s/%s' % (tgt_url.path, pid)) try: log.debug("processing pilot '%s'", pid) if access: # Allow to use a different access schema than used for the the # run. Useful if you ran from the headnode, but would like to # retrieve the files to your desktop (Hello Titan). access_url = ru.Url(access) sandbox_url.schema = access_url.schema sandbox_url.host = access_url.host else: sandbox_url = ru.Url(pilot['pilot_sandbox']) sandbox_url.path.rstrip('/') src_url = ru.Url('%s/%s' % (sandbox_url, tar_name)) src_dir = os.path.dirname(src_url.path) is_local = False log.debug("sandbox: %s", sandbox_url) log.debug("src_url: %s", src_url) # check if we have a local tarball already if skip_existing and \ os.path.isfile(tgt_url.path) and \ os.path.getsize(tgt_url.path): is_local = True stager = StagingHelper(log) if not is_local: # need to fetch tarball # - if no remote tarball exists, create it # - fetch remote tarball _, _, ret = stager.sh_callout(src_url, 'test -f %s' % src_url.path) if ret: # no tarball on remote side, create one # find_cmd = "find . -name '*.%s'" % ext tar_cmd = "cd %s && tar czf %s $(%s)" \ % (src_dir, tar_name, find_cmd) out, err, ret = stager.sh_callout(src_url, tar_cmd) log.debug("create with '%s': %s/%s", tar_cmd, out, err) if ret: raise RuntimeError("failed to create tarball: %s" % err) log.info("fetch '%s' to '%s'.", src_url, tgt_url) stager.copy(src_url, tgt_url, flags=rpc.CREATE_PARENTS) # we now have a local tarball - unpack it # note that we do not check if it was unpacked before - it's simpler # (and possibly cheaper) to just do that again log.info('Extract tarball %s', tgt_url.path) tarball = tarfile.open('%s/%s' % (tgt_url.path, tar_name), mode='r:gz') tarball.extractall("%s/%s" % (tgt_url.path, pid)) found = glob.glob("%s/%s/**.%s" % (tgt_url.path, pid, ext)) files.extend(found) if rep: rep.ok("+ %s (%s)\n" % (pid, name)) except Exception: # do not raise, we still try the other pilots log.exception('failed to fetch %s for %s', pid, name) if rep: rep.error("- %s (%s)\n" % (pid, name)) return files
# ------------------------------------------------------------------------------ # def fetch_profiles (sid, src=None, tgt=None, access=None, skip_existing=False, fetch_client=False, log=None, rep=None): return fetch_filetype('prof', 'profiles', sid, src, tgt, access, skip_existing, fetch_client, log, rep) # ------------------------------------------------------------------------------ # def fetch_logfiles (sid, src=None, tgt=None, access=None, skip_existing=False, fetch_client=False, log=None, rep=None): return fetch_filetype('log', 'logfiles', sid, src, tgt, access, skip_existing, fetch_client, log, rep) # ------------------------------------------------------------------------------