{ "cells": [ { "cell_type": "markdown", "id": "67521807", "metadata": {}, "source": [ "# Describing Tasks\n", "\n", "The notion of tasks is fundamental in RADICAL-Pilot as tasks define the work to be executed on a [supported HPC platform](../supported.rst). This notebook will guide the user through the various task types available in RADICAL-Pilot, and how to specify their respective payload. It will also show some means to inspect tasks after (successful or failed) execution.\n", "\n", "
\n", " \n", "__Warning:__ We assume that you are familiar with deploying, configuring and using RADICAL-Pilot, for example by taking the [Getting Started](../getting_started.ipynb) introduction tutorial.\n", "\n", "
\n", "\n", "
\n", "\n", "__Warning:__ All examples in this notebook are executed locally on a GNU/Linux host. The host needs to have MPI installed - OpenMPI, MPICH, MVAPICH or any other MPI flavor is supported as long as it provides a standard compliant `mpiexec` command. See the documentation of your GNU/Linux distribution on how to install MPI.\n", "\n", "
\n", "\n", "Let's have a quick check that we have MPI launch method installed. " ] }, { "cell_type": "code", "execution_count": null, "id": "567c0f26-3e35-44d3-a81a-ab89a79a3dcd", "metadata": {}, "outputs": [], "source": [ "import radical.utils as ru\n", "\n", "mpi_lm_exists = bool(ru.which(['mpirun', 'mpiexec']))" ] }, { "cell_type": "markdown", "id": "ec11a436-b9a8-4706-a126-d5f1ba19bd41", "metadata": {}, "source": [ "First, some preparatory work for the tutorial. We import some modules and set some variables. Note that we `import radical.pilot as rp` so to abbreviate future API calls. " ] }, { "cell_type": "code", "execution_count": null, "id": "c8b8387d", "metadata": {}, "outputs": [], "source": [ "import os\n", "import sys\n", "import pprint\n", "\n", "# do not use animated output in notebooks\n", "os.environ['RADICAL_REPORT_ANIME'] = 'FALSE'\n", "\n", "import radical.pilot as rp\n", "\n", "# determine the path of the currently active virtualenv to simplify some examples below\n", "ve_path = os.path.dirname(os.path.dirname(ru.which('python3')))" ] }, { "cell_type": "markdown", "id": "97ab1560", "metadata": {}, "source": [ "## Initial setup and Pilot Submission\n", "\n", "As shown in the introductory tutorials, we will configure the reporter output, set up a RADICAL-Pilot session, create pilot and task manager instances, and run a small local pilot with 10 cores assigned to it." ] }, { "cell_type": "code", "execution_count": null, "id": "7e4566d0", "metadata": {}, "outputs": [], "source": [ "# create session and managers\n", "session = rp.Session()\n", "pmgr = rp.PilotManager(session)\n", "tmgr = rp.TaskManager(session)\n", "\n", "# submit a pilot\n", "pilot = pmgr.submit_pilots(\n", " rp.PilotDescription({'resource' : 'local.localhost',\n", " 'cores' : 32,\n", " 'runtime' : 60,\n", " 'exit_on_error': False}))\n", "\n", "# add the pilot to the task manager and wait for the pilot to become active\n", "tmgr.add_pilots(pilot)\n", "pilot.wait(rp.PMGR_ACTIVE)\n", "\n", "# configure reporter output \n", "report = ru.Reporter(name='radical.pilot')\n", "report.title('Tutorial: Describing Tasks (RP version %s)' % rp.version)\n", "report.info('Session ID : %s\\n' % session.uid)\n", "report.info('Pilot state: %s\\n' % pilot.state)" ] }, { "cell_type": "markdown", "id": "1ce411cb", "metadata": {}, "source": [ "## Task execution\n", "\n", "At this point we have the system set up and ready to execute our workload. To do so we describe the tasks of which the workload is comprised and submit them for execution. The goal of this tutorial is to introduce the various attributes available for describing tasks, to explain the execution process in some detail, and to describe how completed or failed tasks can be inspected.\n", "\n", "### RP Executable Tasks vs. RAPTOR Tasks\n", "\n", "RADICAL-Pilot is, in the most general sense, a pilot-based task execution backend. Its implementation focuses on *executable* tasks, i.e., on tasks which are described by an executable, it's command line arguments, in- and output files, and by its execution environment. \n", "\n", "A more general task execution engine called 'RAPTOR' is additionally provided as part of RADICAL-Pilot. RAPTOR can additionally execute *function* tasks, i.e., tasks which are defined by a function code entry point, function parameters and return values. This tutorial that you are reading right now, focuses on *executable* tasks. RAPTOR's additionally supported task types are the topic of the tutorial [Executing Tasks with RAPTOR](raptor.ipynb).\n", "\n", "### Task Descriptions\n", "\n", "The [rp.TaskDescription](../apidoc.rst#radical.pilot.TaskDescription) class is, as the name suggests, the basis for all task descriptions in RADICAL-Pilot. Its most important attribute is [rp.TaskDescription.mode](../apidoc.rst#radical.pilot.TaskDescription.mode): for *executable* tasks the mode must be set to `rp.TASK_EXECUTABLE`, which is the default setting.\n", "\n", "Executable tasks have exactly one additional required attribute: [rp.TaskDescription.executable](../apidoc.rst#radical.pilot.TaskDescription.executable), i.e, the name of the executable. That can be either an absolute path to the executable on the file system of the target HPC platform, or it can be a plain executable name which is known at runtime in the task's execution environment (we will cover the execution environment setup further down below)." ] }, { "cell_type": "code", "execution_count": null, "id": "1ba782cd", "metadata": {}, "outputs": [], "source": [ "# create a minimal executable task\n", "td = rp.TaskDescription({'executable': '/bin/date'})\n", "task = tmgr.submit_tasks(td)" ] }, { "cell_type": "markdown", "id": "03112275", "metadata": {}, "source": [ "The task will be scheduled for execution on the pilot we created above. We now wait for the task to complete, i.e., to reach one of the final states `DONE`, `CANCELED` or `FAILED`:" ] }, { "cell_type": "code", "execution_count": null, "id": "5f2ea29b", "metadata": {}, "outputs": [], "source": [ "tmgr.wait_tasks()" ] }, { "cell_type": "markdown", "id": "9efbbe7a", "metadata": {}, "source": [ "Congratulations, you successfully executed a RADICAL-Pilot task!\n", "\n", "## Task Inspection\n", "\n", "Once completed, we can inspect the tasks for details of their execution: we print a summary for all tasks and then inspect one of them in more detail. The output shows a number of task attributes which can be set by the task description. Those are specifically:\n", "\n", " - `uid`: a unique string identifying the task. If not defined in the task description, RP will generate an ID which is unique within the scope of the current session.\n", " - `name`: a common name for the task which has no meaning to RP itself but can be used by the application to identify or classify certain tasks. The task name is not required to be unique.\n", " - `metadata`: any user defined data. The only requirement is that the data are serializable via [msgpack](https://msgpack.org/), which RP internally uses as serialization format. Note that metadata are communicated along with the task itself and, as such, they should usually be very small bits of data to not deteriorate performance.\n", " \n", "It is very application dependent what task attributes are useful: you may not need most of those in your specific applications. But for example: `task.stdout` and `task.stderr` provide a quick and easy ways to scan the task results without the need to explicit data staging, and the `task.task_sandbox` is useful if your application employs out-of-band data management and needs access to the task output files." ] }, { "cell_type": "code", "execution_count": null, "id": "e7a7d0ac", "metadata": {}, "outputs": [], "source": [ "report.plain('uid : %s\\n' % task.uid)\n", "report.plain('tmgr : %s\\n' % task.tmgr.uid)\n", "report.plain('pilot : %s\\n' % task.pilot)\n", "report.plain('name : %s\\n' % task.name)\n", "report.plain('executable : %s\\n' % task.description['executable'])\n", "report.plain('state : %s\\n' % task.state)\n", "report.plain('exit_code : %s\\n' % task.exit_code)\n", "report.plain('stdout : %s\\n' % task.stdout.strip())\n", "report.plain('stderr : %s\\n' % task.stderr)\n", "report.plain('return_value : %s\\n' % task.return_value)\n", "report.plain('exception : %s\\n' % task.exception)\n", "report.plain('\\n')\n", "report.plain('endpoint_fs : %s\\n' % task.endpoint_fs)\n", "report.plain('resource_sandbox: %s\\n' % task.resource_sandbox)\n", "report.plain('session_sandbox : %s\\n' % task.session_sandbox)\n", "report.plain('pilot_sandbox : %s\\n' % task.pilot_sandbox)\n", "report.plain('task_sandbox : %s\\n' % task.task_sandbox)\n", "report.plain('client_sandbox : %s\\n' % task.client_sandbox)\n", "report.plain('metadata : %s\\n' % task.metadata)" ] }, { "cell_type": "markdown", "id": "7273c6ea", "metadata": {}, "source": [ "All applications can fail, often for reasons out of control of the user. A Task is no different, it can fail as well. Many non-trivial application will need to have a way to handle failing tasks. Detecting the failure is the first and necessary step to do so, and RP makes that part easy: RP’s task state model defines that a failing task will immediately go into FAILED state, and that state information is available as the `task.state` property.\n", "\n", "
\n", "\n", "__Note:__ Depending on when the failure happen, the task may also have a value for the `task.stderr` property. That will enable to further inspect the causes of the failure. `task.stderr` will only be available if the task reached the EXECUTING state before failing. See the task [State Model](../internals.rst#state-model) for more information.\n", "\n", "
\n", "\n", "Let us submit a new set of tasks and inspect the failure modes. We will scan `/bin/date` for acceptable single letter arguments:" ] }, { "cell_type": "code", "execution_count": null, "id": "32c95d9a", "metadata": {}, "outputs": [], "source": [ "import string\n", "letters = string.ascii_lowercase + string.ascii_uppercase\n", "\n", "report.progress_tgt(len(letters), label='create')\n", "\n", "tds = list()\n", "for letter in letters:\n", " tds.append(rp.TaskDescription({'executable': '/bin/date',\n", " 'arguments': ['-' + letter]}))\n", " report.progress()\n", "\n", "report.progress_done()\n", "\n", "tasks = tmgr.submit_tasks(tds)" ] }, { "cell_type": "markdown", "id": "187dbca6", "metadata": {}, "source": [ "This time, we wait only for the newly submitted tasks. We then find which ones succeeded and check their resulting output. Spoiler alert: We will find 3 valid single-letter options." ] }, { "cell_type": "code", "execution_count": null, "id": "fa13837b", "metadata": {}, "outputs": [], "source": [ "tmgr.wait_tasks([task.uid for task in tasks])\n", "\n", "for task in tasks:\n", " if task.state == rp.DONE:\n", " print('%s: %s: %s' % (task.uid, task.description['arguments'], task.stdout.strip()))\n" ] }, { "cell_type": "markdown", "id": "0cc12709", "metadata": {}, "source": [ "By changing the state we check for from `rp.DONE` to `rp.FAILED`, we can inspect the error messages for the various tested flags (in `task.stderr`):" ] }, { "cell_type": "code", "execution_count": null, "id": "a3708cb3", "metadata": {}, "outputs": [], "source": [ "tmgr.wait_tasks([task.uid for task in tasks])\n", "\n", "for task in tasks:\n", " if task.state == rp.FAILED:\n", " print('%s: %s: %s' % (task.uid, task.description['arguments'], task.stderr.strip()))" ] }, { "cell_type": "markdown", "id": "82299910", "metadata": {}, "source": [ "## MPI Tasks and Task Resources\n", "\n", "So far, we run single-core tasks. The most common way for application to utilize multiple cores and nodes on HPC machines is to use MPI as a communication layer, which coordinates multiple application processes, i.e., MPI ranks. In fact, the notion of `ranks` is central to RP's [rp.TaskDescription](../apidoc.rst#radical.pilot.TaskDescription) class. All MPI ranks will be near-exact copies of each other: they run in the same work directory and the same `environment`, are defined by the same `executable` and `arguments`, get the same amount of resources allocated, etc. Notable exceptions are:\n", "\n", " - rank processes may run on different nodes;\n", " - rank processes can communicate via MPI;\n", " - each rank process obtains a unique rank ID.\n", "\n", "It is up to the underlying MPI implementation to determine the exact value of the process' rank ID. The MPI implementation may also set a number of additional environment variables for each process.\n", "\n", "It is important to understand that only applications which make use of MPI should have more than one rank -- otherwise identical copies of the *same* application instance are launched which will compute the same results, thus wasting resources for all ranks but one. Worse: I/O-routines of these non-MPI ranks can interfere with each other and invalidate those results.\n", "\n", "Also: applications with a single rank cannot make effective use of MPI - depending on the specific resource configuration, RP may launch those tasks without providing an MPI communicator.\n", " \n", "The following rank-related attributes are supported by RADICAL-Pilot:\n", "\n", " - `td.ranks`: the number of MPI ranks (application processes) to start;\n", " - `td.cores_per_rank`: the number of cores each rank can use for spawning additional threads or processes;\n", " - `td.gpus_per_rank`: the number of GPUs each rank can utilize;\n", " - `td.mem_per_rank`: the size of memory (in Megabytes) which is available to each rank;\n", " - `td.lfs_per_rank`: the amount of node-local file storage which is available to each rank;\n", " - `td.threading_type`: how to inform the application about available resources to run threads on\n", " - `rp.OpenMP`: defines `OMP_NUM_THREADS` in the task environment;\n", " - `td.gpu_type`: how to inform the application about available GPU resources\n", " - `rp.CUDA`: defines `CUDA_VISIBLE_DEVICES` in the task environment.\n", "\n", "The next example uses the `radical-pilot-hello.sh` command as a test to report on rank creation. \n", "\n", "
\n", "\n", "__Note:__ No core pinning is performed on localhost. Thus, tasks see all CPU cores as available to them. However, the `THREADS` information still reports the correct number of assigned CPU cores.\n", "\n", "
\n", "\n", "
\n", "\n", "__Note:__ If there is no MPI launch method installed, then we will proceed with a single rank.\n", "\n", "
" ] }, { "cell_type": "code", "execution_count": null, "id": "9047b209", "metadata": {}, "outputs": [], "source": [ "tds = list()\n", "for n in range(4):\n", " ranks = (n + 1) if mpi_lm_exists else 1\n", " tds.append(\n", " rp.TaskDescription({'executable' : ve_path + '/bin/radical-pilot-hello.sh',\n", " 'arguments' : [n + 1], \n", " 'ranks' : ranks, \n", " 'cores_per_rank': (n + 1),\n", " 'threading_type': rp.OpenMP}))\n", " report.progress()\n", "report.progress_done()\n", "\n", "tasks = tmgr.submit_tasks(tds)\n", "tmgr.wait_tasks([task.uid for task in tasks])\n", "\n", "for task in tasks:\n", " print('--- %s:\\n%s\\n' % (task.uid, task.stdout.strip()))" ] }, { "cell_type": "markdown", "id": "420ed233", "metadata": {}, "source": [ "## Task Data Management\n", "\n", "The [rp.TaskDescription](../apidoc.rst#radical.pilot.TaskDescription) supports diverse means to specify the task's input/out data and data-related properties:\n", "\n", " - `td.stdout`: path of the file to store the task's standard output in;\n", " - `td.stderr`: path of the file to store the task's standard error in;\n", " - `td.input_staging`: list of file staging directives to stage task input data;\n", " - `td.output_staging`: list of file staging directives to stage task output data.\n", " \n", "Let us run an example task which uses those 4 attributes: we run a word count on `/etc/passwd` (which we stage as input file) and store the result in an output file (which we fetch back). We will also stage back the files in which standard output and standard error are stored (although in this simple example both are expected to be empty)." ] }, { "cell_type": "code", "execution_count": null, "id": "0fd464ed", "metadata": {}, "outputs": [], "source": [ "\n", "td = rp.TaskDescription(\n", " {'executable' : '/bin/sh',\n", " 'arguments' : ['-c', 'cat input.dat | wc > output.dat'],\n", " 'stdout' : 'task_io.out',\n", " 'stderr' : 'task_io.err',\n", " 'input_staging' : [{'source': '/etc/passwd', 'target': 'input.dat'}],\n", " 'output_staging': [{'source': 'output.dat', 'target': '/tmp/output.test.dat'},\n", " {'source': 'task_io.out', 'target': '/tmp/output.test.out'},\n", " {'source': 'task_io.err', 'target': '/tmp/output.test.err'}]}\n", ")\n", "\n", "task = tmgr.submit_tasks(td)\n", "tmgr.wait_tasks([task.uid])\n", "\n", "# let's check the resulting output files\n", "print(ru.sh_callout('ls -la /tmp/output.test.*', shell=True)[0])\n", "print(ru.sh_callout('cat /tmp/output.test.dat')[0])" ] }, { "cell_type": "markdown", "id": "a4bb97c2", "metadata": {}, "source": [ "RADICAL-Pilot data staging capabilities go beyond what is captured in the example above:\n", "\n", " - Data can be transferred, copied, moved and linked;\n", " - Data can refer to absolute paths, or are specified relative to the systems root file system, to RP's resource sandbox, session sandbox, pilot sandbox or task sandbox;\n", " - Data staging can be performed not only for tasks, but also for the overall workflow (for example, when many tasks share the same input data).\n", " \n", "Find a detailed explanation of RADICAL-Pilot data staging capabilities in our [Data Staging with RADICAL-Pilot](staging_data.ipynb) tutorial. " ] }, { "cell_type": "markdown", "id": "200d8813", "metadata": {}, "source": [ "## Task Execution Environment\n", "\n", "On HPC platforms, it is common to provide application executables via environment modules. But task execution environments are also frequently used for scripting languages such as Python (e.g., `virtualenv`, `venv` or `conda`). RADICAL-Pilot supports the setup of the task execution environment in the following ways:\n", "\n", " 1. `td.environment` dictionary of key-value pairs for environment variables;\n", " 3. `td.pre_exec` directives to customize task specific environments;\n", " 2. `td.named_env`, prepared named environments for tasks.\n", " \n", "We will cover these options in the next three examples.\n", "\n", "### Environment Dictionary\n", "\n", "Environment variables can be set explicitly in the task description via the `td.environment` attribute. When that attribute is not specified, tasks will be executed in the default environment that the pilot found on the compute nodes. If the attribute `td.environment` is defined, then the default environment will be augmented with the specified settings. Usefull variables to export might be `PATH`, `LD_LIBRARY_PATH`, etc., or any application specific environment variables used by your tasks.\n", " \n", "
\n", "\n", "__Note:__ As demonstrated below, a number of custom environment variables are always provided, such as the various sandbox locations known to RADICAL-Pilot. \n", " \n", "
" ] }, { "cell_type": "code", "execution_count": null, "id": "059fa07e", "metadata": {}, "outputs": [], "source": [ "td = rp.TaskDescription(\n", " {'executable' : '/bin/sh',\n", " 'arguments' : ['-c', \n", " 'env | grep -e RP_SESSION -e RP_TASK | sort; '\n", " 'printf \"FOO=$FOO\\nBAR=$BAR\\nSHELL=$SHELL\\n\"'],\n", " 'environment': {'FOO': 'foo', \n", " 'BAR': 'bar'}}\n", ")\n", "\n", "task = tmgr.submit_tasks(td)\n", "tmgr.wait_tasks([task.uid])\n", "print(task.stdout)" ] }, { "cell_type": "markdown", "id": "70d849d8", "metadata": {}, "source": [ "### Environment Setup with `pre_exec`\n", "\n", "The `td.pre_exec` attribute of the task description can be used to specify a set of shell commands which will be executed before the task's executable is launched. `td.pre_exec` can be used to prepare the task's runtime environment, for example to:\n", "\n", " - Load a system module;\n", " - Export some environment variable;\n", " - Run a shell script or shell commands;\n", " - Activate some virtual environment.\n", " \n", "The example shown below activates the virtual environment this notebook is running in (in `ve_path`) so that it is usable for the task itself. We run another `td.pre_exec` command to install the `pyyaml` module in it. The actual task will then run `pip list` to check if that module is indeed available.\n", "\n", "
\n", " \n", "__Warning:__ The first `td.pre_exec` command assumes that this is a virtual environment, not a Conda environment. You may need to change that command if your notebook runs in a Conda environment.\n", " \n", "
\n" ] }, { "cell_type": "code", "execution_count": null, "id": "15728941", "metadata": {}, "outputs": [], "source": [ "td = rp.TaskDescription({'pre_exec' : ['. %s/bin/activate' % ve_path, \n", " 'pip install pyyaml'],\n", " 'executable' : '/bin/sh',\n", " 'arguments' : ['-c', 'which python3; pip show pyyaml']})\n", "\n", "task = tmgr.submit_tasks(td)\n", "tmgr.wait_tasks([task.uid])\n", "print(task.stdout)" ] }, { "cell_type": "markdown", "id": "78f3f8a7", "metadata": {}, "source": [ "### Environment Setup with `named_env`\n", "\n", "When the same environment is used for many tasks, then the collective sum of the `td.pre_exec` activities can create a significant runtime overhead, both on the shared filesystem and also on the system load. `td.named_env` addresses that problem: applications can prepare a task environment (see [rp.Pilot.prepare_env()](../apidoc.rst#radical.pilot.Pilot.prepare_env)) and then use the `td.named_env` attribute to activate it for the task. This process is very lightweight on system load and runtime overhead and thus the recommended way to set up task environments which are shared among many tasks. Any setup step though which needs to be *individually* run for each task, such as the creation of task specific input files, should still be added to the task's `td.pre_exec` directives.\n", "\n", "
\n", "\n", "__Note:__ If you don't need to create a new environment, but want to ensure that tasks will use the same environment as where RP Agent runs (`rp`), then you can provide it per each task: `td.named_env = 'rp'`.\n", " \n", "
" ] }, { "cell_type": "code", "execution_count": null, "id": "41467fc2", "metadata": {}, "outputs": [], "source": [ "\n", "pilot.prepare_env(env_name='test_env', \n", " env_spec={'type' : 'venv', \n", " 'setup': ['psutil']})\n", "\n", "td = rp.TaskDescription({'executable' : '/bin/sh',\n", " 'arguments' : ['-c', 'which python3; pip list | grep psutil'],\n", " 'named_env' : 'test_env'})\n", "\n", "task = tmgr.submit_tasks(td)\n", "tmgr.wait_tasks([task.uid])\n", "print(task.stdout)" ] }, { "cell_type": "code", "execution_count": null, "id": "9c914fc2", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "report.header('finalize')\n", "session.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false } }, "nbformat": 4, "nbformat_minor": 5 }