Skip to content

Commit

Permalink
support reading row group
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Dec 19, 2024
1 parent e0473b6 commit f325184
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <packed/chunk_manager.h>
#include <packed/column_group.h>
#include "common/config.h"
#include "common/result.h"
#include <parquet/arrow/reader.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
Expand Down Expand Up @@ -57,6 +58,8 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

Result<std::shared_ptr<arrow::Table>> ReadRowGroup(int file_index, int row_group_index);

arrow::Status Close() override;

private:
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,21 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBat
return arrow::Status::OK();
}

Result<std::shared_ptr<arrow::Table>> PackedRecordBatchReader::ReadRowGroup(int file_index, int row_group_index) {
if (file_index < 0 || file_index >= file_readers_.size()) {
throw std::out_of_range("Invalid file index");
}

auto reader = file_readers_[file_index]->parquet_reader();
if (row_group_index < 0 || row_group_index >= reader->metadata()->num_row_groups()) {
throw std::out_of_range("Invalid row group index");
}

std::shared_ptr<arrow::Table> table;
RETURN_ARROW_NOT_OK(file_readers_[file_index]->ReadRowGroup(row_group_index, &table));
return std::move(table);
}

arrow::Status PackedRecordBatchReader::Close() {
LOG_STORAGE_DEBUG_ << "PackedRecordBatchReader::Close(), total read " << read_count_ << " times";
for (int i = 0; i < column_group_states_.size(); ++i) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ class PackedTestBase : public ::testing::Test {

PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());

auto res = pr.ReadRowGroup(0, 0);
if (!res.ok()) {
ASSERT_FALSE(res.ok());
}
auto row_group = res.value();
ASSERT_TRUE(row_group->num_rows() > 0);
ASSERT_STATUS_OK(pr.Close());

ValidateTableData(table);
Expand Down

0 comments on commit f325184

Please sign in to comment.