__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__ = "MIT"
import os
import copy
import time
from typing import Optional
import threading as mt
import radical.utils as ru
from . import constants as rpc
from . import utils as rpu
from .messages import HeartbeatMessage
from .proxy import Proxy
from .resource_config import ResourceConfig, ENDPOINTS_DEFAULT
# ------------------------------------------------------------------------------
#
class _CloseOptions(ru.TypedDict):
"""Options and validation for Session.close().
Arguments:
download (bool, optional): Fetch pilot profiles and database entries.
(Default False.)
terminate (bool, optional): Shut down all pilots associated with the
session. (Default True.)
"""
_check = True
_schema = {
'download' : bool,
'terminate': bool,
'cleanup' : bool # FIXME: to be removed
}
_defaults = {
'download' : False,
'terminate': True,
'cleanup' : True # FIXME: to be removed
}
# ------------------------------------------------------------------------------
#
[docs]class Session(object):
"""Root of RP object hierarchy for an application instance.
A Session is the root object of all RP objects in an application instance:
it holds :class:`radical.pilot.PilotManager` and
:class:`radical.pilot.TaskManager` instances which in turn hold
:class:`radical.pilot.Pilot` and :class:`radical.pilot.Task`
instances, and several other components which operate on those stateful
entities.
"""
# In that role, the session will create a special pubsub channel `heartbeat`
# which is used by all components in its hierarchy to exchange heartbeat
# messages. Those messages are used to watch component health - if
# a (parent or child) component fails to send heartbeats for a certain
# amount of time, it is considered dead and the process tree will terminate.
# That heartbeat management is implemented in the `ru.Heartbeat` class.
# Only primary sessions instantiate a heartbeat channel (i.e., only the root
# sessions of RP client or agent modules), but all components need to call
# the sessions `heartbeat()` method at regular intervals.
# the reporter is an application-level singleton
_reporter = None
# a session has one of three possible roles:
# - primary: the session is the first explicit session instance created in
# an RP application.
# - agent: the session is the first session instance created in an RP
# agent.
# - default: any other session instance, for example such as created by
# components in the client or agent module.
_PRIMARY = 'primary'
_AGENT_0 = 'agent_0'
_AGENT_N = 'agent_n'
_DEFAULT = 'default'
# --------------------------------------------------------------------------
#
[docs] def __init__(self, proxy_url: Optional[str ] = None,
uid : Optional[str ] = None,
cfg : Optional[dict] = None,
_role : Optional[str ] = _PRIMARY,
_reg_addr: Optional[str ] = None,
**close_options):
"""Create a new session.
A new Session instance is created and stored in the database.
Any RP Session will require an RP Proxy to facilitate communication
between the client machine (i.e., the host where the application created
this Session instance) and the target resource (i.e., the host where the
pilot agent/s is/are running and where the workload is being executed).
A `proxy_url` can be specified which then must point to an RP Proxy
Service instance which this session can use to establish a communication
proxy. If `proxy_url` is not specified, the session will check for the
environment variables `RADICAL_PILOT_PROXY_URL` and will interpret it as
such above. If that information is not available, the session will
instantiate a proxy service on the local host. Note that any proxy
service instantiated by the session itself will be terminated once the
session instance is closed or goes out of scope and is thus garbage
collected and as such should not be used by other session instances.
Note: an RP proxy will have to be accessible by both the client and the
target hosts to facilitate communication between both parties.
That implies access to the respective ports. Proxies started by
the session itself will use the first port larger than 10.000
which is found to be free.
Arguments:
proxy_url (str, optional): proxy service URL - points to an RP
proxy service which is used to establish an RP communication proxy
for this session.
uid (str, optional): Create a session with this UID. Session UIDs
MUST be unique - otherwise they will lead to communication
conflicts, resulting in undefined behaviours.
cfg (str | dict, optional): a named or instantiated configuration
to be used for the session.
_role (`bool`): only `PRIMARY` sessions created by the original
application process (via `rp.Session()`), will create proxies
and Registry Services. `AGENT` sessions will also create
a Registry but no proxies. All other `DEFAULT` session
instances are instantiated internally in processes spawned
(directly or indirectly) by the initial session, for example in
some of it's components, or by the RP agent. Those sessions
will inherit the original session ID, but will not attempt to
create a new proxies or registries.
**close_options (optional): If additional key word arguments are
provided, they will be used as the default arguments to
Session.close(). This can be useful when the Session is used as
a Python context manager, such that close() is called
automatically at the end of a ``with`` block.
_reg_addr (str, optional): Non-primary sessions will connect to the
registry at that endpoint and pull session config and resource
configurations from there.
"""
self._t_start = time.time()
self._role = _role
self._uid = uid
self._cfg = ru.Config(cfg=cfg)
self._reg_addr = _reg_addr
self._proxy_url = proxy_url
self._proxy_cfg = None
self._closed = False
self._created = time.time()
self._close_options = _CloseOptions(close_options)
self._close_options.verify()
self._proxy = None # proxy client instance
self._reg = None # registry client instance
self._pmgrs = dict() # map IDs to pmgr instances
self._tmgrs = dict() # map IDs to tmgr instances
self._cmgr = None # only primary sessions have a cmgr
self._rm = None # resource manager (agent_0 sessions)
self._hb = None # heartbeat monitor
# this session is either living in the client application or lives in
# the scope of a pilot. In the latter case we expect `RP_PILOT_ID` to
# be set - we derive the session module scope from that env variable.
self._module = os.environ.get('RP_PILOT_ID', 'client')
# non-primary sessions need a uid!
if self._role != self._PRIMARY and not self._uid:
raise ValueError('non-primary session needs UID (%s)' % self._role)
# initialization is different for each session type
# NOTE: we could refactor this to session sub-classes
if self._role == self._PRIMARY: self._init_primary()
elif self._role == self._AGENT_0: self._init_agent_0()
elif self._role == self._AGENT_N: self._init_agent_n()
else : self._init_default()
# cache sandboxes etc.
self._cache_lock = ru.RLock()
self._cache = {'endpoint_fs' : dict(),
'resource_sandbox' : dict(),
'session_sandbox' : dict(),
'pilot_sandbox' : dict(),
'client_sandbox' : self._cfg.client_sandbox,
'js_shells' : dict(),
'fs_dirs' : dict()}
# at this point we have a bridge connection, logger, etc, and are done
self._prof.prof('session_ok', uid=self._uid)
if self._role == self._PRIMARY:
self._rep.ok('>>ok\n')
assert(self._reg)
# --------------------------------------------------------------------------
#
def _init_primary(self):
assert self._role == self._PRIMARY
# The primary session
# - reads session config files
# - reads resource config files
# - starts the client side registry service
# - pushes the configs into that registry
# - pushes bridge and component configs into that registry
# - starts a ZMQ proxy (or ensures one is up and running)
# if user did not set a uid, we need to generate a new ID
if not self._uid:
self._uid = ru.generate_id('rp.session', mode=ru.ID_PRIVATE)
# we still call `_init_cfg` to complete missing config settings
# FIXME: completion only needed by `PRIMARY`
self._init_cfg_from_scratch()
# primary sessions create a registry service
self._start_registry()
self._connect_registry()
# only primary sessions start and initialize the proxy service
self._start_proxy()
# start heartbeat channel
self._start_heartbeat()
# push the session config into the registry
self._publish_cfg()
# start bridges and components
self._start_components()
time.sleep(1)
# primary session hooks into the control pubsub
bcfg = self._reg['bridges.%s' % rpc.CONTROL_PUBSUB]
self._ctrl_pub = ru.zmq.Publisher(channel=rpc.CONTROL_PUBSUB,
url=bcfg['addr_pub'],
log=self._log,
prof=self._prof)
# crosswire local channels and proxy channels
self._crosswire_proxy()
# --------------------------------------------------------------------------
#
def _init_agent_0(self):
# The agent_0 session expects the `cfg` parameter to contain the
# complete agent config!
#
# - starts the agent side registry service
# - separates
# - session config (== agent config)
# - bridge configs
# - component configs
# - resource config
# - pushes them all into the registry
# - connects to the ZMQ proxy for client/agent communication
# - start agent components
assert self._role == self._AGENT_0
self._init_cfg_from_dict()
self._start_registry()
self._connect_registry()
self._connect_proxy()
self._start_heartbeat()
self._publish_cfg()
self._init_rm()
self._start_components()
self._crosswire_proxy()
# --------------------------------------------------------------------------
#
def _init_agent_n(self):
# The agent_n session fetch their config from agent_0 registry
#
# - connect to registry
# - fetch config from registry
# - start agent bridges and components
assert self._role == self._AGENT_N
# the config passed to the session c'tor is the *agent* config - keep it
a_cfg = self._cfg
self._connect_registry()
self._init_cfg_from_registry()
# merge the agent's config into the session config
self._cfg.bridges = ru.Config(cfg=a_cfg.get('bridges', {}))
self._cfg.components = ru.Config(cfg=a_cfg.get('components', {}))
self._start_components()
# --------------------------------------------------------------------------
#
def _init_default(self):
# sub-agents and components connect to an existing registry (owned by
# the `primary` session or `agent_0`) and load config settings from
# there.
assert self._role == self._DEFAULT
self._connect_registry()
self._init_cfg_from_registry()
# --------------------------------------------------------------------------
#
def _start_registry(self):
# make sure that no other registry is used
if self._reg_addr:
raise ValueError('cannot start registry when providing `reg_addr`')
self._reg_service = ru.zmq.Registry(uid='%s.reg' % self._uid,
path=self._cfg.path)
self._reg_service.start()
self._cfg.reg_addr = self._reg_service.addr
# --------------------------------------------------------------------------
#
def _connect_registry(self):
if not self._cfg.reg_addr:
self._cfg.reg_addr = self._reg_addr
if not self._cfg.reg_addr:
raise ValueError('session needs a registry address')
self._reg = ru.zmq.RegistryClient(url=self._cfg.reg_addr)
# --------------------------------------------------------------------------
#
def _init_cfg_from_scratch(self):
# A primary session will at this point have a registry client connected
# to its registry service. Further, self._cfg will either be a config
# name to be read from disk (`session_<cfg_name>.json`), or a dictionary
# with a specific, user provided config. From this information clean up
# `self._cfg` and store it in the registry. Also read resource configs
# and store the in the registry as well.
# NOTE: `cfg_name` and `cfg` are overloaded, the user cannot point to
# a predefined config and amend it at the same time. This might
# be ok for the session, but introduces an API inconsistency.
cfg_name = 'default'
if isinstance(self._cfg, str):
cfg_name = self._cfg
self._cfg = None
# load the named config, merge provided config
self._cfg = ru.Config('radical.pilot.session', name=cfg_name,
cfg=self._cfg)
rcfgs = ru.Config('radical.pilot.resource', name='*', expand=False)
self._rcfgs = ru.Config()
for site in rcfgs:
self._rcfgs[site] = ru.Config()
for res, rcfg in rcfgs[site].items():
self._rcfgs[site][res] = ResourceConfig(rcfg)
self._rcfg = ru.Config() # the local resource config, if known
# set essential config values for *this* specific session
self._cfg['sid'] = self._uid
pwd = os.getcwd()
if self._cfg.base:
self._cfg.base = os.path.abspath(self._cfg.base)
else:
self._cfg.base = pwd
if self._cfg.path:
self._cfg.path = os.path.abspath(self._cfg.path)
else:
self._cfg.path = '%s/%s' % (self._cfg.base, self._cfg.sid)
if self._cfg.client_sandbox:
self._cfg.client_sandbox = os.path.abspath(self._cfg.client_sandbox)
else:
self._cfg.client_sandbox = pwd
# change RU defaults to point logfiles etc. to the session sandbox
def_cfg = ru.DefaultConfig()
def_cfg.log_dir = self._cfg.path
def_cfg.report_dir = self._cfg.path
def_cfg.profile_dir = self._cfg.path
self._prof = self._get_profiler(name=self._uid)
self._rep = self._get_reporter(name=self._uid)
self._log = self._get_logger (name=self._uid,
level=self._cfg.get('log_lvl'),
debug=self._cfg.get('debug_lvl'))
from . import version_detail as rp_version_detail
self._log.info('radical.pilot version: %s', rp_version_detail)
self._log.info('radical.utils version: %s', ru.version_detail)
self._prof.prof('session_start', uid=self._uid)
self._rep.info ('<<new session: ')
self._rep.plain('[%s]' % self._uid)
# --------------------------------------------------------------------------
#
def _init_cfg_from_dict(self):
# A agent_0 session will read the configuration from agent_0.cfg and
# pass it to the session.
assert self._role == self._AGENT_0
self._cfg = ru.Config(cfg=self._cfg)
# we only have one resource config for the current resource
self._rcfg = ru.Config(cfg=self._cfg.resource_cfg) # local config
self._rcfgs = ru.Config()
del self._cfg['resource_cfg']
# set essential config values for *this* specific session
pwd = os.getcwd()
if self._cfg.base:
self._cfg.base = os.path.abspath(self._cfg.base)
else:
self._cfg.base = pwd
if self._cfg.path:
self._cfg.path = os.path.abspath(self._cfg.path)
else:
self._cfg.path = pwd
# change RU defaults to point logfiles etc. to the session sandbox
def_cfg = ru.DefaultConfig()
def_cfg.log_dir = self._cfg.path
def_cfg.report_dir = self._cfg.path
def_cfg.profile_dir = self._cfg.path
self._prof = self._get_profiler(name=self._uid)
self._rep = self._get_reporter(name=self._uid)
self._log = self._get_logger (name=self._uid,
level=self._cfg.get('log_lvl'),
debug=self._cfg.get('debug_lvl'))
from . import version_detail as rp_version_detail
self._log.info('radical.pilot version: %s', rp_version_detail)
self._log.info('radical.utils version: %s', ru.version_detail)
self._prof.prof('session_start', uid=self._uid)
# --------------------------------------------------------------------------
#
def _init_cfg_from_registry(self):
# fetch config settings from the registry
self._cfg = ru.Config(cfg=self._reg['cfg'])
self._rcfg = ru.Config(cfg=self._reg['rcfg'])
self._rcfgs = ru.Config(cfg=self._reg['rcfgs'])
# change RU defaults to point logfiles etc. to the session sandbox
# NOTE: this is racey: the first session in this process will win
def_cfg = ru.DefaultConfig()
def_cfg.log_dir = self._cfg.path
def_cfg.report_dir = self._cfg.path
def_cfg.profile_dir = self._cfg.path
self._prof = self._get_profiler(name=self._uid)
self._rep = self._get_reporter(name=self._uid)
self._log = self._get_logger (name=self._uid,
level=self._cfg.get('log_lvl'),
debug=self._cfg.get('debug_lvl'))
from . import version_detail as rp_version_detail
self._log.info('radical.pilot version: %s', rp_version_detail)
self._log.info('radical.utils version: %s', ru.version_detail)
self._log.debug('Session(%s, %s)', self._uid, self._role)
self._prof.prof('session_start', uid=self._uid)
# --------------------------------------------------------------------------
#
def _start_heartbeat(self):
# only primary and agent_0 sessions manage heartbeats
assert self._role in [self._PRIMARY, self._AGENT_0]
# start the embedded heartbeat pubsub bridge
self._hb_pubsub = ru.zmq.PubSub('heartbeat_pubsub',
cfg={'uid' : 'heartbeat_pubsub',
'type' : 'pubsub',
'log_lvl': 'debug',
'path' : self._cfg.path})
self._hb_pubsub.start()
time.sleep(1)
# re-enable the test below if timing issues crop up
# ru.zmq.test_pubsub(self._hb_pubsub.channel,
# self._hb_pubsub.addr_pub,
# self._hb_pubsub.addr_sub),
# fill 'cfg.heartbeat' section
self._cfg.heartbeat.addr_pub = str(self._hb_pubsub.addr_pub)
self._cfg.heartbeat.addr_sub = str(self._hb_pubsub.addr_sub)
# create a publisher for that channel to publish own heartbeat
self._hb_pub = ru.zmq.Publisher(channel='heartbeat_pubsub',
url=self._cfg.heartbeat.addr_pub,
log=self._log,
prof=self._prof)
# --------------------------------------
# start the heartbeat monitor, but first
# define its callbacks
def _hb_beat_cb():
# called on every heartbeat: cfg.heartbeat.interval`
# publish own heartbeat
self._hb_pub.put('heartbeat', HeartbeatMessage(uid=self._uid))
# also update proxy heartbeat
if self._proxy:
self._proxy.request('heartbeat', {'sid': self._uid})
# --------------------------------------
# --------------------------------------
# called when some entity misses
# heartbeats: `cfg.heartbeat.timeout`
def _hb_term_cb(hb_uid):
if self._cmgr:
self._cmgr.close()
return False
# --------------------------------------
# create heartbeat manager which monitors all components in this session
# self._log.debug('hb %s from session', self._uid)
self._hb = ru.Heartbeat(uid=self._uid,
timeout=self._cfg.heartbeat.timeout,
interval=self._cfg.heartbeat.interval,
beat_cb=_hb_beat_cb,
term_cb=_hb_term_cb,
log=self._log)
self._hb.start()
# --------------------------------------
# subscribe to heartbeat msgs and inform
# self._hb about every heartbeat
def _hb_msg_cb(topic, msg):
hb_msg = HeartbeatMessage(from_dict=msg)
if hb_msg.uid != self._uid:
self._hb.beat(uid=hb_msg.uid)
# --------------------------------------
ru.zmq.Subscriber(channel='heartbeat_pubsub',
topic='heartbeat',
url=self._cfg.heartbeat.addr_sub,
cb=_hb_msg_cb,
log=self._log,
prof=self._prof)
# --------------------------------------------------------------------------
#
def _publish_cfg(self):
# The primary session and agent_0 push their configs into the registry.
# NOTE: proxy channels populate the registrie's `bridges` section
assert self._role in [self._PRIMARY, self._AGENT_0]
# push proxy, bridges, components and heartbeat subsections separately
flat_cfg = copy.deepcopy(self._cfg)
del flat_cfg['heartbeat']
del flat_cfg['bridges']
del flat_cfg['components']
self._reg['cfg'] = flat_cfg
self._reg['heartbeat'] = self._cfg.heartbeat
self._reg['bridges'] = self._cfg.bridges # proxy bridges
self._reg['components'] = {}
# if we have proxy channels, publish them in the bridges configs too
if self._proxy_cfg:
for channel in self._proxy_cfg:
self._reg['bridges.%s' % channel] = self._proxy_cfg[channel]
# primary sessions publish all known resource configs under `rcfgs`, the
# agent_0 only publishes the *current* resource config under `rcfg`.
if self._role == self._PRIMARY:
self._reg['rcfg'] = dict()
self._reg['rcfgs'] = self._rcfgs
elif self._role == self._AGENT_0:
self._reg['rcfg'] = self._rcfg
self._reg['rcfgs'] = dict()
# --------------------------------------------------------------------------
#
def _start_proxy(self):
# A primary session will start a ZMQ proxy via which agents can connect
# to the client. It is also possible a proxy already exists and a proxy
# address was passed to the ctor - in that case we will, obviously, not
# start a proxy but just make sure the given one is being used.
assert self._role == self._PRIMARY
# check if an external proxy was epcified for the session.
if self._proxy_url:
self._log.debug('use proxy at %s' % self._proxy_url)
return
# check if an external proxy was specified in the environment
elif 'RADICAL_PILOT_PROXY_URL' in os.environ:
self._proxy_url = os.environ['RADICAL_PILOT_PROXY_URL']
self._log.debug('found proxy at %s' % self._proxy_url)
# no luck - start an embedded proxy
else:
self._proxy_event = mt.Event()
self._proxy_thread = mt.Thread(target=self._run_proxy)
self._proxy_thread.daemon = True
self._proxy_thread.start()
self._proxy_event.wait()
assert self._proxy_url
# the proxy url becomes part of the session cfg
self._cfg.proxy_url = self._proxy_url
self._rep.info ('<<zmq proxy : ')
self._rep.plain('[%s]' % self._proxy_url)
# configure proxy channels
try:
self._proxy = ru.zmq.Client(url=self._cfg.proxy_url)
self._proxy_cfg = self._proxy.request('register', {'sid':self._uid})
except:
self._log.exception('%s: failed to start proxy', self._role)
raise
# --------------------------------------------------------------------------
#
def _connect_proxy(self):
assert self._role == self._AGENT_0
# make sure we have a proxy address to use
assert self._cfg.proxy_url
# query the proxy service to fetch proxy cfg created by primary session
self._proxy = ru.zmq.Client(url=self._cfg.proxy_url)
self._proxy_cfg = self._proxy.request('lookup', {'sid': self._uid})
self._log.debug('proxy response: %s', self._proxy_cfg)
# ----------------------------------------------------------------------
def crosswire_pubsub(self, src, tgt, from_proxy):
# we only forward messages which have either no origin set (in this case
# this method sets the origin), or whose origin is the same as
# configured when crosswiring the channels (either 'client' or the pilot
# ID). Also, the messages need to have the `forward` flag set.
path = self._cfg.path
reg = self._reg
url_sub = reg['bridges.%s.addr_sub' % src.lower()]
url_pub = reg['bridges.%s.addr_pub' % tgt.lower()]
# self._log.debug('XXX cfg fwd for topic:%s to %s', src, tgt)
# self._log.debug('XXX cfg fwd for %s to %s', url_sub, url_pub)
publisher = ru.zmq.Publisher(channel=tgt, path=path, url=url_pub,
log=self._log, prof=self._prof)
def pubsub_fwd(topic, msg):
if 'origin' not in msg:
msg['origin'] = self._module
if from_proxy:
# all messages *from* the proxy are forwarded - but not the ones
# which originated in *this* module in the first place.
if msg['origin'] == self._module:
# self._log.debug('XXX >=! fwd %s to topic:%s: %s', src, tgt, msg)
return
# self._log.debug('XXX >=> fwd %s to topic:%s: %s', src, tgt, msg)
publisher.put(tgt, msg)
else:
# only forward messages which have the respective flag set
if not msg.get('fwd'):
# self._log.debug('XXX =>! fwd %s to %s: %s [%s - %s]', src,
# tgt, msg, msg['origin'], self._module)
return
# only forward all messages which originated in *this* module.
if not msg['origin'] == self._module:
# self._log.debug('XXX =>| fwd %s to topic:%s: %s', src, tgt, msg)
return
# avoid message loops (forward only once)
msg['fwd'] = False
# self._log.debug('XXX =>> fwd %s to topic:%s: %s', src, tgt, msg)
publisher.put(tgt, msg)
ru.zmq.Subscriber(channel=src, topic=src, path=path, cb=pubsub_fwd,
url=url_sub, log=self._log, prof=self._prof)
# --------------------------------------------------------------------------
#
def _crosswire_proxy(self):
# - forward local ctrl pubsub messages to proxy control pubsub
# - forward local state pubsub messages to proxy state pubsub
# - forward local task queue messages to proxy task queue
#
# - forward proxy ctrl pubsub messages to local control pubsub
# - forward proxy state pubsub messages to local state pubsub
# - forward proxy task queue messages to local task queue
#
# The local task queue endpoints differ for primary session and agent_0
#
# - primary:
# - forward from AGENT_STAGING_INPUT_PENDING_QUEUE
# - forward to TMGR_STAGING_OUTPUT_PENDING_QUEUE
#
# - agent_0:
# - forward to AGENT_STAGING_INPUT_PENDING_QUEUE
# - forward from TMGR_STAGING_OUTPUT_PENDING_QUEUE
#
# NOTE: the primary session task queues don't live in the session itself
# but are owned by the task manager instead - it will trigger the
# crosswire once the queues are created.
assert self._role in [self._PRIMARY, self._AGENT_0]
self.crosswire_pubsub(src=rpc.CONTROL_PUBSUB,
tgt=rpc.PROXY_CONTROL_PUBSUB,
from_proxy=False)
self.crosswire_pubsub(src=rpc.PROXY_CONTROL_PUBSUB,
tgt=rpc.CONTROL_PUBSUB,
from_proxy=True)
self.crosswire_pubsub(src=rpc.STATE_PUBSUB,
tgt=rpc.PROXY_STATE_PUBSUB,
from_proxy=False)
self.crosswire_pubsub(src=rpc.PROXY_STATE_PUBSUB,
tgt=rpc.STATE_PUBSUB,
from_proxy=True)
# --------------------------------------------------------------------------
#
def _init_rm(self):
# import locally to avoid circular imports
from .agent.resource_manager import ResourceManager
rname = self._rcfg.resource_manager
self._rm = ResourceManager.create(name=rname,
cfg=self._cfg,
rcfg=self._rcfg,
log=self._log, prof=self._prof)
import pprint
self._log.debug(pprint.pformat(self._rm.info))
# --------------------------------------------------------------------------
#
def get_rm(self):
return self._rm
# --------------------------------------------------------------------------
#
def _start_components(self):
assert self._role in [self._PRIMARY, self._AGENT_0, self._AGENT_N]
# primary sessions and agents have a component manager which also
# manages heartbeat. 'self._cmgr.close()` should be called during
# termination
self._cmgr = rpu.ComponentManager(self.uid, self.reg_addr, self._uid)
self._cmgr.start_bridges(self._cfg.bridges)
self._cmgr.start_components(self._cfg.components)
# --------------------------------------------------------------------------
# context manager `with` clause
#
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
# --------------------------------------------------------------------------
#
[docs] def close(self, **kwargs):
"""Close the session.
All subsequent attempts access objects attached to the session will
result in an error. If cleanup is set to True, the session data is
removed from the database.
Arguments:
terminate (bool, optional): Shut down all pilots associated with the
session.
download (bool, optional): Fetch pilot profiles and database
entries.
"""
# close only once
if self._closed:
return
if self._role == self._PRIMARY:
self._rep.info('closing session %s' % self._uid)
self._log.debug("session %s closing", self._uid)
self._prof.prof("session_close", uid=self._uid)
# Merge kwargs with current defaults stored in self._close_options
self._close_options.update(kwargs)
self._close_options.verify()
# to call for `_verify` method and to convert attributes
# to their types if needed (but None value will stay if it is set)
options = self._close_options
if options.terminate:
# terminate all components
if self._role == self._PRIMARY:
self._ctrl_pub.put(rpc.CONTROL_PUBSUB, {'cmd': 'terminate',
'arg': None})
for tmgr_uid, tmgr in self._tmgrs.items():
self._log.debug("session %s closes tmgr %s", self._uid, tmgr_uid)
tmgr.close()
self._log.debug("session %s closed tmgr %s", self._uid, tmgr_uid)
for pmgr_uid, pmgr in self._pmgrs.items():
self._log.debug("session %s closes pmgr %s", self._uid, pmgr_uid)
pmgr.close(terminate=options.terminate)
self._log.debug("session %s closed pmgr %s", self._uid, pmgr_uid)
if self._cmgr:
self._cmgr.close()
# stop heartbeats
if self._hb:
self._hb.stop()
self._hb_pubsub.stop()
if self._proxy:
if self._role == self._PRIMARY:
try:
self._log.debug('session %s closes service', self._uid)
self._proxy.request('unregister', {'sid': self._uid})
except:
pass
if self._role in [self._PRIMARY, self._AGENT_0]:
self._proxy.close()
self._proxy = None
self._log.debug("session %s closed", self._uid)
self._prof.prof("session_stop", uid=self._uid)
self._prof.close()
self._closed = True
# after all is said and done, we attempt to download the pilot log- and
# profiles, if so wanted
if options.download:
self._prof.prof("session_fetch_start", uid=self._uid)
self._log.debug('start download')
tgt = self._cfg.base
self.fetch_profiles(tgt=tgt)
self.fetch_logfiles(tgt=tgt)
self._prof.prof("session_fetch_stop", uid=self._uid)
if self._role == self._PRIMARY:
# stop registry
self._reg.close()
self._reg_service.stop() # this will dump registry
self._t_stop = time.time()
self._rep.info('<<session lifetime: %.1fs'
% (self._t_stop - self._t_start))
self._rep.ok('>>ok\n')
# --------------------------------------------------------------------------
#
def _run_proxy(self):
proxy = Proxy(path=self._cfg.path)
try:
proxy.start()
self._proxy_url = proxy.addr
self._proxy_event.set()
# run forever until process is interrupted or killed
proxy.wait()
finally:
proxy.stop()
proxy.wait()
# --------------------------------------------------------------------------
#
[docs] def as_dict(self):
"""Returns a Python dictionary representation of the object."""
object_dict = {
'uid' : self._uid,
'proxy_url': str(self.proxy_url),
'cfg' : copy.deepcopy(self._cfg)
}
return object_dict
# --------------------------------------------------------------------------
#
@property
def reg_addr(self):
return self._cfg.reg_addr
# --------------------------------------------------------------------------
#
@property
def uid(self):
return self._uid
# --------------------------------------------------------------------------
#
@property
def path(self):
return self._cfg.path
# --------------------------------------------------------------------------
#
@property
def base(self):
return self._cfg.base
# --------------------------------------------------------------------------
#
@property
def proxy_url(self):
return self._cfg.proxy_url
# --------------------------------------------------------------------------
#
@property
def cfg(self):
return self._cfg
# --------------------------------------------------------------------------
#
@property
def rcfgs(self):
return self._rcfgs
# --------------------------------------------------------------------------
#
@property
def rcfg(self):
return self._rcfg
# --------------------------------------------------------------------------
#
@property
def cmgr(self):
return self._cmgr
# --------------------------------------------------------------------------
#
def _get_logger(self, name, level=None, debug=None):
"""Get the Logger instance.
This is a thin wrapper around `ru.Logger()` which makes sure that
log files end up in a separate directory with the name of `session.uid`.
"""
log = ru.Logger(name=name, ns='radical.pilot', path=self._cfg.path,
targets=['.'], level=level, debug=debug)
return log
# --------------------------------------------------------------------------
#
def _get_reporter(self, name):
"""Get the Reporter instance.
This is a thin wrapper around `ru.Reporter()` which makes sure that
log files end up in a separate directory with the name of `session.uid`.
"""
if not self._reporter:
enabled = ru.get_env_ns('report', 'radical.pilot', 'false').lower()
if enabled in ['1', 'true', 'on']:
enabled = True
else:
enabled = False
self._reporter = ru.Reporter(name=name, ns='radical.pilot',
path=self._cfg.path, enabled=enabled)
return self._reporter
# --------------------------------------------------------------------------
#
def _get_profiler(self, name):
"""Get the Profiler instance.
This is a thin wrapper around `ru.Profiler()` which makes sure that
log files end up in a separate directory with the name of `session.uid`.
"""
prof = ru.Profiler(name=name, ns='radical.pilot', path=self._cfg.path)
return prof
# --------------------------------------------------------------------------
#
# FIXME MONGODB: to json
# if self._dbs and self._dbs._c:
# self._dbs._c.update({'type' : 'session',
# "uid" : self.uid},
# {"$push" : {"metadata": metadata}})
# --------------------------------------------------------------------------
#
def _register_pmgr(self, pmgr):
self._pmgrs[pmgr.uid] = pmgr
# # --------------------------------------------------------------------------
# #
# def _reconnect_pmgr(self, pmgr):
#
# if not self._dbs.get_pmgrs(pmgr_ids=pmgr.uid):
# raise ValueError('could not reconnect to pmgr %s' % pmgr.uid)
#
# self._pmgrs[pmgr.uid] = pmgr
#
#
# --------------------------------------------------------------------------
#
[docs] def list_pilot_managers(self):
"""Get PilotManager instances.
Lists the unique identifiers of all :class:`radical.pilot.PilotManager`
instances associated with this session.
Returns:
list[str]: A list of :class:`radical.pilot.PilotManager` uids.
"""
return list(self._pmgrs.keys())
# --------------------------------------------------------------------------
#
[docs] def get_pilot_managers(self, pmgr_uids=None):
"""Get known PilotManager(s).
Arguments:
pmgr_uids (str | Iterable[str], optional): uids of the PilotManagers
we want.
Returns:
radical.pilot.PilotManager | list[radical.pilot.PilotManager]: One
or more `radical.pilot.PilotManager` objects.
"""
return_scalar = False
if isinstance(pmgr_uids, str):
pmgr_uids = [pmgr_uids]
return_scalar = True
if pmgr_uids: pmgrs = [self._pmgrs[uid] for uid in pmgr_uids]
else : pmgrs = list(self._pmgrs.values())
if return_scalar: return pmgrs[0]
else : return pmgrs
# --------------------------------------------------------------------------
#
def _register_tmgr(self, tmgr):
self._tmgrs[tmgr.uid] = tmgr
# # --------------------------------------------------------------------------
# #
# def _reconnect_tmgr(self, tmgr):
#
# if not self._dbs.get_tmgrs(tmgr_ids=tmgr.uid):
# raise ValueError('could not reconnect to tmgr %s' % tmgr.uid)
#
# self._tmgrs[tmgr.uid] = tmgr
#
#
# --------------------------------------------------------------------------
#
[docs] def list_task_managers(self):
"""Get TaskManager identifiers.
Lists the unique identifiers of all :class:`radical.pilot.TaskManager`
instances associated with this session.
Returns:
list[str]: A list of :class:`radical.pilot.TaskManager` uids.
"""
return list(self._tmgrs.keys())
# --------------------------------------------------------------------------
#
[docs] def get_task_managers(self, tmgr_uids=None):
"""Get known TaskManager(s).
Arguments:
tmgr_uids (str | list[str]): uids of the TaskManagers we want
Returns:
radical.pilot.TaskManager | list[radical.pilot.TaskManager]:
One or more `radical.pilot.TaskManager` objects.
"""
return_scalar = False
if not isinstance(tmgr_uids, list):
tmgr_uids = [tmgr_uids]
return_scalar = True
if tmgr_uids: tmgrs = [self._tmgrs[uid] for uid in tmgr_uids]
else : tmgrs = list(self._tmgrs.values())
if return_scalar: return tmgrs[0]
else : return tmgrs
# --------------------------------------------------------------------------
#
[docs] def list_resources(self):
"""Get list of known resource labels.
Returns a list of known resource labels which can be used in a pilot
description.
"""
resources = list()
for domain in self._rcfgs:
for host in self._rcfgs[domain]:
resources.append('%s.%s' % (domain, host))
return sorted(resources)
# --------------------------------------------------------------------------
#
[docs] def get_resource_config(self, resource, schema=None):
"""Returns a dictionary of the requested resource config."""
site, res = resource.split('.', 1)
if site not in self._rcfgs:
raise RuntimeError("Resource site '%s' is unknown." % site)
if res not in self._rcfgs[site]:
raise RuntimeError("Resource label '%s' unknown." % res)
if not schema:
schema = self._rcfgs[site][res]['default_schema']
if not schema:
from_dict = self._rcfgs[site][res]
from_dict.label = resource
return ResourceConfig(from_dict=from_dict)
if schema not in self._rcfgs[site][res]['schemas']:
raise RuntimeError("schema %s unknown for resource %s"
% (schema, resource))
rcfg = ResourceConfig(from_dict=self._rcfgs[site][res])
scfg = rcfg['schemas'][schema]
ru.dict_merge(rcfg, scfg, ru.OVERWRITE)
if 'resource_manager' in rcfg:
# import locally to avoid circular imports
from .agent.resource_manager import ResourceManager
rm = ResourceManager.get_manager(rcfg['resource_manager'])
if rm and rm.batch_started():
rcfg.update(ENDPOINTS_DEFAULT)
rcfg.label = resource
rcfg.verify()
return rcfg
# --------------------------------------------------------------------------
#
def fetch_profiles(self, tgt=None):
return rpu.fetch_profiles(self._uid, tgt=tgt, skip_existing=True,
rep=self._rep)
# --------------------------------------------------------------------------
#
def fetch_logfiles(self, tgt=None):
return rpu.fetch_logfiles(self._uid, tgt=tgt, skip_existing=True,
rep=self._rep)
# --------------------------------------------------------------------------
#
def _get_client_sandbox(self):
"""Client sandbox path.
For the session in the client application, this is `os.getcwd()`. For
the session in any other component, specifically in pilot components,
the client sandbox needs to be read from the session config (or pilot
config). The latter is not yet implemented, so the pilot can not yet
interpret client sandboxes. Since pilot-side staging to and from the
client sandbox is not yet supported anyway, this seems acceptable
(FIXME).
"""
return self._cache['client_sandbox']
# --------------------------------------------------------------------------
#
def _get_resource_sandbox(self, pilot):
"""Global RP sandbox.
For a given pilot dict, determine the global RP sandbox, based on the
pilot's 'resource' attribute.
"""
# FIXME: this should get 'resource, schema=None' as parameters
resource = pilot['description'].get('resource')
schema = pilot['description'].get('access_schema')
if not resource:
raise ValueError('Cannot get pilot sandbox w/o resource target')
# the global sandbox will be the same for all pilots on any resource, so
# we cache it
with self._cache_lock:
if resource not in self._cache['resource_sandbox']:
# cache miss -- determine sandbox and fill cache
rcfg = self.get_resource_config(resource, schema)
fs_url = ru.Url(rcfg['filesystem_endpoint'])
# Get the sandbox from either the pilot_desc or resource conf
sandbox_raw = pilot['description'].get('sandbox')
if not sandbox_raw:
sandbox_raw = rcfg.get('default_remote_workdir', "$PWD")
# we may need to replace pat elements with data from the pilot
# description
if '%' in sandbox_raw:
# expand from pilot description
expand = dict()
for k, v in pilot['description'].items():
if v is None:
v = ''
if k == 'project':
if '_' in v and 'ornl' in resource:
v = v.split('_')[0]
elif '-' in v and 'ncsa' in resource:
v = v.split('-')[0]
expand['pd.%s' % k] = v
if isinstance(v, str):
expand['pd.%s' % k.upper()] = v.upper()
expand['pd.%s' % k.lower()] = v.lower()
else:
expand['pd.%s' % k.upper()] = v
expand['pd.%s' % k.lower()] = v
sandbox_raw = sandbox_raw % expand
# If the sandbox contains expandables, we need to resolve those
# remotely.
#
# NOTE: this will only work for (gsi)ssh or similar shell
# based access mechanisms
if '$' not in sandbox_raw:
# no need to expand further
sandbox_base = sandbox_raw
else:
# FIXME "remote sandbox expansion not yet supported"
out, err, ret = ru.sh_callout(' echo "WORKDIR: %s"' %
sandbox_raw, shell=True)
if ret or 'WORKDIR:' not in out:
raise RuntimeError("workdir expansion failed: %s [%s]"
% (out, err))
sandbox_base = out.split(":")[1].strip()
self._log.debug("sandbox base %s", sandbox_base)
# at this point we have determined the remote 'pwd' - the
# global sandbox is relative to it.
fs_url.path = "%s/radical.pilot.sandbox" % sandbox_base
# before returning, keep the URL string in cache
self._cache['resource_sandbox'][resource] = fs_url
return self._cache['resource_sandbox'][resource]
# --------------------------------------------------------------------------
#
def _get_session_sandbox(self, pilot):
# FIXME: this should get 'resource, schema=None' as parameters
resource = pilot['description'].get('resource')
if not resource:
raise ValueError('Cannot get session sandbox w/o resource target')
with self._cache_lock:
if resource not in self._cache['session_sandbox']:
# cache miss
resource_sandbox = self._get_resource_sandbox(pilot)
session_sandbox = ru.Url(resource_sandbox)
session_sandbox.path += '/%s' % self.uid
self._cache['session_sandbox'][resource] = session_sandbox
return self._cache['session_sandbox'][resource]
# --------------------------------------------------------------------------
#
def _get_pilot_sandbox(self, pilot):
# FIXME: this should get 'pid, resource, schema=None' as parameters
pilot_sandbox = pilot.get('pilot_sandbox')
if str(pilot_sandbox):
return ru.Url(pilot_sandbox)
pid = pilot['uid']
with self._cache_lock:
if pid not in self._cache['pilot_sandbox']:
# cache miss
session_sandbox = self._get_session_sandbox(pilot)
pilot_sandbox = ru.Url(session_sandbox)
pilot_sandbox.path += '/%s/' % pilot['uid']
self._cache['pilot_sandbox'][pid] = pilot_sandbox
return self._cache['pilot_sandbox'][pid]
# --------------------------------------------------------------------------
#
def _get_endpoint_fs(self, pilot):
# FIXME: this should get 'resource, schema=None' as parameters
resource = pilot['description'].get('resource')
if not resource:
raise ValueError("Can't get fs-endpoint w/o resource target")
with self._cache_lock:
if resource not in self._cache['endpoint_fs']:
# cache miss
resource_sandbox = self._get_resource_sandbox(pilot)
endpoint_fs = ru.Url(resource_sandbox)
endpoint_fs.path = ''
self._cache['endpoint_fs'][resource] = endpoint_fs
return self._cache['endpoint_fs'][resource]
# --------------------------------------------------------------------------
#
def _get_task_sandbox(self, task, pilot):
# If a sandbox is specified in the task description, then interpret
# relative paths as relativet to the pilot sandbox.
# task sandboxes are cached in the task dict
task_sandbox = task.get('task_sandbox')
if task_sandbox:
return task_sandbox
# specified in description?
if not task_sandbox:
sandbox = task['description'].get('sandbox')
if sandbox:
task_sandbox = ru.Url(self._get_pilot_sandbox(pilot))
if sandbox[0] == '/':
task_sandbox.path = sandbox
else:
task_sandbox.path += '/%s/' % sandbox
# default
if not task_sandbox:
task_sandbox = ru.Url(self._get_pilot_sandbox(pilot))
task_sandbox.path += "/%s/" % task['uid']
# cache
task['task_sandbox'] = str(task_sandbox)
return task_sandbox
# --------------------------------------------------------------------------
#
def _get_jsurl(self, pilot):
"""Get job service endpoint and hop URL for pilot's target resource."""
resrc = pilot['description']['resource']
schema = pilot['description']['access_schema']
rcfg = self.get_resource_config(resrc, schema)
js_url = ru.Url(rcfg.get('job_manager_endpoint'))
js_hop = ru.Url(rcfg.get('job_manager_hop') or js_url)
# make sure the js_hop url points to an interactive access
# TODO: this is an unreliable heuristics - we should require the js_hop
# URL to be specified in the resource configs.
if '+gsissh' in js_hop.schema or \
'gsissh+' in js_hop.schema : js_hop.schema = 'gsissh'
elif '+ssh' in js_hop.schema or \
'ssh+' in js_hop.schema : js_hop.schema = 'ssh'
else : js_hop.schema = 'fork'
return js_url, js_hop
# ------------------------------------------------------------------------------