Application Level Scheduling

RADICAL-Pilot (RP) by default uses its internal scheduler to efficiently place tasks on the available cluster resources. Some use cases however require more fine-grained and/or explicit control over task placement. RP supports that functionality with application level scheduling. In that case the pilot will report about the available nodes and resources, but will leave it to the application to assign resources to the tasks. A number of API functions are provided to simplify the development of such application level schedulers, and this tutorial will demonstrate their use.

Note: At the moment it is not possible to mix application level scheduling and RP’s internal scheduling in the same session. Note: Application level scheduling is only supported for executable tasks, RAPTOR tasks will ignore any related settings. Note: The outputs of the various cells above may differ, depending on your localhost’s hardware configuration.

Notation

The following terms will be used throughout this tutorial:

  • task: an executable piece of work comprised of one or more processes, all running the same executable on a dedicated set of resources.

  • rank: one of the processes which comprise a running task. The term rank is frequently used for MPI applications, but we use it generically for any task which uses multiprocessing.

  • slot: the set of resources which are assigned to a single rank, i.e., to a single task process. Note that each rank can utilize multiple cores and/or GPUs, usually by support of libraries and frameworks such as OpenMP, CUDA, OpenCL etc.

  • occupation: the portion of a resource assigned to a rank. For example, two ranks could share a GPU, and then each of the ranks would get occupation=0.5 assigned for that GPU.

Setup

We will first start a normal RP session and submit a pilot, and wait until that pilot becomes active (~15 seconds)

[1]:
import time
import copy
import pprint

import radical.pilot as rp
import radical.utils as ru
[2]:
session = rp.Session()

pmgr = rp.PilotManager(session)
tmgr = rp.TaskManager(session)

pilot = pmgr.submit_pilots(rp.PilotDescription(
    {'resource': 'local.localhost',
     'runtime' : 60,
     'nodes'   : 4}))

tmgr.add_pilots(pilot)
pilot.wait([rp.PMGR_ACTIVE, rp.FAILED])

assert pilot.state == rp.PMGR_ACTIVE

print('pilot is active (%s)' % pilot.state)
pilot is active (PMGR_ACTIVE)

Pilot Resources

We then inspect the pilot’s resources by retrieving its nodelist. The nodelist (type: rp.NodeList) has the following attributes:

  • uniform: boolean, indicates if the nodes have identical resources

  • cores_per_node, gpus_per_node, mem_per_node, lfs_per_node: amount of resources per node. Those attributes will be None for non-uniform nodelists.

  • nodes: the actual list of nodes.

Let’s inspect one of the nodes (nodeslist.nodes[0]). A node in the nodelist has the type rp.NodeResource with the following attributes:

  • index: unique node identifier used within RP

  • name: hostname (does not need to be unique!)

  • mem: available memory (in MB)

  • lfs: available disk storage (in MB)

  • cores: available CPU cores and their occupation

  • gpus: available GPUs and their occupation.

The core and gpu information are constructed of an integer (the resource index) and a float (the resource occupation where 0.0 is not used and 1.0 is fully used).

[3]:
# inspect the nodelist
nodelist = pilot.nodelist
print('#nodes : ', len(nodelist.nodes))
print('uniform: ', nodelist.uniform)
print('cpn    : ', nodelist.cores_per_node)
print('gpn    : ', nodelist.gpus_per_node)

# inspect one node
node = nodelist.nodes[0]
pprint.pprint(node.as_dict())
#nodes :  4
uniform:  True
cpn    :  8
gpn    :  1
{'cores': [{'index': 0, 'occupation': 0.0},
           {'index': 1, 'occupation': 0.0},
           {'index': 2, 'occupation': 0.0},
           {'index': 3, 'occupation': 0.0},
           {'index': 4, 'occupation': 0.0},
           {'index': 5, 'occupation': 0.0},
           {'index': 6, 'occupation': 0.0},
           {'index': 7, 'occupation': 0.0}],
 'gpus': [{'index': 0, 'occupation': 0.0}],
 'index': 0,
 'lfs': 1024,
 'mem': 4096,
 'name': 'localhost'}

Defining Slots for Task Ranks

Based on that information we can now define slots, i.e., sets of resources for task ranks to run on. The slot for the simplest possible task (in RP) would just allocate one core. Let’s create two slots (core ID 3 and 4) for a two-ranked task which runs radical-pilot-hello.sh - that is a script which will report the resources used by the task:

[4]:
slot_1 = rp.Slot(node_index=0, node_name='localhost', cores=[3])
slot_2 = rp.Slot(node_index=0, node_name='localhost', cores=[4])

td_1 = rp.TaskDescription({'executable': 'radical-pilot-hello.sh',
                           'arguments' : [1],
                           'ranks'     : 2,
                           'slots'     : [slot_1, slot_2]})  # <--- this is new

task_1 = tmgr.submit_tasks(td_1)
tmgr.wait_tasks(uids=task_1.uid)

pprint.pprint(task_1.slots)
print(task_1.stdout)
[{'cores': [{'index': 3, 'occupation': 1.0}],
  'gpus': [],
  'lfs': 0,
  'mem': 0,
  'node_index': 0,
  'node_name': 'localhost',
  'version': 1},
 {'cores': [{'index': 4, 'occupation': 1.0}],
  'gpus': [],
  'lfs': 0,
  'mem': 0,
  'node_index': 0,
  'node_name': 'localhost',
  'version': 1}]
1 : PID     : 6999
1 : NODE    : build-28471584-project-13481-radicalpilot
1 : CPUS    : 00
1 : GPUS    :
1 : RANK    : 1
1 : THREADS : 1
1 : SLEEP   : 1
0 : PID     : 7000
0 : NODE    : build-28471584-project-13481-radicalpilot
0 : CPUS    : 00
0 : GPUS    :
0 : RANK    : 0
0 : THREADS : 1
0 : SLEEP   : 1

We can check the task’s rankfile to see if the slot settings were respected:

Note: a rankfile will only be available when mpiexec is configured as launch method and -rf is a supported option on your system.

[5]:
sbox = ru.Url(task_1.sandbox).path
tid  = task_1.uid
!cat {sbox}/{tid}.rf
cat: /home/docs/radical.pilot.sandbox/rp.session.4d663960-4693-11f0-9e01-764078d63b85/pilot.0000/task.000000//task.000000.rf: No such file or directory

Scheduling Helpers

A simpler way to obtain task slots is to let the node’s NodeResource.find_slot(rp.RankRequirements) method find it for you. That will return a viable slot, or None if the requested resources are not available at this moment.

[6]:
rr = rp.RankRequirements(n_cores=1)
slot_3 = node.find_slot(rr)

assert slot_3
pprint.pprint(slot_3.as_dict())
{'cores': [{'index': 0, 'occupation': 1.0}],
 'gpus': [],
 'lfs': 0,
 'mem': 0,
 'node_index': 0,
 'node_name': 'localhost',
 'version': 1}

If we now check the node, we will see that the resource occupation of the first core changed.

[7]:
pprint.pprint(node.as_dict())
{'cores': [{'index': 0, 'occupation': 1.0},
           {'index': 1, 'occupation': 0.0},
           {'index': 2, 'occupation': 0.0},
           {'index': 3, 'occupation': 0.0},
           {'index': 4, 'occupation': 0.0},
           {'index': 5, 'occupation': 0.0},
           {'index': 6, 'occupation': 0.0},
           {'index': 7, 'occupation': 0.0}],
 'gpus': [{'index': 0, 'occupation': 0.0}],
 'index': 0,
 'lfs': 1024,
 'mem': 4096,
 'name': 'localhost'}

We can also allocate the slots we manually created before so that later calls to find_slot will take that information into account (after use, one should deallocate the slots again!).

[8]:
node.allocate_slot(slot_1)
node.allocate_slot(slot_2)

pprint.pprint(node.as_dict())
{'cores': [{'index': 0, 'occupation': 1.0},
           {'index': 1, 'occupation': 0.0},
           {'index': 2, 'occupation': 0.0},
           {'index': 3, 'occupation': 1.0},
           {'index': 4, 'occupation': 1.0},
           {'index': 5, 'occupation': 0.0},
           {'index': 6, 'occupation': 0.0},
           {'index': 7, 'occupation': 0.0}],
 'gpus': [{'index': 0, 'occupation': 0.0}],
 'index': 0,
 'lfs': 1024,
 'mem': 4096,
 'name': 'localhost'}
[9]:
# deallocate all used slots
node.deallocate_slot(slot_1)
node.deallocate_slot(slot_2)
node.deallocate_slot(slot_3)

pprint.pprint(node.as_dict())
{'cores': [{'index': 0, 'occupation': 0.0},
           {'index': 1, 'occupation': 0.0},
           {'index': 2, 'occupation': 0.0},
           {'index': 3, 'occupation': 0.0},
           {'index': 4, 'occupation': 0.0},
           {'index': 5, 'occupation': 0.0},
           {'index': 6, 'occupation': 0.0},
           {'index': 7, 'occupation': 0.0}],
 'gpus': [{'index': 0, 'occupation': 0.0}],
 'index': 0,
 'lfs': 1024,
 'mem': 4096,
 'name': 'localhost'}

To allocate a larger set of slots, for example for multi-rank tasks, RP can search the node list itself for available resources. That search might return slots which are distributed across all nodes. For example, the call below will allocate the resources for 4 ranks where each rank uses 4 cores and half a GPU (2 ranks can share one GPU). As the GPU is the limiting resource in this scenario, we will be able to place at most 2 ranks per node:

[10]:
rr = rp.RankRequirements(n_cores=4, n_gpus=1, gpu_occupation=0.5)
slots = nodelist.find_slots(rr, n_slots=4)

for slot in slots:
    print('index:', slot.node_index, 'cores:', slot.cores, 'gpus:', slot.gpus)
index: 0 cores: [RO: 0:1.00, RO: 1:1.00, RO: 2:1.00, RO: 3:1.00] gpus: [RO: 0:0.50]
index: 0 cores: [RO: 4:1.00, RO: 5:1.00, RO: 6:1.00, RO: 7:1.00] gpus: [RO: 0:0.50]
index: 1 cores: [RO: 0:1.00, RO: 1:1.00, RO: 2:1.00, RO: 3:1.00] gpus: [RO: 0:0.50]
index: 1 cores: [RO: 4:1.00, RO: 5:1.00, RO: 6:1.00, RO: 7:1.00] gpus: [RO: 0:0.50]
[11]:
nodelist.release_slots(slots)

Application Level Scheduling

With the above tools, a simple implementation of an application level scheduler would be:

[12]:
# ------------------------------------------------------------------------------
#
class AppScheduler(object):

    def __init__(self, tmgr, nodelist):
        '''
        tmgr    : task manager which handles execution of tasks
        nodelist: resources to be used for task execution
        '''
        self._tmgr            = tmgr
        self._nodelist        = nodelist
        self._running_tasks   = dict()
        self._completed_tasks = list()
        self._slots           = dict()

        self._tmgr.register_callback(self._state_cb)

    def _state_cb(self, task, state):
        '''handle task state transitions, specifically for final states'''
        if state in rp.FINAL and task.uid in self._running_tasks:
            print('---> %s: %s' % (task.uid, state))
            assert state == rp.DONE
            self._nodelist.release_slots(self._slots[task.uid])
            del self._running_tasks[task.uid]
            self._completed_tasks.append(task)

    def wait_tasks(self, uids=None, timeout=None):
        '''wait for a set of (or all) tasks for a certain time (or forever)'''
        if not uids:
            uids = list(self._running_tasks.keys())
        print('waiting  : %s' % uids)
        states = self._tmgr.wait_tasks(uids=uids, timeout=timeout)
        return states

    def submit(self, tds):
        '''tds: list of rp.TaskDescriptions - list of tasks to run'''
        while tds:

            print('===========================================================')

            # find slots for all task descriptions
            allocated = list()
            not_allocated = list()

            for td in tds:
                rr = rp.RankRequirements(n_cores=td.cores_per_rank,
                                         n_gpus=td.gpus_per_rank,
                                         mem=td.mem_per_rank,
                                         lfs=td.lfs_per_rank)
                slots = nodelist.find_slots(rr, n_slots=td.ranks)
                if slots:
                    # this task can be submitted
                    td.slots = slots
                    allocated.append(td)
                    self._slots[td.uid] = slots
                else:
                    # this task has to be retries later on
                    not_allocated.append(td)

            # submit all tasks for which resources were found
            submitted = tmgr.submit_tasks(allocated)
            for task in submitted:
                self._running_tasks[task.uid] = task
                self._running_tasks[task.uid] = task

            print('submitted: %s' % [task.uid for task in allocated])
            print('pending  : %s' % [td.uid   for td   in not_allocated])

            if not_allocated:

                if not self._running_tasks:
                    # no tasks are running - the remaining tasks will never be
                    # able to run, so we have to give up
                    td = not_allocated[0]
                    pprint.pprint(td.as_dict())
                    for node in self._nodelist.nodes:
                        pprint.pprint(node.as_dict())
                    raise ValueError('can never allocate %s' % td.uid)

                # could not submit all tasks - wait for resources to become available
                # on and attempt to schedule the remaining tasks in the next iteration
                while True:
                    states = self.wait_tasks(timeout=5.0)

                    # if we got any free resources, try to schedule more tasks
                    if rp.DONE in states or rp.FAILED in states:
                        break

            time.sleep(3)
            tds = not_allocated


# ------------------------------------------------------------------------------
#
tds = list()
for i in range(16):
    td = {'executable'     : 'sleep',
          'arguments'      : [2],
          'ranks'          : i + 1,   # submit larger and larger tasks
          'cores_per_rank' : 1,
          'uid'            : 't.%02d' % i}
    tds.append(rp.TaskDescription(td))

scheduler = AppScheduler(tmgr, nodelist)
scheduler.submit(tds)
scheduler.wait_tasks()

print('=== all tasks completed ===')

===========================================================
submitted: ['t.00', 't.01', 't.02', 't.03', 't.04', 't.05', 't.06']
pending  : ['t.07', 't.08', 't.09', 't.10', 't.11', 't.12', 't.13', 't.14', 't.15']
waiting  : ['t.00', 't.01', 't.02', 't.03', 't.04', 't.05', 't.06']
===========================================================
submitted: ['t.07', 't.08', 't.09']
pending  : ['t.10', 't.11', 't.12', 't.13', 't.14', 't.15']
waiting  : ['t.07', 't.08', 't.09']
===========================================================
submitted: ['t.10', 't.11']
pending  : ['t.12', 't.13', 't.14', 't.15']
waiting  : ['t.10', 't.11']
===========================================================
submitted: ['t.12', 't.13']
pending  : ['t.14', 't.15']
waiting  : ['t.12', 't.13']
===========================================================
submitted: ['t.14', 't.15']
pending  : []
waiting  : []
=== all tasks completed ===

Use Case 1: Pilot Partitions

Use the application level scheduler to localize certain tasks on one set of nodes (pilot partition), and other tasks on a different set of nodes:

[13]:
nodelist_1 = copy.deepcopy(nodelist)
nodelist_2 = copy.deepcopy(nodelist)

nodelist_1.nodes = nodelist_1.nodes[:2]  # first two nodes
nodelist_2.nodes = nodelist_1.nodes[2:]  # remaining nodes

scheduler_1 = AppScheduler(tmgr, nodelist_1)
scheduler_2 = AppScheduler(tmgr, nodelist_2)

Use Case 2: Virtual Nodes

Use the API to splice nodes into virtual nodes, creating virtual, heterogeneous node list (in the future, we might add APIs to simplify node splicing). Consider the node type we have here:

{'cores': [{'index': 0, 'occupation': 0.0},
           {'index': 1, 'occupation': 0.0},
           {'index': 2, 'occupation': 0.0},
           {'index': 3, 'occupation': 0.0},
           {'index': 4, 'occupation': 0.0},
           {'index': 5, 'occupation': 0.0},
           {'index': 6, 'occupation': 0.0},
           {'index': 7, 'occupation': 0.0},
           {'index': 8, 'occupation': 0.0},
           {'index': 9, 'occupation': 0.0},
           {'index': 10, 'occupation': 0.0},
           {'index': 11, 'occupation': 0.0}],
 'gpus': [{'index': 0, 'occupation': 0.0}],
 'index': 0,
 'lfs': 1000000,
 'mem': 65536,
 'name': 'localhost'}

We could splice that into a GPU node with 2 cores and 1 GPU, and a CPU-only node with the remaining 10 cores:

[{'cores': [{'index': 0, 'occupation': 0.0},
            {'index': 1, 'occupation': 0.0}],
  'gpus': [{'index': 0, 'occupation': 0.0}],
  'index': 0,
  'lfs': 1000000,
  'mem': 65536,
  'name': 'localhost'},

 {'cores': [{'index': 2, 'occupation': 0.0},
            {'index': 3, 'occupation': 0.0},
            {'index': 4, 'occupation': 0.0},
            {'index': 5, 'occupation': 0.0},
            {'index': 6, 'occupation': 0.0},
            {'index': 7, 'occupation': 0.0},
            {'index': 8, 'occupation': 0.0},
            {'index': 9, 'occupation': 0.0},
            {'index': 10, 'occupation': 0.0},
            {'index': 11, 'occupation': 0.0}],
  'index': 1,
  'lfs': 1000000,
  'mem': 65536,
  'name': 'localhost'}]

When splicing all nodes of a cluster allocation, we end up with two virtual allocations, one GPU focused and one CPU focused, and certain task scheduling policies with GPU awareness become almost trivial. One could also, for example, splice nodes at their NUMA domains to trivially get a NUMA-aware scheduler (some care needs to be applied to lfs and mem resource slicing).

Use Case 3: NUMA Domains

Similarly, we can splice nodes into individual NUMA-Domains, thus implementing a simple form of NUMA-aware scheduling. As that is a common use case, RP provides a NumaNode class to simplify that use case:

Assume we have the following node layout with 8 cores and 2 GPUs:

[14]:
node = rp.Node({'name'    : 'localhost',
                'cores'   : [rp.RO(index=x, occupation=rp.FREE) for x in range(8)],
                'gpus'    : [rp.RO(index=x, occupation=rp.FREE) for x in range(2)]})

pprint.pprint(node.as_dict())
{'cores': [{'index': 0, 'occupation': 0.0},
           {'index': 1, 'occupation': 0.0},
           {'index': 2, 'occupation': 0.0},
           {'index': 3, 'occupation': 0.0},
           {'index': 4, 'occupation': 0.0},
           {'index': 5, 'occupation': 0.0},
           {'index': 6, 'occupation': 0.0},
           {'index': 7, 'occupation': 0.0}],
 'gpus': [{'index': 0, 'occupation': 0.0}, {'index': 1, 'occupation': 0.0}],
 'index': 0,
 'lfs': None,
 'mem': None,
 'name': 'localhost'}

Assume further, that the node is divided into two NUMA domains. That could be expressed as follows:

[15]:
ndm = rp.NumaDomainMap({0: rp.NumaDomain(cores=range( 0, 4), gpus=[0]),
                        1: rp.NumaDomain(cores=range( 4, 8), gpus=[1])})

numa_node = rp.NumaNode(node, ndm)
pprint.pprint(numa_node.as_dict())
{'index': 0,
 'lfs': None,
 'mem': None,
 'name': 'localhost',
 'numa_domains': {0: {'cores': [{'index': 0, 'occupation': 0.0},
                                {'index': 1, 'occupation': 0.0},
                                {'index': 2, 'occupation': 0.0},
                                {'index': 3, 'occupation': 0.0}],
                      'gpus': [{'index': 0, 'occupation': 0.0}],
                      'index': 0,
                      'lfs': None,
                      'mem': None,
                      'name': 'localhost.0'},
                  1: {'cores': [{'index': 4, 'occupation': 0.0},
                                {'index': 5, 'occupation': 0.0},
                                {'index': 6, 'occupation': 0.0},
                                {'index': 7, 'occupation': 0.0}],
                      'gpus': [{'index': 1, 'occupation': 0.0}],
                      'index': 0,
                      'lfs': None,
                      'mem': None,
                      'name': 'localhost.1'}}}

We can now schedule NUMA-agnostic and NUMA-aware ranks on that node. Note that the second slot will use the GPU on the second NUMA domain as the first NUMA domain does not have sufficient cores left.

[16]:
slot_1 = node.find_slot(rp.RankRequirements(n_cores=3))
slot_2 = numa_node.find_slot(rp.RankRequirements(n_cores=2, n_gpus=1, numa=True))
print(slot_1)
print(slot_2)

pprint.pprint(numa_node.as_dict())
{'cores': [RO: 0:1.00, RO: 1:1.00, RO: 2:1.00], 'gpus': [], 'lfs': 0, 'mem': 0, 'node_index': 0, 'node_name': 'localhost', 'version': 1}
{'cores': [RO: 4:1.00, RO: 5:1.00], 'gpus': [RO: 1:1.00], 'lfs': 0, 'mem': 0, 'node_index': 0, 'node_name': 'localhost.1', 'version': 1}
{'index': 0,
 'lfs': None,
 'mem': None,
 'name': 'localhost',
 'numa_domains': {0: {'cores': [{'index': 0, 'occupation': 1.0},
                                {'index': 1, 'occupation': 1.0},
                                {'index': 2, 'occupation': 1.0},
                                {'index': 3, 'occupation': 0.0}],
                      'gpus': [{'index': 0, 'occupation': 0.0}],
                      'index': 0,
                      'lfs': None,
                      'mem': None,
                      'name': 'localhost.0'},
                  1: {'cores': [{'index': 4, 'occupation': 1.0},
                                {'index': 5, 'occupation': 1.0},
                                {'index': 6, 'occupation': 0.0},
                                {'index': 7, 'occupation': 0.0}],
                      'gpus': [{'index': 1, 'occupation': 1.0}],
                      'index': 0,
                      'lfs': None,
                      'mem': None,
                      'name': 'localhost.1'}}}