From 95b172e6e93007509e31bf4ad8b595ad8fc9561a Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Tue, 7 Jan 2025 18:25:58 +0800 Subject: [PATCH] packed reader Signed-off-by: shaoting-huang --- cpp/include/milvus-storage/common/path_util.h | 4 + cpp/include/milvus-storage/common/status.h | 5 + cpp/include/milvus-storage/packed/reader.h | 19 ++-- cpp/include/milvus-storage/packed/reader_c.h | 7 +- cpp/src/packed/reader.cpp | 92 ++++++++++++------- cpp/src/packed/reader_c.cpp | 10 +- cpp/src/packed/writer.cpp | 10 +- cpp/test/packed/packed_integration_test.cpp | 2 + 8 files changed, 101 insertions(+), 48 deletions(-) diff --git a/cpp/include/milvus-storage/common/path_util.h b/cpp/include/milvus-storage/common/path_util.h index 1bbec94..11e8e9d 100644 --- a/cpp/include/milvus-storage/common/path_util.h +++ b/cpp/include/milvus-storage/common/path_util.h @@ -48,4 +48,8 @@ std::pair GetAbstractPathParent(const std::string& s) return {s.substr(0, pos), s.substr(pos + 1)}; } +static inline std::string ConcatenateFilePath(const std::string& parent, const std::string& child) { + return parent + kSep + child; +} + } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/common/status.h b/cpp/include/milvus-storage/common/status.h index fa6d42c..3fa3828 100644 --- a/cpp/include/milvus-storage/common/status.h +++ b/cpp/include/milvus-storage/common/status.h @@ -37,6 +37,8 @@ class Status { static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); } + static Status ReaderError(const std::string& msg) { return Status(kReaderError, msg); } + static Status IOError(const std::string& msg) { return Status(kIOError, msg); } bool ok() const { return code_ == kOk; } @@ -51,6 +53,8 @@ class Status { bool IsWriterError() const { return code_ == kWriterError; } + bool IsReaderError() const { return code_ == kReaderError; } + bool IsIOError() const { return code_ == kIOError; } std::string ToString() const; @@ -64,6 +68,7 @@ class Status { kFileNotFound = 4, kWriterError = 5, kIOError = 6, + kReaderError = 7 }; explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {} diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index c808af0..233b0c8 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -40,16 +40,11 @@ using RowOffsetMinHeap = class PackedRecordBatchReader : public arrow::RecordBatchReader { public: - // Test only PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::string& path, + const std::string& file_path, const std::shared_ptr schema, - const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); - - PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::vector& paths, - const std::shared_ptr schema, - const std::vector& column_offsets, + const int pk_index, + const int ts_index, const std::set& needed_columns, const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); @@ -60,6 +55,10 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { arrow::Status Close() override; private: + Status initializeColumnOffsets(arrow::fs::FileSystem& fs, + const std::set& needed_columns, + const int pk_index, + const int ts_index); // Advance buffer to fill the expected buffer size arrow::Status advanceBuffer(); std::vector collectChunks(int64_t chunksize) const; @@ -77,8 +76,12 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { std::unique_ptr chunk_manager_; int64_t absolute_row_position_; std::vector needed_column_offsets_; + std::set needed_paths_; std::vector> row_group_sizes_; + const std::string file_path_; int read_count_; + int pk_index_; + int ts_index_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/packed/reader_c.h b/cpp/include/milvus-storage/packed/reader_c.h index 65ede1c..185cfe7 100644 --- a/cpp/include/milvus-storage/packed/reader_c.h +++ b/cpp/include/milvus-storage/packed/reader_c.h @@ -24,7 +24,12 @@ typedef void* CReader; typedef void* CRecordBatch; typedef void* CFileSystem; -int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out); +int Open(const char* path, + struct ArrowSchema* schema, + const int pk_index, + const int ts_index, + const int64_t buffer_size, + struct ArrowArrayStream* out); #ifdef __cplusplus } diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index 0ffdd84..c240f8e 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -24,47 +24,33 @@ #include "packed/chunk_manager.h" #include "common/config.h" #include "common/serde.h" +#include "common/path_util.h" namespace milvus_storage { PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::string& path, + const std::string& file_path, const std::shared_ptr schema, - const int64_t buffer_size) - : PackedRecordBatchReader( - fs, std::vector{path}, schema, std::vector(), std::set(), buffer_size) {} - -PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::vector& paths, - const std::shared_ptr schema, - const std::vector& column_offsets, + const int pk_index, + const int ts_index, const std::set& needed_columns, const int64_t buffer_size) - : schema_(schema), + : file_path_(file_path), + schema_(schema), buffer_available_(buffer_size), memory_limit_(buffer_size), row_limit_(0), absolute_row_position_(0), read_count_(0) { - auto cols = std::set(needed_columns); - if (cols.empty()) { - for (int i = 0; i < schema->num_fields(); i++) { - cols.emplace(i); - } + if (needed_columns.find(pk_index) == needed_columns.end() || needed_columns.find(ts_index) == needed_columns.end()) { + throw std::runtime_error("pk_index or ts_index should be in needed_columns"); } - auto offsets = std::vector(column_offsets); - if (column_offsets.empty()) { - for (int i = 0; i < schema->num_fields(); i++) { - offsets.emplace_back(0, i); - } - } - std::set needed_paths; - for (int i : cols) { - needed_column_offsets_.push_back(offsets[i]); - needed_paths.emplace(offsets[i].path_index); + auto status = initializeColumnOffsets(fs, needed_columns, pk_index, ts_index); + if (!status.ok()) { + throw std::runtime_error(status.ToString()); } - for (auto i : needed_paths) { - auto result = MakeArrowFileReader(fs, paths[i]); + for (auto i : needed_paths_) { + auto result = MakeArrowFileReader(fs, ConcatenateFilePath(file_path_, std::to_string(i))); if (!result.ok()) { LOG_STORAGE_ERROR_ << "Error making file reader " << i << ":" << result.status().ToString(); throw std::runtime_error(result.status().ToString()); @@ -73,14 +59,17 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, } for (int i = 0; i < file_readers_.size(); ++i) { - auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY); - if (!metadata.ok()) { - LOG_STORAGE_ERROR_ << "metadata not found in file " << i; + auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata(); + + auto row_group_size_meta = metadata->Get(ROW_GROUP_SIZE_META_KEY); + if (!row_group_size_meta.ok()) { + LOG_STORAGE_ERROR_ << "row group size meta not found in file " << i; throw std::runtime_error(metadata.status().ToString()); } - row_group_sizes_.push_back(PackedMetaSerde::DeserializeRowGroupSizes(metadata.ValueOrDie())); + row_group_sizes_.push_back(PackedMetaSerde::DeserializeRowGroupSizes(row_group_size_meta.ValueOrDie())); LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size(); } + // Initialize table states and chunk manager column_group_states_.resize(file_readers_.size(), ColumnGroupState(0, -1, 0)); chunk_manager_ = std::make_unique(needed_column_offsets_, 0); @@ -88,6 +77,47 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, tables_.resize(paths.size(), std::queue>()); } +Status PackedRecordBatchReader::initializeColumnOffsets(arrow::fs::FileSystem& fs, + const std::set& needed_columns, + const int pk_index, + const int ts_index) { + std::string path = ConcatenateFilePath(file_path_, std::to_string(0)); + auto result = MakeArrowFileReader(fs, path); + if (!result.ok()) { + return Status::ReaderError("can not open file reader"); + } + + file_readers_.emplace_back(std::move(result.value())); + auto column_offset_meta = metadata->Get(COLUMN_OFFSETS_META_KEY); + if (!column_offset_meta.ok()) { + return Status::ReaderError("can not find column offset meta"); + } + auto group_indices = PackedMetaSerde::DeserializeColumnOffsets(column_offset_meta.ValueOrDie()); + std::vector offsets; + for (int path_index = 0; path_index < group_indices.size(); path_index++) { + for (int col_index : group_indices[path_index]) { + offsets.emplace_back(ColumnOffset(path_index, col_index)); + } + } + for (int col : needed_columns) { + swtich(col) { + case pk_index: + needed_paths_.emplace(offsets[0].path_index); + needed_column_offsets_.push_back(offsets[0]); + break; + case ts_index: + needed_paths_.emplace(offsets[1].path_index); + needed_column_offsets_.push_back(offsets[1]); + break; + default: + needed_paths_.emplace(offsets[col].path_index); + needed_column_offsets_.push_back(offsets[col]); + break; + } + } + return Status::OK(); +} + std::shared_ptr PackedRecordBatchReader::schema() const { return schema_; } arrow::Status PackedRecordBatchReader::advanceBuffer() { diff --git a/cpp/src/packed/reader_c.cpp b/cpp/src/packed/reader_c.cpp index a94279e..9cf16c6 100644 --- a/cpp/src/packed/reader_c.cpp +++ b/cpp/src/packed/reader_c.cpp @@ -24,7 +24,12 @@ #include #include -int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) { +int Open(const char* path, + struct ArrowSchema* schema, + const int pk_index, + const int ts_index, + const int64_t buffer_size, + struct ArrowArrayStream* out) { auto truePath = std::string(path); auto factory = std::make_shared(); auto conf = milvus_storage::StorageConfig(); @@ -36,7 +41,8 @@ int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size } auto trueFs = r.value(); auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); - auto reader = std::make_shared(*trueFs, path, trueSchema, buffer_size); + auto reader = std::make_shared(*trueFs, path, trueSchema, pk_index, ts_index, + buffer_size); auto status = ExportRecordBatchReader(reader, out); LOG_STORAGE_ERROR_ << "read export done"; if (!status.ok()) { diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index b9b0c6d..d6e6a8e 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -25,6 +25,7 @@ #include "common/config.h" #include "filesystem/fs.h" #include "common/arrow_util.h" +#include "common/path_util.h" namespace milvus_storage { @@ -77,7 +78,7 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { SizeBasedSplitter(max_group_size, pk_index_, ts_index_).SplitRecordBatches(buffered_batches_); for (GroupId i = 0; i < groups.size(); ++i) { auto& group = groups[i]; - std::string group_path = file_path_ + "/" + std::to_string(i); + std::string group_path = ConcatenateFilePath(file_path_, std::to_string(i)); auto writer = std::make_unique(i, group.Schema(), *fs_, group_path, storage_config_, group.GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); @@ -147,11 +148,8 @@ Status PackedRecordBatchWriter::flushRemainingBuffer() { RETURN_NOT_OK(writer->Flush()); current_memory_usage_ -= max_group.second; } - // the first column group writer needs to write column offsets meta - if (!group_writers_.empty()) { - group_writers_[0]->WriteColumnOffsetsMeta(group_indices_); - } for (auto& writer : group_writers_) { + RETURN_NOT_OK(writer->WriteColumnOffsetsMeta(group_indices_)); RETURN_NOT_OK(writer->Close()); } return Status::OK(); @@ -161,7 +159,7 @@ Status PackedRecordBatchWriter::flushUnsplittedBuffer() { if (buffered_batches_.empty()) { return Status::OK(); } - std::string group_path = file_path_ + "/" + std::to_string(0); + std::string group_path = ConcatenateFilePath(file_path_, std::to_string(0)); std::vector indices; indices.push_back(pk_index_); indices.push_back(ts_index_); diff --git a/cpp/test/packed/packed_integration_test.cpp b/cpp/test/packed/packed_integration_test.cpp index 4e54b08..3c8620f 100644 --- a/cpp/test/packed/packed_integration_test.cpp +++ b/cpp/test/packed/packed_integration_test.cpp @@ -21,6 +21,7 @@ class PackedIntegrationTest : public PackedTestBase {}; TEST_F(PackedIntegrationTest, TestOneFile) { int batch_size = 100; + // 0,1,2 std::vector column_offsets = { ColumnOffset(0, 0), ColumnOffset(0, 1), @@ -52,6 +53,7 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) { ColumnOffset(0, 1), ColumnOffset(1, 0), }; + // 0,1;2 PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, pk_index_, ts_index_, storage_config_); for (int i = 0; i < batch_size; ++i) {