Skip to content

Commit

Permalink
Merge pull request #418 from rapidsai/branch-0.16
Browse files Browse the repository at this point in the history
[RELEASE] dask-cuda v0.16
  • Loading branch information
raydouglass authored Oct 21, 2020
2 parents 0275957 + 0577fba commit 4ec39de
Show file tree
Hide file tree
Showing 13 changed files with 552 additions and 201 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
0.16
----
- Parse pool size only when set (#396) `Benjamin Zaitlen`_
- Improve CUDAWorker scheduler-address parsing and __init__ (#397) `Rami Chowdhury`_
- Add benchmark for `da.map_overlap` (#399) `John Kirkham`_
- Explicit-comms: dataframe shuffle (#401) `Mads R. B. Kristensen`_
- Use new NVTX module (#406) `Peter Andreas Entschev`_
- Run Dask's NVML tests (#408) `Benjamin Zaitlen`_
- Skip tests that require cuDF/UCX-Py, when not installed (#411) `Peter Andreas Entschev`_

0.15
----
- Fix-up versioneer (#305) `John Kirkham`_
Expand Down Expand Up @@ -182,3 +192,4 @@
.. _`Randy Gelhausen`: https://github.com/randerzander
.. _`Jacob Tomlinson`: https://github.com/jacobtomlinson
.. _`Sean Frye`: https://github.com/sean-frye
.. _`Rami Chowdhury`: https://github.com/necaris
2 changes: 1 addition & 1 deletion ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ else
py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_rmm as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.protocol.tests.test_collection_cuda as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.tests.test_nanny as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.tests.test_gpu_metrics as m;print(m.__file__)"`
py.test --cache-clear -vs `python -c "import distributed.diagnostics.tests.test_nvml as m;print(m.__file__)"`
fi
fi
194 changes: 194 additions & 0 deletions dask_cuda/benchmarks/local_cupy_map_overlap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import asyncio
from collections import defaultdict
from time import perf_counter as clock

import cupy as cp
import numpy as np
from cupyx.scipy.ndimage.filters import convolve as cp_convolve
from scipy.ndimage import convolve as sp_convolve

from dask import array as da
from dask.distributed import Client, performance_report, wait
from dask.utils import format_bytes, format_time, parse_bytes

from dask_cuda.benchmarks.utils import (
get_cluster_options,
get_scheduler_workers,
parse_benchmark_args,
setup_memory_pool,
)


def mean_filter(a, shape):
a_k = np.full_like(a, 1.0 / np.prod(shape), shape=shape)
if isinstance(a, cp.ndarray):
return cp_convolve(a, a_k)
else:
return sp_convolve(a, a_k)


async def _run(client, args):
# Create a simple random array
if args.type == "gpu":
rs = da.random.RandomState(RandomState=cp.random.RandomState)
else:
rs = da.random.RandomState(RandomState=np.random.RandomState)
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist()
ks = 2 * (2 * args.kernel_size + 1,)
await wait(x)

# Execute the operations to benchmark
if args.profile is not None:
async with performance_report(filename=args.profile):
t1 = clock()
await client.compute(x.map_overlap(mean_filter, args.kernel_size, shape=ks))
took = clock() - t1
else:
t1 = clock()
await client.compute(x.map_overlap(mean_filter, args.kernel_size, shape=ks))
took = clock() - t1

return (took, x.npartitions)


async def run(args):
cluster_options = get_cluster_options(args)
Cluster = cluster_options["class"]
cluster_args = cluster_options["args"]
cluster_kwargs = cluster_options["kwargs"]
scheduler_addr = cluster_options["scheduler_addr"]

async with Cluster(*cluster_args, **cluster_kwargs, asynchronous=True) as cluster:
if args.multi_node:
import time

# Allow some time for workers to start and connect to scheduler
# TODO: make this a command-line argument?
time.sleep(15)

# Use the scheduler address with an SSHCluster rather than the cluster
# object, otherwise we can't shut it down.
async with Client(
scheduler_addr if args.multi_node else cluster, asynchronous=True
) as client:
scheduler_workers = await client.run_on_scheduler(get_scheduler_workers)

await client.run(setup_memory_pool, disable_pool=args.no_rmm_pool)
# Create an RMM pool on the scheduler due to occasional deserialization
# of CUDA objects. May cause issues with InfiniBand otherwise.
await client.run_on_scheduler(
setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool
)

took_list = []
for i in range(args.runs):
took_list.append(await _run(client, args))

# Collect, aggregate, and print peer-to-peer bandwidths
incoming_logs = await client.run(
lambda dask_worker: dask_worker.incoming_transfer_log
)
bandwidths = defaultdict(list)
total_nbytes = defaultdict(list)
for k, L in incoming_logs.items():
for d in L:
if d["total"] >= args.ignore_size:
bandwidths[k, d["who"]].append(d["bandwidth"])
total_nbytes[k, d["who"]].append(d["total"])

bandwidths = {
(scheduler_workers[w1].name, scheduler_workers[w2].name,): [
"%s/s" % format_bytes(x) for x in np.quantile(v, [0.25, 0.50, 0.75])
]
for (w1, w2), v in bandwidths.items()
}
total_nbytes = {
(scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(
sum(nb)
)
for (w1, w2), nb in total_nbytes.items()
}

print("Roundtrip benchmark")
print("--------------------------")
print(f"Size | {args.size}*{args.size}")
print(f"Chunk-size | {args.chunk_size}")
print(f"Ignore-size | {format_bytes(args.ignore_size)}")
print(f"Protocol | {args.protocol}")
print(f"Device(s) | {args.devs}")
print("==========================")
print("Wall-clock | npartitions")
print("--------------------------")
for (took, npartitions) in took_list:
t = format_time(took)
t += " " * (11 - len(t))
print(f"{t} | {npartitions}")
print("==========================")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("--------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
"(%s,%s) | %s %s %s (%s)"
if args.multi_node
else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))

# An SSHCluster will not automatically shut down, we have to
# ensure it does.
if args.multi_node:
await client.shutdown()


def parse_args():
special_args = [
{
"name": ["-s", "--size",],
"default": "10000",
"metavar": "n",
"type": int,
"help": "The size n in n^2 (default 10000)",
},
{
"name": ["-t", "--type",],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Use GPU or CPU arrays",
},
{
"name": ["-c", "--chunk-size",],
"default": "128 MiB",
"metavar": "nbytes",
"type": str,
"help": "Chunk size (default '128 MiB')",
},
{
"name": ["-k", "--kernel-size",],
"default": "1",
"metavar": "k",
"type": int,
"help": "Kernel size, 2*k+1, in each dimension (default 1)",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
{"name": "--runs", "default": 3, "type": int, "help": "Number of runs",},
]

return parse_benchmark_args(
description="Transpose on LocalCUDACluster benchmark", args_list=special_args
)


def main():
args = parse_args()
asyncio.get_event_loop().run_until_complete(run(args))


if __name__ == "__main__":
main()
15 changes: 9 additions & 6 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from toolz import valmap
from tornado.ioloop import IOLoop

import dask
from distributed import Nanny
from distributed.config import config
from distributed.proctitle import (
enable_proctitle_on_children,
enable_proctitle_on_current,
Expand Down Expand Up @@ -47,7 +47,7 @@ def _get_interface(interface, host, cuda_device_index, ucx_net_devices):
class CUDAWorker:
def __init__(
self,
scheduler,
scheduler=None,
host=None,
nthreads=0,
name=None,
Expand Down Expand Up @@ -127,7 +127,11 @@ def del_pid_file():
kwargs = {"worker_port": None, "listen_address": None}
t = Nanny

if not scheduler and not scheduler_file and "scheduler-address" not in config:
if (
not scheduler
and not scheduler_file
and dask.config.get("scheduler-address", None) is None
):
raise ValueError(
"Need to provide scheduler address like\n"
"dask-worker SCHEDULER_ADDRESS:8786"
Expand All @@ -145,7 +149,8 @@ def del_pid_file():
"For installation instructions, please see "
"https://github.com/rapidsai/rmm"
) # pragma: no cover
rmm_pool_size = parse_bytes(rmm_pool_size)
if rmm_pool_size is not None:
rmm_pool_size = parse_bytes(rmm_pool_size)
else:
if enable_nvlink:
warnings.warn(
Expand All @@ -154,8 +159,6 @@ def del_pid_file():
"https://dask-cuda.readthedocs.io/en/latest/ucx.html"
"#important-notes for more details"
)
if rmm_pool_size is not None:
rmm_pool_size = parse_bytes(rmm_pool_size)

if enable_nvlink and rmm_managed_memory:
raise ValueError(
Expand Down
1 change: 1 addition & 0 deletions dask_cuda/explicit_comms/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .comms import *
from .dataframe_merge import dataframe_merge
from .dataframe_shuffle import dataframe_shuffle
Loading

0 comments on commit 4ec39de

Please sign in to comment.