Skip to content

Latest commit

 

History

History

dask

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 

Dask

Dask is an open source set of tools for parallelizing Python analytics tasks. It is developed in coordination with popular open source Python libraries including Numpy, Pandas and Scikit-Learn.

This initialization action will set up Dask on a Google Cloud Dataproc cluster to run with either yarn or standalone, both taking advantage of Dask Distributed. This initialization action is supported on Dataproc image versions 2.0 and newer.

In yarn mode, the cluster is configured with Dask-Yarn. You can then take advantage of Dataproc's features such as scaling out workloads with Autoscaling.

In standalone mode, Dataproc workers are treated as their own distributed machines separate from YARN.

You can also add RAPIDS and GPUs to your environment by following the instructions in the RAPIDS initialization action. Note: RAPIDS with Dask is only supported on Dataproc image version 2.0+.

Using this initialization action

⚠️ NOTICE: See best practices of using initialization actions in production.

Creating Dataproc Cluster with Dask and Dask-Yarn

The following command will create a Google Cloud Dataproc cluster with Dask and Dask-Yarn installed.

CLUSTER_NAME=<cluster-name>
REGION=<region>
gcloud dataproc clusters create ${CLUSTER_NAME} \
  --region ${REGION} \
  --master-machine-type e2-standard-16 \
  --worker-machine-type e2-highmem-32 \
  --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/dask/dask.sh \
  --initialization-action-timeout 20m

You can create a cluster with the Jupyter optional component and component gateway to use Dask and Dask-Yarn from a notebook environment:

CLUSTER_NAME=<cluster-name>
REGION=<region>
gcloud dataproc clusters create ${CLUSTER_NAME} \
  --region ${REGION} \
  --master-machine-type e2-standard-16 \
  --worker-machine-type e2-highmem-32 \
  --optional-components JUPYTER \
  --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/dask/dask.sh \
  --initialization-action-timeout 45m \
  --enable-component-gateway

Creating Dataproc Cluster with Dask in standalone mode

The following command will create a Google Cloud Dataproc cluster with Dask in standalone mode and Cloud Logging enabled.

CLUSTER_NAME=<cluster-name>
REGION=<region>
gcloud dataproc clusters create ${CLUSTER_NAME} \
  --region ${REGION} \
  --master-machine-type e2-standard-16 \
  --worker-machine-type e2-highmem-32 \
  --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/dask/dask.sh \
  --initialization-action-timeout 20m \
  --metadata dask-runtime=standalone \
  --metadata dask-cloud-logging=true

### Dask examples

#### Dask standalone

```python
from dask.distributed import Client
import dask.array as da

import numpy as np

client = Client("localhost:8786")

x = da.sum(np.ones(5))
x.compute()

Dask-Yarn

from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.array as da

import numpy as np

cluster = YarnCluster()
client = Client(cluster)

cluster.adapt() # Dynamically scale Dask resources

x = da.sum(np.ones(5))
x.compute()

Interacting with Data Services

Several libraries exist within the Dask ecosystem for interacting with various data services. More information can be found here.

By default, Dataproc image version 2.0+ comes with pyarrow, gcsfs, fastparquet and fastavro installed.

Interacting with the cluster

With the Jupyter optional component, you can select the dask environment as your kernel.

You can also ssh into the cluster and execute Dask jobs from Python files. To run jobs, you can either scp a file onto your cluster or use gsutil on the cluster to download the Python file.

gcloud compute ssh <cluster-name> --command="gsutil cp gs://path/to/file.py .; python file.py

Accessing Web UIs

You can monitor your Dask applications using Web UIs, depending on which runtime you are using.

For standalone mode, you can access the native Dask UI in one of the following ways:

  1. If Component Gateway is enabled, the Dask UI can be accessible through https://<id>-dot-<region>.dataproc.googleusercontent.com/gateway/default/dask.

  2. Create an SSH port forwarding by running gcloud compute ssh ${vm} --zone="${zone}" -- -L ":8787:${vm}:8787" -N on the local machine, then access the Dask UI through http://localhost:8787 in the browser.

  3. Create an SSH tunnel to access the Dask UI on port 8787.

For yarn mode, you can access the Skein Web UI via the YARN ResourceManager. To access the YARN ResourceManager, create your cluster with component gateway enabled or create an SSH tunnel. You can then access the Skein Web UI by following these instructions.

Supported metadata parameters

This initialization action supports the following metadata fields:

  • dask-runtime=yarn|standalone: Dask runtime. Default is yarn.
  • dask-worker-on-master: Treat Dask master node as an additional worker. Default is true.
  • dask-cloud-logging: Whether to enable Cloud Logging for Dask logs, only applies to standalone mode. Possible values are true and false, the default is false.