Dask is a Python library for parallel and distributed computing. A Dask cluster is composed of one scheduler that coordinates the job of many workers, which can have access to CPU or GPU resources. Here we show how to install Dask in a conda environment on Aurora and how to start a cluster with GPU workers and run a simple example script.
From one of Aurora's login nodes, use the following commands to create a conda environment and install Dask. This will also install other libraries needed to run an example script and create a Jupyter kernel that allows you to work interactively from a notebook.
#!/bin/bash# start_dask_aurora.sh# Usage: # mpiexec -n NNODES * NUM_WORKERS_PER_NODE --ppn NUM_WORKERS_PER_NODE ./start_dask_aurora.sh WORKER_TYPE NUM_WORKERS_PER_NODE# Examples on two nodes:# mpiexec -n 12 --ppn 6 ./start_dask_aurora.sh gpu 6# mpiexec -n 208 --ppn 104 ./start_dask_aurora.sh cpu 104WORKER_TYPE=$1NUM_WORKERS_PER_NODE=$2# if using 12 GPU workers, assign one worker per tile, otherwise use one worker per GPU (2 tiles)if[$NUM_WORKERS_PER_NODE=12]&&[$WORKER_TYPE='gpu'];thenexportZE_FLAT_DEVICE_HIERARCHY=FLAT
exportZE_ENABLE_PCI_ID_DEVICE_ORDER=1elseexportZE_FLAT_DEVICE_HIERARCHY=COMPOSITE
fi# Number of threads per worker (208 CPU threads per node divided by num workers)NTHREADS=$((208/NUM_WORKERS_PER_NODE))# 208 / 12 ≈ 17# Memory limit per worker (1100GB RAM per node divided by num workers)MEMORY_PER_WORKER=$((1100/NUM_WORKERS_PER_NODE))GB# 1100GB / 12 ≈ 91GBLOCAL_DIRECTORY=~/dask-local-directory
DASK_DASHBOARD_PORT=${DASK_DASHBOARD_PORT:-8787}DASK_SCHEDULER_PORT=${DASK_SCHEDULER_PORT:-8786}# Start Dask scheduler on rank 0if[$PALS_RANKID=0];then# Purge Dask worker, log directories and config directoriesrm-rf${LOCAL_DIRECTORY}/*/tmp/dask-workers/*~/.config/dask
mkdir-p${LOCAL_DIRECTORY}/logs/tmp/dask-workers
# Setup schedulernohupdaskscheduler--port${DASK_SCHEDULER_PORT}--dashboard-address$DASK_DASHBOARD_PORT\--scheduler-file${LOCAL_DIRECTORY}/scheduler.json>${LOCAL_DIRECTORY}/logs/$HOSTNAME-scheduler.log2>&1&fisleep10# Setup workersif[$WORKER_TYPE='gpu'];thenZE_AFFINITY_MASK=$PALS_LOCAL_RANKIDdaskworker\--resources"GPU=1"--memory-limit${MEMORY_PER_WORKER}\--nthreads${NTHREADS}--local-directory/tmp/dask-workers\--scheduler-file${LOCAL_DIRECTORY}/scheduler.json>>${LOCAL_DIRECTORY}/logs/$HOSTNAME-worker.log2>&1elsedaskworker\--nthreads${NTHREADS}--local-directory/tmp/dask-workers\--scheduler-file${LOCAL_DIRECTORY}/scheduler.json>>${LOCAL_DIRECTORY}/logs/$HOSTNAME-worker.log2>&1fi
importjsonimportpathlibfromdask.distributedimportClientfname=f'{pathlib.Path.home().as_posix()}/dask-local-directory/scheduler.json'withopen(fname,'r')asf:scheduler=json.load(f)client=Client(scheduler['address'])print(client)importtimeimportdpnpasnpdefcount_points_inside_circle(N):x=np.random.uniform(low=-1.0,high=1.0,size=(N,2))inside_circle=((x*x).sum(axis=1)<1.).sum()returnint(inside_circle)defcompute_pi(inside_circle,N):return4*inside_circle/Ndefrun():start=time.time()num_workers=len(client.scheduler_info()['workers'])N=10_400_000_004# number of points per workerNeach_section,extras=divmod(N,num_workers)points_per_worker=[Neach_sectionfor_inrange(num_workers)]points_per_worker[-1]+=extrasfutures=client.map(count_points_inside_circle,points_per_worker)inside_circle=client.submit(sum,futures).result()pi=compute_pi(inside_circle,N)end=time.time()returnf"Num samples: {N:.2E}\t\tEstimate: {pi:.9f}\t\tTime taken: {end-start:.3f} s"defmain(runs=5):foriinrange(runs):print(f"Run {i}\t\t{run()}")main()client.shutdown()
On your local machine, open an SSH tunnel to the compute node (COMPUTE_NODE is the compute node's hostname and YOUR_ALCF_USERNAME is your ALCF username):