Skip to content

Commit

Permalink
Remove cudf._lib.partitioning in favor of inlining pylibcudf (#17369)
Browse files Browse the repository at this point in the history
Contributes to #17317

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: #17369
  • Loading branch information
mroeschke authored Nov 26, 2024
1 parent ccc8833 commit df17740
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 59 deletions.
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ set(cython_sources
null_mask.pyx
orc.pyx
parquet.pyx
partitioning.pyx
reduce.pyx
replace.pyx
reshape.pyx
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
nvtext,
orc,
parquet,
partitioning,
reduce,
replace,
reshape,
Expand Down
53 changes: 0 additions & 53 deletions python/cudf/cudf/_lib/partitioning.pyx

This file was deleted.

43 changes: 39 additions & 4 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2487,11 +2487,46 @@ def scatter_by_map(
f"ERROR: map_size must be >= {count} (got {map_size})."
)

partitioned_columns, output_offsets = libcudf.partitioning.partition(
[*(self.index._columns if keep_index else ()), *self._columns],
map_index,
map_size,
source_columns = (
itertools.chain(self.index._columns, self._columns)
if keep_index
else self._columns
)

with acquire_spill_lock():
if map_size is None:
map_size = plc.stream_compaction.distinct_count(
map_index.to_pylibcudf(mode="read"),
plc.types.NullPolicy.EXCLUDE,
plc.types.NanPolicy.NAN_IS_VALID,
)

if map_index.size > 0:
plc_lo, plc_hi = plc.reduce.minmax(
map_index.to_pylibcudf(mode="read")
)
# TODO: Use pylibcudf Scalar once APIs are more developed
lo = libcudf.column.Column.from_pylibcudf(
plc.Column.from_scalar(plc_lo, 1)
).element_indexing(0)
hi = libcudf.column.Column.from_pylibcudf(
plc.Column.from_scalar(plc_hi, 1)
).element_indexing(0)
if lo < 0 or hi >= map_size:
raise ValueError("Partition map has invalid values")

plc_table, output_offsets = plc.partitioning.partition(
plc.Table(
[col.to_pylibcudf(mode="read") for col in source_columns]
),
map_index.to_pylibcudf(mode="read"),
map_size,
)
partitioned_columns = [
libcudf.column.Column.from_pylibcudf(col)
for col in plc_table.columns()
]

partitioned = self._from_columns_like_self(
partitioned_columns,
column_names=self._column_names,
Expand Down

0 comments on commit df17740

Please sign in to comment.