\n",
"```\n",
"\n",
"where `default_remote_workdir` is the attribute of the platform config (see the tutorial [RADICAL-Pilot Configuration System](configuration.ipynb#User-defined-configuration)) and, if it is not provided then the current directory is used (`$PWD`). Sandboxes for session, pilot and task are named with their unique IDs (`uid`).\n",
"\n",
"Examples of the expanded locations:\n",
"\n",
"```shell\n",
"# assumptions for the examples below\n",
"# - client's working directory\n",
"# /home/user\n",
"# - agent's sandboxes hierarchy\n",
"# /tmp/radical.pilot.sandbox/rp.session.0000/pilot.0000/task.0000\n",
"\n",
"in : 'client:///tmp/input_data'\n",
"out: '/home/user/tmp/input_data'\n",
"\n",
"in : 'task:///test.txt'\n",
"out: '/tmp/radical.pilot.sandbox/rp.session.0000/pilot.0000/task.0000/test.txt'\n",
"```\n",
"\n",
"### Actions\n",
"\n",
"- `rp.TRANSFER` (__*default*__) - remote file transfer from `source` URL to `target` URL;\n",
"- `rp.COPY` - local file copy (i.e., not crossing host boundaries);\n",
"- `rp.MOVE` - local file move;\n",
"- `rp.LINK` - local file symlink.\n",
"\n",
"Using appropriate data actions helps to improve the application runtime. It is known that I/O operations are expensive and can negatively impact the total execution time of an application. Thus, RP applications should be built considering that:\n",
"\n",
"- the most expensive I/O operations (`TRANSFER`, `MOVE`, `COPY`) should be applied for staging between the `client://` location and corresponding paths on the target platform, since they will be performed outside the allocated resources and will be no resources idling (pilot job is not launched at this moment);\n",
"- task staging between sandboxes should minimize the usage of such actions as `MOVE` and `COPY`, and use the `LINK` action if possible, since these operations will be executed within the allocated resources.\n",
"\n",
"In the example from the section [Examples](#Examples), we demonstrate that if all tasks have the same input data, then this data can be located in a shared space (e.g., staged to the `pilot://` location) and be linked into each task's sandbox (e.g., a link per input file within the `task://` location).\n",
"\n",
"### Flags\n",
"\n",
"Flags are set automatically, but a user also can set them explicitly.\n",
"\n",
"- `rp.CREATE_PARENTS` - create the directory hierarchy for targets on the fly;\n",
"- `rp.RECURSIVE` - if `source` is a directory, handles it recursively.\n",
"\n",
"### Simplified directive format\n",
"\n",
"RP gives some flexibility in the description of staging between the client side and the sandboxes for pilot and task. Thus, if a user provides just names (absolute or relative paths, e.g., names of files or directories), then RP expands them into corresponding directives.\n",
"\n",
"- If a string directive is a single path, then after expanding it, the\n",
"_source_ will be a provided path within the `client://` location, while the\n",
"_target_ will be a base name from a provided path within the `pilot://` or\n",
"the `task://` location for [rp.PilotDescription](../apidoc.rst#radical.pilot.PilotDescription) or [rp.TaskDescription](../apidoc.rst#radical.pilot.TaskDescription) respectively.\n",
"- Having directional characters `>`, `<` within a string directive defines the direction of the staging between corresponding paths:\n",
"\n",
" - Input staging: `source > target`, the _source_ defines a path within the `client://` location, and the _target_ defines a path within the `pilot://` or the `task://` location for [rp.PilotDescription](../apidoc.rst#radical.pilot.PilotDescription) or [rp.TaskDescription](../apidoc.rst#radical.pilot.TaskDescription) respectively.\n",
" - Output staging: `target < source` (applied for [rp.TaskDescription](../apidoc.rst#radical.pilot.TaskDescription) only), the _source_ defines a path within the `task://` location, and the _target_ defines a path within the `client://` location.\n",
"\n",
"Examples of the staging directives being expanded:\n",
"\n",
"[rp.PilotDescription.input_staging](../apidoc.rst#radical.pilot.PilotDescription.input_staging)\n",
"```shell\n",
"in : [ '/tmp/input_data/' ]\n",
"out: [{'source' : 'client:///tmp/input_data',\n",
" 'target' : 'pilot:///input_data',\n",
" 'action' : rp.TRANSFER,\n",
" 'flags' : rp.CREATE_PARENTS|rp.RECURSIVE}]\n",
"in : [ 'input.dat > staged.dat' ]\n",
"out: [{'source' : 'client:///input.dat',\n",
" 'target' : 'pilot:///staged.dat',\n",
" 'action' : rp.TRANSFER,\n",
" 'flags' : rp.CREATE_PARENTS}]\n",
"```\n",
"\n",
"[rp.TaskDescription.input_staging](../apidoc.rst#radical.pilot.TaskDescription.input_staging)\n",
"```shell\n",
"in : [ '/tmp/task_input.txt' ]\n",
"out: [{'source' : 'client:///tmp/task_input.txt',\n",
" 'target' : 'task:///task_input.txt',\n",
" 'action' : rp.TRANSFER,\n",
" 'flags' : rp.CREATE_PARENTS}]\n",
"```\n",
"\n",
"[rp.TaskDescription.output_staging](../apidoc.rst#radical.pilot.TaskDescription.output_staging)\n",
"```shell\n",
"in : [ 'collected.dat < output.txt' ]\n",
"out: [{'source' : 'task:///output.txt',\n",
" 'target' : 'client:///collected.dat',\n",
" 'action' : rp.TRANSFER,\n",
" 'flags' : rp.CREATE_PARENTS}]\n",
"```\n",
"\n",
"## Examples\n",
"\n",
"\n",
"\n",
"__Note:__ In these examples, we will not show a progression bar while waiting for some operation to complete, e.g., while waiting for a pilot to stop. That is because the progression bar offered by RP's reporter does not work well within a notebook. You could use the reporter's progression bar when executing your RP application as a standalone Python script.\n",
"\n",
"
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%env RADICAL_REPORT=TRUE\n",
"%env RADICAL_REPORT_ANIME=FALSE"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import radical.pilot as rp\n",
"import radical.utils as ru"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"session = rp.Session()\n",
"pmgr = rp.PilotManager(session=session)\n",
"tmgr = rp.TaskManager(session=session)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For this example, create a new directory `input_dir` within the current working directory, and place a file into this directory. That file will be the input data for every task (this input file is referred in the [rp.TaskDescription.arguments](../apidoc.rst#radical.pilot.TaskDescription.arguments) attribute).\n",
"\n",
"\n",
"\n",
"__Warning:__ You need to ensure that the directory, where your script will create the data for staging, is writable. Also, you are responsible to clean up that data after it is staged.\n",
"\n",
"
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"input_dir = os.path.join(os.getcwd(), 'input_dir')\n",
"os.makedirs(input_dir, exist_ok=True)\n",
"\n",
"with open(input_dir + '/input.txt', 'w') as f:\n",
" f.write('Staged data (task_id=$RP_TASK_ID | '\n",
" 'pilot_id=$RP_PILOT_ID | session_id=$RP_SESSION_ID)')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You will stage the newly created directory `input_dir` with all its files into the `pilot://` location.\n",
"\n",
"\n",
"\n",
"__Note:__ If provided path for `input_staging` is not an absolute path, then RP will look for it within the current working directory. Using absolute paths will guarantee that the staging data will be located correctly.\n",
"\n",
"
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Staging directives for the pilot.\n",
"\n",
"pd = rp.PilotDescription({\n",
" 'resource' : 'local.localhost',\n",
" 'cores' : 2,\n",
" 'runtime' : 15,\n",
" 'input_staging': [input_dir],\n",
" 'exit_on_error': False\n",
"})\n",
"\n",
"# The staging directive above lists a single directory name.\n",
"# This will automatically be expanded to:\n",
"#\n",
"# {'source' : 'client:///input_dir',\n",
"# 'target' : 'pilot:///input_dir',\n",
"# 'action' : rp.TRANSFER,\n",
"# 'flags' : rp.CREATE_PARENTS|rp.RECURSIVE}\n",
"\n",
"pilot = pmgr.submit_pilots(pd)\n",
"tmgr.add_pilots(pilot)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
" \n",
"__Note:__ You can define input data staging for a pilot within the [rp.PilotDescription](../apidoc.rst#radical.pilot.PilotDescription) object or as an input parameter in the [rp.Pilot.stage_in()](../apidoc.rst#radical.pilot.Pilot.stage_in) method. Importantly, you can only use the [rp.Pilot.stage_out()](../apidoc.rst#radical.pilot.Pilot.stage_out) method to define output data staging.\n",
"\n",
"
\n",
"\n",
"For each task we define directives for input and output staging. We link the file `input.txt` to the task's sandbox before executing that task. After its execution, the task produces an output file, which we copy to the pilot sandbox."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Staging directives for tasks.\n",
"\n",
"N_TASKS = 2\n",
"tds = [] # list of task descriptions\n",
"outputs = [] # list of file names, which are tasks' outputs\n",
"\n",
"for idx in range(N_TASKS):\n",
" output = 'output.%d.txt' % idx\n",
"\n",
" td = rp.TaskDescription({\n",
" 'executable' : '/bin/echo',\n",
" 'arguments' : ['$(cat input.txt)'],\n",
" 'stdout' : output,\n",
" # link file from the pilot sandbox to the task sandbox\n",
" 'input_staging' : [{'source': 'pilot:///input_dir/input.txt',\n",
" 'target': 'task:///input.txt',\n",
" 'action': rp.LINK}],\n",
" # copy task's output file to the pilot sandbox\n",
" 'output_staging': [{'source': 'task:///%s' % output,\n",
" 'target': 'pilot:///%s' % output,\n",
" 'action': rp.COPY}]\n",
" })\n",
"\n",
" tds.append(td)\n",
" outputs.append(output)\n",
"\n",
"tmgr.submit_tasks(tds)\n",
"tmgr.wait_tasks()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can perform output data staging even after the pilot `runtime` has finished (i.e., `pilot.state=DONE`), but always before closing your `session` object."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Staging data from the pilot sandbox to the client working directory\n",
"\n",
"pilot.stage_out([{'source': 'pilot:///%s' % output,\n",
" 'target': 'client:///output_dir/%s' % output,\n",
" 'action': rp.TRANSFER} for output in outputs])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!cat output_dir/*"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"session.close()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}