Skip to content

Commit

Permalink
Remove cudf._lib.reduce in favor of inlining pylibcudf (#17574)
Browse files Browse the repository at this point in the history
Contributes to #17317

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #17574
  • Loading branch information
mroeschke authored Dec 13, 2024
1 parent 5baaf6d commit 48aa08f
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 230 deletions.
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# the License.
# =============================================================================

set(cython_sources column.pyx copying.pyx groupby.pyx interop.pyx reduce.pyx scalar.pyx sort.pyx
set(cython_sources column.pyx copying.pyx groupby.pyx interop.pyx scalar.pyx sort.pyx
stream_compaction.pyx string_casting.pyx strings_udf.pyx types.pyx utils.pyx
)
set(linked_libraries cudf::cudf)
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
copying,
groupby,
interop,
reduce,
sort,
stream_compaction,
string_casting,
Expand Down
4 changes: 1 addition & 3 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ from cudf._lib.scalar import as_device_scalar

from cudf._lib.scalar cimport DeviceScalar

from cudf._lib.reduce import minmax

from pylibcudf.libcudf.types cimport size_type

from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table
Expand All @@ -34,7 +32,7 @@ def _gather_map_is_valid(
"""
if not check_bounds or nullify or len(gather_map) == 0:
return True
gm_min, gm_max = minmax(gather_map)
gm_min, gm_max = gather_map.minmax()
return gm_min >= -nrows and gm_max < nrows


Expand Down
135 changes: 0 additions & 135 deletions python/cudf/cudf/_lib/reduce.pyx

This file was deleted.

122 changes: 103 additions & 19 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import warnings
from collections import abc
from collections.abc import MutableSequence, Sequence
from functools import cached_property
Expand Down Expand Up @@ -31,7 +32,7 @@
drop_duplicates,
drop_nulls,
)
from cudf._lib.types import size_type_dtype
from cudf._lib.types import dtype_to_pylibcudf_type, size_type_dtype
from cudf.api.types import (
_is_non_decimal_numeric_dtype,
_is_pandas_nullable_extension_dtype,
Expand All @@ -41,7 +42,7 @@
is_string_dtype,
)
from cudf.core._compat import PANDAS_GE_210
from cudf.core._internals import unary
from cudf.core._internals import aggregation, unary
from cudf.core._internals.timezones import get_compatible_timezone
from cudf.core.abc import Serializable
from cudf.core.buffer import (
Expand Down Expand Up @@ -259,21 +260,17 @@ def all(self, skipna: bool = True) -> bool:
# The skipna argument is only used for numerical columns.
# If all entries are null the result is True, including when the column
# is empty.

if self.null_count == self.size:
return True

return libcudf.reduce.reduce("all", self)
return self.reduce("all")

def any(self, skipna: bool = True) -> bool:
# Early exit for fast cases.

if not skipna and self.has_nulls():
return True
elif skipna and self.null_count == self.size:
return False

return libcudf.reduce.reduce("any", self)
return self.reduce("any")

def dropna(self) -> Self:
if self.has_nulls():
Expand Down Expand Up @@ -1393,33 +1390,35 @@ def _reduce(
)
if isinstance(preprocessed, ColumnBase):
dtype = kwargs.pop("dtype", None)
return libcudf.reduce.reduce(
op, preprocessed, dtype=dtype, **kwargs
)
return preprocessed.reduce(op, dtype, **kwargs)
return preprocessed

def _can_return_nan(self, skipna: bool | None = None) -> bool:
return not skipna and self.has_nulls(include_nan=False)

def _process_for_reduction(
self, skipna: bool | None = None, min_count: int = 0
) -> ColumnBase | ScalarLike:
if skipna is None:
skipna = True
skipna = True if skipna is None else skipna

if self.has_nulls():
if self._can_return_nan(skipna=skipna):
return cudf.utils.dtypes._get_nan_for_dtype(self.dtype)

col = self.nans_to_nulls() if skipna else self
if col.has_nulls():
if skipna:
result_col = self.dropna()
col = col.dropna()
else:
return cudf.utils.dtypes._get_nan_for_dtype(self.dtype)

result_col = self

# TODO: If and when pandas decides to validate that `min_count` >= 0 we
# should insert comparable behavior.
# https://github.com/pandas-dev/pandas/issues/50022
if min_count > 0:
valid_count = len(result_col) - result_col.null_count
valid_count = len(col) - col.null_count
if valid_count < min_count:
return cudf.utils.dtypes._get_nan_for_dtype(self.dtype)
return result_col
return col

def _reduction_result_dtype(self, reduction_op: str) -> Dtype:
"""
Expand Down Expand Up @@ -1529,6 +1528,91 @@ def one_hot_encode(
for col in plc_table.columns()
)

@acquire_spill_lock()
def scan(self, scan_op: str, inclusive: bool, **kwargs) -> Self:
return type(self).from_pylibcudf( # type: ignore[return-value]
plc.reduce.scan(
self.to_pylibcudf(mode="read"),
aggregation.make_aggregation(scan_op, kwargs).c_obj,
plc.reduce.ScanType.INCLUSIVE
if inclusive
else plc.reduce.ScanType.EXCLUSIVE,
)
)

def reduce(self, reduction_op: str, dtype=None, **kwargs) -> ScalarLike:
if dtype is not None:
warnings.warn(
"dtype is deprecated and will be remove in a future release. "
"Cast the result (e.g. .astype) after the operation instead.",
FutureWarning,
)
col_dtype = dtype
else:
col_dtype = self._reduction_result_dtype(reduction_op)

# check empty case
if len(self) <= self.null_count:
if reduction_op == "sum" or reduction_op == "sum_of_squares":
return self.dtype.type(0)
if reduction_op == "product":
return self.dtype.type(1)
if reduction_op == "any":
return False

return cudf.utils.dtypes._get_nan_for_dtype(col_dtype)

with acquire_spill_lock():
plc_scalar = plc.reduce.reduce(
self.to_pylibcudf(mode="read"),
aggregation.make_aggregation(reduction_op, kwargs).c_obj,
dtype_to_pylibcudf_type(col_dtype),
)
result_col = type(self).from_pylibcudf(
plc.Column.from_scalar(plc_scalar, 1)
)
if plc_scalar.type().id() in {
plc.TypeId.DECIMAL128,
plc.TypeId.DECIMAL64,
plc.TypeId.DECIMAL32,
}:
scale = -plc_scalar.type().scale()
# https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
p = col_dtype.precision
nrows = len(self)
if reduction_op in {"min", "max"}:
new_p = p
elif reduction_op == "sum":
new_p = p + nrows - 1
elif reduction_op == "product":
new_p = p * nrows + nrows - 1
elif reduction_op == "sum_of_squares":
new_p = 2 * p + nrows
else:
raise NotImplementedError(
f"{reduction_op} not implemented for decimal types."
)
precision = max(min(new_p, col_dtype.MAX_PRECISION), 0)
new_dtype = type(col_dtype)(precision, scale)
result_col = result_col.astype(new_dtype)
elif isinstance(col_dtype, cudf.IntervalDtype):
result_col = type(self).from_struct_column( # type: ignore[attr-defined]
result_col, closed=col_dtype.closed
)
return result_col.element_indexing(0)

@acquire_spill_lock()
def minmax(self) -> tuple[ScalarLike, ScalarLike]:
min_val, max_val = plc.reduce.minmax(self.to_pylibcudf(mode="read"))
return (
type(self)
.from_pylibcudf(plc.Column.from_scalar(min_val, 1))
.element_indexing(0),
type(self)
.from_pylibcudf(plc.Column.from_scalar(max_val, 1))
.element_indexing(0),
)


def _has_any_nan(arbitrary: pd.Series | np.ndarray) -> bool:
"""Check if an object dtype Series or array contains NaN."""
Expand Down
14 changes: 0 additions & 14 deletions python/cudf/cudf/core/column/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
if TYPE_CHECKING:
from typing_extensions import Self

from cudf._typing import ScalarLike
from cudf.core.buffer import Buffer
from cudf.core.column import ColumnBase

Expand Down Expand Up @@ -211,16 +210,3 @@ def element_indexing(self, index: int):
if cudf.get_option("mode.pandas_compatible"):
return pd.Interval(**result, closed=self.dtype.closed)
return result

def _reduce(
self,
op: str,
skipna: bool | None = None,
min_count: int = 0,
*args,
**kwargs,
) -> ScalarLike:
result = super()._reduce(op, skipna, min_count, *args, **kwargs)
if cudf.get_option("mode.pandas_compatible"):
return pd.Interval(**result, closed=self.dtype.closed)
return result
Loading

0 comments on commit 48aa08f

Please sign in to comment.