Skip to content

Commit

Permalink
async mem record reader
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Dec 30, 2024
1 parent 82fbaa5 commit 49b084c
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 69 deletions.
13 changes: 7 additions & 6 deletions cpp/include/milvus-storage/c/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ typedef void* CStatus;
typedef void* CRecordBatch;
typedef void* CFileSystem;

int OpenWithRowGroupRange(const char* path,
struct ArrowSchema* schema,
const int64_t row_group_offset,
const int64_t row_group_num,
const int64_t buffer_size,
struct ArrowArrayStream* out);
/**
* Open a memory record batch reader.
*/
int OpenMemRecordBatchReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
struct ArrowArrayStream* out);

#ifdef __cplusplus
}
Expand Down
30 changes: 12 additions & 18 deletions cpp/include/milvus-storage/packed/mem_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,21 @@ namespace milvus_storage {
class MemRecordBatchReader : public arrow::RecordBatchReader {
public:
/**
* @brief Constructor for MemRecordBatchReader.
* @brief MemRecordBatchReader reads num of row groups starting from row_group_offset with memory constraints.
*
* @param fs The Arrow filesystem interface.
* @param path Path to the Parquet file.
* @param schema Expected schema of the Parquet file.
* @param row_group_offset The starting row group index to read, through end of file.
* @param buffer_size Memory limit for reading row groups.
*/
MemRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const size_t row_group_offset,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);
/**
* @brief Constructor for MemRecordBatchReader.
*
* @param fs The Arrow filesystem interface.
* @param path Path to the Parquet file.
* @param schema Expected schema of the Parquet file.
* @param row_group_offset The starting row group index to read.
* @param row_group_num The number of row groups to read.
* @param buffer_size Memory limit for reading row groups.
*/
MemRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const size_t row_group_offset,
const size_t row_group_num,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE,
const size_t row_group_offset = 0,
const size_t row_group_num = std::numeric_limits<size_t>::max());

/**
* @brief Returns the schema of the Parquet file.
Expand All @@ -84,6 +70,7 @@ class MemRecordBatchReader : public arrow::RecordBatchReader {
*/
arrow::Status Close();

private:
std::shared_ptr<arrow::Schema> schema_;
std::unique_ptr<parquet::arrow::FileReader> file_reader_;
size_t current_row_group_ = 0;
Expand All @@ -92,6 +79,13 @@ class MemRecordBatchReader : public arrow::RecordBatchReader {
int64_t buffer_size_;
std::vector<size_t> row_group_sizes_;
size_t row_group_offset_;

void Initialize(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size,
const size_t row_group_offset,
const size_t row_group_num);
};

} // namespace milvus_storage
5 changes: 1 addition & 4 deletions cpp/src/c/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

int OpenMemRecordBatchReader(const char* path,
struct ArrowSchema* schema,
const int64_t row_group_offset,
const int64_t row_group_num,
const int64_t buffer_size,
struct ArrowArrayStream* out) {
auto truePath = std::string(path);
Expand All @@ -41,8 +39,7 @@ int OpenMemRecordBatchReader(const char* path,
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::MemRecordBatchReader>(*trueFs, path, trueSchema, row_group_offset,
row_group_num, buffer_size);
auto reader = std::make_shared<milvus_storage::MemRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
LOG_STORAGE_ERROR_ << "read export done";
if (!status.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/packed/async_mem_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ arrow::Status AsyncMemRecordBatchReader::Execute() {
const auto& [offset, num_row_groups] = batches[reader_index];

auto reader =
std::make_shared<MemRecordBatchReader>(fs_, path_, schema_, offset, num_row_groups, reader_buffer_size);
std::make_shared<MemRecordBatchReader>(fs_, path_, schema_, reader_buffer_size, offset, num_row_groups);
readers_.push_back(reader);

auto submit_result = arrow::internal::GetCpuThreadPool()->Submit([this, reader, reader_index]() -> arrow::Status {
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/packed/mem_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ namespace milvus_storage {
MemRecordBatchReader::MemRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size,
const size_t row_group_offset,
const int64_t buffer_size) {
MemRecordBatchReader(fs, path, schema, row_group_offset, std::numeric_limits<size_t>::max(), buffer_size);
};

MemRecordBatchReader::MemRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const size_t row_group_offset,
const size_t row_group_num,
const int64_t buffer_size)
const size_t row_group_num)
: schema_(schema), row_group_offset_(row_group_offset), buffer_size_(buffer_size) {
Initialize(fs, path, schema, buffer_size, row_group_offset, row_group_num);
}

void MemRecordBatchReader::Initialize(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema>& schema,
const int64_t buffer_size,
const size_t row_group_offset,
const size_t row_group_num) {
auto result = MakeArrowFileReader(fs, path);
if (!result.ok()) {
LOG_STORAGE_ERROR_ << "Error making file reader:" << result.status().ToString();
Expand Down
22 changes: 11 additions & 11 deletions cpp/test/packed/one_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,18 @@ TEST_F(OneFileTest, MemRecordBatchReader) {

// exeed row group range, should throw out_of_range
std::string path = file_path_ + "/0";
EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, 100, 1, reader_memory_), std::out_of_range);
EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, reader_memory_, 100), std::out_of_range);

// file not exist, should throw runtime_error
path = file_path_ + "/file_not_exist";
EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, 0, 1, reader_memory_), std::runtime_error);
EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, reader_memory_), std::runtime_error);

// read all row groups
path = file_path_ + "/0";
MemRecordBatchReader mr(*fs_, path, schema, 0, reader_memory_);
MemRecordBatchReader mr(*fs_, path, schema, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto m_table, mr.ToTable());
ASSERT_STATUS_OK(mr.Close());

// read all row groups async
AsyncMemRecordBatchReader amr(*fs_, path, schema, reader_memory_);
ASSERT_STATUS_OK(amr.Execute());
auto amr_table = amr.Table();
ASSERT_EQ(m_table->num_rows(), amr_table->num_rows());

std::set<int> needed_columns = {0, 1, 2};
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
Expand All @@ -82,10 +76,16 @@ TEST_F(OneFileTest, MemRecordBatchReader) {

// read row group 1
path = file_path_ + "/0";
MemRecordBatchReader mr2(*fs_, path, schema, 1, 1, reader_memory_);
MemRecordBatchReader mr2(*fs_, path, schema, reader_memory_, 1, 1);
ASSERT_AND_ARROW_ASSIGN(auto rg_table, mr2.ToTable());
ASSERT_STATUS_OK(mr.Close());
ASSERT_STATUS_OK(mr2.Close());
ASSERT_GT(m_table->num_rows(), rg_table->num_rows());

// read all row groups async
AsyncMemRecordBatchReader amr(*fs_, path, schema, reader_memory_);
ASSERT_STATUS_OK(amr.Execute());
auto amr_table = amr.Table();
ASSERT_EQ(m_table->num_rows(), amr_table->num_rows());
}

} // namespace milvus_storage
1 change: 1 addition & 0 deletions cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <packed/writer.h>
#include <packed/reader.h>
#include <packed/mem_record_reader.h>
#include <memory>
#include <gtest/gtest.h>
#include <parquet/properties.h>
Expand Down
11 changes: 2 additions & 9 deletions go/packed/packed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,14 @@ import "C"
import (
"errors"
"fmt"
"math"
"unsafe"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/arrio"
"github.com/apache/arrow/go/v12/arrow/cdata"
)

func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {
return OpenWithRowGroupRange(path, schema, bufferSize, 0, math.MaxInt32)
}

func OpenWithRowGroupRange(path string, schema *arrow.Schema, row_group_offset int, row_group_num int, bufferSize int) (arrio.Reader, error) {
func OpenMemRecordBatchReader(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {
// var cSchemaPtr uintptr
// cSchema := cdata.SchemaFromPtr(cSchemaPtr)
var cas cdata.CArrowSchema
Expand All @@ -47,9 +42,7 @@ func OpenWithRowGroupRange(path string, schema *arrow.Schema, row_group_offset i

cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
fmt.Println("go row group offset: ", row_group_offset)
fmt.Println("go row group num: ", row_group_num)
status := C.OpenWithRowGroupRange(cPath, casPtr, C.int64_t(row_group_offset), C.int64_t(row_group_num), C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass)))
status := C.OpenMemRecordBatchReader(cPath, casPtr, C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass)))
if status != 0 {
return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status))
}
Expand Down
12 changes: 2 additions & 10 deletions go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,10 @@ func TestRead(t *testing.T) {
// err = writer.Close()
// assert.NoError(t, err)

// reader, err := Open(path, schema, 10*1024*1024 /* 10MB */)
// assert.NoError(t, err)
// rr, err := reader.Read()
// assert.NoError(t, err)
// defer rr.Release()
// assert.Equal(t, int64(300), rr.NumRows())

// test row group range read
reader, err := OpenWithRowGroupRange(path, schema, 1, 1, 10*1024*1024 /* 10MB */)
reader, err := OpenMemRecordBatchReader(path, schema, 10*1024*1024 /* 10MB */)
assert.NoError(t, err)
rr, err := reader.Read()
assert.NoError(t, err)
defer rr.Release()
assert.Greater(t, int64(300), rr.NumRows())
assert.Equal(t, int64(300), rr.NumRows())
}

0 comments on commit 49b084c

Please sign in to comment.