From b339c189ddb04abe3cd88c130e240908e9926ee9 Mon Sep 17 00:00:00 2001 From: Rick Ratzel <3039903+rlratzel@users.noreply.github.com> Date: Mon, 6 Jan 2025 20:56:25 -0600 Subject: [PATCH] Adds utility scripts and initial docs for managing multi-GPU Dask clusters for cuGraph (#4838) This PR adds utility scripts and initial docs for managing multi-GPU Dask clusters for cuGraph, aimed at helping the situation described in [this issue](https://github.com/rapidsai/cugraph/issues/4831). These scripts are taken from internal tools used for MNMG testing and have been modified to be more generalized for use by the community. Authors: - Rick Ratzel (https://github.com/rlratzel) - Don Acosta (https://github.com/acostadon) Approvers: - Brad Rees (https://github.com/BradReesWork) - Don Acosta (https://github.com/acostadon) - Joseph Nke (https://github.com/jnke2016) URL: https://github.com/rapidsai/cugraph/pull/4838 --- scripts/dask/README.md | 55 +++++++ scripts/dask/run-dask-process.sh | 274 +++++++++++++++++++++++++++++++ scripts/dask/wait_for_workers.py | 123 ++++++++++++++ 3 files changed, 452 insertions(+) create mode 100644 scripts/dask/README.md create mode 100755 scripts/dask/run-dask-process.sh create mode 100644 scripts/dask/wait_for_workers.py diff --git a/scripts/dask/README.md b/scripts/dask/README.md new file mode 100644 index 0000000000..0c8853351b --- /dev/null +++ b/scripts/dask/README.md @@ -0,0 +1,55 @@ +# Dask scripts for multi-GPU environments + +This directory contains tools for configuring environments for single-node or +multi-node, multi-gpu (SNMG or MNMG) Dask-based cugraph runs, currently +consisting of shell and python scripts. + +Users should also consult the multi-GPU utilities in the +`python/cugraph/cugraph/testing/mg_utils.py` module, specifically the +`start_dask_client()` function, to see how to create `client` and `cluster` +instances in Python code to access the corresponding Dask processes created by +the tools here. + + +### run-dask-process.sh + + This script is used to start the Dask scheduler and workers as needed. + + To start a scheduler and workers on a node, run it like this: + ``` + bash$ run-dask-process.sh scheduler workers + ``` + Once a scheduler is running on a node in the cluster, workers can be started + on other nodes in the cluster by running the script on each worker node like + this: + ``` + bash$ run-dask-process.sh workers + ``` + The env var SCHEDULER_FILE must be set to the location where the scheduler + will generate the scheduler JSON file. The same env var is used by the + workers to locate the generated scheduler JSON file for reading. + + The script will ensure the scheduler is started before the workers when both + are specified. + + Additional options can be specified for using different communication + mechanisms: + ``` + --tcp - initalize a TCP cluster (default) + --ucx - initialize a UCX cluster with NVLink + --ucxib | --ucx-ib - initialize a UCX cluster with InfiniBand+NVLink + ``` + Finally, the script can be run with `-h` or `--help` to see the full set of + options. + +### wait_for_workers.py + + This script can be used to ensure all workers that are expected to be present + in the cluster are up and running. This is useful for automation that sets up + the Dask cluster and cannot proceed until the Dask cluster is available + to accept tasks. + + This example waits for 16 workers to be present: + ``` + bash$ python wait_for_workers.py --scheduler-file-path=$SCHEDULER_FILE --num-expected-workers=16 + ``` diff --git a/scripts/dask/run-dask-process.sh b/scripts/dask/run-dask-process.sh new file mode 100755 index 0000000000..9eef17fc5e --- /dev/null +++ b/scripts/dask/run-dask-process.sh @@ -0,0 +1,274 @@ +#!/bin/bash +# Copyright (c) 2024-2025, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +################################################################################ +NUMARGS=$# +ARGS=$* +VALIDARGS="-h --help scheduler workers --tcp --ucx --ucxib --ucx-ib" +HELP="$0 [ ...] [ ...] + where is: + scheduler - start dask scheduler + workers - start dask workers + and is: + --tcp - initalize a TCP cluster (default) + --ucx - initialize a UCX cluster with NVLink + --ucxib | --ucx-ib - initialize a UCX cluster with InfiniBand+NVLink + -h | --help - print this text + + The cluster config order of precedence is any specification on the + command line (--tcp, --ucx, etc.) if provided, then the value of the + env var DASK_CLUSTER_CONFIG_TYPE if set, then the default value of TCP. + + The env var SCHEDULER_FILE must be set to the location of the dask scheduler + file that the scheduler will generate and the worker(s) will read. This + location must be accessible by the scheduler and workers, meaning a multi-node + configuration will need to set this to a location on a shared file system. +" + +# Default configuration variables. Most are defined using the bash := or :- +# syntax, which means they will be set only if they were previously unset in +# the environment. +WORKER_RMM_POOL_SIZE=${WORKER_RMM_POOL_SIZE:-12G} +DASK_CUDA_INTERFACE=${DASK_CUDA_INTERFACE:-ibp5s0f0} +DASK_SCHEDULER_PORT=${DASK_SCHEDULER_PORT:-8792} +DASK_DEVICE_MEMORY_LIMIT=${DASK_DEVICE_MEMORY_LIMIT:-auto} +DASK_HOST_MEMORY_LIMIT=${DASK_HOST_MEMORY_LIMIT:-auto} + +# Logs can be written to a specific location by setting the DASK_LOGS_DIR +# env var. If unset, all logs are created under a dir named after the +# current PID. +DASK_LOGS_DIR=${DASK_LOGS_DIR:-dask_logs-$$} +DASK_SCHEDULER_LOG=${DASK_LOGS_DIR}/scheduler_log.txt +DASK_WORKERS_LOG=${DASK_LOGS_DIR}/worker-${HOSTNAME}_log.txt + +# DASK_CLUSTER_CONFIG_TYPE defaults to the env var value if set, else TCP. CLI +# options to this script take precedence. Valid values are TCP, UCX, UCXIB +DASK_CLUSTER_CONFIG_TYPE=${DASK_CLUSTER_CONFIG_TYPE:-TCP} + + +################################################################################ +# FUNCTIONS + +numargs=$# +args=$* +hasArg () { + (( ${numargs} != 0 )) && (echo " ${args} " | grep -q " $1 ") +} + +logger_prefix=">>>> " +logger () { + if (( $# > 0 )) && [ "$1" == "-p" ]; then + shift + echo -e "${logger_prefix}$@" + else + echo -e "$(date --utc "+%D-%T.%N")_UTC${logger_prefix}$@" + fi +} + +buildTcpArgs () { + export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s" + export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s" + export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s" + export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s" + export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False" + + SCHEDULER_ARGS="--protocol=tcp + --port=$DASK_SCHEDULER_PORT + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--rmm-pool-size=$WORKER_RMM_POOL_SIZE + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + " + +} + +buildUCXWithInfinibandArgs () { + export DASK_RMM__POOL_SIZE=0.5GB + export DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True + + SCHEDULER_ARGS="--protocol=ucx + --port=$DASK_SCHEDULER_PORT + --interface=$DASK_CUDA_INTERFACE + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--interface=$DASK_CUDA_INTERFACE + --rmm-pool-size=$WORKER_RMM_POOL_SIZE + --rmm-async + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + " +} + +buildUCXwithoutInfinibandArgs () { + export UCX_TCP_CM_REUSEADDR=y + export UCX_MAX_RNDV_RAILS=1 + export UCX_TCP_TX_SEG_SIZE=8M + export UCX_TCP_RX_SEG_SIZE=8M + + export DASK_DISTRIBUTED__COMM__UCX__CUDA_COPY=True + export DASK_DISTRIBUTED__COMM__UCX__TCP=True + export DASK_DISTRIBUTED__COMM__UCX__NVLINK=True + export DASK_DISTRIBUTED__COMM__UCX__INFINIBAND=False + export DASK_DISTRIBUTED__COMM__UCX__RDMACM=False + export DASK_RMM__POOL_SIZE=0.5GB + + + SCHEDULER_ARGS="--protocol=ucx + --port=$DASK_SCHEDULER_PORT + --scheduler-file $SCHEDULER_FILE + " + + WORKER_ARGS="--enable-tcp-over-ucx + --enable-nvlink + --disable-infiniband + --disable-rdmacm + --rmm-pool-size=$WORKER_RMM_POOL_SIZE + --local-directory=/tmp/$LOGNAME + --scheduler-file=$SCHEDULER_FILE + --memory-limit=$DASK_HOST_MEMORY_LIMIT + --device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT + " +} + +scheduler_pid="" +worker_pid="" +num_scheduler_tries=0 + +startScheduler () { + mkdir -p $(dirname $SCHEDULER_FILE) + echo "RUNNING: \"dask scheduler $SCHEDULER_ARGS\"" > $DASK_SCHEDULER_LOG + dask scheduler $SCHEDULER_ARGS >> $DASK_SCHEDULER_LOG 2>&1 & + scheduler_pid=$! +} + + +################################################################################ +# READ CLI OPTIONS + +START_SCHEDULER=0 +START_WORKERS=0 + +if (( ${NUMARGS} == 0 )); then + echo "${HELP}" + exit 0 +else + if hasArg -h || hasArg --help; then + echo "${HELP}" + exit 0 + fi + for a in ${ARGS}; do + if ! (echo " ${VALIDARGS} " | grep -q " ${a} "); then + echo "Invalid option: ${a}" + exit 1 + fi + done +fi + +if [ -z ${SCHEDULER_FILE+x} ]; then + echo "Env var SCHEDULER_FILE must be set. See -h for details" + exit 1 +fi + +if hasArg scheduler; then + START_SCHEDULER=1 +fi +if hasArg workers; then + START_WORKERS=1 +fi +# Allow the command line to take precedence +if hasArg --tcp; then + DASK_CLUSTER_CONFIG_TYPE=TCP +elif hasArg --ucx; then + DASK_CLUSTER_CONFIG_TYPE=UCX +elif hasArg --ucxib || hasArg --ucx-ib; then + DASK_CLUSTER_CONFIG_TYPE=UCXIB +fi + + +################################################################################ +# SETUP & RUN + +#export DASK_LOGGING__DISTRIBUTED="DEBUG" +#ulimit -n 100000 + +if [[ "$DASK_CLUSTER_CONFIG_TYPE" == "UCX" ]]; then + logger "Using cluster configurtion for UCX" + buildUCXwithoutInfinibandArgs +elif [[ "$DASK_CLUSTER_CONFIG_TYPE" == "UCXIB" ]]; then + logger "Using cluster configurtion for UCX with Infiniband" + buildUCXWithInfinibandArgs +else + logger "Using cluster configurtion for TCP" + buildTcpArgs +fi + +mkdir -p $DASK_LOGS_DIR +logger "Logs written to: $DASK_LOGS_DIR" + +if [[ $START_SCHEDULER == 1 ]]; then + rm -f $SCHEDULER_FILE $DASK_SCHEDULER_LOG $DASK_WORKERS_LOG + + startScheduler + sleep 6 + num_scheduler_tries=$(( num_scheduler_tries+1 )) + + # Wait for the scheduler to start first before proceeding, since + # it may require several retries (if prior run left ports open + # that need time to close, etc.) + while [ ! -f "$SCHEDULER_FILE" ]; do + scheduler_alive=$(ps -p $scheduler_pid > /dev/null ; echo $?) + if [[ $scheduler_alive != 0 ]]; then + if [[ $num_scheduler_tries != 30 ]]; then + logger "scheduler failed to start, retry #$num_scheduler_tries" + startScheduler + sleep 6 + num_scheduler_tries=$(( num_scheduler_tries+1 )) + else + logger "could not start scheduler, exiting." + exit 1 + fi + fi + done + logger "scheduler started." +fi + +if [[ $START_WORKERS == 1 ]]; then + rm -f $DASK_WORKERS_LOG + while [ ! -f "$SCHEDULER_FILE" ]; do + logger "run-dask-process.sh: $SCHEDULER_FILE not present - waiting to start workers..." + sleep 2 + done + echo "RUNNING: \"dask_cuda_worker $WORKER_ARGS\"" > $DASK_WORKERS_LOG + dask-cuda-worker $WORKER_ARGS >> $DASK_WORKERS_LOG 2>&1 & + worker_pid=$! + logger "worker(s) started." +fi + +# This script will not return until the following background process +# have been completed/killed. +if [[ $worker_pid != "" ]]; then + logger "waiting for worker pid $worker_pid to finish before exiting script..." + wait $worker_pid +fi +if [[ $scheduler_pid != "" ]]; then + logger "waiting for scheduler pid $scheduler_pid to finish before exiting script..." + wait $scheduler_pid +fi diff --git a/scripts/dask/wait_for_workers.py b/scripts/dask/wait_for_workers.py new file mode 100644 index 0000000000..931e991c4c --- /dev/null +++ b/scripts/dask/wait_for_workers.py @@ -0,0 +1,123 @@ +# Copyright (c) 2024-2025, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time + +from dask.distributed import Client + + +def initialize_dask_cuda(communication_type): + communication_type = communication_type.lower() + if "ucx" in communication_type: + os.environ["UCX_MAX_RNDV_RAILS"] = "1" + + if communication_type == "ucx-ib": + os.environ["UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES"]="cuda" + os.environ["DASK_RMM__POOL_SIZE"]="0.5GB" + os.environ["DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT"]="True" + + +def wait_for_workers( + num_expected_workers, scheduler_file_path, communication_type, timeout_after=0 +): + """ + Waits until num_expected_workers workers are available based on + the workers managed by scheduler_file_path, then returns 0. If + timeout_after is specified, will return 1 if num_expected_workers + workers are not available before the timeout. + """ + # FIXME: use scheduler file path from global environment if none + # supplied in configuration yaml + + print("wait_for_workers.py - initializing client...", end="") + sys.stdout.flush() + initialize_dask_cuda(communication_type) + print("done.") + sys.stdout.flush() + + ready = False + start_time = time.time() + while not ready: + if timeout_after and ((time.time() - start_time) >= timeout_after): + print( + f"wait_for_workers.py timed out after {timeout_after} seconds before finding {num_expected_workers} workers." + ) + sys.stdout.flush() + break + with Client(scheduler_file=scheduler_file_path) as client: + num_workers = len(client.scheduler_info()["workers"]) + if num_workers < num_expected_workers: + print( + f"wait_for_workers.py expected {num_expected_workers} but got {num_workers}, waiting..." + ) + sys.stdout.flush() + time.sleep(5) + else: + print(f"wait_for_workers.py got {num_workers} workers, done.") + sys.stdout.flush() + ready = True + + if ready is False: + return 1 + return 0 + + +if __name__ == "__main__": + import argparse + + ap = argparse.ArgumentParser() + ap.add_argument( + "--num-expected-workers", + type=int, + required=False, + help="Number of workers to wait for. If not specified, " + "uses the NUM_WORKERS env var if set, otherwise defaults " + "to 16.", + ) + ap.add_argument( + "--scheduler-file-path", + type=str, + required=True, + help="Path to shared scheduler file to read.", + ) + ap.add_argument( + "--communication-type", + type=str, + default="tcp", + required=False, + help="Initiliaze dask_cuda based on the cluster communication type." + "Supported values are tcp(default), ucx, ucxib, ucx-ib.", + ) + ap.add_argument( + "--timeout-after", + type=int, + default=0, + required=False, + help="Number of seconds to wait for workers. " + "Default is 0 which means wait forever.", + ) + args = ap.parse_args() + + if args.num_expected_workers is None: + args.num_expected_workers = os.environ.get("NUM_WORKERS", 16) + + exitcode = wait_for_workers( + num_expected_workers=args.num_expected_workers, + scheduler_file_path=args.scheduler_file_path, + communication_type=args.communication_type, + timeout_after=args.timeout_after, + ) + + sys.exit(exitcode)