Skip to content

Commit

Permalink
Document Dataset pipeline stage fusion (ray-project#22737)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored Mar 1, 2022
1 parent 1a170f7 commit 5a0b7a7
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Key Concepts
============

To work with Ray Datasets, you need to understand how Datasets and Dataset Pipelines work.
You might also be interested to learn about the execution model of Ray Datasets operations
You might also be interested to learn about the execution model of Ray Datasets operations.


.. _dataset_concept:
Expand Down Expand Up @@ -72,7 +72,7 @@ In the common case, each read task produces a single output block. Read tasks ma

.. note::

Block splitting is currently off by default. See the block size tuning section below on how to enable block splitting (beta).
Block splitting is off by default. See the :ref:`performance section <data_performance_tips>` on how to enable block splitting (beta).

Deferred Read Task Execution
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -116,7 +116,11 @@ Execution Memory

During execution, certain types of intermediate data must fit in memory. This includes the input block of a task, as well as at least one of the output blocks of the task (when a task has multiple output blocks, only one needs to fit in memory at any given time). The input block consumes object stored shared memory (Python heap memory for non-Arrow data). The output blocks consume Python heap memory (prior to putting in the object store) as well as object store memory (after being put in the object store).

This means that large block sizes can lead to potential out-of-memory situations. To avoid OOM errors, Datasets tries to split blocks during map and read tasks into pieces smaller than the target max block size. In some cases, this splitting is not possible (e.g., if a single item in a block is extremely large, or the function given to ``.map_batches`` returns a very large batch). To avoid these issues, make sure no single item in your Datasets is too large, and always call ``.map_batches`` with batch size small enough such that the output batch can comfortably fit into memory.
This means that large block sizes can lead to potential out-of-memory situations. To avoid OOM errors, Datasets can split blocks during map and read tasks into pieces smaller than the target max block size. In some cases, this splitting is not possible (e.g., if a single item in a block is extremely large, or the function given to ``.map_batches`` returns a very large batch). To avoid these issues, make sure no single item in your Datasets is too large, and always call ``.map_batches`` with batch size small enough such that the output batch can comfortably fit into memory.

.. note::

Block splitting is off by default. See the :ref:`performance section <data_performance_tips>` on how to enable block splitting (beta).

Object Store Memory
~~~~~~~~~~~~~~~~~~~
Expand All @@ -137,3 +141,17 @@ Datasets uses the Ray object store to store data blocks, which means it inherits
**Reference Counting**: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object.

**Load Balancing**: Datasets uses Ray scheduling hints to spread read tasks out across the cluster to balance memory usage.

Stage Fusion Optimization
=========================

To avoid unnecessary data movement in the distributed setting, Dataset pipelines will *fuse* compatible stages (i.e., stages with the same compute strategy and resource specifications). Read and map-like stages are always fused if possible. All-to-all dataset transformations such as ``random_shuffle`` can be fused with earlier map-like stages, but not later stages. For Datasets, only read stages are fused. This is since non-pipelined Datasets are eagerly executed except for their initial read stage.

You can tell if stage fusion is enabled by checking the :ref:`Dataset stats <data_performance_tips>` and looking for fused stages (e.g., ``read->map_batches``).

.. code-block::
Stage N read->map_batches->shuffle_map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Output num rows: N min, N max, N mean, N total

0 comments on commit 5a0b7a7

Please sign in to comment.