diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 3a7d8858c639..12f503bac348 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -5,7 +5,7 @@ Key Concepts ============ To work with Ray Datasets, you need to understand how Datasets and Dataset Pipelines work. -You might also be interested to learn about the execution model of Ray Datasets operations +You might also be interested to learn about the execution model of Ray Datasets operations. .. _dataset_concept: @@ -72,7 +72,7 @@ In the common case, each read task produces a single output block. Read tasks ma .. note:: - Block splitting is currently off by default. See the block size tuning section below on how to enable block splitting (beta). + Block splitting is off by default. See the :ref:`performance section ` on how to enable block splitting (beta). Deferred Read Task Execution ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -116,7 +116,11 @@ Execution Memory During execution, certain types of intermediate data must fit in memory. This includes the input block of a task, as well as at least one of the output blocks of the task (when a task has multiple output blocks, only one needs to fit in memory at any given time). The input block consumes object stored shared memory (Python heap memory for non-Arrow data). The output blocks consume Python heap memory (prior to putting in the object store) as well as object store memory (after being put in the object store). -This means that large block sizes can lead to potential out-of-memory situations. To avoid OOM errors, Datasets tries to split blocks during map and read tasks into pieces smaller than the target max block size. In some cases, this splitting is not possible (e.g., if a single item in a block is extremely large, or the function given to ``.map_batches`` returns a very large batch). To avoid these issues, make sure no single item in your Datasets is too large, and always call ``.map_batches`` with batch size small enough such that the output batch can comfortably fit into memory. +This means that large block sizes can lead to potential out-of-memory situations. To avoid OOM errors, Datasets can split blocks during map and read tasks into pieces smaller than the target max block size. In some cases, this splitting is not possible (e.g., if a single item in a block is extremely large, or the function given to ``.map_batches`` returns a very large batch). To avoid these issues, make sure no single item in your Datasets is too large, and always call ``.map_batches`` with batch size small enough such that the output batch can comfortably fit into memory. + +.. note:: + + Block splitting is off by default. See the :ref:`performance section ` on how to enable block splitting (beta). Object Store Memory ~~~~~~~~~~~~~~~~~~~ @@ -137,3 +141,17 @@ Datasets uses the Ray object store to store data blocks, which means it inherits **Reference Counting**: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object. **Load Balancing**: Datasets uses Ray scheduling hints to spread read tasks out across the cluster to balance memory usage. + +Stage Fusion Optimization +========================= + +To avoid unnecessary data movement in the distributed setting, Dataset pipelines will *fuse* compatible stages (i.e., stages with the same compute strategy and resource specifications). Read and map-like stages are always fused if possible. All-to-all dataset transformations such as ``random_shuffle`` can be fused with earlier map-like stages, but not later stages. For Datasets, only read stages are fused. This is since non-pipelined Datasets are eagerly executed except for their initial read stage. + +You can tell if stage fusion is enabled by checking the :ref:`Dataset stats ` and looking for fused stages (e.g., ``read->map_batches``). + +.. code-block:: + + Stage N read->map_batches->shuffle_map: N/N blocks executed in T + * Remote wall time: T min, T max, T mean, T total + * Remote cpu time: T min, T max, T mean, T total + * Output num rows: N min, N max, N mean, N total