Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report number of rows per file read by PQ reader when no row selection and fix segfault in chunked PQ reader when skip_rows > 0 #16195

Merged
merged 35 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c249f05
Calculate num rows read from each data source by PQ reader
mhaseeb123 Jul 4, 2024
3891e3a
Adding gtests for num_rows_per_source
mhaseeb123 Jul 9, 2024
4bc569e
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 9, 2024
d3863a6
Minor updates
mhaseeb123 Jul 9, 2024
794d59c
Merge branch 'report-nrows-per-source' of https://github.com/mhaseeb1…
mhaseeb123 Jul 9, 2024
ebdfad5
gtests for empty dfs and minor improvements
mhaseeb123 Jul 9, 2024
0fd6890
separate out the empty df gtest
mhaseeb123 Jul 9, 2024
a294c18
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 9, 2024
d268873
Add num_rows_per_source vector to the types.pxd for future use in pyt…
mhaseeb123 Jul 10, 2024
702b0ee
Adjust for global_skip_rows while computing num_rows_per_source
mhaseeb123 Jul 11, 2024
d031af9
Fix segfault when skip_rows > 0 and num_passes > 1 in chunked_parquet…
mhaseeb123 Jul 11, 2024
1a207e9
Handle base cases when calculating num_rows_per_source
mhaseeb123 Jul 11, 2024
78ed6d1
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 11, 2024
7ad6179
Add a couple more gtests
mhaseeb123 Jul 11, 2024
975b7c3
Revert the chunk_start_row in column_info_for_row_group the page inde…
mhaseeb123 Jul 11, 2024
e826caf
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 11, 2024
fa33f7a
Merge branch 'report-nrows-per-source' of https://github.com/mhaseeb1…
mhaseeb123 Jul 12, 2024
0ac70b2
Minor bug fix
mhaseeb123 Jul 12, 2024
bcc3bec
Remove the unreachable branch
mhaseeb123 Jul 16, 2024
363b0da
Applying suggestions
mhaseeb123 Jul 16, 2024
8c5816b
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 16, 2024
e239382
Suggestions from commits
mhaseeb123 Jul 16, 2024
6f7d203
Finally fixed the segfault when skip_rows > row group size
mhaseeb123 Jul 17, 2024
c19e972
Revert const to row_group_info as no longer needed
mhaseeb123 Jul 17, 2024
9208a80
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 17, 2024
513d3bb
Fix the free() invalid pointer at chunked reader destructor
mhaseeb123 Jul 17, 2024
c11b27d
Minor code cleanup
mhaseeb123 Jul 17, 2024
dd19f52
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 17, 2024
a641037
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 17, 2024
6ae6f07
Minor code improvements.
mhaseeb123 Jul 18, 2024
91e4735
Add helper function for include_output_num_rows_per_source
mhaseeb123 Jul 18, 2024
ed4352a
Applying suggestions from reviews
mhaseeb123 Jul 19, 2024
02b32ac
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 19, 2024
71e2d4d
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 19, 2024
1439189
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ struct column_name_info {
struct table_metadata {
std::vector<column_name_info>
schema_info; //!< Detailed name information for the entire output hierarchy
std::vector<size_t> num_rows_per_source; //!< Number of rows read from each data source.
//!< Currently only computed for Parquet readers if no
//!< AST filters being used. Empty vector otherwise.
std::map<std::string, std::string> user_data; //!< Format-dependent metadata of the first input
//!< file as key-values pairs (deprecated)
std::vector<std::unordered_map<std::string, std::string>>
Expand Down
86 changes: 83 additions & 3 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <rmm/resource_ref.hpp>

#include <thrust/binary_search.h>
#include <thrust/iterator/counting_iterator.h>

#include <bitset>
Expand Down Expand Up @@ -549,7 +550,17 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
out_columns.reserve(_output_buffers.size());

// no work to do (this can happen on the first pass if we have no rows to read)
if (!has_more_work()) { return finalize_output(out_metadata, out_columns); }
if (!has_more_work()) {
// Check if number of rows per source should be included in output metadata.
if (include_output_num_rows_per_source()) {
// Empty dataframe case: Simply initialize to a list of zeros
out_metadata.num_rows_per_source =
std::vector<size_t>(_file_itm_data.num_rows_per_source.size(), 0);
}

// Finalize output
return finalize_output(mode, out_metadata, out_columns);
}

auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
Expand Down Expand Up @@ -585,11 +596,80 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
}
}

// Check if number of rows per source should be included in output metadata.
if (include_output_num_rows_per_source()) {
// For chunked reading, compute the output number of rows per source
if (mode == read_mode::CHUNKED_READ) {
out_metadata.num_rows_per_source =
calculate_output_num_rows_per_source(read_info.skip_rows, read_info.num_rows);
}
// Simply move the number of rows per file if reading all at once
else {
// Move is okay here as we are reading in one go.
out_metadata.num_rows_per_source = std::move(_file_itm_data.num_rows_per_source);
}
}

// Add empty columns if needed. Filter output columns based on filter.
return finalize_output(out_metadata, out_columns);
return finalize_output(mode, out_metadata, out_columns);
}

std::vector<size_t> reader::impl::calculate_output_num_rows_per_source(size_t const chunk_start_row,
size_t const chunk_num_rows)
{
// Handle base cases.
if (_file_itm_data.num_rows_per_source.size() == 0) {
return {};
} else if (_file_itm_data.num_rows_per_source.size() == 1) {
return {chunk_num_rows};
}

std::vector<size_t> num_rows_per_source(_file_itm_data.num_rows_per_source.size(), 0);

// Subtract global skip rows from the start_row as we took care of that when computing
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binary search the lower and upper index into the partial_sum_nrows_source and compute the number of rows seen per source in between.

// _file_itm_data.num_rows_per_source
auto const start_row = chunk_start_row - _file_itm_data.global_skip_rows;
auto const end_row = start_row + chunk_num_rows;
CUDF_EXPECTS(start_row <= end_row and end_row <= _file_itm_data.global_num_rows,
"Encountered invalid output chunk row bounds.");

// Copy reference to a const local variable for better readability
auto const& partial_sum_nrows_source = _file_itm_data.exclusive_sum_num_rows_per_source;

// Binary search start_row and end_row in exclusive_sum_num_rows_per_source vector
auto const start_iter =
std::upper_bound(partial_sum_nrows_source.cbegin(), partial_sum_nrows_source.cend(), start_row);
auto const end_iter =
(end_row == _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows)
? partial_sum_nrows_source.cend() - 1
: std::upper_bound(start_iter, partial_sum_nrows_source.cend(), end_row);

// Compute the array offset index for both iterators
auto const start_idx = std::distance(partial_sum_nrows_source.cbegin(), start_iter);
auto const end_idx = std::distance(partial_sum_nrows_source.cbegin(), end_iter);

CUDF_EXPECTS(start_idx <= end_idx,
"Encountered invalid source files indexes for output chunk row bounds");

// If the entire chunk is from the same source file, then the count is simply num_rows
if (start_idx == end_idx) {
num_rows_per_source[start_idx] = chunk_num_rows;
} else {
// Compute the number of rows from the first source file
num_rows_per_source[start_idx] = partial_sum_nrows_source[start_idx] - start_row;
// Compute the number of rows from the last source file
num_rows_per_source[end_idx] = end_row - partial_sum_nrows_source[end_idx - 1];
// Simply copy the number of rows for each source in range: (start_idx, end_idx)
std::copy(_file_itm_data.num_rows_per_source.cbegin() + start_idx + 1,
_file_itm_data.num_rows_per_source.cbegin() + end_idx,
num_rows_per_source.begin() + start_idx + 1);
}

return num_rows_per_source;
}

table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata,
table_with_metadata reader::impl::finalize_output(read_mode mode,
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns)
{
// Create empty columns as needed (this can happen if we've ended up with no actual data to read)
Expand Down
31 changes: 29 additions & 2 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,13 @@ class reader::impl {
* @brief Finalize the output table by adding empty columns for the non-selected columns in
* schema.
*
* @param read_mode Value indicating if the data sources are read all at once or chunk by chunk
* @param out_metadata The output table metadata
* @param out_columns The columns for building the output table
* @return The output table along with columns' metadata
*/
table_with_metadata finalize_output(table_metadata& out_metadata,
table_with_metadata finalize_output(read_mode mode,
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns);

/**
Expand Down Expand Up @@ -336,11 +338,36 @@ class reader::impl {
: true;
}

/**
* @brief Check if this is the first output chunk
*
* @return True if this is the first output chunk
*/
[[nodiscard]] bool is_first_output_chunk() const
{
return _file_itm_data._output_chunk_count == 0;
}

/**
* @brief Check if number of rows per source should be included in output metadata.
*
* @return True if AST filter is not present
*/
[[nodiscard]] bool include_output_num_rows_per_source() const
{
return not _expr_conv.get_converted_expr().has_value();
}

/**
* @brief Calculate the number of rows read from each source in the output chunk
*
* @param chunk_start_row The offset of the first row in the output chunk
* @param chunk_num_rows The number of rows in the the output chunk
* @return Vector of number of rows from each respective data source in the output chunk
*/
[[nodiscard]] std::vector<size_t> calculate_output_num_rows_per_source(size_t chunk_start_row,
size_t chunk_num_rows);

rmm::cuda_stream_view _stream;
rmm::device_async_resource_ref _mr{rmm::mr::get_current_device_resource()};

Expand Down Expand Up @@ -387,7 +414,7 @@ class reader::impl {

// chunked reading happens in 2 parts:
//
// At the top level, the entire file is divided up into "passes" omn which we try and limit the
// At the top level, the entire file is divided up into "passes" on which we try and limit the
// total amount of temporary memory (compressed data, decompressed data) in use
// via _input_pass_read_limit.
//
Expand Down
53 changes: 35 additions & 18 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1232,22 +1232,22 @@ void reader::impl::setup_next_pass(read_mode mode)
pass.skip_rows = _file_itm_data.global_skip_rows;
pass.num_rows = _file_itm_data.global_num_rows;
} else {
auto const global_start_row = _file_itm_data.global_skip_rows;
auto const global_end_row = global_start_row + _file_itm_data.global_num_rows;
auto const start_row =
std::max(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass],
global_start_row);
auto const end_row =
std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1],
global_end_row);

// skip_rows is always global in the sense that it is relative to the first row of
// everything we will be reading, regardless of what pass we are on.
// num_rows is how many rows we are reading this pass.
pass.skip_rows =
global_start_row +
// pass_start_row and pass_end_row are computed from the selected row groups relative to the
// global_skip_rows.
auto const pass_start_row =
_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass];
Copy link
Member Author

@mhaseeb123 mhaseeb123 Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All diff in this file only fixes the arithmetic that leads to segfault for skip_rows. Changes needed for gtests to pass successfully

pass.num_rows = end_row - start_row;
auto const pass_end_row =
std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1],
_file_itm_data.global_num_rows);

// pass.skip_rows is always global in the sense that it is relative to the first row of
// the data source (global row number 0), regardless of what pass we are on. Therefore,
// we must re-add global_skip_rows to the pass_start_row which is relative to the
// global_skip_rows.
pass.skip_rows = _file_itm_data.global_skip_rows + pass_start_row;
// num_rows is how many rows we are reading this pass. Since this is a difference, adding
// global_skip_rows to both variables is redundant.
pass.num_rows = pass_end_row - pass_start_row;
}

// load page information for the chunk. this retrieves the compressed bytes for all the
Expand Down Expand Up @@ -1509,6 +1509,7 @@ void reader::impl::create_global_chunk_info()

// Initialize column chunk information
auto remaining_rows = num_rows;
auto skip_rows = _file_itm_data.global_skip_rows;
for (auto const& rg : row_groups_info) {
auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index);
auto const row_group_start = rg.start_row;
Expand Down Expand Up @@ -1561,7 +1562,12 @@ void reader::impl::create_global_chunk_info()
schema.type == BYTE_ARRAY and _strings_to_categorical));
}

remaining_rows -= row_group_rows;
// Adjust for skip_rows when updating the remaining rows after the first group
remaining_rows -=
(skip_rows) ? std::min<int>(rg.start_row + row_group.num_rows - skip_rows, remaining_rows)
: row_group_rows;
// Set skip_rows = 0 as it is no longer needed for subsequent row_groups
skip_rows = 0;
}
}

Expand Down Expand Up @@ -1598,6 +1604,9 @@ void reader::impl::compute_input_passes()
_file_itm_data.input_pass_row_group_offsets.push_back(0);
_file_itm_data.input_pass_start_row_count.push_back(0);

// To handle global_skip_rows when computing input passes
int skip_rows = _file_itm_data.global_skip_rows;

for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) {
auto const& rgi = row_groups_info[cur_rg_index];
auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index);
Expand All @@ -1606,14 +1615,22 @@ void reader::impl::compute_input_passes()
auto const [compressed_rg_size, _ /*compressed + uncompressed*/] =
get_row_group_size(row_group);

// We must use the effective size of the first row group we are reading to accurately calculate
// the first non-zero input_pass_start_row_count.
auto const row_group_rows =
(skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows;

// Set skip_rows = 0 as it is no longer needed for subsequent row_groups
skip_rows = 0;

// can we add this row group
if (cur_pass_byte_size + compressed_rg_size >= comp_read_limit) {
// A single row group (the current one) is larger than the read limit:
// We always need to include at least one row group, so end the pass at the end of the current
// row group
if (cur_rg_start == cur_rg_index) {
_file_itm_data.input_pass_row_group_offsets.push_back(cur_rg_index + 1);
_file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group.num_rows);
_file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group_rows);
cur_rg_start = cur_rg_index + 1;
cur_pass_byte_size = 0;
}
Expand All @@ -1627,7 +1644,7 @@ void reader::impl::compute_input_passes()
} else {
cur_pass_byte_size += compressed_rg_size;
}
cur_row_count += row_group.num_rows;
cur_row_count += row_group_rows;
}

// add the last pass if necessary
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ struct file_intermediate_data {
// is not capped by global_skip_rows and global_num_rows.
std::vector<std::size_t> input_pass_start_row_count{};

// number of rows to be read from each data source
std::vector<std::size_t> num_rows_per_source{};

// partial sum of the number of rows per data source
std::vector<std::size_t> exclusive_sum_num_rows_per_source{};
vuule marked this conversation as resolved.
Show resolved Hide resolved

size_t _current_input_pass{0}; // current input pass index
size_t _output_chunk_count{0}; // how many output chunks we have produced

Expand Down
32 changes: 26 additions & 6 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con
return names;
}

std::tuple<int64_t, size_type, std::vector<row_group_info>>
std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
aggregate_reader_metadata::select_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
int64_t skip_rows_opt,
Expand Down Expand Up @@ -976,6 +976,9 @@ aggregate_reader_metadata::select_row_groups(
static_cast<size_type>(from_opts.second)};
}();

// Get number of rows in each data source
std::vector<size_t> num_rows_per_source(per_file_metadata.size(), 0);

if (!row_group_indices.empty()) {
CUDF_EXPECTS(row_group_indices.size() == per_file_metadata.size(),
"Must specify row groups for each source");
Expand All @@ -989,28 +992,45 @@ aggregate_reader_metadata::select_row_groups(
selection.emplace_back(rowgroup_idx, rows_to_read, src_idx);
// if page-level indexes are present, then collect extra chunk and page info.
column_info_for_row_group(selection.back(), 0);
rows_to_read += get_row_group(rowgroup_idx, src_idx).num_rows;
auto const rows_this_rg = get_row_group(rowgroup_idx, src_idx).num_rows;
rows_to_read += rows_this_rg;
num_rows_per_source[src_idx] += rows_this_rg;
}
}
} else {
size_type count = 0;
for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) {
auto const& fmd = per_file_metadata[src_idx];
for (size_t rg_idx = 0; rg_idx < fmd.row_groups.size(); ++rg_idx) {
for (size_t rg_idx = 0;
Copy link
Member Author

@mhaseeb123 mhaseeb123 Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop will now stop as soon as count >= rows_to_read + rows_to_skip

rg_idx < fmd.row_groups.size() and count < rows_to_skip + rows_to_read;
++rg_idx) {
auto const& rg = fmd.row_groups[rg_idx];
auto const chunk_start_row = count;
count += rg.num_rows;
if (count > rows_to_skip || count == 0) {
// start row of this row group adjusted with rows_to_skip
num_rows_per_source[src_idx] += count;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
num_rows_per_source[src_idx] -=
(chunk_start_row <= rows_to_skip) ? rows_to_skip : chunk_start_row;

// We need the unadjusted start index of this row group to correctly initialize
// ColumnChunkDesc for this row group in create_global_chunk_info() and calculate
// the row offset for the first pass in compute_input_passes().
selection.emplace_back(rg_idx, chunk_start_row, src_idx);
// if page-level indexes are present, then collect extra chunk and page info.

// If page-level indexes are present, then collect extra chunk and page info.
// The page indexes rely on absolute row numbers, not adjusted for skip_rows.
column_info_for_row_group(selection.back(), chunk_start_row);
}
if (count >= rows_to_skip + rows_to_read) { break; }
// Adjust the number of rows for the last source file.
if (count >= rows_to_skip + rows_to_read) {
num_rows_per_source[src_idx] -= count - rows_to_skip - rows_to_read;
}
}
}
}

return {rows_to_skip, rows_to_read, std::move(selection)};
return {rows_to_skip, rows_to_read, std::move(selection), std::move(num_rows_per_source)};
}

std::tuple<std::vector<input_column_info>,
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,17 @@ class aggregate_reader_metadata {
* @param output_column_schemas schema indices of output columns
* @param filter Optional AST expression to filter row groups based on Column chunk statistics
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of corrected row_start, row_count and list of row group indexes and its
* starting row
* @return A tuple of corrected row_start, row_count, list of row group indexes and its
* starting row, and list of number of rows per source.
*/
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<row_group_info>> select_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
select_row_groups(host_span<std::vector<size_type> const> row_group_indices,
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;

/**
* @brief Filters and reduces down to a selection of columns
Expand Down
Loading
Loading