Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading Parquet string cols when nrows and input_pass_limit > 0 #17321

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,13 @@ inline __device__ bool is_bounds_page(page_state_s* const s,
size_t const begin = start_row;
size_t const end = start_row + num_rows;

// for non-nested schemas, rows cannot span pages, so use a more restrictive test
// For non-nested schemas, rows cannot span pages, so use a more restrictive test except for
// the page_end. This is because we may adjusted the `num_rows` for the last page in
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
// `generate_list_column_row_count_estimates()` to compensate for the list row size estimates
// in case of chunked read mode.
return has_repetition
? ((page_begin <= begin && page_end >= begin) || (page_begin <= end && page_end >= end))
: ((page_begin < begin && page_end > begin) || (page_begin < end && page_end > end));
: ((page_begin < begin && page_end > begin) || (page_begin < end && page_end >= end));
Copy link
Member Author

@mhaseeb123 mhaseeb123 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + pass.pages.size(),
set_final_row_count{pass.pages, pass.chunks});

^Here we adjust the num_rows for the last page when input_pass_limit > 0 to compensate for lists. This leads to a false negative is_bounds_page for the last page of str cols due to the page_end > end predicate. This false negative results in page_bounds() including total size of last page instead of clipping it at nrows. This finally manifests in the out_buf.create_string_data() with larger num_bytes than data size leading to the extra \x00 chars in the column.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvdbaranec I didn't see the effect of adjusting num_rows for the last page showing up for other col types and is_bounds_page is only used in string decoders so I am hoping other decoders don't rely on a similar predicate (at least doesn't seem like in the tests).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know it's a convoluted bug when the fix is 99% comment.

Should we try to keep the condition and change thenum_rows adjustment? This fix feels like a workaround for incorrect num_rows (expecting to be wrong on this one, but want to make sure we at least consider this).

Copy link
Member Author

@mhaseeb123 mhaseeb123 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ikr, I have been debating myself with this one as well. I am not fully sure if we would break things by adjusting num_rows for the last pages of cols (to what?) in generate_list_column_row_count_estimates() and keep the original condition here. Maybe @nvdbaranec or @etseidl can comment here better.

For now, I chose to relax this condition as this function (is_bounds_page()) is only being used in string decoders (narrower scope) and the described problem is only being seen in string cols.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me nervous. So in the normal case, won't this mean every final page is a bounds page, regardless of whether we're using num_rows? That means unnecessary processing when all we want to do is read the entire page anyway. Could we add a boolean argument to is_bounds_page if num_rows has been modified, or add a field to the PageInfo struct? Or maybe do as @vuule suggests (change the num_rows adjustment)...perhaps add an adjusted_num_rows 🤷

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, Ed! I have added a new boolean field to PageInfo which tells if the num_rows for this page has been adjusted and if so, we relax the condition.

}

/**
Expand Down
99 changes: 97 additions & 2 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3776,10 +3776,10 @@ def test_parquet_chunked_reader(
chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups
):
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000}
{"a": [1, 2, 3, None] * 10000, "b": ["av", "qw", None, "xyz"] * 10000}
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
)
buffer = BytesIO()
df.to_parquet(buffer)
df.to_parquet(buffer, row_group_size=10000)
actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
Expand All @@ -3793,6 +3793,101 @@ def test_parquet_chunked_reader(
assert_eq(expected, actual)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("num_rows", [997, 2997, None])
def test_parquet_chunked_reader_structs(
Copy link
Member Author

@mhaseeb123 mhaseeb123 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar test as above (test_parquet_chunked_reader) but with a struct table.

chunk_read_limit,
pass_read_limit,
num_rows,
):
data = [
{
"a": "g",
"b": {
"b_a": 10,
"b_b": {"b_b_b": None, "b_b_a": 2},
},
"c": None,
},
{"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]},
{"a": "j", "b": None, "c": [8, 10]},
{"a": None, "b": {"b_a": None, "b_b": None}, "c": None},
None,
{
"a": None,
"b": {"b_a": None, "b_b": {"b_b_b": 1}},
"c": [18, 19],
},
{"a": None, "b": None, "c": None},
] * 1000

pa_struct = pa.Table.from_pydict({"struct": data})
df = cudf.DataFrame.from_arrow(pa_struct)
buffer = BytesIO()
df.to_parquet(buffer)

actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
nrows=num_rows if num_rows is not None else len(df),
)
expected = cudf.read_parquet(
buffer,
nrows=num_rows if num_rows is not None else len(df),
)
assert_eq(expected, actual)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("num_rows", [4997, 9997, None])
@pytest.mark.parametrize(
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
"str_encoding",
[
"PLAIN",
"DELTA_BYTE_ARRAY",
"DELTA_LENGTH_BYTE_ARRAY",
],
)
def test_parquet_chunked_reader_string_decoders(
chunk_read_limit,
pass_read_limit,
num_rows,
str_encoding,
):
df = pd.DataFrame(
{
"i64": [1, 2, 3, None] * 10000,
"str": ["av", "qw", "asd", "xyz"] * 10000,
"list": list(
[["ad", "cd"], ["asd", "fd"], None, ["asd", None]] * 10000
),
}
)
buffer = BytesIO()
# Write 4 Parquet row groups with string column encoded
df.to_parquet(
buffer,
row_group_size=10000,
use_dictionary=False,
column_encoding={"str": str_encoding},
)
# Check with num_rows specified
actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
nrows=num_rows if num_rows is not None else len(df),
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
)
expected = cudf.read_parquet(
buffer,
nrows=num_rows if num_rows is not None else len(df),
)
assert_eq(expected, actual)


@pytest.mark.parametrize(
"nrows,skip_rows",
[
Expand Down
Loading