Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading Parquet string cols when nrows and input_pass_limit > 0 #17321

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,21 @@ inline __device__ bool is_bounds_page(page_state_s* const s,
size_t const begin = start_row;
size_t const end = start_row + num_rows;

// for non-nested schemas, rows cannot span pages, so use a more restrictive test
return has_repetition
? ((page_begin <= begin && page_end >= begin) || (page_begin <= end && page_end >= end))
: ((page_begin < begin && page_end > begin) || (page_begin < end && page_end > end));
// Test for list schemas.
auto const is_bounds_page_lists =
((page_begin <= begin and page_end >= begin) or (page_begin <= end and page_end >= end));

// For non-list schemas, rows cannot span pages, so use a more restrictive test. Make sure to
// relax the test for `page_end` if we adjusted the `num_rows` for the last page to compensate
// for list row size estimates in `generate_list_column_row_count_estimates()` when chunked
// read mode.
auto const test_page_end_nonlists =
s->page.is_num_rows_adjusted ? page_end >= end : page_end > end;

auto const is_bounds_page_nonlists =
(page_begin < begin and page_end > begin) or (page_begin < end and test_page_end_nonlists);

return has_repetition ? is_bounds_page_lists : is_bounds_page_nonlists;
}

/**
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
// definition levels
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.is_num_rows_adjusted = false;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,10 @@ struct PageInfo {
// - In the case of a nested schema, you have to decode the repetition and definition
// levels to extract actual column values
int32_t num_input_values;
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
bool is_num_rows_adjusted; // Flag to indicate if the number of rows of this page have been
// adjusted to compensate for the list row size estimates.
// the next four are calculated in gpuComputePageStringSizes
int32_t num_nulls; // number of null values (V2 header), but recalculated for string cols
int32_t num_valids; // number of non-null values, taking into account skip_rows/num_rows
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,10 @@ struct set_final_row_count {
if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; }
size_t const page_start_row = chunk.start_row + page.chunk_row;
size_t const chunk_last_row = chunk.start_row + chunk.num_rows;
page.num_rows = chunk_last_row - page_start_row;
// Mark `is_num_rows_adjusted` to signal string decoders that the `num_rows` of this page has
// been adjusted.
page.is_num_rows_adjusted = page.num_rows != (chunk_last_row - page_start_row);
vuule marked this conversation as resolved.
Show resolved Hide resolved
page.num_rows = chunk_last_row - page_start_row;
}
};

Expand Down
106 changes: 104 additions & 2 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3771,10 +3771,10 @@ def test_parquet_chunked_reader(
chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups
):
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000}
{"a": [1, 2, 3, None] * 10000, "b": ["av", "qw", None, "xyz"] * 10000}
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
)
buffer = BytesIO()
df.to_parquet(buffer)
df.to_parquet(buffer, row_group_size=10000)
actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
Expand All @@ -3788,6 +3788,108 @@ def test_parquet_chunked_reader(
assert_eq(expected, actual)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("num_rows", [997, 2997, None])
def test_parquet_chunked_reader_structs(
Copy link
Member Author

@mhaseeb123 mhaseeb123 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar test as above (test_parquet_chunked_reader) but with a struct table.

chunk_read_limit,
pass_read_limit,
num_rows,
):
data = [
{
"a": "g",
"b": {
"b_a": 10,
"b_b": {"b_b_b": None, "b_b_a": 2},
},
"c": None,
},
{"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]},
{"a": "j", "b": None, "c": [8, 10]},
{"a": None, "b": {"b_a": None, "b_b": None}, "c": None},
None,
{
"a": None,
"b": {"b_a": None, "b_b": {"b_b_b": 1}},
"c": [18, 19],
},
{"a": None, "b": None, "c": None},
] * 1000

pa_struct = pa.Table.from_pydict({"struct": data})
df = cudf.DataFrame.from_arrow(pa_struct)
buffer = BytesIO()
df.to_parquet(buffer)

# Number of rows to read
nrows = num_rows if num_rows is not None else len(df)

actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
nrows=nrows,
)
expected = cudf.read_parquet(
buffer,
nrows=nrows,
)
assert_eq(expected, actual)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("num_rows", [4997, 9997, None])
@pytest.mark.parametrize(
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
"str_encoding",
[
"PLAIN",
"DELTA_BYTE_ARRAY",
"DELTA_LENGTH_BYTE_ARRAY",
],
)
def test_parquet_chunked_reader_string_decoders(
chunk_read_limit,
pass_read_limit,
num_rows,
str_encoding,
):
df = pd.DataFrame(
{
"i64": [1, 2, 3, None] * 10000,
"str": ["av", "qw", "asd", "xyz"] * 10000,
"list": list(
[["ad", "cd"], ["asd", "fd"], None, ["asd", None]] * 10000
),
}
)
buffer = BytesIO()
# Write 4 Parquet row groups with string column encoded
df.to_parquet(
buffer,
row_group_size=10000,
use_dictionary=False,
column_encoding={"str": str_encoding},
)

# Number of rows to read
nrows = num_rows if num_rows is not None else len(df)

# Check with num_rows specified
actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
nrows=nrows,
)
expected = cudf.read_parquet(
buffer,
nrows=nrows,
)
assert_eq(expected, actual)


@pytest.mark.parametrize(
"nrows,skip_rows",
[
Expand Down
1 change: 0 additions & 1 deletion python/cudf_polars/cudf_polars/testing/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def pytest_configure(config: pytest.Config) -> None:
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394",
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception",
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception",
"tests/unit/io/test_lazy_parquet.py::test_glob_n_rows": "https://github.com/rapidsai/cudf/issues/17311",
"tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394",
Expand Down
24 changes: 1 addition & 23 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,7 @@ def test_scan(
n_rows=n_rows,
)
engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked})
if (
is_chunked
and (columns is None or columns[0] != "a")
and (
# When we mask with the slice, it happens to remove the
# bad row
(mask is None and slice is not None)
# When we both slice and read a subset of rows it also
# removes the bad row
or (slice is None and n_rows is not None)
)
):
# slice read produces wrong result for string column
request.applymarker(
pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311")
)

mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
if slice is not None:
q = q.slice(*slice)
if mask is not None:
Expand Down Expand Up @@ -377,13 +362,6 @@ def large_df(df, tmpdir_factory, chunked_slice):
def test_scan_parquet_chunked(
request, chunked_slice, large_df, chunk_read_limit, pass_read_limit
):
if chunked_slice in {"skip_partial", "partial"} and (
chunk_read_limit == 0 and pass_read_limit != 0
):
request.applymarker(
pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311")
)

assert_gpu_result_equal(
large_df,
engine=pl.GPUEngine(
Expand Down
Loading