Staging Data

RADICAL-Pilot (RP) provides capabilities of moving data from its client side to its agent side (on the HPC platform) and back, and within the agent space, i.e., between sandboxes for session, pilots and tasks. To ensure that a task finds the data it needs on the HPC platform where it runs, RP provides a mechanism to stage the task’s input data before its execution. Further, RP allows users to stage the output data generated by tasks, back to their workstations or in any other file system location on the HPC platform. Users can then collect their data from that location after their RP application has terminated.

The rest of the tutorial shows how the user can describe data staging and what that means for the running application.

Note: Often, HPC platforms have a shared file system and that allows users not to define staging directives for the tasks. That is because: (i) tasks will be able to access data saved on the shared file system from all the compute nodes allocated to a pilot; and (ii) users can define the paths to those data directly in each task description, without the need to stage (i.e., copy/link) those data inside each task’s sandbox.

Staging directives

Format

The staging directives are specified using a dict type with the following form:

staging_directive = {'source': str, 'target': str, 'action': str, 'flags': int}
  • Source - data files (or directory) that need to be staged (see the section Locations);

  • Target - the target path for the staged data (see the section Locations);

  • Action - defines how the provided data should be staged (see the section Actions);

  • Flags - sets the options applied for a corresponding action (see the section Flags)

Locations

Source and Target locations can be given as strings or radical.utils.Url instances. Strings containing :// are converted into URLs, while strings without :// are considered absolute or relative paths, and are thus interpreted in the context of the client’s working directory (see the section Simplified directive format for examples).

Special URL schemas are relative to certain locations:

  • client:// - client’s working directory (./);

  • endpoint:// - root of the file system on the target platform;

  • resource:// - agent sandbox (radical.pilot.sandbox/) on the target platform;

  • session:// - session sandbox on the target platform (within the agent sandbox);

  • pilot:// - pilot sandbox on the target platform (within the session sandbox);

  • task:// - task sandbox on the target platform (within the pilot sandbox).

All locations are interpreted as directories, not as files. We treat each schema as a namespace (i.e., schema:// = schema:///) and not as a location qualified by a hostname. The hostname element of the URL is expected to be empty, and the path is always considered relative to the locations specified above (even though URLs usually don’t have a notion of relative paths).

endpoint:// is based on the filesystem_endpoint attribute of the platform config (see the tutorial RADICAL-Pilot Configuration System) and points to the file system accessible via that URL. Note that the notion of root depends on the access protocol and the providing service implementation.

We saw sandboxes in the Getting Started tutorial. Here we expand upon that initial introduction. The hierarchy of the sandboxes is the following:

<default_remote_workdir>/radical.pilot.sandbox/<session_sandbox_ID>/<pilot_sandbox_ID>/<task_sandbox_ID>

where default_remote_workdir is the attribute of the platform config (see the tutorial RADICAL-Pilot Configuration System) and, if it is not provided then the current directory is used ($PWD). Sandboxes for session, pilot and task are named with their unique IDs (uid).

Examples of the expanded locations:

# assumptions for the examples below
#   - client's working directory
#        /home/user
#   - agent's sandboxes hierarchy
#        /tmp/radical.pilot.sandbox/rp.session.0000/pilot.0000/task.0000

in : 'client:///tmp/input_data'
out: '/home/user/tmp/input_data'

in : 'task:///test.txt'
out: '/tmp/radical.pilot.sandbox/rp.session.0000/pilot.0000/task.0000/test.txt'

Actions

  • radical.pilot.TRANSFER (default) - remote file transfer from source URL to target URL;

  • radical.pilot.COPY - local file copy (i.e., not crossing host boundaries);

  • radical.pilot.MOVE - local file move;

  • radical.pilot.LINK - local file symlink.

Using appropriate data actions helps to improve the application runtime. It is known that I/O operations are expensive and can negatively impact the total execution time of an application. Thus, RP applications should be built considering that:

  • the most expensive I/O operations (TRANSFER, MOVE, COPY) should be applied for staging between the client:// location and corresponding paths on the target platform, since they will be performed outside of the allocated resources and will be no resources idling (pilot job is not launched at this moment);

  • task staging between sandboxes should minimize the usage of such actions as MOVE and COPY, and use the LINK action if possible, since these operations will be executed within the allocated resources.

In the example from the section Examples, we demonstrate that if all tasks have the same input data, then this data can be located in a shared space (e.g., staged to the pilot:// location) and be linked into each task’s sandbox (e.g., a link per input file within the task:// location).

Flags

Flags are set automatically, but a user also can set them explicitly.

  • radical.pilot.CREATE_PARENTS - create the directory hierarchy for targets on the fly;

  • radical.pilot.RECURSIVE - if source is a directory, handles it recursively.

Simplified directive format

RP gives some flexibility in the description of staging between the client side and the sandboxes for pilot and task. Thus, if a user provides just names (absolute or relative paths, e.g., names of files or directories), then RP expands them into corresponding directives.

  • If a string directive is a single path, then after expanding it, the source will be a provided path within the client:// location, while the target will be a base name from a provided path within the pilot:// or the task:// location for radical.pilot.PilotDescription or radical.pilot.TaskDescription respectively.

  • Having directional characters >, < within a string directive defines the direction of the staging between corresponding paths:

    • Input staging: source > target, the source defines a path within the client:// location, and the target defines a path within the pilot:// or the task:// location for radical.pilot.PilotDescription or radical.pilot.TaskDescription respectively.

    • Output staging: target < source (applied for radical.pilot.TaskDescription only), the source defines a path within the task:// location, and the target defines a path within the client:// location.

Examples of the staging directives being expanded:

radical.pilot.PilotDescription.input_staging

in : [ '/tmp/input_data/' ]
out: [{'source' : 'client:///tmp/input_data',
       'target' : 'pilot:///input_data',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS|radical.pilot.RECURSIVE}]
in : [ 'input.dat > staged.dat' ]
out: [{'source' : 'client:///input.dat',
       'target' : 'pilot:///staged.dat',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS}]

radical.pilot.TaskDescription.input_staging

in : [ '/tmp/task_input.txt' ]
out: [{'source' : 'client:///tmp/task_input.txt',
       'target' : 'task:///task_input.txt',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS}]

radical.pilot.TaskDescription.output_staging

in : [ 'collected.dat < output.txt' ]
out: [{'source' : 'task:///output.txt',
       'target' : 'client:///collected.dat',
       'action' : radical.pilot.TRANSFER,
       'flags'  : radical.pilot.CREATE_PARENTS}]

Examples

Note: In these examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP’s reporter does not work well within a notebook. You could use the reporter’s progression bar when executing your RP application as a standalone Python script.

[1]:
%env RADICAL_REPORT_ANIME=FALSE
env: RADICAL_REPORT_ANIME=FALSE
[2]:
import radical.pilot as rp
import radical.utils as ru
[3]:
session = rp.Session()
pmgr    = rp.PilotManager(session=session)
tmgr    = rp.TaskManager(session=session)

For this example, create a new directory input_dir within the current working directory, and place a file into this directory. That file will be the input data for every task (this input file is referred in the radical.pilot.TaskDescription.arguments attribute).

Warning: You need to ensure that the directory, where your script will create the data for staging, is writable. Also, you are responsible to cleanup that data after it is staged.

[4]:
import os

input_dir = os.path.join(os.getcwd(), 'input_dir')
os.makedirs(input_dir, exist_ok=True)

with open(input_dir + '/input.txt', 'w') as f:
    f.write('Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)')

You will stage the newly created directory input_dir with all its files into the pilot:// location.

Note: If provided path for input_staging is not an absolute path, then RP will look for it within the current working directory. Using absolute paths will guarantee that the staging data will be located correctly.

[5]:
# Staging directives for the pilot.

pd = rp.PilotDescription({
    'resource'     : 'local.localhost',
    'cores'        : 2,
    'runtime'      : 15,
    'input_staging': [input_dir],
    'exit_on_error': False
})

# The staging directive above lists a single directory name.
# This will automatically be expanded to:
#
#    {'source' : 'client:///input_dir',
#     'target' : 'pilot:///input_dir',
#     'action' : rp.TRANSFER,
#     'flags'  : rp.CREATE_PARENTS|rp.RECURSIVE}

pilot = pmgr.submit_pilots(pd)
tmgr.add_pilots(pilot)

Note: You can define input data staging for a pilot within the radical.pilot.PilotDescription object or as an input parameter in the radical.pilot.Pilot.stage_in() method. Importantly, you can only use the radical.pilot.Pilot.stage_out() method to define output data staging.

For each task we define directives for input and output staging. We link the file input.txt to the task’s sandbox before executing that task. After its execution, the task produces an output file, which we copy to the pilot sandbox.

[6]:
# Staging directives for tasks.

N_TASKS = 2
tds     = []  # list of task descriptions
outputs = []  # list of file names, which are tasks' outputs

for idx in range(N_TASKS):
    output = 'output.%d.txt' % idx

    td = rp.TaskDescription({
        'executable'    : '/bin/echo',
        'arguments'     : ['$(cat input.txt)'],
        'stdout'        : output,
        # link file from the pilot sandbox to the task sandbox
        'input_staging' : [{'source': 'pilot:///input_dir/input.txt',
                            'target': 'task:///input.txt',
                            'action': rp.LINK}],
        # copy task's output file to the pilot sandbox
        'output_staging': [{'source': 'task:///%s'  % output,
                            'target': 'pilot:///%s' % output,
                            'action': rp.COPY}]
    })

    tds.append(td)
    outputs.append(output)

tmgr.submit_tasks(tds)
tmgr.wait_tasks()
[6]:
['DONE', 'DONE']

You can perform output data staging even after the pilot runtime has finished (i.e., pilot.state=DONE), but always before closing your session object.

[7]:
# Staging data from the pilot sandbox to the client working directory

pilot.stage_out([{'source': 'pilot:///%s'             % output,
                  'target': 'client:///output_dir/%s' % output,
                  'action': rp.TRANSFER} for output in outputs])
[7]:
['/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/stable/docs/source/tutorials/output_dir/output.0.txt',
 '/home/docs/checkouts/readthedocs.org/user_builds/radicalpilot/checkouts/stable/docs/source/tutorials/output_dir/output.1.txt']
[8]:
!cat output_dir/*
Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)
Staged data (task_id=$RP_TASK_ID | pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)
[9]:
session.close(cleanup=True)