Using Multiple Pilots

RADICAL-Pilot supports managing multiple pilots during a single run, while workload (bag of computing tasks) will be distributed among all available pilots. radical.pilot.TaskManager dispatches tasks to pilots according to a scheduling algorithm (radical.pilot.SCHEDULER_ROUND_ROBIN, radical.pilot.SCHEDULER_BACKFILLING), which is provided during its initialization, e.g., TaskManager(session=session, scheduler=SCHEDULER_BACKFILLING).

Note: RADICAL-Pilot enacts a second scheduling step within each launched pilot: once the Agent takes ownership of tasks, which the radical.pilot.TaskManager scheduler assigned to it, the Agent Scheduler will place the tasks on the set of resources (CPU cores, GPUs) that the Agent is managing. The Agent Scheduler can be configured via resource configuration files (see the tutorial RADICAL-Pilot Configuration System)

  • Round-Robin Scheduler, RR (SCHEDULER_ROUND_ROBIN, default) fairly distributes arriving tasks over the set of known pilots, independent of task state, expected workload, pilot state or pilot lifetime. As such, RR is a fairly simplistic, but also a very fast scheduler, which does not impose any additional communication round trips between radical.pilot.TaskManager and pilot agents.

  • Backfilling Scheduler, BF (SCHEDULER_BACKFILLING) provides a better load balancing, but at the cost of additional communication round trips. It depends on the actual application workload if that load balancing is beneficial or not. It is most beneficial for the large number of pilots and for relatively long-running tasks (e.g., the task runtime is significantly longer than the communication round trip time between radical.pilot.TaskManager and pilot agents).

    • It is not recommended to use BF for: (i) a single pilot, and/or (ii) many short-running tasks.

    • BF will only dispatch tasks to pilot agents once the pilot agent is in the radical.pilot.PMGR_ACTIVE state. The tasks will thus get executed even if one of the pilots never reaches that state - the load will be distributed between pilots which become active (radical.pilot.PMGR_ACTIVE).

    • BF will only dispatch as many tasks to the Agent as they can be executed concurrently. No tasks will be waiting in the Agent Scheduler queue. BF will react on task termination events, and will then backfill (!) the Agent with any remaining tasks. The Agent will remain under-utilized during that communication.

Examples

Note: In our examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP’s reporter does not work within a notebook. You could use it when executing an RP application as a standalone Python script.

[1]:
%env RADICAL_REPORT_ANIME=False
env: RADICAL_REPORT_ANIME=False
[2]:
import radical.pilot as rp
import radical.utils as ru
[3]:
session = rp.Session()
pmgr    = rp.PilotManager(session=session)
tmgr    = rp.TaskManager(session=session)  # -> rp.TaskManager(session=session, scheduler=rp.SCHEDULER_ROUND_ROBIN)

We run multiple pilots using a description for a “local” platform (local.localhost), which is provided by the built-in configuration file resource_local.json and mimics a configuration for CPU cores and GPUs.

Note: The amount of resources is arbitrary, and it does not reflect the number of physical resources available.

Note: Before running multiple pilots on a single HPC platform, ensure that its scheduling policy conforms the use case, since each pilot represents a batch job and HPC’s scheduling policy might limit the number of jobs running simultaneously per user. For the remote submission see the tutorial Using RADICAL-Pilot on HPC Platforms.

[4]:
pd0 = rp.PilotDescription({
    'resource': 'local.localhost',
    'cores'   : 4,
    'runtime' : 10
})
pd1 = rp.PilotDescription({
    'resource': 'local.localhost',
    'cores'   : 2,
    'runtime' : 5
})

pilots = pmgr.submit_pilots([pd0, pd1])

After pilots are submitted, they should be added to `radical.pilot.TaskManager for the task dispatching.

[5]:
tmgr.add_pilots(pilots)
tmgr.list_pilots()  # lists all pilots available for TaskManager
[5]:
['pilot.0000', 'pilot.0001']
[6]:
N_TASKS = 4
tds     = []  # list of task descriptions

for _ in range(N_TASKS):

    td = rp.TaskDescription({
        'executable': '/bin/echo',
        'arguments' : ['pilot_id=$RP_PILOT_ID']
    })

    tds.append(td)

tasks = tmgr.submit_tasks(tds)
tmgr.wait_tasks()
[6]:
['DONE', 'DONE', 'DONE', 'DONE']

Each task was assigned to a corresponding pilot according to a default scheduling algorithm radical.pilot.SCHEDULER_ROUND_ROBIN, but it is possible to assign a task to a particular pilot explicitly with the radical.pilot.TaskDescription.pilot attribute. Thus, in the example below, we submit another pilot and assign a new task to it.

[7]:
pd3 = rp.PilotDescription({
    'resource': 'local.localhost',
    'cores'   : 2,
    'runtime' : 5
})

pilot = pmgr.submit_pilots(pd3)
tmgr.add_pilots(pilot)

td = rp.TaskDescription()
td.executable = '/bin/echo'
td.arguments  = ['task is assigned to $RP_PILOT_ID']
td.pilot      = pilot.uid

task = tmgr.submit_tasks(td)
tmgr.wait_tasks(uids=task.uid)

tasks.append(task)

If the task is assigned to a pilot, which wasn’t added to radical.pilot.TaskManager, then that pilot is unknown for the task dispatching, and the task is pending for a corresponding pilot to arrive. In the example below, we set a timeout for the radical.pilot.TaskManager.wait_tasks() method, since by default, that method waits for tasks reaching their final state, but this task will have the state radical.pilot.TMGR_SCHEDULING until the session will be closed (since we will not submit an associated pilot).

[8]:
td = rp.TaskDescription()
td.executable = '/bin/echo'
td.arguments  = ['pilot_id=$RP_PILOT_ID']
td.pilot      = 'unknown_pilot_id'

task = tmgr.submit_tasks(td)
tmgr.wait_tasks(uids=task.uid, timeout=15)
[8]:
'NEW'

As a final step, we go through all tasks outputs to see which pilot was assigned to each task.

[9]:
for task in tasks:
    stdout = task.stdout.strip()[:35]
    print('%s - output: %-30s (in task description: pilot="%s")' %
          (task.uid, stdout, task.description['pilot']))
task.000000 - output: pilot_id=pilot.0000            (in task description: pilot="")
task.000001 - output: pilot_id=pilot.0001            (in task description: pilot="")
task.000002 - output: pilot_id=pilot.0000            (in task description: pilot="")
task.000003 - output: pilot_id=pilot.0001            (in task description: pilot="")
task.000004 - output: task is assigned to pilot.0002 (in task description: pilot="pilot.0002")
[10]:
session.close(cleanup=True)