__copyright__ = 'Copyright 2013-2021, The RADICAL-Cybertools Team'
__license__ = 'MIT'
import pprint
import math as m
import radical.utils as ru
from ... import constants as rpc
from .base import AgentSchedulingComponent
# ------------------------------------------------------------------------------
#
# This is a continuous scheduler with awareness of a node's file-storage and
# memory capabilities. The scheduler respects colocate tagging (place task on
# same node as other tasks with the same tag).
#
#
# Continuous:
#
# The scheduler attempts to find continuous stretches of nodes to place
# a multinode-task.
#
#
# Local Storage:
#
# The storage availability will be obtained from the rm_node_list and assigned
# to the node list of the class. The lfs requirement will be obtained from the
# td in the alloc_nompi and alloc_mpi methods. Using the availability and
# requirement, the _find_resources method will return the core and gpu ids.
#
# Expected DS of the nodelist
# self.nodes = [{
# 'node_name': 'aa',
# 'node_id' : 'node.0000',
# 'cores' : [0, 1, 2, 3, 4, 5, 6, 7],
# 'gpus' : [0, 1, 2],
# 'lfs' : 128,
# 'mem' : 256
# },
# {
# 'node_name': 'bb',
# 'node_id' : 'node.0001',
# 'cores' : [0, 1, 2, 3, 4, 5, 6, 7],
# 'gpus' : [0, 1, 2],
# 'lfs' : 128,
# 'mem' : 256,
# },
# ...
# ]
#
# lfs storage (size) and memory is specified in MByte. The scheduler assumes
# that both are freed when the task finishes.
#
#
# Task Tagging:
#
# The scheduler attempts to schedule tasks with the same colocate tag onto the
# same node, so that the task can reuse the previous task's data. This assumes
# that storage is not freed when the tasks finishes.
#
# FIXME: the alert reader will realize a discrepancy in the above set of
# assumptions.
#
# ------------------------------------------------------------------------------
#
[docs]class Continuous(AgentSchedulingComponent):
'''
The Continuous scheduler attempts to place threads and processes of
a tasks onto nodes in the cluster.
'''
# --------------------------------------------------------------------------
#
def __init__(self, cfg, session):
AgentSchedulingComponent.__init__(self, cfg, session)
self._colo_history = dict()
self._tagged_nodes = set()
self._scattered = None
self._node_offset = 0
# --------------------------------------------------------------------------
#
# --------------------------------------------------------------------------
#
[docs] def _iterate_nodes(self):
'''
The scheduler iterates through the node list for each task placement.
However, we want to avoid starting from node zero every time as tasks
have likely placed on that node previously - instead, we in general want
to pick off where the last task placement succeeded. This iterator is
preserving that state.
Note that the first index is yielded twice, so that the respective
node can function as first and last node in an allocation.
'''
iterator_count = 0
while iterator_count < len(self.nodes):
yield self.nodes[self._node_offset]
iterator_count += 1
self._node_offset += 1
self._node_offset = self._node_offset % len(self.nodes)
# --------------------------------------------------------------------------
#
[docs] def unschedule_task(self, tasks):
'''
This method is called when previously acquired resources are not needed
anymore. `slots` are the resource slots as previously returned by
`schedule_task()`.
'''
# reflect the request in the nodelist state (set to `FREE`)
for task in ru.as_list(tasks):
self._change_slot_states(task['slots'], rpc.FREE)
# --------------------------------------------------------------------------
#
[docs] def _find_resources(self, node, find_slots, ranks_per_slot, cores_per_slot,
gpus_per_slot, lfs_per_slot, mem_per_slot, partial):
'''
Find up to the requested number of slots, where each slot features the
respective `x_per_slot` resources. This call will return a list of
slots of the following structure:
{
'node_name': 'node_name',
'node_id' : '1',
'core_map' : [[1, 2, 4, 5], [6, 7, 8, 9]],
'gpu_map' : [[1, 3], [1, 3]],
'lfs' : 1234,
'mem' : 4321
}
The call will *not* change the allocation status of the node, atomicity
must be guaranteed by the caller.
We don't care about continuity within a single node - cores `[1, 5]` are
assumed to be as close together as cores `[1, 2]`.
When `partial` is set, the method CAN return less than `find_slots`
number of slots - otherwise, the method returns the requested number of
slots or `None`.
FIXME: SMT handling: we should assume that hardware threads of the same
physical core cannot host different executables, so HW threads
can only account for thread placement, not process placement.
This might best be realized by internally handling SMT as minimal
thread count and using physical core IDs for process placement?
'''
# self._log.debug('find on %s: %s * [%s, %s]', node['uid'], )
# check if the node can host the request
free_cores = node['cores'].count(rpc.FREE)
free_gpus = node['gpus'].count(rpc.FREE)
free_lfs = node['lfs']
free_mem = node['mem']
# check how many slots we can serve, at most
alc_slots = 1
if cores_per_slot:
alc_slots = int(m.floor(free_cores / cores_per_slot))
if gpus_per_slot:
alc_slots = min(alc_slots, int(m.floor(free_gpus / gpus_per_slot)))
if lfs_per_slot:
alc_slots = min(alc_slots, int(m.floor(free_lfs / lfs_per_slot)))
if mem_per_slot:
alc_slots = min(alc_slots, int(m.floor(free_mem / mem_per_slot)))
# is this enough?
if not alc_slots:
return None
if not partial:
if alc_slots < find_slots:
return None
# find at most `find_slots`
alc_slots = min(alc_slots, find_slots)
# we should be able to host the slots - dig out the precise resources
slots = list()
node_id = node['node_id']
node_name = node['node_name']
core_idx = 0
gpu_idx = 0
for _ in range(alc_slots):
cores = list()
gpus = list()
while len(cores) < cores_per_slot:
if node['cores'][core_idx] == rpc.FREE:
cores.append(core_idx)
core_idx += 1
while len(gpus) < gpus_per_slot:
if node['gpus'][gpu_idx] == rpc.FREE:
gpus.append(gpu_idx)
gpu_idx += 1
cores_per_rank = cores_per_slot // ranks_per_slot
# create number of lists (equal to `ranks_per_slot`) with
# cores indices (i.e., cores per rank)
core_map = [cores[i:i + cores_per_rank]
for i in range(0, len(cores), cores_per_rank)]
# gpus per rank are the same within the slot
gpu_map = [gpus] * len(core_map)
slots.append({'node_name': node_name,
'node_id' : node_id,
'core_map' : core_map,
'gpu_map' : gpu_map,
'lfs' : lfs_per_slot,
'mem' : mem_per_slot})
# consistency check
assert (len(slots) == find_slots) or (len(slots) and partial)
return slots
# --------------------------------------------------------------------------
#
#
[docs] def schedule_task(self, task):
'''
Find an available set of slots, potentially across node boundaries (in
the MPI case).
A `slot` is here considered the amount of resources required by a single
MPI rank. Those resources need to be available on a single node - but
slots can be distributed across multiple nodes. Resources for non-MPI
tasks will always need to be placed on a single node.
By default, we only allow for partial allocations on the first and last
node - but all intermediate nodes MUST be completely used (this is the
'CONTINUOUS' scheduler after all).
If the scheduler is configured with `scattered=True`, then that
constraint is relaxed, and any set of slots (be it continuous across
nodes or not) is accepted as valid allocation.
No matter the mode, we always make sure that we allocate slots in chunks
of cores, gpus, lfs and mem required per process - otherwise the
application processes would not be able to acquire the requested
resources on the respective node.
'''
self._log.debug_3('find_resources %s', task['uid'])
td = task['description']
mpi = bool(td['ranks'] > 1)
cores_per_node = self._rm.info.cores_per_node
gpus_per_node = self._rm.info.gpus_per_node
lfs_per_node = self._rm.info.lfs_per_node
mem_per_node = self._rm.info.mem_per_node
cores_per_slot = td['cores_per_rank']
gpus_per_slot = td['gpus_per_rank']
lfs_per_slot = td['lfs_per_rank']
mem_per_slot = td['mem_per_rank']
# make sure that processes are at least single-threaded
if not cores_per_slot:
cores_per_slot = 1
# check if there is a GPU sharing
if gpus_per_slot and not gpus_per_slot.is_integer():
gpus = m.ceil(td['ranks'] * gpus_per_slot)
# find the greatest common divisor
req_slots = m.gcd(td['ranks'], gpus)
ranks_per_slot = td['ranks'] // req_slots
gpus_per_slot = gpus // req_slots
cores_per_slot *= ranks_per_slot
lfs_per_slot *= ranks_per_slot
mem_per_slot *= ranks_per_slot
else:
req_slots = td['ranks']
ranks_per_slot = 1
gpus_per_slot = int(gpus_per_slot)
self._log.debug_3('req : %s %s %s %s %s %s',
req_slots, ranks_per_slot, cores_per_slot,
gpus_per_slot, lfs_per_slot, mem_per_slot)
# First and last nodes can be a partial allocation - all other nodes
# can only be partial when `scattered` is set.
#
# Iterate over all nodes until we find something. Check if it fits the
# allocation mode and sequence. If not, start over with the next node.
# If it matches, add the slots found and continue to next node.
#
# FIXME: persistent node index
# we always fail when too many threads are requested
assert cores_per_slot <= cores_per_node, \
'too many threads per proc %s' % cores_per_slot
assert gpus_per_slot <= gpus_per_node, \
'too many gpus per proc %s' % gpus_per_slot
assert lfs_per_slot <= lfs_per_node, \
'too much lfs per proc %s' % lfs_per_slot
assert mem_per_slot <= mem_per_node, \
'too much mem per proc %s' % mem_per_slot
# check what resource type limits teh number of slots per node
tmp = list()
slots_per_node = int(m.floor(cores_per_node / cores_per_slot))
tmp.append([cores_per_node, cores_per_slot, slots_per_node])
if gpus_per_slot:
slots_per_node = min(slots_per_node,
int(m.floor(gpus_per_node / gpus_per_slot)))
tmp.append([gpus_per_node, gpus_per_slot, slots_per_node])
if lfs_per_slot:
slots_per_node = min(slots_per_node,
int(m.floor(lfs_per_node / lfs_per_slot)))
tmp.append([lfs_per_node, lfs_per_slot, slots_per_node])
if mem_per_slot:
slots_per_node = min(slots_per_node,
int(m.floor(mem_per_node / mem_per_slot)))
tmp.append([mem_per_node, mem_per_slot, slots_per_node])
if not mpi and req_slots > slots_per_node:
raise ValueError('non-mpi task does not fit on a single node:'
'%s * %s:%s > %s:%s -- %s > %s [%s %s] %s' % (req_slots,
cores_per_slot, gpus_per_slot,
cores_per_node, gpus_per_node, req_slots,
slots_per_node, cores_per_slot, gpus_per_slot, tmp))
# set conditions to find the first matching node
is_first = True
is_last = False
colo_tag = td['tags'].get('colocate')
if colo_tag is not None:
colo_tag = str(colo_tag)
# in case of PRTE LM: key `partition` from task description attribute
# `tags` represents a DVM ID
partition = td.get('tags', {}).get('partition')
if self._partitions and partition is not None:
partition = str(partition)
if partition not in self._partitions:
raise ValueError('partition id (%s) out of range' % partition)
# partition id becomes a part of a co-locate tag
colo_tag = partition + ('' if not colo_tag else '_%s' % colo_tag)
if colo_tag not in self._colo_history:
self._colo_history[colo_tag] = self._partitions[partition]
task_partition_id = None
# what remains to be allocated? all of it right now.
alc_slots = list()
rem_slots = req_slots
# start the search
for node in self._iterate_nodes():
node_id = node['node_id']
node_name = node['node_name']
self._log.debug_3('next %s : %s', node_id, node_name)
self._log.debug_3('req1: %s = %s + %s', req_slots, rem_slots,
len(alc_slots))
# Check if a task is tagged to use this node. This means we check
# - if a "colocate" tag exists
# - if a "colocate" tag has been used before
# - if the previous use included this node
# If a tag exists, continue to consider this node if the tag was
# used for this node - else continue to the next node.
if colo_tag is not None:
if colo_tag in self._colo_history:
if node_id not in self._colo_history[colo_tag]:
continue
# for a new tag check that nodes were not used for previous tags
else:
# `exclusive` -> not to share nodes between different tags
is_exclusive = td['tags'].get('exclusive', False)
if is_exclusive and node_id in self._tagged_nodes:
if len(self.nodes) > len(self._tagged_nodes):
continue
self._log.warn('not enough nodes for exclusive tags, ' +
'switched "exclusive" flag to "False"')
node_partition_id = None
if self._partitions:
# nodes assigned to the task should be from the same partition
# FIXME: handle the case when unit (MPI task) would require
# more nodes than the amount available per partition
_skip_node = True
for plabel, p_node_ids in self._partitions.items():
if node_id in p_node_ids:
if task_partition_id in [None, plabel]:
node_partition_id = plabel
_skip_node = False
break
if _skip_node:
continue
# if only a small set of cores/gpus remains unallocated (i.e., less
# than node size), we are in fact looking for the last node. Note
# that this can also be the first node, for small tasks.
if rem_slots < slots_per_node:
is_last = True
if not mpi:
# non-mpi tasks are never partially allocated
partial = False
elif is_first or self._scattered or is_last:
# we allow partial nodes on the first and last node,
# and on any node if a 'scattered' allocation is requested.
partial = True
else:
partial = False
# now we know how many slots we still need at this point - but
# we only search up to node-size on this node. Duh!
find_slots = min(rem_slots, slots_per_node)
self._log.debug_3('find: %s', find_slots)
# under the constraints so derived, check what we find on this node
new_slots = self._find_resources(node = node,
find_slots = find_slots,
ranks_per_slot = ranks_per_slot,
cores_per_slot = cores_per_slot,
gpus_per_slot = gpus_per_slot,
lfs_per_slot = lfs_per_slot,
mem_per_slot = mem_per_slot,
partial = partial)
if not new_slots:
# this was not a match. If we are in 'scattered' mode, we just
# ignore this node. Otherwise we have to restart the search
# (continuity is broken)
if not self._scattered:
alc_slots = list()
rem_slots = req_slots
is_first = True
is_last = False
# try next node
continue
if node_partition_id is not None and task_partition_id is None:
task_partition_id = node_partition_id
# this node got a match, store away the found slots and continue
# search for remaining ones
rem_slots -= len(new_slots)
alc_slots.extend(new_slots)
self._log.debug_3('new slots: %s', pprint.pformat(new_slots))
self._log.debug_3('req2: %s = %s + %s <> %s', req_slots, rem_slots,
len(new_slots), len(alc_slots))
# we are young only once. kinda...
is_first = False
# or maybe don't continue the search if we have in fact enough!
if rem_slots == 0:
break
# if we did not find enough, there is not much we can do at this point
if rem_slots > 0:
return None # signal failure
slots = {'ranks' : alc_slots,
'partition_id': task_partition_id}
# if tag `colocate` was provided, then corresponding nodes should be
# stored in the tag history (if partition nodes were kept under this
# key before then it will be overwritten)
if colo_tag is not None and colo_tag != partition:
self._colo_history[colo_tag] = [node['node_id']
for node in slots['ranks']]
self._tagged_nodes.update(self._colo_history[colo_tag])
# this should be nicely filled out now - return
return slots
# ------------------------------------------------------------------------------