From 7cd3c32d29b9205cdebafd1a687ec7609d13da01 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Mon, 30 Dec 2024 20:48:23 +0800 Subject: [PATCH] file record batch reader Signed-off-by: shaoting-huang --- .github/workflows/ci.yaml | 3 + .../{packed/utils => common}/serde.h | 0 .../format/parquet/file_reader.h | 53 +++++++++++++++ cpp/src/format/parquet/file_reader.cpp | 68 +++++++++++++++++++ cpp/src/format/parquet/file_writer.cpp | 2 +- cpp/src/packed/reader.cpp | 2 +- cpp/test/format/parquet/file_reader_test.cpp | 61 +++++++++++++++++ go/Makefile | 14 ++-- 8 files changed, 197 insertions(+), 6 deletions(-) rename cpp/include/milvus-storage/{packed/utils => common}/serde.h (100%) create mode 100644 cpp/test/format/parquet/file_reader_test.cpp diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 42192bdd..fdd8d388 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -56,6 +56,9 @@ jobs: - name: Build c++ run: cd cpp && make + + - name: Run cpp tests + run: cd cpp/build/Release/test && ./milvus_test - name: Run tests run: cd go && make && make test diff --git a/cpp/include/milvus-storage/packed/utils/serde.h b/cpp/include/milvus-storage/common/serde.h similarity index 100% rename from cpp/include/milvus-storage/packed/utils/serde.h rename to cpp/include/milvus-storage/common/serde.h diff --git a/cpp/include/milvus-storage/format/parquet/file_reader.h b/cpp/include/milvus-storage/format/parquet/file_reader.h index 9fdf8fc0..ad504942 100644 --- a/cpp/include/milvus-storage/format/parquet/file_reader.h +++ b/cpp/include/milvus-storage/format/parquet/file_reader.h @@ -17,8 +17,61 @@ #include "format/reader.h" #include "parquet/arrow/reader.h" #include "storage/options.h" +#include "common/config.h" namespace milvus_storage { +class FileRecordBatchReader : public arrow::RecordBatchReader { + public: + /** + * @brief FileRecordBatchReader reads num of row groups starting from row_group_offset with memory constraints. + * + * @param fs The Arrow filesystem interface. + * @param path Path to the Parquet file. + * @param schema Expected schema of the Parquet file. + * @param buffer_size Memory limit for reading row groups. + * @param row_group_offset The starting row group index to read. + * @param row_group_num The number of row groups to read. + */ + FileRecordBatchReader(arrow::fs::FileSystem& fs, + const std::string& path, + const std::shared_ptr& schema, + const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE, + const size_t row_group_offset = 0, + const size_t row_group_num = std::numeric_limits::max()); + + /** + * @brief Returns the schema of the Parquet file. + * + * @return A shared pointer to the Arrow schema. + */ + std::shared_ptr schema() const; + + /** + * @brief Reads the next record batch from the file. + * + * @param out A shared pointer to the output record batch. + * @return Arrow Status indicating success or failure. + */ + arrow::Status ReadNext(std::shared_ptr* out); + + /** + * @brief Closes the reader and releases resources. + * + * @return Arrow Status indicating success or failure. + */ + arrow::Status Close(); + + private: + std::shared_ptr schema_; + std::unique_ptr file_reader_; + size_t current_row_group_ = 0; + size_t read_count_ = 0; + + int64_t buffer_size_; + std::vector row_group_sizes_; + size_t row_group_offset_; +}; + class ParquetFileReader : public Reader { public: ParquetFileReader(std::unique_ptr reader); diff --git a/cpp/src/format/parquet/file_reader.cpp b/cpp/src/format/parquet/file_reader.cpp index 8b425ef0..51478e19 100644 --- a/cpp/src/format/parquet/file_reader.cpp +++ b/cpp/src/format/parquet/file_reader.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -24,9 +25,76 @@ #include #include "arrow/table.h" #include "common/macro.h" +#include "common/serde.h" +#include "common/log.h" +#include "common/arrow_util.h" namespace milvus_storage { +FileRecordBatchReader::FileRecordBatchReader(arrow::fs::FileSystem& fs, + const std::string& path, + const std::shared_ptr& schema, + const int64_t buffer_size, + const size_t row_group_offset, + const size_t row_group_num) + : schema_(schema), row_group_offset_(row_group_offset), buffer_size_(buffer_size) { + auto result = MakeArrowFileReader(fs, path); + if (!result.ok()) { + LOG_STORAGE_ERROR_ << "Error making file reader:" << result.status().ToString(); + throw std::runtime_error(result.status().ToString()); + } + file_reader_ = std::move(result.value()); + + auto metadata = file_reader_->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY); + if (!metadata.ok()) { + LOG_STORAGE_ERROR_ << "Metadata not found in file: " << path; + throw std::runtime_error(metadata.status().ToString()); + } + auto all_row_group_sizes = PackedMetaSerde::deserialize(metadata.ValueOrDie()); + if (row_group_offset >= all_row_group_sizes.size()) { + std::string error_msg = + "Row group offset exceeds total number of row groups. " + "Row group offset: " + + std::to_string(row_group_offset) + ", Total row groups: " + std::to_string(all_row_group_sizes.size()); + LOG_STORAGE_ERROR_ << error_msg; + throw std::out_of_range(error_msg); + } + size_t end_offset = std::min(row_group_offset + row_group_num, all_row_group_sizes.size()); + row_group_sizes_.assign(all_row_group_sizes.begin() + row_group_offset, all_row_group_sizes.begin() + end_offset); +} + +std::shared_ptr FileRecordBatchReader::schema() const { return schema_; } + +arrow::Status FileRecordBatchReader::ReadNext(std::shared_ptr* out) { + std::vector rgs_to_read; + size_t buffer_size = 0; + + while (current_row_group_ < row_group_sizes_.size() && + buffer_size + row_group_sizes_[current_row_group_] <= buffer_size_) { + rgs_to_read.push_back(current_row_group_ + row_group_offset_); + buffer_size += row_group_sizes_[current_row_group_]; + current_row_group_++; + } + + if (rgs_to_read.empty()) { + *out = nullptr; + return arrow::Status::OK(); + } + + std::shared_ptr table = nullptr; + RETURN_NOT_OK(file_reader_->ReadRowGroups(rgs_to_read, &table)); + *out = table->CombineChunksToBatch().ValueOrDie(); + return arrow::Status::OK(); +} + +arrow::Status FileRecordBatchReader::Close() { + LOG_STORAGE_DEBUG_ << "FileRecordBatchReader closed after reading " << read_count_ << " times."; + file_reader_ = nullptr; + schema_ = nullptr; + row_group_sizes_.clear(); + return arrow::Status::OK(); +} + ParquetFileReader::ParquetFileReader(std::unique_ptr reader) : reader_(std::move(reader)) {} Result> GetRecordAtOffset(arrow::RecordBatchReader* reader, int64_t offset) { diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index ae2d6160..96d96f3b 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -20,7 +20,7 @@ #include "filesystem/fs.h" #include #include "common/config.h" -#include "packed/utils/serde.h" +#include "common/serde.h" #include "filesystem/s3/multi_part_upload_s3_fs.h" namespace milvus_storage { diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index c6aa969e..fd2226af 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -23,7 +23,7 @@ #include "common/log.h" #include "packed/chunk_manager.h" #include "common/config.h" -#include "packed/utils/serde.h" +#include "common/serde.h" namespace milvus_storage { diff --git a/cpp/test/format/parquet/file_reader_test.cpp b/cpp/test/format/parquet/file_reader_test.cpp new file mode 100644 index 00000000..1630c6bc --- /dev/null +++ b/cpp/test/format/parquet/file_reader_test.cpp @@ -0,0 +1,61 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "packed_test_base.h" + +namespace milvus_storage { + +TEST_F(FileReaderTest, FileRecordBatchReader) { + std::vector> fields = { + arrow::field("int32", arrow::int32()), + arrow::field("int64", arrow::int64()), + arrow::field("str", arrow::utf8()), + }; + auto schema = arrow::schema(fields); + + // exeed row group range, should throw out_of_range + std::string path = file_path_ + "/0"; + EXPECT_THROW(FileRecordBatchReader fr(*fs_, path, schema, reader_memory_, 100), std::out_of_range); + + // file not exist, should throw runtime_error + path = file_path_ + "/file_not_exist"; + EXPECT_THROW(FileRecordBatchReader fr(*fs_, path, schema, reader_memory_), std::runtime_error); + + // read all row groups + path = file_path_ + "/0"; + MemRecordBatchReader fr(*fs_, path, schema, reader_memory_); + ASSERT_AND_ARROW_ASSIGN(auto m_table, mr.ToTable()); + ASSERT_STATUS_OK(mr.Close()); + + std::set needed_columns = {0, 1, 2}; + std::vector column_offsets = { + ColumnOffset(0, 0), + ColumnOffset(0, 1), + ColumnOffset(0, 2), + }; + std::vector paths = {file_path_ + "/0"}; + PackedRecordBatchReader pr(*fs_, paths, schema, column_offsets, needed_columns, reader_memory_); + ASSERT_AND_ARROW_ASSIGN(auto p_table, pr.ToTable()); + ASSERT_STATUS_OK(pr.Close()); + ASSERT_EQ(m_table->num_rows(), p_table->num_rows()); + + // read row group 1 + path = file_path_ + "/0"; + MemRecordBatchReader fr2(*fs_, path, schema, reader_memory_, 1, 1); + ASSERT_AND_ARROW_ASSIGN(auto rg_table, mr2.ToTable()); + ASSERT_STATUS_OK(fr2.Close()); + ASSERT_GT(m_table->num_rows(), rg_table->num_rows()); +} + +} // namespace milvus_storage \ No newline at end of file diff --git a/go/Makefile b/go/Makefile index c00e3461..0ad0727a 100644 --- a/go/Makefile +++ b/go/Makefile @@ -8,19 +8,25 @@ CFLAGS += $(CONAN_CFLAGS) CXXFLAGS += $(CONAN_CXXFLAGS) INCLUDE_DIRS = $(CONAN_INCLUDE_DIRS_ARROW) $(MILVUS_STORAGE_INCLUDE_DIR) CPPFLAGS = $(addprefix -I, $(INCLUDE_DIRS)) -LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR)) +LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR)) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR) .EXPORT_ALL_VARIABLES: -.PHONY: build +.PHONY: build test proto build: + @echo "CPPFLAGS: $(CPPFLAGS)" + @echo "LDFLAGS: $(LDFLAGS)" CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" go build ./... test: - CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR) -lmilvus-storage" go test -timeout 30s ./... + LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \ + CGO_CFLAGS="$(CPPFLAGS)" \ + CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \ + go test -timeout 30s ./... + proto: mkdir -p proto/manifest_proto mkdir -p proto/schema_proto protoc -I="proto" --go_out=paths=source_relative:./proto/manifest_proto proto/manifest.proto - protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto + protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto