diff --git a/cpp/benchmark/benchmark_packed.cpp b/cpp/benchmark/benchmark_packed.cpp index d60b9bd..c62be12 100644 --- a/cpp/benchmark/benchmark_packed.cpp +++ b/cpp/benchmark/benchmark_packed.cpp @@ -76,10 +76,10 @@ class S3Fixture : public benchmark::Fixture { static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) { std::set needed_columns = {0, 1, 2}; - std::vector column_offsets = { - std::make_shared(0, 0), - std::make_shared(1, 0), - std::make_shared(1, 1), + std::vector column_offsets = { + ColumnOffset(0, 0), + ColumnOffset(1, 0), + ColumnOffset(1, 1), }; auto paths = std::vector{path + "/0", path + "/1"}; @@ -113,6 +113,8 @@ static void PackedWrite(benchmark::State& st, size_t buffer_size) { auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()), arrow::field("str", arrow::utf8())}); + int pk_index = 0; + int ts_index = 1; arrow::Int32Builder int_builder; arrow::Int64Builder int64_builder; arrow::StringBuilder str_builder; @@ -137,7 +139,7 @@ static void PackedWrite(benchmark::State& st, auto conf = StorageConfig(); conf.use_custom_part_upload_size = true; conf.part_size = 30 * 1024 * 1024; - PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf); + PackedRecordBatchWriter writer(buffer_size, schema, fs, path, pk_index, ts_index, conf); for (int i = 0; i < 8 * 1024; ++i) { auto r = writer.Write(record_batch); if (!r.ok()) { @@ -145,10 +147,9 @@ static void PackedWrite(benchmark::State& st, break; } } - auto column_index_groups = writer.Close(); - if (column_index_groups->Size() == 0) { - st.SkipWithError("Failed to close writer"); - break; + auto r = writer.Close(); + if (!r.ok()) { + st.SkipWithError(r.ToString()); } } } diff --git a/cpp/include/milvus-storage/common/config.h b/cpp/include/milvus-storage/common/config.h index 22900b0..e460854 100644 --- a/cpp/include/milvus-storage/common/config.h +++ b/cpp/include/milvus-storage/common/config.h @@ -21,6 +21,17 @@ using namespace std; namespace milvus_storage { +static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB + +// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596 +static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB + +static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE; + +// Default number of rows to read when using ::arrow::RecordBatchReader +static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024; +static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; + struct StorageConfig { std::string uri = ""; std::string bucket_name = ""; @@ -44,17 +55,4 @@ struct StorageConfig { } }; -static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB - -// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596 -static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB - -static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE; - -static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size"; - -// Default number of rows to read when using ::arrow::RecordBatchReader -static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024; -static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; - } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/common/serde.h b/cpp/include/milvus-storage/common/serde.h index 46438df..efb403b 100644 --- a/cpp/include/milvus-storage/common/serde.h +++ b/cpp/include/milvus-storage/common/serde.h @@ -15,25 +15,83 @@ #pragma once #include #include +#include namespace milvus_storage { +static const std::string GROUP_DELIMITER = ";"; +static const std::string COLUMN_DELIMITER = ","; +static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size"; +static const std::string COLUMN_OFFSETS_META_KEY = "column_offsets"; + class PackedMetaSerde { public: // Serialize a vector of size_t to a byte array and convert it to a string - static std::string serialize(const std::vector& sizes) { + static std::string SerializeRowGroupSizes(const std::vector& sizes) { std::vector byteArray(sizes.size() * sizeof(size_t)); std::memcpy(byteArray.data(), sizes.data(), byteArray.size()); return std::string(byteArray.begin(), byteArray.end()); } // Deserialize a string back to a vector of size_t - static std::vector deserialize(const std::string& input) { + static std::vector DeserializeRowGroupSizes(const std::string& input) { std::vector byteArray(input.begin(), input.end()); std::vector sizes(byteArray.size() / sizeof(size_t)); std::memcpy(sizes.data(), byteArray.data(), byteArray.size()); return sizes; } + + static std::string SerializeColumnOffsets(const std::vector>& column_offsets) { + std::stringstream ss; + for (size_t i = 0; i < column_offsets.size(); ++i) { + if (i > 0) { + ss << GROUP_DELIMITER; + } + + for (size_t j = 0; j < column_offsets[i].size(); ++j) { + if (j > 0) { + ss << COLUMN_DELIMITER; + } + ss << column_offsets[i][j]; + } + } + + auto s = ss.str(); + std::cout << "SerializeColumnOffsets: " << s << std::endl; + return s; + } + + static std::vector> DeserializeColumnOffsets(const std::string& input) { + std::vector> column_offsets; + + size_t group_start = 0; + size_t group_end = input.find(GROUP_DELIMITER); + + while (group_start != std::string::npos) { + std::string group = input.substr(group_start, group_end - group_start); + std::vector group_indices; + + size_t column_start = 0; + size_t column_end = group.find(COLUMN_DELIMITER); + while (column_start != std::string::npos) { + std::string column = group.substr(column_start, column_end - column_start); + if (!column.empty()) { + group_indices.push_back(std::stoi(column)); + } + column_start = (column_end == std::string::npos) ? std::string::npos : column_end + COLUMN_DELIMITER.size(); + column_end = group.find(COLUMN_DELIMITER, column_start); + } + + if (!group_indices.empty()) { + column_offsets.push_back(group_indices); + } + + group_start = (group_end == std::string::npos) ? std::string::npos : group_end + GROUP_DELIMITER.size(); + group_end = input.find(GROUP_DELIMITER, group_start); + } + + return column_offsets; + } }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/format/parquet/file_writer.h b/cpp/include/milvus-storage/format/parquet/file_writer.h index e4f3acc..d85be8e 100644 --- a/cpp/include/milvus-storage/format/parquet/file_writer.h +++ b/cpp/include/milvus-storage/format/parquet/file_writer.h @@ -40,6 +40,8 @@ class ParquetFileWriter : public FileWriter { Status WriteRecordBatches(const std::vector>& batches, const std::vector& batch_memory_sizes); + void AppendKVMetadata(const std::string& key, const std::string& value); + int64_t count() override; Status Close() override; diff --git a/cpp/include/milvus-storage/packed/chunk_manager.h b/cpp/include/milvus-storage/packed/chunk_manager.h index e27c94d..eec903e 100644 --- a/cpp/include/milvus-storage/packed/chunk_manager.h +++ b/cpp/include/milvus-storage/packed/chunk_manager.h @@ -25,6 +25,15 @@ namespace milvus_storage { +struct ColumnOffset { + int path_index; + int col_index; + + ColumnOffset() = default; + + ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {} +}; + // record which chunk is in use and its offset in the file struct ChunkState { int chunk; @@ -50,7 +59,7 @@ struct ChunkState { class ChunkManager { public: - ChunkManager(const std::vector& column_offsets, int64_t chunksize); + ChunkManager(const std::vector& column_offsets, int64_t chunksize); std::vector> SliceChunksByMaxContiguousSlice( int64_t chunksize, std::vector>>& tables); @@ -62,7 +71,7 @@ class ChunkManager { void SetChunkSize(int64_t chunksize) { chunksize_ = chunksize; } private: - std::vector column_offsets_; + std::vector column_offsets_; std::vector chunk_states_; int64_t chunksize_; }; diff --git a/cpp/include/milvus-storage/packed/column_group.h b/cpp/include/milvus-storage/packed/column_group.h index 1394db9..d408a5f 100644 --- a/cpp/include/milvus-storage/packed/column_group.h +++ b/cpp/include/milvus-storage/packed/column_group.h @@ -90,55 +90,4 @@ struct ColumnGroupState { void resetMemorySize() { this->memory_size = 0; } }; -struct ColumnOffset { - int path_index; - int col_index; - - ColumnOffset() = default; - - ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {} -}; - -using ColumnOffsetPtr = std::shared_ptr; - -// ColumnOffsetMapping is a map of original field name to ColumnOffset. -// The column offset is a pair of path index and a column index in the path. -class ColumnOffsetMapping { - public: - ColumnOffsetMapping() { mapping_ = std::map(); }; - - ColumnOffsetMapping(const std::vector>& group_indices, const std::shared_ptr schema) { - for (int path_index = 0; path_index < group_indices.size(); path_index++) { - for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) { - int original_col_index = group_indices[path_index][col_index]; - std::string field_name = schema->field(original_col_index)->name(); - mapping_[field_name] = std::make_shared(path_index, col_index); - } - } - } - - void AddColumnOffset(const std::string& field_name, int64_t path_index, int64_t col_index) { - auto column_offset = std::make_shared(path_index, col_index); - mapping_[field_name] = column_offset; - } - - ColumnOffsetPtr GetByFieldName(const std::string& field_name) { - if (Contains(field_name)) { - return mapping_.at(field_name); - } - return nullptr; - } - - std::map GetMapping() const { return mapping_; } - - bool Contains(const std::string& key) const { return mapping_.find(key) != mapping_.end(); } - - size_t Size() { return mapping_.size(); } - - void Clear() { mapping_.clear(); } - - private: - std::map mapping_; -}; - } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/packed/column_group_writer.h b/cpp/include/milvus-storage/packed/column_group_writer.h index 5dc89ac..17217f7 100644 --- a/cpp/include/milvus-storage/packed/column_group_writer.h +++ b/cpp/include/milvus-storage/packed/column_group_writer.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "packed/column_group.h" #include "common/config.h" +#include "common/serde.h" namespace milvus_storage { @@ -36,6 +37,7 @@ class ColumnGroupWriter { Status Init(); Status Write(const std::shared_ptr& record); + Status WriteColumnOffsetsMeta(const std::vector>& column_offsets); Status Flush(); Status Close(); GroupId Group_id() const; diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index a8e4e4b..c808af0 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -49,7 +49,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { PackedRecordBatchReader(arrow::fs::FileSystem& fs, const std::vector& paths, const std::shared_ptr schema, - const std::vector& column_offsets, + const std::vector& column_offsets, const std::set& needed_columns, const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); @@ -76,7 +76,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { int64_t row_limit_; std::unique_ptr chunk_manager_; int64_t absolute_row_position_; - std::vector needed_column_offsets_; + std::vector needed_column_offsets_; std::vector> row_group_sizes_; int read_count_; }; diff --git a/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h b/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h index da8bd30..b12b43b 100644 --- a/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h +++ b/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h @@ -20,7 +20,11 @@ namespace milvus_storage { class SizeBasedSplitter : public SplitterPlugin { public: - explicit SizeBasedSplitter(size_t max_group_size); + /* + * @brief SizeBasedSplitter is a splitter plugin that splits record batches into column groups based on the size of + * each column. The pk_index and ts_index are determined into the first group. + */ + explicit SizeBasedSplitter(size_t max_group_size, int pk_index, int ts_index); void Init() override; @@ -28,6 +32,11 @@ class SizeBasedSplitter : public SplitterPlugin { std::vector Split(const std::shared_ptr& record) override; + private: + std::vector initializeSmallGroupIndices(); + + int pk_index_; + int ts_index_; size_t max_group_size_; static constexpr size_t SPLIT_THRESHOLD = 1024; // 1K }; diff --git a/cpp/include/milvus-storage/packed/writer.h b/cpp/include/milvus-storage/packed/writer.h index 141d4d3..ec3df68 100644 --- a/cpp/include/milvus-storage/packed/writer.h +++ b/cpp/include/milvus-storage/packed/writer.h @@ -36,13 +36,15 @@ class PackedRecordBatchWriter { std::shared_ptr schema, std::shared_ptr fs, const std::string& file_path, + const int pk_index, + const int ts_index, const StorageConfig& storage_config); // Put the record batch into the corresponding column group, // , and write the maximum buffer of column group to the file. Status Write(const std::shared_ptr& record); - // Close the writer and return the mapping of field name written to column offset. - std::unique_ptr Close(); + // Close the writer and write the mapping of column offset to the metadata of parquet file. + Status Close(); private: // split first buffer into column groups based on column size @@ -60,6 +62,8 @@ class PackedRecordBatchWriter { std::shared_ptr schema_; std::shared_ptr fs_; const std::string file_path_; + const int pk_index_; + const int ts_index_; const StorageConfig storage_config_; size_t current_memory_usage_; std::vector> group_writers_; diff --git a/cpp/include/milvus-storage/packed/writer_c.h b/cpp/include/milvus-storage/packed/writer_c.h index 979f353..62e351b 100644 --- a/cpp/include/milvus-storage/packed/writer_c.h +++ b/cpp/include/milvus-storage/packed/writer_c.h @@ -26,13 +26,13 @@ typedef void* CColumnOffsetMapping; int NewPackedWriter(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, + const int pk_index, + const int ts_index, CPackedWriter* c_packed_writer); int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema); -int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping); - -void DeletePackedWriter(CPackedWriter c_packed_writer); +int Close(CPackedWriter c_packed_writer); #ifdef __cplusplus } diff --git a/cpp/src/format/parquet/file_reader.cpp b/cpp/src/format/parquet/file_reader.cpp index 51478e1..c120da6 100644 --- a/cpp/src/format/parquet/file_reader.cpp +++ b/cpp/src/format/parquet/file_reader.cpp @@ -50,7 +50,7 @@ FileRecordBatchReader::FileRecordBatchReader(arrow::fs::FileSystem& fs, LOG_STORAGE_ERROR_ << "Metadata not found in file: " << path; throw std::runtime_error(metadata.status().ToString()); } - auto all_row_group_sizes = PackedMetaSerde::deserialize(metadata.ValueOrDie()); + auto all_row_group_sizes = PackedMetaSerde::DeserializeRowGroupSizes(metadata.ValueOrDie()); if (row_group_offset >= all_row_group_sizes.size()) { std::string error_msg = "Row group offset exceeds total number of row groups. " diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index ab09628..b1884dc 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -90,9 +90,13 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vectorAppend(key, value); +} + Status ParquetFileWriter::Close() { - std::string meta = PackedMetaSerde::serialize(row_group_sizes_); - kv_metadata_->Append(ROW_GROUP_SIZE_META_KEY, meta); + std::string meta = PackedMetaSerde::SerializeRowGroupSizes(row_group_sizes_); + AppendKVMetadata(ROW_GROUP_SIZE_META_KEY, meta); RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_)); RETURN_ARROW_NOT_OK(writer_->Close()); return Status::OK(); diff --git a/cpp/src/packed/chunk_manager.cpp b/cpp/src/packed/chunk_manager.cpp index bf89cf1..bca5136 100644 --- a/cpp/src/packed/chunk_manager.cpp +++ b/cpp/src/packed/chunk_manager.cpp @@ -24,7 +24,7 @@ namespace milvus_storage { -ChunkManager::ChunkManager(const std::vector& column_offsets, int64_t chunksize) +ChunkManager::ChunkManager(const std::vector& column_offsets, int64_t chunksize) : column_offsets_(column_offsets), chunksize_(chunksize) { chunk_states_ = std::vector(column_offsets_.size()); } @@ -40,8 +40,8 @@ std::vector> ChunkManager::SliceChunksByMaxCon // Identify the chunk for each column and adjust chunk size for (int i = 0; i < column_offsets_.size(); ++i) { auto offset = column_offsets_[i]; - auto table_queue = tables[offset->path_index]; - auto column = table_queue.front()->column(offset->col_index); + auto table_queue = tables[offset.path_index]; + auto column = table_queue.front()->column(offset.col_index); auto chunk = column->chunk(chunk_states_[i].chunk).get(); // Adjust chunksize if a smaller contiguous chunk is found @@ -67,7 +67,7 @@ std::vector> ChunkManager::SliceChunksByMaxCon // Mark the table to pop if all chunks are consumed if (chunk_state.chunk == chunk_sizes[i]) { - table_to_pop.insert(column_offsets_[i]->path_index); + table_to_pop.insert(column_offsets_[i].path_index); chunk_state.reset(); } } else { @@ -87,7 +87,7 @@ std::vector> ChunkManager::SliceChunksByMaxCon // resets the chunk states for columns in a specific file. void ChunkManager::ResetChunkState(int path_index) { for (int j = 0; j < column_offsets_.size(); ++j) { - if (column_offsets_[j]->path_index == path_index) { + if (column_offsets_[j].path_index == path_index) { chunk_states_[j].reset(); } } diff --git a/cpp/src/packed/column_group_writer.cpp b/cpp/src/packed/column_group_writer.cpp index e9212a6..809a258 100644 --- a/cpp/src/packed/column_group_writer.cpp +++ b/cpp/src/packed/column_group_writer.cpp @@ -61,6 +61,12 @@ Status ColumnGroupWriter::Flush() { return Status::OK(); } +Status ColumnGroupWriter::WriteColumnOffsetsMeta(const std::vector>& column_offsets) { + std::string meta = PackedMetaSerde::SerializeColumnOffsets(column_offsets); + writer_.AppendKVMetadata(COLUMN_OFFSETS_META_KEY, meta); + return Status::OK(); +} + Status ColumnGroupWriter::Close() { finished_ = true; LOG_STORAGE_DEBUG_ << "Group " << group_id_ << " flushed " << flushed_batches_ << " batches and " << flushed_rows_ diff --git a/cpp/src/packed/column_offset_mapping_c.cpp b/cpp/src/packed/column_offset_mapping_c.cpp deleted file mode 100644 index fab3a13..0000000 --- a/cpp/src/packed/column_offset_mapping_c.cpp +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2024 Zilliz -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "packed/column_offset_mapping_c.h" -#include "packed/column_group.h" - -int NewColumnOffsetMapping(CColumnOffsetMapping* c_column_offset_mapping) { - try { - auto column_offset_mapping = std::make_unique(); - *c_column_offset_mapping = column_offset_mapping.release(); - return 0; - } catch (const std::exception& ex) { - return -1; - } -} - -void DeleteColumnOffsetMapping(CColumnOffsetMapping c_column_offset_mapping) { - auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; - delete column_offset_mapping; -} - -int AddColumnOffset(CColumnOffsetMapping c_column_offset_mapping, - const char* field_name, - int64_t path_index, - int64_t col_index) { - try { - auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; - std::string field_name_str(field_name); - column_offset_mapping->AddColumnOffset(field_name_str, path_index, col_index); - return 0; - } catch (const std::exception& ex) { - return -1; - } -} - -void GetColumnOffsetMappingKeys(CColumnOffsetMapping c_column_offset_mapping, void* keys) { - auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; - const char** keys_ = (const char**)keys; - auto map_ = column_offset_mapping->GetMapping(); - std::size_t i = 0; - for (auto it = map_.begin(); it != map_.end(); ++it, i++) { - keys_[i] = it->first.c_str(); - } -} - -int GetColumnOffsetMappingSize(CColumnOffsetMapping c_column_offset_mapping) { - auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; - return column_offset_mapping->Size(); -} - -int GetColumnOffset(CColumnOffsetMapping c_column_offset_mapping, - const char* field_name, - int* path_index, - int* col_index) { - auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; - std::string field_name_str(field_name); - auto column_offset = column_offset_mapping->GetByFieldName(field_name_str); - if (column_offset == nullptr) { - return 0; - } - *path_index = column_offset->path_index; - *col_index = column_offset->col_index; - return 0; -} diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index 8c653e1..0ffdd84 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -32,12 +32,12 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, const std::shared_ptr schema, const int64_t buffer_size) : PackedRecordBatchReader( - fs, std::vector{path}, schema, std::vector(), std::set(), buffer_size) {} + fs, std::vector{path}, schema, std::vector(), std::set(), buffer_size) {} PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, const std::vector& paths, const std::shared_ptr schema, - const std::vector& column_offsets, + const std::vector& column_offsets, const std::set& needed_columns, const int64_t buffer_size) : schema_(schema), @@ -52,16 +52,16 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, cols.emplace(i); } } - auto offsets = std::vector(column_offsets); + auto offsets = std::vector(column_offsets); if (column_offsets.empty()) { for (int i = 0; i < schema->num_fields(); i++) { - offsets.emplace_back(std::make_shared(0, i)); + offsets.emplace_back(0, i); } } std::set needed_paths; for (int i : cols) { needed_column_offsets_.push_back(offsets[i]); - needed_paths.emplace(offsets[i]->path_index); + needed_paths.emplace(offsets[i].path_index); } for (auto i : needed_paths) { auto result = MakeArrowFileReader(fs, paths[i]); @@ -78,7 +78,7 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, LOG_STORAGE_ERROR_ << "metadata not found in file " << i; throw std::runtime_error(metadata.status().ToString()); } - row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie())); + row_group_sizes_.push_back(PackedMetaSerde::DeserializeRowGroupSizes(metadata.ValueOrDie())); LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size(); } // Initialize table states and chunk manager diff --git a/cpp/src/packed/splitter/size_based_splitter.cpp b/cpp/src/packed/splitter/size_based_splitter.cpp index 4ebd875..3fc281a 100644 --- a/cpp/src/packed/splitter/size_based_splitter.cpp +++ b/cpp/src/packed/splitter/size_based_splitter.cpp @@ -20,10 +20,12 @@ #include #include #include +#include namespace milvus_storage { -SizeBasedSplitter::SizeBasedSplitter(size_t max_group_size) : max_group_size_(max_group_size) {} +SizeBasedSplitter::SizeBasedSplitter(size_t max_group_size, int pk_index, int ts_index) + : max_group_size_(max_group_size), pk_index_(pk_index), ts_index_(ts_index) {} void SizeBasedSplitter::Init() {} @@ -44,22 +46,39 @@ std::vector SizeBasedSplitter::SplitRecordBatches( } // split column indices into small and large groups - std::vector> group_indices; - std::vector small_group_indices; + std::vector small_group = initializeSmallGroupIndices(); + std::vector large_group; + std::vector> small_groups; + std::vector> large_groups; for (int i = 0; i < column_sizes.size(); ++i) { + if (i == pk_index_ || i == ts_index_) { + continue; + } size_t avg_size = column_sizes[i] / column_rows[i]; - if (small_group_indices.size() >= max_group_size_) { - group_indices.push_back(small_group_indices); - small_group_indices.clear(); + if (small_group.size() >= max_group_size_) { + small_groups.push_back(small_group); + small_group.clear(); + } + if (large_group.size() >= max_group_size_) { + large_groups.push_back(large_group); + large_group.clear(); } if (avg_size >= SPLIT_THRESHOLD) { - group_indices.push_back({i}); + large_group.push_back({i}); } else { - small_group_indices.push_back(i); + small_group.push_back(i); } } - group_indices.push_back(small_group_indices); - small_group_indices.clear(); + if (!small_group.empty()) { + small_groups.push_back(small_group); + } + if (!large_group.empty()) { + large_groups.push_back(large_group); + } + + // merge small and large groups + std::vector> group_indices = small_groups; + group_indices.insert(group_indices.end(), large_groups.begin(), large_groups.end()); // create column groups std::vector column_groups; @@ -80,4 +99,12 @@ std::vector SizeBasedSplitter::Split(const std::shared_ptr SizeBasedSplitter::initializeSmallGroupIndices() { + std::vector small_group_indices; + small_group_indices.push_back(pk_index_); + small_group_indices.push_back(ts_index_); + return small_group_indices; +} + } // namespace milvus_storage diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index df0092b..02cc011 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -32,15 +32,26 @@ PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, std::shared_ptr fs, const std::string& file_path, + const int pk_index, + const int ts_index, const StorageConfig& storage_config) : memory_limit_(memory_limit), schema_(std::move(schema)), fs_(std::move(fs)), file_path_(file_path), + pk_index_(pk_index), + ts_index_(ts_index), storage_config_(storage_config), splitter_({}), current_memory_usage_(0), - size_split_done_(false) {} + size_split_done_(false) { + if (pk_index_ < 0 || pk_index_ >= schema_->num_fields()) { + throw Status::InvalidArgument("Invalid primary key index: " + std::to_string(pk_index_)); + } + if (ts_index_ < 0 || ts_index_ >= schema_->num_fields()) { + throw Status::InvalidArgument("Invalid timestamp index: " + std::to_string(ts_index_)); + } +} Status PackedRecordBatchWriter::Write(const std::shared_ptr& record) { size_t next_batch_size = GetRecordBatchMemorySize(record); @@ -61,8 +72,10 @@ Status PackedRecordBatchWriter::Write(const std::shared_ptr& } Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { + cout << "start split and write first buffer" << endl; + auto max_group_size = buffered_batches_[0]->num_columns(); std::vector groups = - SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_); + SizeBasedSplitter(max_group_size, pk_index_, ts_index_).SplitRecordBatches(buffered_batches_); for (GroupId i = 0; i < groups.size(); ++i) { auto& group = groups[i]; std::string group_path = file_path_ + "/" + std::to_string(i); @@ -70,7 +83,7 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { group.GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); for (auto& batch : group.GetRecordBatches()) { - RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0))); + RETURN_NOT_OK(writer->Write(batch)); } max_heap_.emplace(i, group.GetMemoryUsage()); @@ -116,23 +129,13 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr PackedRecordBatchWriter::Close() { +Status PackedRecordBatchWriter::Close() { // write unsplitted record batch to one file if the buffer record batches are not splitted if (!size_split_done_ && !buffered_batches_.empty()) { - auto status = flushUnsplittedBuffer(); - if (!status.ok()) { - LOG_STORAGE_ERROR_ << "Failed to write unsplitted record batch while closing writer."; - return std::make_unique(); - } - return std::make_unique(std::move(group_indices_), schema_); + return flushUnsplittedBuffer(); } // flush all remaining column groups before closing - auto status = flushRemainingBuffer(); - if (!status.ok()) { - LOG_STORAGE_ERROR_ << "Failed to flush remaining column groups while closing writer."; - return std::make_unique(); - } - return std::make_unique(std::move(group_indices_), schema_); + return flushRemainingBuffer(); } Status PackedRecordBatchWriter::flushRemainingBuffer() { @@ -145,6 +148,10 @@ Status PackedRecordBatchWriter::flushRemainingBuffer() { RETURN_NOT_OK(writer->Flush()); current_memory_usage_ -= max_group.second; } + // the first column group writer needs to write column offsets meta + if (!group_writers_.empty()) { + group_writers_[0]->WriteColumnOffsetsMeta(group_indices_); + } for (auto& writer : group_writers_) { RETURN_NOT_OK(writer->Close()); } @@ -152,18 +159,31 @@ Status PackedRecordBatchWriter::flushRemainingBuffer() { } Status PackedRecordBatchWriter::flushUnsplittedBuffer() { + if (buffered_batches_.empty()) { + return Status::OK(); + } std::string group_path = file_path_ + "/" + std::to_string(0); - std::vector indices(buffered_batches_[0]->num_columns()); - std::iota(std::begin(indices), std::end(indices), 0); - auto writer = std::make_unique(0, buffered_batches_[0]->schema(), *fs_, group_path, - storage_config_, indices); + std::vector indices; + indices.push_back(pk_index_); + indices.push_back(ts_index_); + for (int i = 0; i < buffered_batches_[0]->num_columns(); ++i) { + if (i != pk_index_ && i != ts_index_) { + indices.emplace_back(i); + } + } + group_indices_.emplace_back(indices); + splitter_ = IndicesBasedSplitter(group_indices_); + std::vector column_groups = splitter_.Split(buffered_batches_[0]); + assert(column_groups.size() == 1); + auto writer = std::make_unique(0, column_groups[0].Schema(), *fs_, group_path, storage_config_, + column_groups[0].GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); - for (auto& batch : buffered_batches_) { - RETURN_NOT_OK(writer->Write(batch)); + for (int i = 0; i < buffered_batches_.size(); ++i) { + RETURN_NOT_OK(writer->Write(buffered_batches_[i])); } RETURN_NOT_OK(writer->Flush()); + RETURN_NOT_OK(writer->WriteColumnOffsetsMeta(group_indices_)); RETURN_NOT_OK(writer->Close()); - group_indices_.emplace_back(indices); return Status::OK(); } diff --git a/cpp/src/packed/writer_c.cpp b/cpp/src/packed/writer_c.cpp index 55d4f69..6c9d8ec 100644 --- a/cpp/src/packed/writer_c.cpp +++ b/cpp/src/packed/writer_c.cpp @@ -25,6 +25,8 @@ int NewPackedWriter(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, + const int pk_index, + const int ts_index, CPackedWriter* c_packed_writer) { try { auto truePath = std::string(path); @@ -33,8 +35,8 @@ int NewPackedWriter(const char* path, conf.uri = "file:///tmp/"; auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); - auto writer = - std::make_unique(buffer_size, trueSchema, trueFs, truePath, conf); + auto writer = std::make_unique(buffer_size, trueSchema, trueFs, truePath, + pk_index, ts_index, conf); *c_packed_writer = writer.release(); return 0; @@ -57,11 +59,14 @@ int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, st } } -int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping) { +int Close(CPackedWriter c_packed_writer) { try { auto packed_writer = static_cast(c_packed_writer); - *c_column_offset_mapping = packed_writer->Close().release(); + auto status = packed_writer->Close(); delete packed_writer; + if (!status.ok()) { + return -1; + } return 0; } catch (std::exception& e) { return -1; diff --git a/cpp/test/common/serde_test.cpp b/cpp/test/common/serde_test.cpp new file mode 100644 index 0000000..110add7 --- /dev/null +++ b/cpp/test/common/serde_test.cpp @@ -0,0 +1,54 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "gtest/gtest.h" +#include "common/serde.h" + +namespace milvus_storage { + +class SerdeTest : public testing::Test {}; + +TEST_F(SerdeTest, TestColumnOffsetSerde) { + std::vector> column_offsets = {{0, 1, 2}, {3, 4}, {5, 6, 7, 8}}; + + std::string serialized = PackedMetaSerde::SerializeColumnOffsets(column_offsets); + EXPECT_EQ(serialized, "0,1,2;3,4;5,6,7,8"); + + std::vector> deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_EQ(deserialized, column_offsets); + + // Test case: Empty input + std::vector> empty_offsets = {}; + serialized = PackedMetaSerde::SerializeColumnOffsets(empty_offsets); + EXPECT_EQ(serialized, ""); + + deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_TRUE(deserialized.empty()); + + // Test case: Single group + std::vector> single_group = {{1, 2, 3}}; + serialized = PackedMetaSerde::SerializeColumnOffsets(single_group); + EXPECT_EQ(serialized, "1,2,3"); + + deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_EQ(deserialized, single_group); + + // Test case: Single column in each group + std::vector> single_column_groups = {{0}, {1}, {2}}; + serialized = PackedMetaSerde::SerializeColumnOffsets(single_column_groups); + EXPECT_EQ(serialized, "0;1;2"); + + deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_EQ(deserialized, single_column_groups); +} +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/format/parquet/file_reader_test.cpp b/cpp/test/format/parquet/file_reader_test.cpp index b64809f..5532add 100644 --- a/cpp/test/format/parquet/file_reader_test.cpp +++ b/cpp/test/format/parquet/file_reader_test.cpp @@ -21,7 +21,7 @@ class FileReaderTest : public PackedTestBase {}; TEST_F(FileReaderTest, FileRecordBatchReader) { int batch_size = 100; - PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, pk_index_, ts_index_, storage_config_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } @@ -49,10 +49,10 @@ TEST_F(FileReaderTest, FileRecordBatchReader) { ASSERT_STATUS_OK(fr.Close()); std::set needed_columns = {0, 1, 2}; - std::vector column_offsets = { - std::make_shared(0, 0), - std::make_shared(0, 1), - std::make_shared(0, 2), + std::vector column_offsets = { + ColumnOffset(0, 0), + ColumnOffset(0, 1), + ColumnOffset(0, 2), }; PackedRecordBatchReader pr(*fs_, {path}, schema, column_offsets, needed_columns, reader_memory_); ASSERT_AND_ARROW_ASSIGN(auto pr_table, pr.ToTable()); diff --git a/cpp/test/packed/chunk_manager_test.cpp b/cpp/test/packed/chunk_manager_test.cpp index 31c6b9b..c4513c7 100644 --- a/cpp/test/packed/chunk_manager_test.cpp +++ b/cpp/test/packed/chunk_manager_test.cpp @@ -36,13 +36,12 @@ class ChunkManagerTest : public PackedTestBase { // large string column tables_[1].push(table_->SelectColumns({2}).ValueOrDie()); - column_offsets_ = {std::make_shared(0, 0), std::make_shared(0, 1), - std::make_shared(1, 0)}; + column_offsets_ = {ColumnOffset(0, 0), ColumnOffset(0, 1), ColumnOffset(1, 0)}; chunk_manager_ = std::make_unique(column_offsets_, chunksize_); } - std::vector column_offsets_; + std::vector column_offsets_; std::unique_ptr chunk_manager_; std::vector>> tables_; int chunksize_ = 2; diff --git a/cpp/test/packed/packed_integration_test.cpp b/cpp/test/packed/packed_integration_test.cpp index 602eac0..fb7f889 100644 --- a/cpp/test/packed/packed_integration_test.cpp +++ b/cpp/test/packed/packed_integration_test.cpp @@ -21,17 +21,17 @@ class PackedIntegrationTest : public PackedTestBase {}; TEST_F(PackedIntegrationTest, TestOneFile) { int batch_size = 100; - PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); + std::vector column_offsets = { + ColumnOffset(0, 0), + ColumnOffset(0, 1), + ColumnOffset(0, 2), + }; + + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, pk_index_, ts_index_, storage_config_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } - auto column_offset_mapping = writer.Close(); - EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields()); - - std::vector column_offsets; - for (auto& field : schema_->fields()) { - column_offsets.push_back(column_offset_mapping->GetByFieldName(field->name())); - } + EXPECT_TRUE(writer.Close().ok()); std::vector paths = {file_path_ + "/0"}; @@ -45,19 +45,20 @@ TEST_F(PackedIntegrationTest, TestOneFile) { } TEST_F(PackedIntegrationTest, TestSplitColumnGroup) { - int batch_size = 100; + int batch_size = 1000; - PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); + std::vector column_offsets = { + ColumnOffset(0, 0), + ColumnOffset(1, 0), + ColumnOffset(1, 1), + }; + + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, pk_index_, ts_index_, storage_config_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } - auto column_offset_mapping = writer.Close(); - EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields()); - - std::vector column_offsets; - for (auto& field : schema_->fields()) { - column_offsets.push_back(column_offset_mapping->GetByFieldName(field->name())); - } + EXPECT_TRUE(writer.Close().ok()); + std::cout << "writer closed" << std::endl; std::vector paths = {file_path_ + "/0", file_path_ + "/1"}; diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index 0c889d0..6b2cb19 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -121,6 +121,8 @@ class PackedTestBase : public ::testing::Test { record_batch_ = randomRecordBatch(); table_ = arrow::Table::FromRecordBatches({record_batch_}).ValueOrDie(); schema_ = table_->schema(); + pk_index_ = 0; + ts_index_ = 1; } protected: @@ -169,6 +171,8 @@ class PackedTestBase : public ::testing::Test { size_t reader_memory_; std::shared_ptr fs_; std::string file_path_; + int pk_index_; + int ts_index_; StorageConfig storage_config_; std::shared_ptr schema_; diff --git a/cpp/test/packed/splitter_test.cpp b/cpp/test/packed/splitter_test.cpp index 12eaf9a..db5f78c 100644 --- a/cpp/test/packed/splitter_test.cpp +++ b/cpp/test/packed/splitter_test.cpp @@ -39,17 +39,17 @@ TEST_F(SplitterTest, IndicesBasedSplitterTest) { } TEST_F(SplitterTest, SizeBasedSplitterTest) { - SizeBasedSplitter splitter(64); + SizeBasedSplitter splitter(64, 0, 1); std::vector column_groups = splitter.Split(record_batch_); ASSERT_EQ(column_groups.size(), 2); - ASSERT_EQ(column_groups[0].GetRecordBatch(0)->num_columns(), 1); - ASSERT_EQ(column_groups[0].GetRecordBatch(0)->column(0)->type()->id(), arrow::Type::STRING); + ASSERT_EQ(column_groups[0].GetRecordBatch(0)->num_columns(), 2); + ASSERT_EQ(column_groups[0].GetRecordBatch(0)->column(0)->type()->id(), arrow::Type::INT32); + ASSERT_EQ(column_groups[0].GetRecordBatch(0)->column(1)->type()->id(), arrow::Type::INT64); - ASSERT_EQ(column_groups[1].GetRecordBatch(0)->num_columns(), 2); - ASSERT_EQ(column_groups[1].GetRecordBatch(0)->column(0)->type()->id(), arrow::Type::INT32); - ASSERT_EQ(column_groups[1].GetRecordBatch(0)->column(1)->type()->id(), arrow::Type::INT64); + ASSERT_EQ(column_groups[1].GetRecordBatch(0)->num_columns(), 1); + ASSERT_EQ(column_groups[1].GetRecordBatch(0)->column(0)->type()->id(), arrow::Type::STRING); } } // namespace milvus_storage