From 66c1907ab87efa9cf1637b4791515609a3eb971f Mon Sep 17 00:00:00 2001 From: Ray Douglass Date: Fri, 22 Sep 2023 09:47:17 -0400 Subject: [PATCH 01/22] v23.12 Updates [skip ci] --- .github/workflows/build.yaml | 6 +++--- .github/workflows/pr.yaml | 10 +++++----- .github/workflows/test.yaml | 2 +- ci/build_docs.sh | 2 +- dask_cuda/__init__.py | 2 +- dependencies.yaml | 8 ++++---- pyproject.toml | 2 +- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index f5c7a0af4..6f78f70be 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -28,7 +28,7 @@ concurrency: jobs: conda-python-build: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} @@ -38,7 +38,7 @@ jobs: if: github.ref_type == 'branch' needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: arch: "amd64" branch: ${{ inputs.branch }} @@ -51,7 +51,7 @@ jobs: upload-conda: needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index f7846c226..0dac577db 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -18,26 +18,26 @@ jobs: - docs-build - wheel-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.12 checks: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.12 conda-python-build: needs: checks secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: pull-request conda-python-tests: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: pull-request docs-build: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: build_type: pull-request node_type: "gpu-v100-latest-1" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 188ceefde..ea6c93db2 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -16,7 +16,7 @@ on: jobs: conda-python-tests: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.10 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: nightly branch: ${{ inputs.branch }} diff --git a/ci/build_docs.sh b/ci/build_docs.sh index 662bb4884..a283ecc09 100755 --- a/ci/build_docs.sh +++ b/ci/build_docs.sh @@ -23,7 +23,7 @@ rapids-mamba-retry install \ --channel "${PYTHON_CHANNEL}" \ dask-cuda -export RAPIDS_VERSION_NUMBER="23.10" +export RAPIDS_VERSION_NUMBER="23.12" export RAPIDS_DOCS_DIR="$(mktemp -d)" rapids-logger "Build Python docs" diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index c33fae213..982184f0a 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -19,7 +19,7 @@ from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator -__version__ = "23.10.00" +__version__ = "23.12.00" # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` diff --git a/dependencies.yaml b/dependencies.yaml index 46500c172..6584b52dc 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -115,13 +115,13 @@ dependencies: common: - output_types: [conda] packages: - - cudf=23.10 - - dask-cudf=23.10 - - kvikio=23.10 + - cudf=23.12 + - dask-cudf=23.12 + - kvikio=23.12 - pytest - pytest-cov - ucx-proc=*=gpu - - ucx-py=0.34 + - ucx-py=0.35 specific: - output_types: conda matrices: diff --git a/pyproject.toml b/pyproject.toml index 73777b316..ea4534e13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ requires = [ [project] name = "dask-cuda" -version = "23.10.00" +version = "23.12.00" description = "Utilities for Dask and CUDA interactions" readme = { file = "README.md", content-type = "text/markdown" } authors = [ From 5e1c500a4ddceaafb61ce8c9d7327428093802f2 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Fri, 29 Sep 2023 14:38:52 -0500 Subject: [PATCH 02/22] dask-cuda: Build CUDA 12.0 ARM conda packages. (#1238) This PR builds conda packages using CUDA 12 on ARM. Closes #1232. Depends on https://github.com/rapidsai/cudf/pull/14112 for tests to pass. Authors: - Bradley Dice (https://github.com/bdice) Approvers: - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/dask-cuda/pull/1238 --- .github/workflows/build.yaml | 6 +++--- .github/workflows/pr.yaml | 10 +++++----- .github/workflows/test.yaml | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 6f78f70be..bb65c1e9d 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -28,7 +28,7 @@ concurrency: jobs: conda-python-build: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} @@ -38,7 +38,7 @@ jobs: if: github.ref_type == 'branch' needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm with: arch: "amd64" branch: ${{ inputs.branch }} @@ -51,7 +51,7 @@ jobs: upload-conda: needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@cuda-120-arm with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 0dac577db..0cbf82387 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -18,26 +18,26 @@ jobs: - docs-build - wheel-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@cuda-120-arm checks: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@cuda-120-arm conda-python-build: needs: checks secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm with: build_type: pull-request conda-python-tests: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm with: build_type: pull-request docs-build: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm with: build_type: pull-request node_type: "gpu-v100-latest-1" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index ea6c93db2..9a5d1c626 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -16,7 +16,7 @@ on: jobs: conda-python-tests: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm with: build_type: nightly branch: ${{ inputs.branch }} From b6212ea04b414012b292b35bee368e8f2b345acd Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 4 Oct 2023 19:05:43 +0200 Subject: [PATCH 03/22] Small reorganization and fixes for `test_spill` (#1255) Do a minor reorganization on how `client.run()` is invoked and the centralize functions to check host and disk chunks. The failures seem related to `del` not cleaning up objects in time, thus invoke garbage collection after `del` until it memory is actually released or the test times out. Local tests seem that invoking garbage collection once or twice is enough to prevent the test from failing. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/1255 --- dask_cuda/tests/test_spill.py | 94 ++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 30 deletions(-) diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 6a542cfb9..6172b0bc6 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -1,3 +1,4 @@ +import gc import os from time import sleep @@ -58,7 +59,10 @@ def assert_device_host_file_size( def worker_assert( - dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead + total_size, + device_chunk_overhead, + serialized_chunk_overhead, + dask_worker=None, ): assert_device_host_file_size( dask_worker.data, total_size, device_chunk_overhead, serialized_chunk_overhead @@ -66,7 +70,10 @@ def worker_assert( def delayed_worker_assert( - dask_worker, total_size, device_chunk_overhead, serialized_chunk_overhead + total_size, + device_chunk_overhead, + serialized_chunk_overhead, + dask_worker=None, ): start = time() while not device_host_file_size_matches( @@ -82,6 +89,18 @@ def delayed_worker_assert( ) +def assert_host_chunks(spills_to_disk, dask_worker=None): + if spills_to_disk is False: + assert len(dask_worker.data.host) + + +def assert_disk_chunks(spills_to_disk, dask_worker=None): + if spills_to_disk is True: + assert len(dask_worker.data.disk or list()) > 0 + else: + assert len(dask_worker.data.disk or list()) == 0 + + @pytest.mark.parametrize( "params", [ @@ -122,7 +141,7 @@ def delayed_worker_assert( }, ], ) -@gen_test(timeout=120) +@gen_test(timeout=30) async def test_cupy_cluster_device_spill(params): cupy = pytest.importorskip("cupy") with dask.config.set( @@ -144,6 +163,8 @@ async def test_cupy_cluster_device_spill(params): ) as cluster: async with Client(cluster, asynchronous=True) as client: + await client.wait_for_workers(1) + rs = da.random.RandomState(RandomState=cupy.random.RandomState) x = rs.random(int(50e6), chunks=2e6) await wait(x) @@ -153,7 +174,10 @@ async def test_cupy_cluster_device_spill(params): # Allow up to 1024 bytes overhead per chunk serialized await client.run( - lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024) + worker_assert, + x.nbytes, + 1024, + 1024, ) y = client.compute(x.sum()) @@ -162,20 +186,19 @@ async def test_cupy_cluster_device_spill(params): assert (abs(res / x.size) - 0.5) < 1e-3 await client.run( - lambda dask_worker: worker_assert(dask_worker, x.nbytes, 1024, 1024) + worker_assert, + x.nbytes, + 1024, + 1024, ) - host_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.host) + await client.run( + assert_host_chunks, + params["spills_to_disk"], ) - disk_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.disk or list()) + await client.run( + assert_disk_chunks, + params["spills_to_disk"], ) - for hc, dc in zip(host_chunks.values(), disk_chunks.values()): - if params["spills_to_disk"]: - assert dc > 0 - else: - assert hc > 0 - assert dc == 0 @pytest.mark.parametrize( @@ -218,7 +241,7 @@ async def test_cupy_cluster_device_spill(params): }, ], ) -@gen_test(timeout=120) +@gen_test(timeout=30) async def test_cudf_cluster_device_spill(params): cudf = pytest.importorskip("cudf") @@ -243,6 +266,8 @@ async def test_cudf_cluster_device_spill(params): ) as cluster: async with Client(cluster, asynchronous=True) as client: + await client.wait_for_workers(1) + # There's a known issue with datetime64: # https://github.com/numpy/numpy/issues/4983#issuecomment-441332940 # The same error above happens when spilling datetime64 to disk @@ -264,26 +289,35 @@ async def test_cudf_cluster_device_spill(params): await wait(cdf2) del cdf + gc.collect() - host_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.host) + await client.run( + assert_host_chunks, + params["spills_to_disk"], ) - disk_chunks = await client.run( - lambda dask_worker: len(dask_worker.data.disk or list()) + await client.run( + assert_disk_chunks, + params["spills_to_disk"], ) - for hc, dc in zip(host_chunks.values(), disk_chunks.values()): - if params["spills_to_disk"]: - assert dc > 0 - else: - assert hc > 0 - assert dc == 0 await client.run( - lambda dask_worker: worker_assert(dask_worker, nbytes, 32, 2048) + worker_assert, + nbytes, + 32, + 2048, ) del cdf2 - await client.run( - lambda dask_worker: delayed_worker_assert(dask_worker, 0, 0, 0) - ) + while True: + try: + await client.run( + delayed_worker_assert, + 0, + 0, + 0, + ) + except AssertionError: + gc.collect() + else: + break From 38eec6c2a0bbb6132c550bdf2fed9dbbe16f2f18 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 4 Oct 2023 15:20:16 -0500 Subject: [PATCH 04/22] Update plugins to inherit from ``WorkerPlugin`` (#1230) Upstream in `distributed` we're considering enforcing plugins to inherit from their respective base class (e.g. `WorkerPlugin`, `SchedulerPlugin`, `NannyPlugin`) https://github.com/dask/distributed/pull/8149. This PR updates plugins here to inherhit from `WorkerPlugin`. This makes things a little more future-proof and is probably a good thing to do anyways. Authors: - James Bourbeau (https://github.com/jrbourbeau) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/dask-cuda/pull/1230 --- dask_cuda/utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index a155dc593..1e244bb31 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -18,7 +18,7 @@ import distributed # noqa: required for dask.config.get("distributed.comm.ucx") from dask.config import canonical_name from dask.utils import format_bytes, parse_bytes -from distributed import Worker, wait +from distributed import Worker, WorkerPlugin, wait from distributed.comm import parse_address try: @@ -32,7 +32,7 @@ def nvtx_annotate(message=None, color="blue", domain=None): yield -class CPUAffinity: +class CPUAffinity(WorkerPlugin): def __init__(self, cores): self.cores = cores @@ -40,7 +40,7 @@ def setup(self, worker=None): os.sched_setaffinity(0, self.cores) -class RMMSetup: +class RMMSetup(WorkerPlugin): def __init__( self, initial_pool_size, @@ -135,7 +135,7 @@ def setup(self, worker=None): rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr)) -class PreImport: +class PreImport(WorkerPlugin): def __init__(self, libraries): if libraries is None: libraries = [] From 2ffd1d64c856e4d91ae6ee0098b47db0df8023da Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 11 Oct 2023 16:28:06 -0500 Subject: [PATCH 05/22] Use branch-23.12 workflows. (#1259) This PR switches back to using `branch-23.12` for CI workflows because the CUDA 12 ARM conda migration is complete. Authors: - Bradley Dice (https://github.com/bdice) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) - Jake Awe (https://github.com/AyodeAwe) URL: https://github.com/rapidsai/dask-cuda/pull/1259 --- .github/workflows/build.yaml | 6 +++--- .github/workflows/pr.yaml | 10 +++++----- .github/workflows/test.yaml | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index bb65c1e9d..6f78f70be 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -28,7 +28,7 @@ concurrency: jobs: conda-python-build: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} @@ -38,7 +38,7 @@ jobs: if: github.ref_type == 'branch' needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: arch: "amd64" branch: ${{ inputs.branch }} @@ -51,7 +51,7 @@ jobs: upload-conda: needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 0cbf82387..0dac577db 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -18,26 +18,26 @@ jobs: - docs-build - wheel-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.12 checks: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.12 conda-python-build: needs: checks secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: pull-request conda-python-tests: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: pull-request docs-build: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: build_type: pull-request node_type: "gpu-v100-latest-1" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9a5d1c626..ea6c93db2 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -16,7 +16,7 @@ on: jobs: conda-python-tests: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@cuda-120-arm + uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: nightly branch: ${{ inputs.branch }} From 48de0c5cb28d4a691ebebcdd0539226e74f4f69c Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 12 Oct 2023 21:01:22 +0200 Subject: [PATCH 06/22] Increase close timeout of `Nanny` in `LocalCUDACluster` (#1260) Tests in CI have been failing more often, but those errors can't be reproduced locally. This is possibly related to `Nanny`'s internal mechanism to establish timeouts to kill processes, perhaps due to higher load on the servers, tasks take longer and killing processes takes into account the overall time taken to establish a timeout, which is then drastically reduced leaving little time to actually shutdown processes. It is also not possible to programatically set a different timeout given existing Distributed's API, which currently calls `close()` without arguments in `SpecCluster._correct_state_internal()`. Given the limitations described above, a new class is added by this change with the sole purpose of rewriting the timeout for `Nanny.close()` method with an increased value, and then use the new class when launching `LocalCUDACluster` via the `worker_class` argument. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Benjamin Zaitlen (https://github.com/quasiben) URL: https://github.com/rapidsai/dask-cuda/pull/1260 --- dask_cuda/local_cuda_cluster.py | 22 ++++++++++++++-------- dask_cuda/tests/test_explicit_comms.py | 7 +++++++ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 324484331..ef15dcce3 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -2,6 +2,8 @@ import logging import os import warnings +from functools import partial +from typing import Literal import dask from distributed import LocalCluster, Nanny, Worker @@ -23,6 +25,13 @@ ) +class IncreasedCloseTimeoutNanny(Nanny): + async def close( # type:ignore[override] + self, timeout: float = 10.0, reason: str = "nanny-close" + ) -> Literal["OK"]: + return await super().close(timeout=timeout, reason=reason) + + class LoggedWorker(Worker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -32,7 +41,7 @@ async def start(self): self.data.set_address(self.address) -class LoggedNanny(Nanny): +class LoggedNanny(IncreasedCloseTimeoutNanny): def __init__(self, *args, **kwargs): super().__init__(*args, worker_class=LoggedWorker, **kwargs) @@ -333,13 +342,10 @@ def __init__( enable_rdmacm=enable_rdmacm, ) - if worker_class is not None: - from functools import partial - - worker_class = partial( - LoggedNanny if log_spilling is True else Nanny, - worker_class=worker_class, - ) + worker_class = partial( + LoggedNanny if log_spilling is True else IncreasedCloseTimeoutNanny, + worker_class=worker_class, + ) self.pre_import = pre_import diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index ae4e3332c..d9cd6dfb2 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -17,6 +17,7 @@ import dask_cuda from dask_cuda.explicit_comms import comms from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle +from dask_cuda.local_cuda_cluster import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") @@ -35,6 +36,7 @@ def _test_local_cluster(protocol): dashboard_address=None, n_workers=4, threads_per_worker=1, + worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: with Client(cluster) as client: @@ -56,6 +58,7 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): dashboard_address=None, n_workers=npartitions, threads_per_worker=1, + worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: with Client(cluster): @@ -102,6 +105,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): dashboard_address=None, n_workers=n_workers, threads_per_worker=1, + worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: with Client(cluster) as client: @@ -204,6 +208,7 @@ def check_shuffle(): dashboard_address=None, n_workers=2, threads_per_worker=1, + worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: with Client(cluster): @@ -221,6 +226,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): dashboard_address=None, n_workers=n_workers, threads_per_worker=1, + worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: with Client(cluster): @@ -327,6 +333,7 @@ def test_lock_workers(): dashboard_address=None, n_workers=4, threads_per_worker=5, + worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: ps = [] From d94a0281785e0b2511c6ded84417660a593c6aa8 Mon Sep 17 00:00:00 2001 From: Jake Awe <50372925+AyodeAwe@users.noreply.github.com> Date: Wed, 18 Oct 2023 08:37:30 -0500 Subject: [PATCH 07/22] update workflow links (#1261) --- .github/workflows/build.yaml | 6 +++--- .github/workflows/pr.yaml | 10 +++++----- .github/workflows/test.yaml | 2 +- ci/release/update-version.sh | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 6f78f70be..6e5f77d9b 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -28,7 +28,7 @@ concurrency: jobs: conda-python-build: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} @@ -38,7 +38,7 @@ jobs: if: github.ref_type == 'branch' needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: arch: "amd64" branch: ${{ inputs.branch }} @@ -51,7 +51,7 @@ jobs: upload-conda: needs: [conda-python-build] secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.12 with: build_type: ${{ inputs.build_type || 'branch' }} branch: ${{ inputs.branch }} diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 0dac577db..26a5e8e9c 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -18,26 +18,26 @@ jobs: - docs-build - wheel-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/pr-builder.yaml@branch-23.12 checks: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/checks.yaml@branch-23.12 conda-python-build: needs: checks secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-23.12 with: build_type: pull-request conda-python-tests: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: pull-request docs-build: needs: conda-python-build secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/custom-job.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-23.12 with: build_type: pull-request node_type: "gpu-v100-latest-1" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index ea6c93db2..9a5e0428a 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -16,7 +16,7 @@ on: jobs: conda-python-tests: secrets: inherit - uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 + uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-23.12 with: build_type: nightly branch: ${{ inputs.branch }} diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index 59360a689..c0e8c11d2 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -45,6 +45,6 @@ sed_runner "s/ucx-py=.*/ucx-py=${NEXT_UCXPY_VERSION}/g" dependencies.yaml # CI files for FILE in .github/workflows/*.yaml; do - sed_runner "/shared-action-workflows/ s/@.*/@branch-${NEXT_SHORT_TAG}/g" "${FILE}" + sed_runner "/shared-workflows/ s/@.*/@branch-${NEXT_SHORT_TAG}/g" "${FILE}" done sed_runner "s/RAPIDS_VERSION_NUMBER=\".*/RAPIDS_VERSION_NUMBER=\"${NEXT_SHORT_TAG}\"/g" ci/build_docs.sh From 84bfc149a222194e9fedec542a3eb68eca88672a Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 26 Oct 2023 10:31:18 +0200 Subject: [PATCH 08/22] Move some `dask_cuda.utils` pieces to their own modules (#1263) Move some functions and classes into their own modules: - Move plugins to new `dask_cuda.plugins` module; - Move test utils to `dask_cuda.utils_test` module; - Move `IncreasedCloseTimeoutNanny` to `dask_cuda.utils_test` module, not anymore as a default to `LocalCUDACluster`. Additionally, pass `worker_class=IncreasedCloseTimeoutNanny` to tests that have failed in the past due to `Nanny`'s close timeout. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/1263 --- dask_cuda/cuda_worker.py | 4 +- dask_cuda/local_cuda_cluster.py | 29 ++--- dask_cuda/plugins.py | 122 ++++++++++++++++++ dask_cuda/tests/test_dask_cuda_worker.py | 6 +- dask_cuda/tests/test_explicit_comms.py | 2 +- dask_cuda/tests/test_initialize.py | 5 + dask_cuda/tests/test_local_cuda_cluster.py | 2 +- dask_cuda/tests/test_proxify_host_file.py | 6 +- dask_cuda/tests/test_proxy.py | 2 + dask_cuda/tests/test_spill.py | 3 + dask_cuda/utils.py | 140 +-------------------- dask_cuda/utils_test.py | 45 +++++++ dask_cuda/worker_spec.py | 3 +- 13 files changed, 204 insertions(+), 165 deletions(-) create mode 100644 dask_cuda/plugins.py create mode 100644 dask_cuda/utils_test.py diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 9dc2d56ce..e25a7c142 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -20,11 +20,9 @@ from .device_host_file import DeviceHostFile from .initialize import initialize +from .plugins import CPUAffinity, PreImport, RMMSetup from .proxify_host_file import ProxifyHostFile from .utils import ( - CPUAffinity, - PreImport, - RMMSetup, cuda_visible_devices, get_cpu_affinity, get_n_gpus, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index ef15dcce3..d0ea92748 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -3,7 +3,6 @@ import os import warnings from functools import partial -from typing import Literal import dask from distributed import LocalCluster, Nanny, Worker @@ -11,11 +10,9 @@ from .device_host_file import DeviceHostFile from .initialize import initialize +from .plugins import CPUAffinity, PreImport, RMMSetup from .proxify_host_file import ProxifyHostFile from .utils import ( - CPUAffinity, - PreImport, - RMMSetup, cuda_visible_devices, get_cpu_affinity, get_ucx_config, @@ -25,13 +22,6 @@ ) -class IncreasedCloseTimeoutNanny(Nanny): - async def close( # type:ignore[override] - self, timeout: float = 10.0, reason: str = "nanny-close" - ) -> Literal["OK"]: - return await super().close(timeout=timeout, reason=reason) - - class LoggedWorker(Worker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -41,7 +31,7 @@ async def start(self): self.data.set_address(self.address) -class LoggedNanny(IncreasedCloseTimeoutNanny): +class LoggedNanny(Nanny): def __init__(self, *args, **kwargs): super().__init__(*args, worker_class=LoggedWorker, **kwargs) @@ -342,10 +332,17 @@ def __init__( enable_rdmacm=enable_rdmacm, ) - worker_class = partial( - LoggedNanny if log_spilling is True else IncreasedCloseTimeoutNanny, - worker_class=worker_class, - ) + if worker_class is not None: + if log_spilling is True: + raise ValueError( + "Cannot enable `log_spilling` when `worker_class` is specified. If " + "logging is needed, ensure `worker_class` is a subclass of " + "`distributed.local_cuda_cluster.LoggedNanny` or a subclass of " + "`distributed.local_cuda_cluster.LoggedWorker`, and specify " + "`log_spilling=False`." + ) + if not issubclass(worker_class, Nanny): + worker_class = partial(Nanny, worker_class=worker_class) self.pre_import = pre_import diff --git a/dask_cuda/plugins.py b/dask_cuda/plugins.py new file mode 100644 index 000000000..4eba97f2b --- /dev/null +++ b/dask_cuda/plugins.py @@ -0,0 +1,122 @@ +import importlib +import os + +from distributed import WorkerPlugin + +from .utils import get_rmm_log_file_name, parse_device_memory_limit + + +class CPUAffinity(WorkerPlugin): + def __init__(self, cores): + self.cores = cores + + def setup(self, worker=None): + os.sched_setaffinity(0, self.cores) + + +class RMMSetup(WorkerPlugin): + def __init__( + self, + initial_pool_size, + maximum_pool_size, + managed_memory, + async_alloc, + release_threshold, + log_directory, + track_allocations, + ): + if initial_pool_size is None and maximum_pool_size is not None: + raise ValueError( + "`rmm_maximum_pool_size` was specified without specifying " + "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." + ) + if async_alloc is True: + if managed_memory is True: + raise ValueError( + "`rmm_managed_memory` is incompatible with the `rmm_async`." + ) + if async_alloc is False and release_threshold is not None: + raise ValueError("`rmm_release_threshold` requires `rmm_async`.") + + self.initial_pool_size = initial_pool_size + self.maximum_pool_size = maximum_pool_size + self.managed_memory = managed_memory + self.async_alloc = async_alloc + self.release_threshold = release_threshold + self.logging = log_directory is not None + self.log_directory = log_directory + self.rmm_track_allocations = track_allocations + + def setup(self, worker=None): + if self.initial_pool_size is not None: + self.initial_pool_size = parse_device_memory_limit( + self.initial_pool_size, alignment_size=256 + ) + + if self.async_alloc: + import rmm + + if self.release_threshold is not None: + self.release_threshold = parse_device_memory_limit( + self.release_threshold, alignment_size=256 + ) + + mr = rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=self.initial_pool_size, + release_threshold=self.release_threshold, + ) + + if self.maximum_pool_size is not None: + self.maximum_pool_size = parse_device_memory_limit( + self.maximum_pool_size, alignment_size=256 + ) + mr = rmm.mr.LimitingResourceAdaptor( + mr, allocation_limit=self.maximum_pool_size + ) + + rmm.mr.set_current_device_resource(mr) + if self.logging: + rmm.enable_logging( + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ) + ) + elif self.initial_pool_size is not None or self.managed_memory: + import rmm + + pool_allocator = False if self.initial_pool_size is None else True + + if self.initial_pool_size is not None: + if self.maximum_pool_size is not None: + self.maximum_pool_size = parse_device_memory_limit( + self.maximum_pool_size, alignment_size=256 + ) + + rmm.reinitialize( + pool_allocator=pool_allocator, + managed_memory=self.managed_memory, + initial_pool_size=self.initial_pool_size, + maximum_pool_size=self.maximum_pool_size, + logging=self.logging, + log_file_name=get_rmm_log_file_name( + worker, self.logging, self.log_directory + ), + ) + if self.rmm_track_allocations: + import rmm + + mr = rmm.mr.get_current_device_resource() + rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr)) + + +class PreImport(WorkerPlugin): + def __init__(self, libraries): + if libraries is None: + libraries = [] + elif isinstance(libraries, str): + libraries = libraries.split(",") + self.libraries = libraries + + def setup(self, worker=None): + for l in self.libraries: + importlib.import_module(l) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 449fdba7e..974ad1319 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -40,7 +40,7 @@ def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 str(nthreads), "--no-dashboard", "--worker-class", - "dask_cuda.utils.MockWorker", + "dask_cuda.utils_test.MockWorker", ] ): with Client("127.0.0.1:9359", loop=loop) as client: @@ -329,7 +329,7 @@ def test_cuda_mig_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: str(nthreads), "--no-dashboard", "--worker-class", - "dask_cuda.utils.MockWorker", + "dask_cuda.utils_test.MockWorker", ] ): with Client("127.0.0.1:9359", loop=loop) as client: @@ -364,7 +364,7 @@ def test_cuda_visible_devices_uuid(loop): # noqa: F811 "127.0.0.1", "--no-dashboard", "--worker-class", - "dask_cuda.utils.MockWorker", + "dask_cuda.utils_test.MockWorker", ] ): with Client("127.0.0.1:9359", loop=loop) as client: diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index d9cd6dfb2..bd6770225 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -17,7 +17,7 @@ import dask_cuda from dask_cuda.explicit_comms import comms from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle -from dask_cuda.local_cuda_cluster import IncreasedCloseTimeoutNanny +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") diff --git a/dask_cuda/tests/test_initialize.py b/dask_cuda/tests/test_initialize.py index 60c7a798f..05b72f996 100644 --- a/dask_cuda/tests/test_initialize.py +++ b/dask_cuda/tests/test_initialize.py @@ -10,6 +10,7 @@ from dask_cuda.initialize import initialize from dask_cuda.utils import get_ucx_config +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") @@ -29,6 +30,7 @@ def _test_initialize_ucx_tcp(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -64,6 +66,7 @@ def _test_initialize_ucx_nvlink(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -100,6 +103,7 @@ def _test_initialize_ucx_infiniband(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -138,6 +142,7 @@ def _test_initialize_ucx_all(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config()}, ) as cluster: with Client(cluster) as client: diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 845759dfd..5d7762579 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -13,13 +13,13 @@ from dask_cuda import CUDAWorker, LocalCUDACluster, utils from dask_cuda.initialize import initialize from dask_cuda.utils import ( - MockWorker, get_cluster_configuration, get_device_total_memory, get_gpu_count_mig, get_gpu_uuid_from_index, print_cluster_config, ) +from dask_cuda.utils_test import MockWorker @gen_test(timeout=20) diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 2e3f8269d..191f62fe4 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -19,6 +19,7 @@ from dask_cuda.proxify_host_file import ProxifyHostFile from dask_cuda.proxy_object import ProxyObject, asproxy, unproxy from dask_cuda.utils import get_device_total_memory +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny cupy = pytest.importorskip("cupy") cupy.cuda.set_allocator(None) @@ -393,7 +394,10 @@ def is_proxy_object(x): with dask.config.set(jit_unspill_compatibility_mode=compatibility_mode): async with dask_cuda.LocalCUDACluster( - n_workers=1, jit_unspill=True, asynchronous=True + n_workers=1, + jit_unspill=True, + worker_class=IncreasedCloseTimeoutNanny, + asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: ddf = dask.dataframe.from_pandas( diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index c779a39ef..8de56a5c5 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -23,6 +23,7 @@ from dask_cuda.disk_io import SpillToDiskFile from dask_cuda.proxify_device_objects import proxify_device_objects from dask_cuda.proxify_host_file import ProxifyHostFile +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny # Make the "disk" serializer available and use a directory that are # remove on exit. @@ -422,6 +423,7 @@ def task(x): async with dask_cuda.LocalCUDACluster( n_workers=1, protocol=protocol, + worker_class=IncreasedCloseTimeoutNanny, asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 6172b0bc6..f8df7e04f 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -12,6 +12,7 @@ from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401 from dask_cuda import LocalCUDACluster, utils +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny if utils.get_device_total_memory() < 1e10: pytest.skip("Not enough GPU memory", allow_module_level=True) @@ -160,6 +161,7 @@ async def test_cupy_cluster_device_spill(params): asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], + worker_class=IncreasedCloseTimeoutNanny, ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -263,6 +265,7 @@ async def test_cudf_cluster_device_spill(params): asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], + worker_class=IncreasedCloseTimeoutNanny, ) as cluster: async with Client(cluster, asynchronous=True) as client: diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 1e244bb31..f16ad18a2 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -1,4 +1,3 @@ -import importlib import math import operator import os @@ -18,7 +17,7 @@ import distributed # noqa: required for dask.config.get("distributed.comm.ucx") from dask.config import canonical_name from dask.utils import format_bytes, parse_bytes -from distributed import Worker, WorkerPlugin, wait +from distributed import wait from distributed.comm import parse_address try: @@ -32,122 +31,6 @@ def nvtx_annotate(message=None, color="blue", domain=None): yield -class CPUAffinity(WorkerPlugin): - def __init__(self, cores): - self.cores = cores - - def setup(self, worker=None): - os.sched_setaffinity(0, self.cores) - - -class RMMSetup(WorkerPlugin): - def __init__( - self, - initial_pool_size, - maximum_pool_size, - managed_memory, - async_alloc, - release_threshold, - log_directory, - track_allocations, - ): - if initial_pool_size is None and maximum_pool_size is not None: - raise ValueError( - "`rmm_maximum_pool_size` was specified without specifying " - "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." - ) - if async_alloc is True: - if managed_memory is True: - raise ValueError( - "`rmm_managed_memory` is incompatible with the `rmm_async`." - ) - if async_alloc is False and release_threshold is not None: - raise ValueError("`rmm_release_threshold` requires `rmm_async`.") - - self.initial_pool_size = initial_pool_size - self.maximum_pool_size = maximum_pool_size - self.managed_memory = managed_memory - self.async_alloc = async_alloc - self.release_threshold = release_threshold - self.logging = log_directory is not None - self.log_directory = log_directory - self.rmm_track_allocations = track_allocations - - def setup(self, worker=None): - if self.initial_pool_size is not None: - self.initial_pool_size = parse_device_memory_limit( - self.initial_pool_size, alignment_size=256 - ) - - if self.async_alloc: - import rmm - - if self.release_threshold is not None: - self.release_threshold = parse_device_memory_limit( - self.release_threshold, alignment_size=256 - ) - - mr = rmm.mr.CudaAsyncMemoryResource( - initial_pool_size=self.initial_pool_size, - release_threshold=self.release_threshold, - ) - - if self.maximum_pool_size is not None: - self.maximum_pool_size = parse_device_memory_limit( - self.maximum_pool_size, alignment_size=256 - ) - mr = rmm.mr.LimitingResourceAdaptor( - mr, allocation_limit=self.maximum_pool_size - ) - - rmm.mr.set_current_device_resource(mr) - if self.logging: - rmm.enable_logging( - log_file_name=get_rmm_log_file_name( - worker, self.logging, self.log_directory - ) - ) - elif self.initial_pool_size is not None or self.managed_memory: - import rmm - - pool_allocator = False if self.initial_pool_size is None else True - - if self.initial_pool_size is not None: - if self.maximum_pool_size is not None: - self.maximum_pool_size = parse_device_memory_limit( - self.maximum_pool_size, alignment_size=256 - ) - - rmm.reinitialize( - pool_allocator=pool_allocator, - managed_memory=self.managed_memory, - initial_pool_size=self.initial_pool_size, - maximum_pool_size=self.maximum_pool_size, - logging=self.logging, - log_file_name=get_rmm_log_file_name( - worker, self.logging, self.log_directory - ), - ) - if self.rmm_track_allocations: - import rmm - - mr = rmm.mr.get_current_device_resource() - rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr)) - - -class PreImport(WorkerPlugin): - def __init__(self, libraries): - if libraries is None: - libraries = [] - elif isinstance(libraries, str): - libraries = libraries.split(",") - self.libraries = libraries - - def setup(self, worker=None): - for l in self.libraries: - importlib.import_module(l) - - def unpack_bitmask(x, mask_bits=64): """Unpack a list of integers containing bitmasks. @@ -669,27 +552,6 @@ def _align(size, alignment_size): return _align(int(device_memory_limit), alignment_size) -class MockWorker(Worker): - """Mock Worker class preventing NVML from getting used by SystemMonitor. - - By preventing the Worker from initializing NVML in the SystemMonitor, we can - mock test multiple devices in `CUDA_VISIBLE_DEVICES` behavior with single-GPU - machines. - """ - - def __init__(self, *args, **kwargs): - distributed.diagnostics.nvml.device_get_count = MockWorker.device_get_count - self._device_get_count = distributed.diagnostics.nvml.device_get_count - super().__init__(*args, **kwargs) - - def __del__(self): - distributed.diagnostics.nvml.device_get_count = self._device_get_count - - @staticmethod - def device_get_count(): - return 0 - - def get_gpu_uuid_from_index(device_index=0): """Get GPU UUID from CUDA device index. diff --git a/dask_cuda/utils_test.py b/dask_cuda/utils_test.py new file mode 100644 index 000000000..aba77ee79 --- /dev/null +++ b/dask_cuda/utils_test.py @@ -0,0 +1,45 @@ +from typing import Literal + +import distributed +from distributed import Nanny, Worker + + +class MockWorker(Worker): + """Mock Worker class preventing NVML from getting used by SystemMonitor. + + By preventing the Worker from initializing NVML in the SystemMonitor, we can + mock test multiple devices in `CUDA_VISIBLE_DEVICES` behavior with single-GPU + machines. + """ + + def __init__(self, *args, **kwargs): + distributed.diagnostics.nvml.device_get_count = MockWorker.device_get_count + self._device_get_count = distributed.diagnostics.nvml.device_get_count + super().__init__(*args, **kwargs) + + def __del__(self): + distributed.diagnostics.nvml.device_get_count = self._device_get_count + + @staticmethod + def device_get_count(): + return 0 + + +class IncreasedCloseTimeoutNanny(Nanny): + """Increase `Nanny`'s close timeout. + + The internal close timeout mechanism of `Nanny` recomputes the time left to kill + the `Worker` process based on elapsed time of the close task, which may leave + very little time for the subprocess to shutdown cleanly, which may cause tests + to fail when the system is under higher load. This class increases the default + close timeout of 5.0 seconds that `Nanny` sets by default, which can be overriden + via Distributed's public API. + + This class can be used with the `worker_class` argument of `LocalCluster` or + `LocalCUDACluster` to provide a much higher default of 30.0 seconds. + """ + + async def close( # type:ignore[override] + self, timeout: float = 30.0, reason: str = "nanny-close" + ) -> Literal["OK"]: + return await super().close(timeout=timeout, reason=reason) diff --git a/dask_cuda/worker_spec.py b/dask_cuda/worker_spec.py index 6a61fa8f8..84ce51725 100644 --- a/dask_cuda/worker_spec.py +++ b/dask_cuda/worker_spec.py @@ -5,7 +5,8 @@ from .initialize import initialize from .local_cuda_cluster import cuda_visible_devices -from .utils import CPUAffinity, get_cpu_affinity, get_gpu_count +from .plugins import CPUAffinity +from .utils import get_cpu_affinity, get_gpu_count def worker_spec( From fbeee9ce67b8c99573457f047ddb41f6519ba926 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Fri, 27 Oct 2023 04:58:05 -0500 Subject: [PATCH 09/22] Unpin `dask` and `distributed` for `23.12` development (#1264) This PR relaxes `dask` and `distributed` versions pinning for `23.12` development. xref: https://github.com/rapidsai/cudf/pull/14320 Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - https://github.com/jakirkham - Peter Andreas Entschev (https://github.com/pentschev) - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/dask-cuda/pull/1264 --- conda/recipes/dask-cuda/meta.yaml | 2 +- dask_cuda/__init__.py | 2 - dask_cuda/compat.py | 118 --------------------- dask_cuda/device_host_file.py | 2 +- dask_cuda/tests/test_explicit_comms.py | 4 + dask_cuda/tests/test_local_cuda_cluster.py | 2 + dependencies.yaml | 6 +- pyproject.toml | 4 +- 8 files changed, 13 insertions(+), 127 deletions(-) delete mode 100644 dask_cuda/compat.py diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 08df9e563..3b0c15626 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -32,7 +32,7 @@ requirements: - tomli run: - python - - dask-core ==2023.9.2 + - dask-core >=2023.9.2 {% for r in data.get("project", {}).get("dependencies", []) %} - {{ r }} {% endfor %} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 2218e47e5..9d6917ef6 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -21,8 +21,6 @@ __version__ = "23.12.00" -from . import compat - # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( dask.dataframe.shuffle.rearrange_by_column diff --git a/dask_cuda/compat.py b/dask_cuda/compat.py deleted file mode 100644 index 1c09337b2..000000000 --- a/dask_cuda/compat.py +++ /dev/null @@ -1,118 +0,0 @@ -import pickle - -import msgpack -from packaging.version import Version - -import dask -import distributed -import distributed.comm.utils -import distributed.protocol -from distributed.comm.utils import OFFLOAD_THRESHOLD, nbytes, offload -from distributed.protocol.core import ( - Serialized, - decompress, - logger, - merge_and_deserialize, - msgpack_decode_default, - msgpack_opts, -) - -if Version(distributed.__version__) >= Version("2023.8.1"): - # Monkey-patch protocol.core.loads (and its users) - async def from_frames( - frames, deserialize=True, deserializers=None, allow_offload=True - ): - """ - Unserialize a list of Distributed protocol frames. - """ - size = False - - def _from_frames(): - try: - # Patched code - return loads( - frames, deserialize=deserialize, deserializers=deserializers - ) - # end patched code - except EOFError: - if size > 1000: - datastr = "[too large to display]" - else: - datastr = frames - # Aid diagnosing - logger.error("truncated data stream (%d bytes): %s", size, datastr) - raise - - if allow_offload and deserialize and OFFLOAD_THRESHOLD: - size = sum(map(nbytes, frames)) - if ( - allow_offload - and deserialize - and OFFLOAD_THRESHOLD - and size > OFFLOAD_THRESHOLD - ): - res = await offload(_from_frames) - else: - res = _from_frames() - - return res - - def loads(frames, deserialize=True, deserializers=None): - """Transform bytestream back into Python value""" - - allow_pickle = dask.config.get("distributed.scheduler.pickle") - - try: - - def _decode_default(obj): - offset = obj.get("__Serialized__", 0) - if offset > 0: - sub_header = msgpack.loads( - frames[offset], - object_hook=msgpack_decode_default, - use_list=False, - **msgpack_opts, - ) - offset += 1 - sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] - if deserialize: - if "compression" in sub_header: - sub_frames = decompress(sub_header, sub_frames) - return merge_and_deserialize( - sub_header, sub_frames, deserializers=deserializers - ) - else: - return Serialized(sub_header, sub_frames) - - offset = obj.get("__Pickled__", 0) - if offset > 0: - sub_header = msgpack.loads(frames[offset]) - offset += 1 - sub_frames = frames[offset : offset + sub_header["num-sub-frames"]] - # Patched code - if "compression" in sub_header: - sub_frames = decompress(sub_header, sub_frames) - # end patched code - if allow_pickle: - return pickle.loads( - sub_header["pickled-obj"], buffers=sub_frames - ) - else: - raise ValueError( - "Unpickle on the Scheduler isn't allowed, " - "set `distributed.scheduler.pickle=true`" - ) - - return msgpack_decode_default(obj) - - return msgpack.loads( - frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts - ) - - except Exception: - logger.critical("Failed to deserialize", exc_info=True) - raise - - distributed.protocol.loads = loads - distributed.protocol.core.loads = loads - distributed.comm.utils.from_frames = from_frames diff --git a/dask_cuda/device_host_file.py b/dask_cuda/device_host_file.py index 7942f6547..b646a9294 100644 --- a/dask_cuda/device_host_file.py +++ b/dask_cuda/device_host_file.py @@ -17,7 +17,7 @@ serialize_bytelist, ) from distributed.sizeof import safe_sizeof -from distributed.spill import CustomFile as KeyAsStringFile +from distributed.spill import AnyKeyFile as KeyAsStringFile from distributed.utils import nbytes from .is_device_object import is_device_object diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index bd6770225..7d8e1b194 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -164,6 +164,8 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): @pytest.mark.parametrize("_partitions", [True, False]) def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": + pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") + pytest.importorskip("cudf") p = mp.Process( @@ -259,6 +261,8 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) def test_dataframe_shuffle_merge(backend, protocol, nworkers): if backend == "cudf": + pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") + pytest.importorskip("cudf") p = mp.Process( target=_test_dataframe_shuffle_merge, args=(backend, protocol, nworkers) diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 5d7762579..3298cf219 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -337,6 +337,7 @@ async def test_pre_import(): # Intentionally not using @gen_test to skip cleanup checks +@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/1265") def test_pre_import_not_found(): async def _test_pre_import_not_found(): with raises_with_cause(RuntimeError, None, ImportError, None): @@ -491,6 +492,7 @@ def test_print_cluster_config(capsys): assert "[plugin]" in captured.out +@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/1265") def test_death_timeout_raises(): with pytest.raises(asyncio.exceptions.TimeoutError): with LocalCUDACluster( diff --git a/dependencies.yaml b/dependencies.yaml index 703c52074..1022b3a38 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -101,8 +101,8 @@ dependencies: common: - output_types: [conda, requirements] packages: - - dask==2023.9.2 - - distributed==2023.9.2 + - dask>=2023.9.2 + - distributed>=2023.9.2 - numba>=0.57 - numpy>=1.21 - pandas>=1.3,<1.6.0dev0 @@ -110,7 +110,7 @@ dependencies: - zict>=2.0.0 - output_types: [conda] packages: - - dask-core==2023.9.2 + - dask-core>=2023.9.2 test_python: common: - output_types: [conda] diff --git a/pyproject.toml b/pyproject.toml index 0ceae5db4..2ebe09bc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,8 +16,8 @@ authors = [ license = { text = "Apache-2.0" } requires-python = ">=3.9" dependencies = [ - "dask ==2023.9.2", - "distributed ==2023.9.2", + "dask >=2023.9.2", + "distributed >=2023.9.2", "pynvml >=11.0.0,<11.5", "numpy >=1.21", "numba >=0.57", From a0c6da3b7742d27ef4b10afa18882a09e657dda9 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 27 Oct 2023 15:34:34 +0200 Subject: [PATCH 10/22] Reenable tests that were segfaulting (#1266) Some tests were previously disabled in https://github.com/rapidsai/dask-cuda/pull/1264 to prevent segfaults that should now be resolved after upgrading to Arrow 13. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/dask-cuda/pull/1266 --- dask_cuda/tests/test_explicit_comms.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 7d8e1b194..bd6770225 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -164,8 +164,6 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): @pytest.mark.parametrize("_partitions", [True, False]) def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": - pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") - pytest.importorskip("cudf") p = mp.Process( @@ -261,8 +259,6 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): @pytest.mark.parametrize("protocol", ["tcp", "ucx"]) def test_dataframe_shuffle_merge(backend, protocol, nworkers): if backend == "cudf": - pytest.skip("Temporarily disable due to segfaults in libaws-cpp-sdk-core.so") - pytest.importorskip("cudf") p = mp.Process( target=_test_dataframe_shuffle_merge, args=(backend, protocol, nworkers) From d9e10013afb5501f1438d2a7f8d0394368e30108 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Fri, 27 Oct 2023 17:03:32 -0500 Subject: [PATCH 11/22] Generate proper, consistent nightly versions for pip and conda packages (#1267) This PR changes conda Python packages and wheels to all generate a consistent version for nightlies. The nightly version is of the form YY.MM.DDaN, where N is the number of commits from the last tag. The version is embedded in both the package metadata and in the `dask_cuda.__version__` attribute. In addition the commit hash itself is embedded into the package as `dask_cuda.__git_commit__`. These changes ensure that 1. wheels are properly considered nightlies and are treated accordingly by pip (e.g. requiring --pre for installation, not conflicting with normal releases, etc) 2. wheels and conda packages are aligned on versions so that they can be easily compared if necessary. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - https://github.com/jakirkham - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/dask-cuda/pull/1267 --- MANIFEST.in | 1 + VERSION | 1 + ci/build_python.sh | 10 +++++++++- ci/build_python_pypi.sh | 19 ++++++++----------- ci/release/update-version.sh | 7 ++----- conda/recipes/dask-cuda/meta.yaml | 4 ++-- dask_cuda/VERSION | 1 + dask_cuda/__init__.py | 2 +- dask_cuda/_version.py | 20 ++++++++++++++++++++ pyproject.toml | 5 ++++- 10 files changed, 49 insertions(+), 21 deletions(-) create mode 100644 VERSION create mode 120000 dask_cuda/VERSION create mode 100644 dask_cuda/_version.py diff --git a/MANIFEST.in b/MANIFEST.in index 344d51cc8..d97770d06 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1,2 @@ include dask_cuda/_version.py +include dask_cuda/VERSION diff --git a/VERSION b/VERSION new file mode 100644 index 000000000..a193fff41 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +23.12.00 diff --git a/ci/build_python.sh b/ci/build_python.sh index d4a28497d..23c806704 100755 --- a/ci/build_python.sh +++ b/ci/build_python.sh @@ -9,9 +9,17 @@ export CMAKE_GENERATOR=Ninja rapids-print-env +package_name="dask_cuda" + +version=$(rapids-generate-version) +commit=$(git rev-parse HEAD) + +echo "${version}" | tr -d '"' > VERSION +sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_name}/_version.py" + rapids-logger "Begin py build" -rapids-conda-retry mambabuild \ +RAPIDS_PACKAGE_VERSION=${version} rapids-conda-retry mambabuild \ conda/recipes/dask-cuda rapids-upload-conda-to-s3 python diff --git a/ci/build_python_pypi.sh b/ci/build_python_pypi.sh index 6b72b96d7..b13783d16 100755 --- a/ci/build_python_pypi.sh +++ b/ci/build_python_pypi.sh @@ -3,6 +3,9 @@ python -m pip install build --user + +version=$(rapids-generate-version) +commit=$(git rev-parse HEAD) # While conda provides these during conda-build, they are also necessary during # the setup.py build for PyPI export GIT_DESCRIBE_TAG=$(git describe --abbrev=0 --tags) @@ -11,25 +14,19 @@ export GIT_DESCRIBE_NUMBER=$(git rev-list ${GIT_DESCRIBE_TAG}..HEAD --count) # Build date for PyPI pre-releases using version from `pyproject.toml` as source. TOML_VERSION=$(grep "version = .*" pyproject.toml | grep -o '".*"' | sed 's/"//g') if ! rapids-is-release-build; then - export BUILD_DATE=$(date +%y%m%d) - export PACKAGE_VERSION_NUMBER="${TOML_VERSION}a${BUILD_DATE}" + export PACKAGE_VERSION_NUMBER="${version}" fi + +echo "${version}" | tr -d '"' > VERSION +sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_name}/_version.py" + # Compute/export RAPIDS_DATE_STRING source rapids-env-update -# Update pyproject.toml with pre-release build date -if ! rapids-is-release-build; then - sed -i "s/^version = \""${TOML_VERSION}".*\"/version = \""${PACKAGE_VERSION_NUMBER}"\"/g" pyproject.toml -fi python -m build \ --sdist \ --wheel \ --outdir dist/ \ . - -# Revert pyproject.toml pre-release build date -if ! rapids-is-release-build; then - sed -i "s/^version = \""${PACKAGE_VERSION_NUMBER}"\"/version = \""${TOML_VERSION}"\"/g" pyproject.toml -fi diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index c0e8c11d2..94cd5d12b 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -31,11 +31,8 @@ function sed_runner() { sed -i.bak ''"$1"'' $2 && rm -f ${2}.bak } -# Python __init__.py updates -sed_runner "s/__version__ = .*/__version__ = \"${NEXT_FULL_TAG}\"/g" dask_cuda/__init__.py - -# Python pyproject.toml updates -sed_runner "s/^version = .*/version = \"${NEXT_FULL_TAG}\"/g" pyproject.toml +# Centralized version file update +echo "${NEXT_FULL_TAG}" | tr -d '"' > VERSION # Bump cudf and dask-cudf testing dependencies sed_runner "s/cudf=.*/cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 3b0c15626..6804b1ce4 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -4,7 +4,7 @@ # conda build -c conda-forge . {% set data = load_file_data("pyproject.toml") %} -{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') %} +{% set version = environ['RAPIDS_PACKAGE_VERSION'].strip('""').lstrip('v') %} {% set py_version = environ['CONDA_PY'] %} {% set date_string = environ['RAPIDS_DATE_STRING'] %} @@ -13,7 +13,7 @@ package: version: {{ version }} source: - git_url: ../../.. + path: ../../.. build: number: {{ GIT_DESCRIBE_NUMBER }} diff --git a/dask_cuda/VERSION b/dask_cuda/VERSION new file mode 120000 index 000000000..6ff19de4b --- /dev/null +++ b/dask_cuda/VERSION @@ -0,0 +1 @@ +../VERSION \ No newline at end of file diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 9d6917ef6..dbbb1f7fb 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -11,6 +11,7 @@ import dask.dataframe.multi import dask.bag.core +from ._version import __git_commit__, __version__ from .cuda_worker import CUDAWorker from .explicit_comms.dataframe.shuffle import ( get_rearrange_by_column_wrapper, @@ -19,7 +20,6 @@ from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator -__version__ = "23.12.00" # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( diff --git a/dask_cuda/_version.py b/dask_cuda/_version.py new file mode 100644 index 000000000..c54072ba5 --- /dev/null +++ b/dask_cuda/_version.py @@ -0,0 +1,20 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib.resources + +__version__ = ( + importlib.resources.files("dask_cuda").joinpath("VERSION").read_text().strip() +) +__git_commit__ = "" diff --git a/pyproject.toml b/pyproject.toml index 2ebe09bc7..c240e61b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ requires = [ [project] name = "dask-cuda" -version = "23.12.00" +dynamic = ["version"] description = "Utilities for Dask and CUDA interactions" readme = { file = "README.md", content-type = "text/markdown" } authors = [ @@ -123,6 +123,9 @@ filterwarnings = [ [tool.setuptools] license-files = ["LICENSE"] +[tool.setuptools.dynamic] +version = {file = "dask_cuda/VERSION"} + [tool.setuptools.packages.find] exclude = [ "docs", From 004185e01e3c4a26f1bb4213f3bc815f324e0c2f Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 31 Oct 2023 19:07:48 +0100 Subject: [PATCH 12/22] Add support for UCXX (#1268) Add support for UCXX via support for `protocol="ucxx"`. Extend existing UCX-Py tests to test both UCX-Py and UCXX now. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/dask-cuda/pull/1268 --- ci/test_python.sh | 2 +- dask_cuda/benchmarks/local_cudf_groupby.py | 2 +- dask_cuda/benchmarks/local_cudf_merge.py | 2 +- dask_cuda/benchmarks/local_cudf_shuffle.py | 2 +- dask_cuda/benchmarks/local_cupy.py | 2 +- .../benchmarks/local_cupy_map_overlap.py | 2 +- dask_cuda/benchmarks/utils.py | 2 +- dask_cuda/initialize.py | 63 ++++++++++---- dask_cuda/local_cuda_cluster.py | 9 +- dask_cuda/tests/test_dgx.py | 42 ++++++--- dask_cuda/tests/test_explicit_comms.py | 8 +- dask_cuda/tests/test_from_array.py | 8 +- dask_cuda/tests/test_initialize.py | 85 ++++++++++++++----- dask_cuda/tests/test_local_cuda_cluster.py | 57 ++++++++++--- dask_cuda/tests/test_proxy.py | 14 ++- dask_cuda/tests/test_utils.py | 26 ++++-- dask_cuda/utils.py | 6 +- dependencies.yaml | 2 + 18 files changed, 248 insertions(+), 86 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index 827eb84c9..ca4140bae 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -45,7 +45,7 @@ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ UCX_WARN_UNUSED_ENV_VARS=n \ UCX_MEMTYPE_CACHE=n \ -timeout 40m pytest \ +timeout 60m pytest \ -vv \ --durations=0 \ --capture=no \ diff --git a/dask_cuda/benchmarks/local_cudf_groupby.py b/dask_cuda/benchmarks/local_cudf_groupby.py index 4e9dea94e..2f07e3df7 100644 --- a/dask_cuda/benchmarks/local_cudf_groupby.py +++ b/dask_cuda/benchmarks/local_cudf_groupby.py @@ -139,7 +139,7 @@ def pretty_print_results(args, address_to_index, p2p_bw, results): key="Device memory limit", value=f"{format_bytes(args.device_memory_limit)}" ) print_key_value(key="RMM Pool", value=f"{not args.disable_rmm_pool}") - if args.protocol == "ucx": + if args.protocol in ["ucx", "ucxx"]: print_key_value(key="TCP", value=f"{args.enable_tcp_over_ucx}") print_key_value(key="InfiniBand", value=f"{args.enable_infiniband}") print_key_value(key="NVLink", value=f"{args.enable_nvlink}") diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index f26a26ae9..ba3a9d56d 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -217,7 +217,7 @@ def pretty_print_results(args, address_to_index, p2p_bw, results): ) print_key_value(key="RMM Pool", value=f"{not args.disable_rmm_pool}") print_key_value(key="Frac-match", value=f"{args.frac_match}") - if args.protocol == "ucx": + if args.protocol in ["ucx", "ucxx"]: print_key_value(key="TCP", value=f"{args.enable_tcp_over_ucx}") print_key_value(key="InfiniBand", value=f"{args.enable_infiniband}") print_key_value(key="NVLink", value=f"{args.enable_nvlink}") diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 51ba48f93..a3492b664 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -146,7 +146,7 @@ def pretty_print_results(args, address_to_index, p2p_bw, results): key="Device memory limit", value=f"{format_bytes(args.device_memory_limit)}" ) print_key_value(key="RMM Pool", value=f"{not args.disable_rmm_pool}") - if args.protocol == "ucx": + if args.protocol in ["ucx", "ucxx"]: print_key_value(key="TCP", value=f"{args.enable_tcp_over_ucx}") print_key_value(key="InfiniBand", value=f"{args.enable_infiniband}") print_key_value(key="NVLink", value=f"{args.enable_nvlink}") diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index 1c1d12d30..22c51556f 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -193,7 +193,7 @@ def pretty_print_results(args, address_to_index, p2p_bw, results): ) print_key_value(key="RMM Pool", value=f"{not args.disable_rmm_pool}") print_key_value(key="Protocol", value=f"{args.protocol}") - if args.protocol == "ucx": + if args.protocol in ["ucx", "ucxx"]: print_key_value(key="TCP", value=f"{args.enable_tcp_over_ucx}") print_key_value(key="InfiniBand", value=f"{args.enable_infiniband}") print_key_value(key="NVLink", value=f"{args.enable_nvlink}") diff --git a/dask_cuda/benchmarks/local_cupy_map_overlap.py b/dask_cuda/benchmarks/local_cupy_map_overlap.py index f40318559..8250c9f9f 100644 --- a/dask_cuda/benchmarks/local_cupy_map_overlap.py +++ b/dask_cuda/benchmarks/local_cupy_map_overlap.py @@ -78,7 +78,7 @@ def pretty_print_results(args, address_to_index, p2p_bw, results): ) print_key_value(key="RMM Pool", value=f"{not args.disable_rmm_pool}") print_key_value(key="Protocol", value=f"{args.protocol}") - if args.protocol == "ucx": + if args.protocol in ["ucx", "ucxx"]: print_key_value(key="TCP", value=f"{args.enable_tcp_over_ucx}") print_key_value(key="InfiniBand", value=f"{args.enable_infiniband}") print_key_value(key="NVLink", value=f"{args.enable_nvlink}") diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index d3ce666b2..51fae7201 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -73,7 +73,7 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] cluster_args.add_argument( "-p", "--protocol", - choices=["tcp", "ucx"], + choices=["tcp", "ucx", "ucxx"], default="tcp", type=str, help="The communication protocol to use.", diff --git a/dask_cuda/initialize.py b/dask_cuda/initialize.py index 0b9c92a59..571a46a55 100644 --- a/dask_cuda/initialize.py +++ b/dask_cuda/initialize.py @@ -5,7 +5,6 @@ import numba.cuda import dask -import distributed.comm.ucx from distributed.diagnostics.nvml import get_device_index_and_uuid, has_cuda_context from .utils import get_ucx_config @@ -23,12 +22,21 @@ def _create_cuda_context_handler(): numba.cuda.current_context() -def _create_cuda_context(): +def _create_cuda_context(protocol="ucx"): + if protocol not in ["ucx", "ucxx"]: + return try: # Added here to ensure the parent `LocalCUDACluster` process creates the CUDA # context directly from the UCX module, thus avoiding a similar warning there. try: - distributed.comm.ucx.init_once() + if protocol == "ucx": + import distributed.comm.ucx + + distributed.comm.ucx.init_once() + elif protocol == "ucxx": + import distributed_ucxx.ucxx + + distributed_ucxx.ucxx.init_once() except ModuleNotFoundError: # UCX initialization has to be delegated to Distributed, it will take care # of setting correct environment variables and importing `ucp` after that. @@ -39,20 +47,35 @@ def _create_cuda_context(): os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(",")[0] ) ctx = has_cuda_context() - if ( - ctx.has_context - and not distributed.comm.ucx.cuda_context_created.has_context - ): - distributed.comm.ucx._warn_existing_cuda_context(ctx, os.getpid()) + if protocol == "ucx": + if ( + ctx.has_context + and not distributed.comm.ucx.cuda_context_created.has_context + ): + distributed.comm.ucx._warn_existing_cuda_context(ctx, os.getpid()) + elif protocol == "ucxx": + if ( + ctx.has_context + and not distributed_ucxx.ucxx.cuda_context_created.has_context + ): + distributed_ucxx.ucxx._warn_existing_cuda_context(ctx, os.getpid()) _create_cuda_context_handler() - if not distributed.comm.ucx.cuda_context_created.has_context: - ctx = has_cuda_context() - if ctx.has_context and ctx.device_info != cuda_visible_device: - distributed.comm.ucx._warn_cuda_context_wrong_device( - cuda_visible_device, ctx.device_info, os.getpid() - ) + if protocol == "ucx": + if not distributed.comm.ucx.cuda_context_created.has_context: + ctx = has_cuda_context() + if ctx.has_context and ctx.device_info != cuda_visible_device: + distributed.comm.ucx._warn_cuda_context_wrong_device( + cuda_visible_device, ctx.device_info, os.getpid() + ) + elif protocol == "ucxx": + if not distributed_ucxx.ucxx.cuda_context_created.has_context: + ctx = has_cuda_context() + if ctx.has_context and ctx.device_info != cuda_visible_device: + distributed_ucxx.ucxx._warn_cuda_context_wrong_device( + cuda_visible_device, ctx.device_info, os.getpid() + ) except Exception: logger.error("Unable to start CUDA Context", exc_info=True) @@ -64,6 +87,7 @@ def initialize( enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, + protocol="ucx", ): """Create CUDA context and initialize UCX-Py, depending on user parameters. @@ -118,7 +142,7 @@ def initialize( dask.config.set({"distributed.comm.ucx": ucx_config}) if create_cuda_context: - _create_cuda_context() + _create_cuda_context(protocol=protocol) @click.command() @@ -127,6 +151,12 @@ def initialize( default=False, help="Create CUDA context", ) +@click.option( + "--protocol", + default=None, + type=str, + help="Communication protocol, such as: 'tcp', 'tls', 'ucx' or 'ucxx'.", +) @click.option( "--enable-tcp-over-ucx/--disable-tcp-over-ucx", default=False, @@ -150,10 +180,11 @@ def initialize( def dask_setup( service, create_cuda_context, + protocol, enable_tcp_over_ucx, enable_infiniband, enable_nvlink, enable_rdmacm, ): if create_cuda_context: - _create_cuda_context() + _create_cuda_context(protocol=protocol) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index d0ea92748..7a5c8c13d 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -319,8 +319,11 @@ def __init__( if enable_tcp_over_ucx or enable_infiniband or enable_nvlink: if protocol is None: protocol = "ucx" - elif protocol != "ucx": - raise TypeError("Enabling InfiniBand or NVLink requires protocol='ucx'") + elif protocol not in ["ucx", "ucxx"]: + raise TypeError( + "Enabling InfiniBand or NVLink requires protocol='ucx' or " + "protocol='ucxx'" + ) self.host = kwargs.get("host", None) @@ -371,7 +374,7 @@ def __init__( ) + ["dask_cuda.initialize"] self.new_spec["options"]["preload_argv"] = self.new_spec["options"].get( "preload_argv", [] - ) + ["--create-cuda-context"] + ) + ["--create-cuda-context", "--protocol", protocol] self.cuda_visible_devices = CUDA_VISIBLE_DEVICES self.scale(n_workers) diff --git a/dask_cuda/tests/test_dgx.py b/dask_cuda/tests/test_dgx.py index ece399d45..1fd6d0ebb 100644 --- a/dask_cuda/tests/test_dgx.py +++ b/dask_cuda/tests/test_dgx.py @@ -73,10 +73,13 @@ def test_default(): assert not p.exitcode -def _test_tcp_over_ucx(): - ucp = pytest.importorskip("ucp") +def _test_tcp_over_ucx(protocol): + if protocol == "ucx": + ucp = pytest.importorskip("ucp") + elif protocol == "ucxx": + ucp = pytest.importorskip("ucxx") - with LocalCUDACluster(enable_tcp_over_ucx=True) as cluster: + with LocalCUDACluster(protocol=protocol, enable_tcp_over_ucx=True) as cluster: with Client(cluster) as client: res = da.from_array(numpy.arange(10000), chunks=(1000,)) res = res.sum().compute() @@ -93,10 +96,17 @@ def check_ucx_options(): assert all(client.run(check_ucx_options).values()) -def test_tcp_over_ucx(): - ucp = pytest.importorskip("ucp") # NOQA: F841 +@pytest.mark.parametrize( + "protocol", + ["ucx", "ucxx"], +) +def test_tcp_over_ucx(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") - p = mp.Process(target=_test_tcp_over_ucx) + p = mp.Process(target=_test_tcp_over_ucx, args=(protocol,)) p.start() p.join() assert not p.exitcode @@ -117,9 +127,14 @@ def test_tcp_only(): assert not p.exitcode -def _test_ucx_infiniband_nvlink(enable_infiniband, enable_nvlink, enable_rdmacm): +def _test_ucx_infiniband_nvlink( + protocol, enable_infiniband, enable_nvlink, enable_rdmacm +): cupy = pytest.importorskip("cupy") - ucp = pytest.importorskip("ucp") + if protocol == "ucx": + ucp = pytest.importorskip("ucp") + elif protocol == "ucxx": + ucp = pytest.importorskip("ucxx") if enable_infiniband is None and enable_nvlink is None and enable_rdmacm is None: enable_tcp_over_ucx = None @@ -135,6 +150,7 @@ def _test_ucx_infiniband_nvlink(enable_infiniband, enable_nvlink, enable_rdmacm) cm_tls_priority = ["tcp"] initialize( + protocol=protocol, enable_tcp_over_ucx=enable_tcp_over_ucx, enable_infiniband=enable_infiniband, enable_nvlink=enable_nvlink, @@ -142,6 +158,7 @@ def _test_ucx_infiniband_nvlink(enable_infiniband, enable_nvlink, enable_rdmacm) ) with LocalCUDACluster( + protocol=protocol, interface="ib0", enable_tcp_over_ucx=enable_tcp_over_ucx, enable_infiniband=enable_infiniband, @@ -171,6 +188,7 @@ def check_ucx_options(): assert all(client.run(check_ucx_options).values()) +@pytest.mark.parametrize("protocol", ["ucx", "ucxx"]) @pytest.mark.parametrize( "params", [ @@ -185,8 +203,11 @@ def check_ucx_options(): _get_dgx_version() == DGXVersion.DGX_A100, reason="Automatic InfiniBand device detection Unsupported for %s" % _get_dgx_name(), ) -def test_ucx_infiniband_nvlink(params): - ucp = pytest.importorskip("ucp") # NOQA: F841 +def test_ucx_infiniband_nvlink(protocol, params): + if protocol == "ucx": + ucp = pytest.importorskip("ucp") + elif protocol == "ucxx": + ucp = pytest.importorskip("ucxx") if params["enable_infiniband"]: if not any([at.startswith("rc") for at in ucp.get_active_transports()]): @@ -195,6 +216,7 @@ def test_ucx_infiniband_nvlink(params): p = mp.Process( target=_test_ucx_infiniband_nvlink, args=( + protocol, params["enable_infiniband"], params["enable_nvlink"], params["enable_rdmacm"], diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index bd6770225..21b35e481 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -44,7 +44,7 @@ def _test_local_cluster(protocol): assert sum(c.run(my_rank, 0)) == sum(range(4)) -@pytest.mark.parametrize("protocol", ["tcp", "ucx"]) +@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) def test_local_cluster(protocol): p = mp.Process(target=_test_local_cluster, args=(protocol,)) p.start() @@ -160,7 +160,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): @pytest.mark.parametrize("nworkers", [1, 2, 3]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) -@pytest.mark.parametrize("protocol", ["tcp", "ucx"]) +@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) @pytest.mark.parametrize("_partitions", [True, False]) def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": @@ -256,7 +256,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) -@pytest.mark.parametrize("protocol", ["tcp", "ucx"]) +@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) def test_dataframe_shuffle_merge(backend, protocol, nworkers): if backend == "cudf": pytest.importorskip("cudf") @@ -293,7 +293,7 @@ def _test_jit_unspill(protocol): assert_eq(got, expected) -@pytest.mark.parametrize("protocol", ["tcp", "ucx"]) +@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) def test_jit_unspill(protocol): pytest.importorskip("cudf") diff --git a/dask_cuda/tests/test_from_array.py b/dask_cuda/tests/test_from_array.py index 33f27d6fe..e20afcf3e 100644 --- a/dask_cuda/tests/test_from_array.py +++ b/dask_cuda/tests/test_from_array.py @@ -5,12 +5,16 @@ from dask_cuda import LocalCUDACluster -pytest.importorskip("ucp") cupy = pytest.importorskip("cupy") -@pytest.mark.parametrize("protocol", ["ucx", "tcp"]) +@pytest.mark.parametrize("protocol", ["ucx", "ucxx", "tcp"]) def test_ucx_from_array(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") + N = 10_000 with LocalCUDACluster(protocol=protocol) as cluster: with Client(cluster): diff --git a/dask_cuda/tests/test_initialize.py b/dask_cuda/tests/test_initialize.py index 05b72f996..a953a10c1 100644 --- a/dask_cuda/tests/test_initialize.py +++ b/dask_cuda/tests/test_initialize.py @@ -13,7 +13,6 @@ from dask_cuda.utils_test import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore -ucp = pytest.importorskip("ucp") # Notice, all of the following tests is executed in a new process such # that UCX options of the different tests doesn't conflict. @@ -21,11 +20,16 @@ # of UCX before retrieving the current config. -def _test_initialize_ucx_tcp(): +def _test_initialize_ucx_tcp(protocol): + if protocol == "ucx": + ucp = pytest.importorskip("ucp") + elif protocol == "ucxx": + ucp = pytest.importorskip("ucxx") + kwargs = {"enable_tcp_over_ucx": True} - initialize(**kwargs) + initialize(protocol=protocol, **kwargs) with LocalCluster( - protocol="ucx", + protocol=protocol, dashboard_address=None, n_workers=1, threads_per_worker=1, @@ -50,18 +54,29 @@ def check_ucx_options(): assert all(client.run(check_ucx_options).values()) -def test_initialize_ucx_tcp(): - p = mp.Process(target=_test_initialize_ucx_tcp) +@pytest.mark.parametrize("protocol", ["ucx", "ucxx"]) +def test_initialize_ucx_tcp(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") + + p = mp.Process(target=_test_initialize_ucx_tcp, args=(protocol,)) p.start() p.join() assert not p.exitcode -def _test_initialize_ucx_nvlink(): +def _test_initialize_ucx_nvlink(protocol): + if protocol == "ucx": + ucp = pytest.importorskip("ucp") + elif protocol == "ucxx": + ucp = pytest.importorskip("ucxx") + kwargs = {"enable_nvlink": True} - initialize(**kwargs) + initialize(protocol=protocol, **kwargs) with LocalCluster( - protocol="ucx", + protocol=protocol, dashboard_address=None, n_workers=1, threads_per_worker=1, @@ -87,18 +102,29 @@ def check_ucx_options(): assert all(client.run(check_ucx_options).values()) -def test_initialize_ucx_nvlink(): - p = mp.Process(target=_test_initialize_ucx_nvlink) +@pytest.mark.parametrize("protocol", ["ucx", "ucxx"]) +def test_initialize_ucx_nvlink(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") + + p = mp.Process(target=_test_initialize_ucx_nvlink, args=(protocol,)) p.start() p.join() assert not p.exitcode -def _test_initialize_ucx_infiniband(): +def _test_initialize_ucx_infiniband(protocol): + if protocol == "ucx": + ucp = pytest.importorskip("ucp") + elif protocol == "ucxx": + ucp = pytest.importorskip("ucxx") + kwargs = {"enable_infiniband": True} - initialize(**kwargs) + initialize(protocol=protocol, **kwargs) with LocalCluster( - protocol="ucx", + protocol=protocol, dashboard_address=None, n_workers=1, threads_per_worker=1, @@ -127,17 +153,28 @@ def check_ucx_options(): @pytest.mark.skipif( "ib0" not in psutil.net_if_addrs(), reason="Infiniband interface ib0 not found" ) -def test_initialize_ucx_infiniband(): - p = mp.Process(target=_test_initialize_ucx_infiniband) +@pytest.mark.parametrize("protocol", ["ucx", "ucxx"]) +def test_initialize_ucx_infiniband(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") + + p = mp.Process(target=_test_initialize_ucx_infiniband, args=(protocol,)) p.start() p.join() assert not p.exitcode -def _test_initialize_ucx_all(): - initialize() +def _test_initialize_ucx_all(protocol): + if protocol == "ucx": + ucp = pytest.importorskip("ucp") + elif protocol == "ucxx": + ucp = pytest.importorskip("ucxx") + + initialize(protocol=protocol) with LocalCluster( - protocol="ucx", + protocol=protocol, dashboard_address=None, n_workers=1, threads_per_worker=1, @@ -166,8 +203,14 @@ def check_ucx_options(): assert all(client.run(check_ucx_options).values()) -def test_initialize_ucx_all(): - p = mp.Process(target=_test_initialize_ucx_all) +@pytest.mark.parametrize("protocol", ["ucx", "ucxx"]) +def test_initialize_ucx_all(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") + + p = mp.Process(target=_test_initialize_ucx_all, args=(protocol,)) p.start() p.join() assert not p.exitcode diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 3298cf219..b05389e4c 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -87,23 +87,38 @@ def get_visible_devices(): } +@pytest.mark.parametrize( + "protocol", + ["ucx", "ucxx"], +) @gen_test(timeout=20) -async def test_ucx_protocol(): - pytest.importorskip("ucp") +async def test_ucx_protocol(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") async with LocalCUDACluster( - protocol="ucx", asynchronous=True, data=dict + protocol=protocol, asynchronous=True, data=dict ) as cluster: assert all( - ws.address.startswith("ucx://") for ws in cluster.scheduler.workers.values() + ws.address.startswith(f"{protocol}://") + for ws in cluster.scheduler.workers.values() ) +@pytest.mark.parametrize( + "protocol", + ["ucx", "ucxx"], +) @gen_test(timeout=20) -async def test_explicit_ucx_with_protocol_none(): - pytest.importorskip("ucp") +async def test_explicit_ucx_with_protocol_none(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") - initialize(enable_tcp_over_ucx=True) + initialize(protocol=protocol, enable_tcp_over_ucx=True) async with LocalCUDACluster( protocol=None, enable_tcp_over_ucx=True, asynchronous=True, data=dict ) as cluster: @@ -113,11 +128,18 @@ async def test_explicit_ucx_with_protocol_none(): @pytest.mark.filterwarnings("ignore:Exception ignored in") +@pytest.mark.parametrize( + "protocol", + ["ucx", "ucxx"], +) @gen_test(timeout=20) -async def test_ucx_protocol_type_error(): - pytest.importorskip("ucp") +async def test_ucx_protocol_type_error(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") - initialize(enable_tcp_over_ucx=True) + initialize(protocol=protocol, enable_tcp_over_ucx=True) with pytest.raises(TypeError): async with LocalCUDACluster( protocol="tcp", enable_tcp_over_ucx=True, asynchronous=True, data=dict @@ -478,16 +500,25 @@ async def test_worker_fraction_limits(): ) -def test_print_cluster_config(capsys): +@pytest.mark.parametrize( + "protocol", + ["ucx", "ucxx"], +) +def test_print_cluster_config(capsys, protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") + pytest.importorskip("rich") with LocalCUDACluster( - n_workers=1, device_memory_limit="1B", jit_unspill=True, protocol="ucx" + n_workers=1, device_memory_limit="1B", jit_unspill=True, protocol=protocol ) as cluster: with Client(cluster) as client: print_cluster_config(client) captured = capsys.readouterr() assert "Dask Cluster Configuration" in captured.out - assert "ucx" in captured.out + assert protocol in captured.out assert "1 B" in captured.out assert "[plugin]" in captured.out diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 8de56a5c5..7614219bf 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -400,10 +400,14 @@ def _pxy_deserialize(self): @pytest.mark.parametrize("send_serializers", [None, ("dask", "pickle"), ("cuda",)]) -@pytest.mark.parametrize("protocol", ["tcp", "ucx"]) +@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) @gen_test(timeout=120) async def test_communicating_proxy_objects(protocol, send_serializers): """Testing serialization of cuDF dataframe when communicating""" + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") cudf = pytest.importorskip("cudf") def task(x): @@ -412,7 +416,7 @@ def task(x): serializers_used = x._pxy_get().serializer # Check that `x` is serialized with the expected serializers - if protocol == "ucx": + if protocol in ["ucx", "ucxx"]: if send_serializers is None: assert serializers_used == "cuda" else: @@ -443,11 +447,15 @@ def task(x): await client.submit(task, df) -@pytest.mark.parametrize("protocol", ["tcp", "ucx"]) +@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) @pytest.mark.parametrize("shared_fs", [True, False]) @gen_test(timeout=20) async def test_communicating_disk_objects(protocol, shared_fs): """Testing disk serialization of cuDF dataframe when communicating""" + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") cudf = pytest.importorskip("cudf") ProxifyHostFile._spill_to_disk.shared_filesystem = shared_fs diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index 34e63f1b4..a0a77677d 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -79,11 +79,18 @@ def test_get_device_total_memory(): assert total_mem > 0 -def test_get_preload_options_default(): - pytest.importorskip("ucp") +@pytest.mark.parametrize( + "protocol", + ["ucx", "ucxx"], +) +def test_get_preload_options_default(protocol): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") opts = get_preload_options( - protocol="ucx", + protocol=protocol, create_cuda_context=True, ) @@ -93,14 +100,21 @@ def test_get_preload_options_default(): assert opts["preload_argv"] == ["--create-cuda-context"] +@pytest.mark.parametrize( + "protocol", + ["ucx", "ucxx"], +) @pytest.mark.parametrize("enable_tcp", [True, False]) @pytest.mark.parametrize("enable_infiniband", [True, False]) @pytest.mark.parametrize("enable_nvlink", [True, False]) -def test_get_preload_options(enable_tcp, enable_infiniband, enable_nvlink): - pytest.importorskip("ucp") +def test_get_preload_options(protocol, enable_tcp, enable_infiniband, enable_nvlink): + if protocol == "ucx": + pytest.importorskip("ucp") + elif protocol == "ucxx": + pytest.importorskip("ucxx") opts = get_preload_options( - protocol="ucx", + protocol=protocol, create_cuda_context=True, enable_tcp_over_ucx=enable_tcp, enable_infiniband=enable_infiniband, diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index f16ad18a2..ff4dbbae3 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -287,7 +287,7 @@ def get_preload_options( if create_cuda_context: preload_options["preload_argv"].append("--create-cuda-context") - if protocol == "ucx": + if protocol in ["ucx", "ucxx"]: initialize_ucx_argv = [] if enable_tcp_over_ucx: initialize_ucx_argv.append("--enable-tcp-over-ucx") @@ -625,6 +625,10 @@ def get_worker_config(dask_worker): import ucp ret["ucx-transports"] = ucp.get_active_transports() + elif scheme == "ucxx": + import ucxx + + ret["ucx-transports"] = ucxx.get_active_transports() # comm timeouts ret["distributed.comm.timeouts"] = dask.config.get("distributed.comm.timeouts") diff --git a/dependencies.yaml b/dependencies.yaml index 1022b3a38..02783dbff 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -122,6 +122,8 @@ dependencies: - pytest-cov - ucx-proc=*=gpu - ucx-py=0.35 + - ucxx=0.35 + - distributed-ucxx=0.35 specific: - output_types: conda matrices: From e5b240c5f30e414388b5ca0e5a5a5c8e594d0ade Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 6 Nov 2023 16:34:41 +0100 Subject: [PATCH 13/22] Remove `ucp.reset()` requirement from `test_dgx` (#1269) By moving the `ucp.get_transports()` call to the subprocess we remove the requirement to reset UCX from the `pytest` process, preventing potential interferences with tests that run after. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/dask-cuda/pull/1269 --- dask_cuda/tests/test_dgx.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/dask_cuda/tests/test_dgx.py b/dask_cuda/tests/test_dgx.py index 1fd6d0ebb..a7b79f327 100644 --- a/dask_cuda/tests/test_dgx.py +++ b/dask_cuda/tests/test_dgx.py @@ -128,7 +128,7 @@ def test_tcp_only(): def _test_ucx_infiniband_nvlink( - protocol, enable_infiniband, enable_nvlink, enable_rdmacm + skip_queue, protocol, enable_infiniband, enable_nvlink, enable_rdmacm ): cupy = pytest.importorskip("cupy") if protocol == "ucx": @@ -136,6 +136,14 @@ def _test_ucx_infiniband_nvlink( elif protocol == "ucxx": ucp = pytest.importorskip("ucxx") + if enable_infiniband and not any( + [at.startswith("rc") for at in ucp.get_active_transports()] + ): + skip_queue.put("No support available for 'rc' transport in UCX") + return + else: + skip_queue.put("ok") + if enable_infiniband is None and enable_nvlink is None and enable_rdmacm is None: enable_tcp_over_ucx = None cm_tls = ["all"] @@ -205,17 +213,16 @@ def check_ucx_options(): ) def test_ucx_infiniband_nvlink(protocol, params): if protocol == "ucx": - ucp = pytest.importorskip("ucp") + pytest.importorskip("ucp") elif protocol == "ucxx": - ucp = pytest.importorskip("ucxx") + pytest.importorskip("ucxx") - if params["enable_infiniband"]: - if not any([at.startswith("rc") for at in ucp.get_active_transports()]): - pytest.skip("No support available for 'rc' transport in UCX") + skip_queue = mp.Queue() p = mp.Process( target=_test_ucx_infiniband_nvlink, args=( + skip_queue, protocol, params["enable_infiniband"], params["enable_nvlink"], @@ -225,9 +232,8 @@ def test_ucx_infiniband_nvlink(protocol, params): p.start() p.join() - # Starting a new cluster on the same pytest process after an rdmacm cluster - # has been used may cause UCX-Py to complain about being already initialized. - if params["enable_rdmacm"] is True: - ucp.reset() + skip_msg = skip_queue.get() + if skip_msg != "ok": + pytest.skip(skip_msg) assert not p.exitcode From 9a5d06d8ec2318a0f23931188775802c227fd9b2 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Mon, 13 Nov 2023 10:24:53 -0600 Subject: [PATCH 14/22] Use new `rapids-dask-dependency` metapackage for managing dask versions (#1270) Currently dask versions are pinned as part of every release cycle and then unpinned for the next development cycle across all of RAPIDS. This introduces a great deal of churn. To centralize the dependency, we have created a metapackage to manage the required dask version and this PR introduces that metapackage as a dependency of dask-cuda. xref: https://github.com/rapidsai/cudf/pull/14364 Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) - Jake Awe (https://github.com/AyodeAwe) URL: https://github.com/rapidsai/dask-cuda/pull/1270 --- ci/release/update-version.sh | 1 + conda/recipes/dask-cuda/meta.yaml | 1 - dependencies.yaml | 6 +----- pyproject.toml | 3 +-- 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index 94cd5d12b..e57e8b1e4 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -39,6 +39,7 @@ sed_runner "s/cudf=.*/cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml sed_runner "s/dask-cudf=.*/dask-cudf=${NEXT_SHORT_TAG}/g" dependencies.yaml sed_runner "s/kvikio=.*/kvikio=${NEXT_SHORT_TAG}/g" dependencies.yaml sed_runner "s/ucx-py=.*/ucx-py=${NEXT_UCXPY_VERSION}/g" dependencies.yaml +sed_runner "s/rapids-dask-dependency=.*/rapids-dask-dependency=${NEXT_SHORT_TAG}.*/g" dependencies.yaml # CI files for FILE in .github/workflows/*.yaml; do diff --git a/conda/recipes/dask-cuda/meta.yaml b/conda/recipes/dask-cuda/meta.yaml index 6804b1ce4..c194d117b 100644 --- a/conda/recipes/dask-cuda/meta.yaml +++ b/conda/recipes/dask-cuda/meta.yaml @@ -32,7 +32,6 @@ requirements: - tomli run: - python - - dask-core >=2023.9.2 {% for r in data.get("project", {}).get("dependencies", []) %} - {{ r }} {% endfor %} diff --git a/dependencies.yaml b/dependencies.yaml index 02783dbff..b1c9cd3fc 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -101,16 +101,12 @@ dependencies: common: - output_types: [conda, requirements] packages: - - dask>=2023.9.2 - - distributed>=2023.9.2 - numba>=0.57 - numpy>=1.21 - pandas>=1.3,<1.6.0dev0 - pynvml>=11.0.0,<11.5 + - rapids-dask-dependency==23.12.* - zict>=2.0.0 - - output_types: [conda] - packages: - - dask-core>=2023.9.2 test_python: common: - output_types: [conda] diff --git a/pyproject.toml b/pyproject.toml index c240e61b7..3b02debbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,12 +16,11 @@ authors = [ license = { text = "Apache-2.0" } requires-python = ">=3.9" dependencies = [ - "dask >=2023.9.2", - "distributed >=2023.9.2", "pynvml >=11.0.0,<11.5", "numpy >=1.21", "numba >=0.57", "pandas >=1.3,<1.6.0dev0", + "rapids-dask-dependency==23.12.*", "zict >=2.0.0", ] classifiers = [ From d026d6eb497f9748f943f34118483a2dd4f2d32a Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 13 Nov 2023 19:20:28 -0800 Subject: [PATCH 15/22] Add missing alpha spec (#1273) Without this extra spec, consumers of dask-cuda nightlies won't know that dask-cuda nightlies want to use nightlies of rapids-dask-dependency. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Jake Awe (https://github.com/AyodeAwe) URL: https://github.com/rapidsai/dask-cuda/pull/1273 --- ci/build_python_pypi.sh | 10 ++++++++++ ci/test_python.sh | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/ci/build_python_pypi.sh b/ci/build_python_pypi.sh index b13783d16..66bc7cf2e 100755 --- a/ci/build_python_pypi.sh +++ b/ci/build_python_pypi.sh @@ -17,6 +17,16 @@ if ! rapids-is-release-build; then export PACKAGE_VERSION_NUMBER="${version}" fi +# For nightlies we want to ensure that we're pulling in alphas as well. The +# easiest way to do so is to augment the spec with a constraint containing a +# min alpha version that doesn't affect the version bounds but does allow usage +# of alpha versions for that dependency without --pre +alpha_spec='' +if ! rapids-is-release-build; then + alpha_spec=',>=0.0.0a0' +fi + +sed -r -i "s/rapids-dask-dependency==(.*)\"/rapids-dask-dependency==\1${alpha_spec}\"/g" pyproject.toml echo "${version}" | tr -d '"' > VERSION sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_name}/_version.py" diff --git a/ci/test_python.sh b/ci/test_python.sh index ca4140bae..f700c935b 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -55,7 +55,7 @@ timeout 60m pytest \ --cov=dask_cuda \ --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage.xml" \ --cov-report=term \ - tests + tests -k "not ucxx" popd rapids-logger "Run local benchmark" From 21e11bfa6f1c1231856c8a24ed9733a82ec55168 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 15 Nov 2023 18:45:50 -0800 Subject: [PATCH 16/22] Fix path (#1277) The `package_name` variable was not set here, so the git commit was never actually overwritten. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/dask-cuda/pull/1277 --- ci/build_python_pypi.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/build_python_pypi.sh b/ci/build_python_pypi.sh index 66bc7cf2e..e177acf87 100755 --- a/ci/build_python_pypi.sh +++ b/ci/build_python_pypi.sh @@ -29,7 +29,7 @@ fi sed -r -i "s/rapids-dask-dependency==(.*)\"/rapids-dask-dependency==\1${alpha_spec}\"/g" pyproject.toml echo "${version}" | tr -d '"' > VERSION -sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_name}/_version.py" +sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "dask_cuda/_version.py" # Compute/export RAPIDS_DATE_STRING source rapids-env-update From dc759b0a93caca35f44751b5713db88f36c66f66 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Fri, 17 Nov 2023 10:37:00 -0600 Subject: [PATCH 17/22] Enable build concurrency for nightly and merge triggers. (#1282) --- .github/workflows/build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 6e5f77d9b..f365c52c7 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -22,7 +22,7 @@ on: default: nightly concurrency: - group: ${{ github.workflow }}-${{ github.ref }} + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} cancel-in-progress: true jobs: From b1f13fc11a9c47e19ae950bfc0051e3df694a3ad Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 20 Nov 2023 17:53:09 +0000 Subject: [PATCH 18/22] Set minimum click to 8.1 (#1272) Closes #1271 Authors: - Jacob Tomlinson (https://github.com/jacobtomlinson) - Lawrence Mitchell (https://github.com/wence-) Approvers: - Benjamin Zaitlen (https://github.com/quasiben) URL: https://github.com/rapidsai/dask-cuda/pull/1272 --- pyproject.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3b02debbe..c5286410a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,13 +13,14 @@ readme = { file = "README.md", content-type = "text/markdown" } authors = [ { name = "NVIDIA Corporation" }, ] -license = { text = "Apache-2.0" } +license = { text = "Apache-2.0." } requires-python = ">=3.9" dependencies = [ + "click >=8.1", "pynvml >=11.0.0,<11.5", "numpy >=1.21", "numba >=0.57", - "pandas >=1.3,<1.6.0dev0", + "pandas >=1.3,<1.6.0.dev0", "rapids-dask-dependency==23.12.*", "zict >=2.0.0", ] From be3170d17d86a057b322205434260b8451f88d56 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 20 Nov 2023 14:38:51 -0800 Subject: [PATCH 19/22] Fix license [skip ci] (#1285) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c5286410a..4f0da0689 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ readme = { file = "README.md", content-type = "text/markdown" } authors = [ { name = "NVIDIA Corporation" }, ] -license = { text = "Apache-2.0." } +license = { text = "Apache-2.0" } requires-python = ">=3.9" dependencies = [ "click >=8.1", From 1962e2d4f480754842fd81fd0b3639bc8b7cfc7f Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 1 Dec 2023 19:04:42 +0100 Subject: [PATCH 20/22] Increase Nanny close timeout for `test_spilling_local_cuda_cluster` (#1289) Increase Nanny close timeout for `test_spilling_local_cuda_cluster` which didn't fail in the past but failed for the first time in latest nightly run. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Richard (Rick) Zamora (https://github.com/rjzamora) --- dask_cuda/tests/test_proxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 7614219bf..5458c5bab 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -306,6 +306,7 @@ def task(x): n_workers=1, device_memory_limit="1B", jit_unspill=jit_unspill, + worker_class=IncreasedCloseTimeoutNanny, asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: From 8be265920105da50bcd3058e922182b69501ba62 Mon Sep 17 00:00:00 2001 From: Ray Douglass <3107146+raydouglass@users.noreply.github.com> Date: Mon, 4 Dec 2023 14:11:37 -0500 Subject: [PATCH 21/22] Pin actions/labeler to v4 [skip ci] (#1292) RAPIDS repos are using the `main` branch of https://github.com/actions/labeler which recently introduced [breaking changes](https://github.com/actions/labeler/releases/tag/v5.0.0). This PR pins to the latest v4 release of the labeler action until we can evaluate the changes required for v5. Authors: - Ray Douglass (https://github.com/raydouglass) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) --- .github/workflows/labeler.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml index 55117f774..1ddd5b5cc 100644 --- a/.github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -6,6 +6,6 @@ jobs: triage: runs-on: ubuntu-latest steps: - - uses: actions/labeler@main + - uses: actions/labeler@v4 with: repo-token: "${{ secrets.GITHUB_TOKEN }}" From f33744213835db822ad8d27a611347c896842409 Mon Sep 17 00:00:00 2001 From: Ray Douglass Date: Wed, 6 Dec 2023 09:58:51 -0500 Subject: [PATCH 22/22] Update Changelog [skip ci] --- CHANGELOG.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55b9650e3..fa8bd51af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,34 @@ +# dask-cuda 23.12.00 (6 Dec 2023) + +## 🐛 Bug Fixes + +- Update actions/labeler to v4 ([#1292](https://github.com/rapidsai/dask-cuda/pull/1292)) [@raydouglass](https://github.com/raydouglass) +- Increase Nanny close timeout for `test_spilling_local_cuda_cluster` ([#1289](https://github.com/rapidsai/dask-cuda/pull/1289)) [@pentschev](https://github.com/pentschev) +- Fix path ([#1277](https://github.com/rapidsai/dask-cuda/pull/1277)) [@vyasr](https://github.com/vyasr) +- Add missing alpha spec ([#1273](https://github.com/rapidsai/dask-cuda/pull/1273)) [@vyasr](https://github.com/vyasr) +- Set minimum click to 8.1 ([#1272](https://github.com/rapidsai/dask-cuda/pull/1272)) [@jacobtomlinson](https://github.com/jacobtomlinson) +- Reenable tests that were segfaulting ([#1266](https://github.com/rapidsai/dask-cuda/pull/1266)) [@pentschev](https://github.com/pentschev) +- Increase close timeout of `Nanny` in `LocalCUDACluster` ([#1260](https://github.com/rapidsai/dask-cuda/pull/1260)) [@pentschev](https://github.com/pentschev) +- Small reorganization and fixes for `test_spill` ([#1255](https://github.com/rapidsai/dask-cuda/pull/1255)) [@pentschev](https://github.com/pentschev) +- Update plugins to inherit from ``WorkerPlugin`` ([#1230](https://github.com/rapidsai/dask-cuda/pull/1230)) [@jrbourbeau](https://github.com/jrbourbeau) + +## 🚀 New Features + +- Add support for UCXX ([#1268](https://github.com/rapidsai/dask-cuda/pull/1268)) [@pentschev](https://github.com/pentschev) + +## 🛠️ Improvements + +- Fix license ([#1285](https://github.com/rapidsai/dask-cuda/pull/1285)) [@vyasr](https://github.com/vyasr) +- Build concurrency for nightly and merge triggers ([#1282](https://github.com/rapidsai/dask-cuda/pull/1282)) [@bdice](https://github.com/bdice) +- Use new `rapids-dask-dependency` metapackage for managing dask versions ([#1270](https://github.com/rapidsai/dask-cuda/pull/1270)) [@galipremsagar](https://github.com/galipremsagar) +- Remove `ucp.reset()` requirement from `test_dgx` ([#1269](https://github.com/rapidsai/dask-cuda/pull/1269)) [@pentschev](https://github.com/pentschev) +- Generate proper, consistent nightly versions for pip and conda packages ([#1267](https://github.com/rapidsai/dask-cuda/pull/1267)) [@galipremsagar](https://github.com/galipremsagar) +- Unpin `dask` and `distributed` for `23.12` development ([#1264](https://github.com/rapidsai/dask-cuda/pull/1264)) [@galipremsagar](https://github.com/galipremsagar) +- Move some `dask_cuda.utils` pieces to their own modules ([#1263](https://github.com/rapidsai/dask-cuda/pull/1263)) [@pentschev](https://github.com/pentschev) +- Update `shared-action-workflows` references ([#1261](https://github.com/rapidsai/dask-cuda/pull/1261)) [@AyodeAwe](https://github.com/AyodeAwe) +- Use branch-23.12 workflows. ([#1259](https://github.com/rapidsai/dask-cuda/pull/1259)) [@bdice](https://github.com/bdice) +- dask-cuda: Build CUDA 12.0 ARM conda packages. ([#1238](https://github.com/rapidsai/dask-cuda/pull/1238)) [@bdice](https://github.com/bdice) + # dask-cuda 23.10.00 (11 Oct 2023) ## 🐛 Bug Fixes