Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet sub-rowgroup reading. #14360

Merged
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
8907a90
Cleanup of namespaces in parquet. The ::detail::parquet namespace ha…
nvdbaranec Oct 6, 2023
cb74b7e
Remove reader_impl_chunking.cu, which was accidentally included.
nvdbaranec Oct 6, 2023
227e1f0
Centralize all pass/chunk related code into reader_impl_chunking.cu
nvdbaranec Oct 6, 2023
b7f51a5
Merge branch 'branch-23.12' into parquet_chunked_reader_cleanup
nvdbaranec Oct 9, 2023
f1378e5
Formatting.
nvdbaranec Oct 9, 2023
79ae066
Remove unnecessary comment block.
nvdbaranec Oct 9, 2023
85b1e83
Change include file ordering
nvdbaranec Oct 10, 2023
3d4fa81
Merge branch 'branch-23.12' into parquet_chunked_reader_cleanup
nvdbaranec Oct 10, 2023
08ce770
First pass at sub rwogroup reading. Basic tests show proof of concept…
nvdbaranec Nov 3, 2023
ef373d6
Merge branch 'branch-23.12' into parquet_sub_rowgroup_chunks
nvdbaranec Nov 6, 2023
4d2326d
Formatting.
nvdbaranec Nov 6, 2023
8d4e3f9
Setup dictionary pages properly at the pass level. Fixed issues with …
nvdbaranec Nov 7, 2023
df67c5c
Fixed an issue with double-decompression of dictionaries. Greatly sim…
nvdbaranec Nov 8, 2023
c13eb58
Fixed an issue with mis-allocation of nesting info structs for list c…
nvdbaranec Nov 8, 2023
0a3a918
Fix an issue with setting up nesting information for lists. Fixed inc…
nvdbaranec Nov 9, 2023
1000978
Fix another dict_page setup issue. Many more tests passing.
nvdbaranec Nov 9, 2023
cea9bc5
Fixed edge cases for skip_rows/num_rows. Added a missing __device__ …
nvdbaranec Nov 12, 2023
259ca1c
Sort pass pages by input schema index instead of input schema value t…
nvdbaranec Nov 12, 2023
8758574
Fixed several edge cases in input and output chunking. Fixed an odd t…
nvdbaranec Nov 13, 2023
7cd3b83
Fixed an issue with subpass computation.
nvdbaranec Nov 13, 2023
32b0a23
Merge branch 'branch-23.12' into parquet_sub_rowgroup_chunks
nvdbaranec Nov 14, 2023
2dcc10b
Fixed an issue with string dict index computation stemming from how p…
nvdbaranec Nov 16, 2023
a055bbb
Fixed an old bug that caused an issue with the subrowgroup reader. Di…
nvdbaranec Nov 17, 2023
1227e72
Handling for list columns with rows that span page boundaries. Better…
nvdbaranec Dec 3, 2023
953e539
Fix a small bug in output chunk computation.
nvdbaranec Dec 3, 2023
30083b7
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks.
nvdbaranec Dec 3, 2023
8ef72a6
Fixed a missing stream in a cudaMemcpyAsync call. Switch some tests …
nvdbaranec Dec 4, 2023
a2603be
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Dec 7, 2023
54a8c02
Changed the mechanism with which we collect pages after determining s…
nvdbaranec Dec 7, 2023
1bff089
Fixed an indexing issue (not using column-relative indices) in the pa…
nvdbaranec Dec 7, 2023
b3539f9
Wave of PR review comment fixes.
nvdbaranec Dec 12, 2023
ae452f9
Second wave of PR review feedback.
nvdbaranec Dec 12, 2023
a9d1e0c
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Dec 12, 2023
491f991
More PR review feedback.
nvdbaranec Dec 12, 2023
f4f0438
Remove the code that checks for uninitialized PageNestingInfo structs…
nvdbaranec Dec 14, 2023
b131e7a
Include nvcomp scratch space needed in chunking computation. Fixed an…
nvdbaranec Dec 19, 2023
c7b2a55
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Dec 19, 2023
713199f
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Dec 22, 2023
92769c1
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Jan 3, 2024
bacb763
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Jan 16, 2024
266ad87
Added tweakable parameter for controlling ratio of compressed/decompr…
nvdbaranec Jan 18, 2024
21c1636
Fixed exception message.
nvdbaranec Jan 18, 2024
aa99bb8
Formatting.
nvdbaranec Jan 18, 2024
479251f
More formatting. For some reason pre-commit didn't catch everything l…
nvdbaranec Jan 18, 2024
19538a3
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Jan 19, 2024
ec81928
Review feedback changes.
nvdbaranec Jan 19, 2024
b01b06e
Formatting.
nvdbaranec Jan 19, 2024
ce1f94e
Add a missing proclaim_return_type.
nvdbaranec Jan 19, 2024
484d63c
PR review feedback. Remove unnecessary INVALID nvcomp wrapper enum v…
nvdbaranec Jan 22, 2024
781526b
Formatting.
nvdbaranec Jan 22, 2024
eab3b6a
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Jan 22, 2024
fe995bb
PR review feedback. Remove use of deprecated make_strings_column() in…
nvdbaranec Jan 22, 2024
97df57a
Formatting.
nvdbaranec Jan 22, 2024
9097dfd
PR review feedback.
nvdbaranec Jan 23, 2024
44f365a
Formatting
nvdbaranec Jan 23, 2024
7ff6459
More PR review feedback.
nvdbaranec Jan 23, 2024
8ab4f80
Formatting.
nvdbaranec Jan 23, 2024
3ed0351
Wave of PR review feedback.
nvdbaranec Jan 24, 2024
6a5b17f
More PR feedback. Whole lot of consting going on.
nvdbaranec Jan 24, 2024
0083639
Formatting.
nvdbaranec Jan 24, 2024
ef25b72
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
nvdbaranec Jan 24, 2024
b0002d7
Added a missing CUDF_KERNEL tag to gpuDecodePageHeaders. Misc review …
nvdbaranec Jan 24, 2024
a48c8e8
Formatting.
nvdbaranec Jan 24, 2024
d280d18
Merge branch 'branch-24.02' into parquet_sub_rowgroup_chunks
ttnghia Jan 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,8 +99,8 @@ inline bool operator==(feature_status_parameters const& lhs, feature_status_para
* @param[in] inputs List of input buffers
* @param[out] outputs List of output buffers
* @param[out] results List of output status structures
* @param[in] max_uncomp_chunk_size maximum size of uncompressed chunk
* @param[in] max_total_uncomp_size maximum total size of uncompressed data
* @param[in] max_uncomp_chunk_size Maximum size of any single uncompressed chunk
* @param[in] max_total_uncomp_size Maximum total size of uncompressed data
* @param[in] stream CUDA stream to use
*/
void batched_decompress(compression_type compression,
Expand All @@ -111,6 +111,24 @@ void batched_decompress(compression_type compression,
size_t max_total_uncomp_size,
rmm::cuda_stream_view stream);

/**
* @brief Return the amount of temporary space required in bytes for a given decompression
* operation.
*
* The size returned reflects the size of the scratch buffer to be passed to
* `batched_decompress_async`
*
* @param[in] compression Compression type
* @param[in] num_chunks The number of decompression chunks to be processed
* @param[in] max_uncomp_chunk_size Maximum size of any single uncompressed chunk
* @param[in] max_total_uncomp_size Maximum total size of uncompressed data
* @returns The total required size in bytes
*/
size_t batched_decompress_temp_size(compression_type compression,
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
size_t num_chunks,
size_t max_uncomp_chunk_size,
size_t max_total_uncomp_size);

/**
* @brief Gets the maximum size any chunk could compress to in the batch.
*
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1301,16 +1301,15 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
if (((s->col.data_type & 7) == BYTE_ARRAY) && (s->col.str_dict_index)) {
// String dictionary: use index
s->dict_base = reinterpret_cast<uint8_t const*>(s->col.str_dict_index);
s->dict_size = s->col.page_info[0].num_input_values * sizeof(string_index_pair);
s->dict_size = s->col.dict_page->num_input_values * sizeof(string_index_pair);
} else {
s->dict_base =
s->col.page_info[0].page_data; // dictionary is always stored in the first page
s->dict_size = s->col.page_info[0].uncompressed_page_size;
s->dict_base = s->col.dict_page->page_data;
s->dict_size = s->col.dict_page->uncompressed_page_size;
}
s->dict_run = 0;
s->dict_val = 0;
s->dict_bits = (cur < end) ? *cur++ : 0;
if (s->dict_bits > 32 || !s->dict_base) {
if (s->dict_bits > 32 || (!s->dict_base && s->col.dict_page->num_input_values > 0)) {
s->set_error_code(decode_error::INVALID_DICT_WIDTH);
}
break;
Expand Down
26 changes: 14 additions & 12 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,10 @@ struct gpuParsePageHeader {
* @param[in] num_chunks Number of column chunks
*/
// blockDim {128,1,1}
CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
kernel_error::pointer error_code)
__global__ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
gpuParsePageHeader parse_page_header;
Expand Down Expand Up @@ -392,11 +393,10 @@ CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* ch
bs->page.temp_string_buf = nullptr;
bs->page.kernel_mask = decode_kernel_mask::NONE;
}
num_values = bs->ck.num_values;
page_info = bs->ck.page_info;
num_dict_pages = bs->ck.num_dict_pages;
max_num_pages = (page_info) ? bs->ck.max_num_pages : 0;
values_found = 0;
num_values = bs->ck.num_values;
page_info = chunk_pages ? chunk_pages[chunk].pages : nullptr;
max_num_pages = page_info ? bs->ck.max_num_pages : 0;
values_found = 0;
__syncwarp();
while (values_found < num_values && bs->cur < bs->end) {
int index_out = -1;
Expand Down Expand Up @@ -495,9 +495,9 @@ CUDF_KERNEL void __launch_bounds__(128)
if (!lane_id && ck->num_dict_pages > 0 && ck->str_dict_index) {
// Data type to describe a string
string_index_pair* dict_index = ck->str_dict_index;
uint8_t const* dict = ck->page_info[0].page_data;
int dict_size = ck->page_info[0].uncompressed_page_size;
int num_entries = ck->page_info[0].num_input_values;
uint8_t const* dict = ck->dict_page->page_data;
int dict_size = ck->dict_page->uncompressed_page_size;
int num_entries = ck->dict_page->num_input_values;
int pos = 0, cur = 0;
for (int i = 0; i < num_entries; i++) {
int len = 0;
Expand All @@ -518,13 +518,15 @@ CUDF_KERNEL void __launch_bounds__(128)
}

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks, error_code);
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(
chunks, chunk_pages, num_chunks, error_code);
}

void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -868,14 +868,16 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi
if (col.str_dict_index) {
// String dictionary: use index
dict_base = reinterpret_cast<const uint8_t*>(col.str_dict_index);
dict_size = col.page_info[0].num_input_values * sizeof(string_index_pair);
dict_size = col.dict_page->num_input_values * sizeof(string_index_pair);
} else {
dict_base = col.page_info[0].page_data; // dictionary is always stored in the first page
dict_size = col.page_info[0].uncompressed_page_size;
dict_base = col.dict_page->page_data;
dict_size = col.dict_page->uncompressed_page_size;
}

// FIXME: need to return an error condition...this won't actually do anything
if (s->dict_bits > 32 || !dict_base) { CUDF_UNREACHABLE("invalid dictionary bit size"); }
if (s->dict_bits > 32 || (!dict_base && col.dict_page->num_input_values > 0)) {
CUDF_UNREACHABLE("invalid dictionary bit size");
}

str_bytes = totalDictEntriesSize(
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
Expand Down
57 changes: 42 additions & 15 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -339,6 +339,21 @@ struct PageInfo {
decode_kernel_mask kernel_mask;
};

/**
* @brief Return the column schema id as the key for a PageInfo struct.
*/
struct get_page_key {
__device__ int32_t operator()(PageInfo const& page) const { return page.src_col_schema; }
};

/**
* @brief Return an iterator that returns they keys for a vector of pages.
*/
inline auto make_page_key_iterator(device_span<PageInfo const> pages)
{
return thrust::make_transform_iterator(pages.begin(), get_page_key{});
}

/**
* @brief Struct describing a particular chunk of column data
*/
Expand All @@ -362,7 +377,8 @@ struct ColumnChunkDesc {
int8_t decimal_precision_,
int32_t ts_clock_rate_,
int32_t src_col_index_,
int32_t src_col_schema_)
int32_t src_col_schema_,
float list_bytes_per_row_est_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
Expand All @@ -375,7 +391,7 @@ struct ColumnChunkDesc {
num_data_pages(0),
num_dict_pages(0),
max_num_pages(0),
page_info(nullptr),
dict_page(nullptr),
str_dict_index(nullptr),
valid_map_base{nullptr},
column_data_base{nullptr},
Expand All @@ -386,26 +402,26 @@ struct ColumnChunkDesc {
decimal_precision(decimal_precision_),
ts_clock_rate(ts_clock_rate_),
src_col_index(src_col_index_),
src_col_schema(src_col_schema_)
src_col_schema(src_col_schema_),
list_bytes_per_row_est(list_bytes_per_row_est_)
{
}

uint8_t const* compressed_data{}; // pointer to compressed column chunk data
size_t compressed_size{}; // total compressed data size for this chunk
size_t num_values{}; // total number of values in this column
size_t start_row{}; // starting row of this chunk
uint32_t num_rows{}; // number of rows in this chunk
uint8_t const* compressed_data{}; // pointer to compressed column chunk data
size_t compressed_size{}; // total compressed data size for this chunk
size_t num_values{}; // total number of values in this column
size_t start_row{}; // file-wide, absolute starting row of this chunk
uint32_t num_rows{}; // number of rows in this chunk
int16_t max_level[level_type::NUM_LEVEL_TYPES]{}; // max definition/repetition level
int16_t max_nesting_depth{}; // max nesting depth of the output
uint16_t data_type{}; // basic column data type, ((type_length << 3) |
// parquet::Type)
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
uint8_t
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
int32_t num_dict_pages{}; // number of dictionary pages
int32_t max_num_pages{}; // size of page_info array
PageInfo* page_info{}; // output page info for up to num_dict_pages +
// num_data_pages (dictionary pages first)
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
int32_t num_dict_pages{}; // number of dictionary pages
int32_t max_num_pages{}; // size of page_info array
PageInfo const* dict_page{};
string_index_pair* str_dict_index{}; // index for string dictionary
bitmask_type** valid_map_base{}; // base pointers of valid bit map for this column
void** column_data_base{}; // base pointers of column data
Expand All @@ -418,6 +434,15 @@ struct ColumnChunkDesc {

int32_t src_col_index{}; // my input column index
int32_t src_col_schema{}; // my schema index in the file

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* @brief A utility structure for use in decoding page headers.
*/
struct chunk_page_info {
PageInfo* pages;
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down Expand Up @@ -578,11 +603,13 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
* @brief Launches kernel for parsing the page headers in the column chunks
*
* @param[in] chunks List of column chunks
* @param[in] chunk_pages List of pages associated with the chunks, in chunk-sorted order
* @param[in] num_chunks Number of column chunks
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
int32_t num_chunks,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);
Expand Down
Loading