From 49b084c45dc5f6b420e0c598483c94f638958db0 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Mon, 30 Dec 2024 18:06:34 +0800 Subject: [PATCH] async mem record reader Signed-off-by: shaoting-huang --- cpp/include/milvus-storage/c/reader_c.h | 13 ++++---- .../milvus-storage/packed/mem_record_reader.h | 30 ++++++++----------- cpp/src/c/reader_c.cpp | 5 +--- cpp/src/packed/async_mem_record_reader.cpp | 2 +- cpp/src/packed/mem_record_reader.cpp | 21 ++++++------- cpp/test/packed/one_file_test.cpp | 22 +++++++------- cpp/test/packed/packed_test_base.h | 1 + go/packed/packed.go | 11 ++----- go/packed/packed_test.go | 12 ++------ 9 files changed, 48 insertions(+), 69 deletions(-) diff --git a/cpp/include/milvus-storage/c/reader_c.h b/cpp/include/milvus-storage/c/reader_c.h index e2e419a..139b456 100644 --- a/cpp/include/milvus-storage/c/reader_c.h +++ b/cpp/include/milvus-storage/c/reader_c.h @@ -25,12 +25,13 @@ typedef void* CStatus; typedef void* CRecordBatch; typedef void* CFileSystem; -int OpenWithRowGroupRange(const char* path, - struct ArrowSchema* schema, - const int64_t row_group_offset, - const int64_t row_group_num, - const int64_t buffer_size, - struct ArrowArrayStream* out); +/** + * Open a memory record batch reader. + */ +int OpenMemRecordBatchReader(const char* path, + struct ArrowSchema* schema, + const int64_t buffer_size, + struct ArrowArrayStream* out); #ifdef __cplusplus } diff --git a/cpp/include/milvus-storage/packed/mem_record_reader.h b/cpp/include/milvus-storage/packed/mem_record_reader.h index 56a5ea3..64ba9c4 100644 --- a/cpp/include/milvus-storage/packed/mem_record_reader.h +++ b/cpp/include/milvus-storage/packed/mem_record_reader.h @@ -32,35 +32,21 @@ namespace milvus_storage { class MemRecordBatchReader : public arrow::RecordBatchReader { public: /** - * @brief Constructor for MemRecordBatchReader. + * @brief MemRecordBatchReader reads num of row groups starting from row_group_offset with memory constraints. * * @param fs The Arrow filesystem interface. * @param path Path to the Parquet file. * @param schema Expected schema of the Parquet file. - * @param row_group_offset The starting row group index to read, through end of file. * @param buffer_size Memory limit for reading row groups. - */ - MemRecordBatchReader(arrow::fs::FileSystem& fs, - const std::string& path, - const std::shared_ptr& schema, - const size_t row_group_offset, - const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); - /** - * @brief Constructor for MemRecordBatchReader. - * - * @param fs The Arrow filesystem interface. - * @param path Path to the Parquet file. - * @param schema Expected schema of the Parquet file. * @param row_group_offset The starting row group index to read. * @param row_group_num The number of row groups to read. - * @param buffer_size Memory limit for reading row groups. */ MemRecordBatchReader(arrow::fs::FileSystem& fs, const std::string& path, const std::shared_ptr& schema, - const size_t row_group_offset, - const size_t row_group_num, - const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); + const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE, + const size_t row_group_offset = 0, + const size_t row_group_num = std::numeric_limits::max()); /** * @brief Returns the schema of the Parquet file. @@ -84,6 +70,7 @@ class MemRecordBatchReader : public arrow::RecordBatchReader { */ arrow::Status Close(); + private: std::shared_ptr schema_; std::unique_ptr file_reader_; size_t current_row_group_ = 0; @@ -92,6 +79,13 @@ class MemRecordBatchReader : public arrow::RecordBatchReader { int64_t buffer_size_; std::vector row_group_sizes_; size_t row_group_offset_; + + void Initialize(arrow::fs::FileSystem& fs, + const std::string& path, + const std::shared_ptr& schema, + const int64_t buffer_size, + const size_t row_group_offset, + const size_t row_group_num); }; } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/src/c/reader_c.cpp b/cpp/src/c/reader_c.cpp index 3c6e3f0..ee54093 100644 --- a/cpp/src/c/reader_c.cpp +++ b/cpp/src/c/reader_c.cpp @@ -26,8 +26,6 @@ int OpenMemRecordBatchReader(const char* path, struct ArrowSchema* schema, - const int64_t row_group_offset, - const int64_t row_group_num, const int64_t buffer_size, struct ArrowArrayStream* out) { auto truePath = std::string(path); @@ -41,8 +39,7 @@ int OpenMemRecordBatchReader(const char* path, } auto trueFs = r.value(); auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); - auto reader = std::make_shared(*trueFs, path, trueSchema, row_group_offset, - row_group_num, buffer_size); + auto reader = std::make_shared(*trueFs, path, trueSchema, buffer_size); auto status = ExportRecordBatchReader(reader, out); LOG_STORAGE_ERROR_ << "read export done"; if (!status.ok()) { diff --git a/cpp/src/packed/async_mem_record_reader.cpp b/cpp/src/packed/async_mem_record_reader.cpp index f2aeaff..f04d771 100644 --- a/cpp/src/packed/async_mem_record_reader.cpp +++ b/cpp/src/packed/async_mem_record_reader.cpp @@ -70,7 +70,7 @@ arrow::Status AsyncMemRecordBatchReader::Execute() { const auto& [offset, num_row_groups] = batches[reader_index]; auto reader = - std::make_shared(fs_, path_, schema_, offset, num_row_groups, reader_buffer_size); + std::make_shared(fs_, path_, schema_, reader_buffer_size, offset, num_row_groups); readers_.push_back(reader); auto submit_result = arrow::internal::GetCpuThreadPool()->Submit([this, reader, reader_index]() -> arrow::Status { diff --git a/cpp/src/packed/mem_record_reader.cpp b/cpp/src/packed/mem_record_reader.cpp index e3a264f..6e1dba1 100644 --- a/cpp/src/packed/mem_record_reader.cpp +++ b/cpp/src/packed/mem_record_reader.cpp @@ -28,18 +28,19 @@ namespace milvus_storage { MemRecordBatchReader::MemRecordBatchReader(arrow::fs::FileSystem& fs, const std::string& path, const std::shared_ptr& schema, + const int64_t buffer_size, const size_t row_group_offset, - const int64_t buffer_size) { - MemRecordBatchReader(fs, path, schema, row_group_offset, std::numeric_limits::max(), buffer_size); -}; - -MemRecordBatchReader::MemRecordBatchReader(arrow::fs::FileSystem& fs, - const std::string& path, - const std::shared_ptr& schema, - const size_t row_group_offset, - const size_t row_group_num, - const int64_t buffer_size) + const size_t row_group_num) : schema_(schema), row_group_offset_(row_group_offset), buffer_size_(buffer_size) { + Initialize(fs, path, schema, buffer_size, row_group_offset, row_group_num); +} + +void MemRecordBatchReader::Initialize(arrow::fs::FileSystem& fs, + const std::string& path, + const std::shared_ptr& schema, + const int64_t buffer_size, + const size_t row_group_offset, + const size_t row_group_num) { auto result = MakeArrowFileReader(fs, path); if (!result.ok()) { LOG_STORAGE_ERROR_ << "Error making file reader:" << result.status().ToString(); diff --git a/cpp/test/packed/one_file_test.cpp b/cpp/test/packed/one_file_test.cpp index ced5f50..0b7894c 100644 --- a/cpp/test/packed/one_file_test.cpp +++ b/cpp/test/packed/one_file_test.cpp @@ -50,24 +50,18 @@ TEST_F(OneFileTest, MemRecordBatchReader) { // exeed row group range, should throw out_of_range std::string path = file_path_ + "/0"; - EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, 100, 1, reader_memory_), std::out_of_range); + EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, reader_memory_, 100), std::out_of_range); // file not exist, should throw runtime_error path = file_path_ + "/file_not_exist"; - EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, 0, 1, reader_memory_), std::runtime_error); + EXPECT_THROW(MemRecordBatchReader mr(*fs_, path, schema, reader_memory_), std::runtime_error); // read all row groups path = file_path_ + "/0"; - MemRecordBatchReader mr(*fs_, path, schema, 0, reader_memory_); + MemRecordBatchReader mr(*fs_, path, schema, reader_memory_); ASSERT_AND_ARROW_ASSIGN(auto m_table, mr.ToTable()); ASSERT_STATUS_OK(mr.Close()); - // read all row groups async - AsyncMemRecordBatchReader amr(*fs_, path, schema, reader_memory_); - ASSERT_STATUS_OK(amr.Execute()); - auto amr_table = amr.Table(); - ASSERT_EQ(m_table->num_rows(), amr_table->num_rows()); - std::set needed_columns = {0, 1, 2}; std::vector column_offsets = { ColumnOffset(0, 0), @@ -82,10 +76,16 @@ TEST_F(OneFileTest, MemRecordBatchReader) { // read row group 1 path = file_path_ + "/0"; - MemRecordBatchReader mr2(*fs_, path, schema, 1, 1, reader_memory_); + MemRecordBatchReader mr2(*fs_, path, schema, reader_memory_, 1, 1); ASSERT_AND_ARROW_ASSIGN(auto rg_table, mr2.ToTable()); - ASSERT_STATUS_OK(mr.Close()); + ASSERT_STATUS_OK(mr2.Close()); ASSERT_GT(m_table->num_rows(), rg_table->num_rows()); + + // read all row groups async + AsyncMemRecordBatchReader amr(*fs_, path, schema, reader_memory_); + ASSERT_STATUS_OK(amr.Execute()); + auto amr_table = amr.Table(); + ASSERT_EQ(m_table->num_rows(), amr_table->num_rows()); } } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index 506e1e4..b8977ce 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -32,6 +32,7 @@ #include #include +#include #include #include #include diff --git a/go/packed/packed.go b/go/packed/packed.go index e0cd680..25aa176 100644 --- a/go/packed/packed.go +++ b/go/packed/packed.go @@ -25,7 +25,6 @@ import "C" import ( "errors" "fmt" - "math" "unsafe" "github.com/apache/arrow/go/v12/arrow" @@ -33,11 +32,7 @@ import ( "github.com/apache/arrow/go/v12/arrow/cdata" ) -func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) { - return OpenWithRowGroupRange(path, schema, bufferSize, 0, math.MaxInt32) -} - -func OpenWithRowGroupRange(path string, schema *arrow.Schema, row_group_offset int, row_group_num int, bufferSize int) (arrio.Reader, error) { +func OpenMemRecordBatchReader(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) { // var cSchemaPtr uintptr // cSchema := cdata.SchemaFromPtr(cSchemaPtr) var cas cdata.CArrowSchema @@ -47,9 +42,7 @@ func OpenWithRowGroupRange(path string, schema *arrow.Schema, row_group_offset i cPath := C.CString(path) defer C.free(unsafe.Pointer(cPath)) - fmt.Println("go row group offset: ", row_group_offset) - fmt.Println("go row group num: ", row_group_num) - status := C.OpenWithRowGroupRange(cPath, casPtr, C.int64_t(row_group_offset), C.int64_t(row_group_num), C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass))) + status := C.OpenMemRecordBatchReader(cPath, casPtr, C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass))) if status != 0 { return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status)) } diff --git a/go/packed/packed_test.go b/go/packed/packed_test.go index 2cf7d6d..5ad5800 100644 --- a/go/packed/packed_test.go +++ b/go/packed/packed_test.go @@ -60,18 +60,10 @@ func TestRead(t *testing.T) { // err = writer.Close() // assert.NoError(t, err) - // reader, err := Open(path, schema, 10*1024*1024 /* 10MB */) - // assert.NoError(t, err) - // rr, err := reader.Read() - // assert.NoError(t, err) - // defer rr.Release() - // assert.Equal(t, int64(300), rr.NumRows()) - - // test row group range read - reader, err := OpenWithRowGroupRange(path, schema, 1, 1, 10*1024*1024 /* 10MB */) + reader, err := OpenMemRecordBatchReader(path, schema, 10*1024*1024 /* 10MB */) assert.NoError(t, err) rr, err := reader.Read() assert.NoError(t, err) defer rr.Release() - assert.Greater(t, int64(300), rr.NumRows()) + assert.Equal(t, int64(300), rr.NumRows()) }