diff --git a/cpp/benchmark/benchmark_packed.cpp b/cpp/benchmark/benchmark_packed.cpp index d2b7eee..d60b9bd 100644 --- a/cpp/benchmark/benchmark_packed.cpp +++ b/cpp/benchmark/benchmark_packed.cpp @@ -76,10 +76,10 @@ class S3Fixture : public benchmark::Fixture { static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) { std::set needed_columns = {0, 1, 2}; - std::vector column_offsets = { - ColumnOffset(0, 0), - ColumnOffset(1, 0), - ColumnOffset(1, 1), + std::vector column_offsets = { + std::make_shared(0, 0), + std::make_shared(1, 0), + std::make_shared(1, 1), }; auto paths = std::vector{path + "/0", path + "/1"}; diff --git a/cpp/include/milvus-storage/packed/chunk_manager.h b/cpp/include/milvus-storage/packed/chunk_manager.h index 2092839..e27c94d 100644 --- a/cpp/include/milvus-storage/packed/chunk_manager.h +++ b/cpp/include/milvus-storage/packed/chunk_manager.h @@ -50,7 +50,7 @@ struct ChunkState { class ChunkManager { public: - ChunkManager(const std::vector& column_offsets, int64_t chunksize); + ChunkManager(const std::vector& column_offsets, int64_t chunksize); std::vector> SliceChunksByMaxContiguousSlice( int64_t chunksize, std::vector>>& tables); @@ -62,7 +62,7 @@ class ChunkManager { void SetChunkSize(int64_t chunksize) { chunksize_ = chunksize; } private: - std::vector column_offsets_; + std::vector column_offsets_; std::vector chunk_states_; int64_t chunksize_; }; diff --git a/cpp/include/milvus-storage/packed/column_group.h b/cpp/include/milvus-storage/packed/column_group.h index d498812..1394db9 100644 --- a/cpp/include/milvus-storage/packed/column_group.h +++ b/cpp/include/milvus-storage/packed/column_group.h @@ -18,6 +18,8 @@ #include #include #include "common/status.h" +#include +#include namespace milvus_storage { @@ -97,41 +99,46 @@ struct ColumnOffset { ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {} }; +using ColumnOffsetPtr = std::shared_ptr; + // ColumnOffsetMapping is a map of original field name to ColumnOffset. // The column offset is a pair of path index and a column index in the path. -struct ColumnOffsetMapping { - ColumnOffsetMapping() = default; +class ColumnOffsetMapping { + public: + ColumnOffsetMapping() { mapping_ = std::map(); }; ColumnOffsetMapping(const std::vector>& group_indices, const std::shared_ptr schema) { for (int path_index = 0; path_index < group_indices.size(); path_index++) { for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) { int original_col_index = group_indices[path_index][col_index]; std::string field_name = schema->field(original_col_index)->name(); - mapping_[field_name] = ColumnOffset(path_index, col_index); + mapping_[field_name] = std::make_shared(path_index, col_index); } } } - ColumnOffset GetColumnOffset(std::string field_name) { - if (mapping_.find(field_name) == mapping_.end()) { - return ColumnOffset(-1, -1); - } - return mapping_[field_name]; + void AddColumnOffset(const std::string& field_name, int64_t path_index, int64_t col_index) { + auto column_offset = std::make_shared(path_index, col_index); + mapping_[field_name] = column_offset; } - std::string ToString() { - std::string str; - for (auto& pair : mapping_) { - str += pair.first + "->" + std::to_string(pair.second.path_index) + ":" + std::to_string(pair.second.col_index) + - ";"; + ColumnOffsetPtr GetByFieldName(const std::string& field_name) { + if (Contains(field_name)) { + return mapping_.at(field_name); } - return str; + return nullptr; } + std::map GetMapping() const { return mapping_; } + + bool Contains(const std::string& key) const { return mapping_.find(key) != mapping_.end(); } + size_t Size() { return mapping_.size(); } + void Clear() { mapping_.clear(); } + private: - std::unordered_map mapping_; + std::map mapping_; }; } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/packed/column_offset_mapping_c.h b/cpp/include/milvus-storage/packed/column_offset_mapping_c.h new file mode 100644 index 0000000..9c9b97e --- /dev/null +++ b/cpp/include/milvus-storage/packed/column_offset_mapping_c.h @@ -0,0 +1,45 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef void* CColumnOffsetMapping; + +int NewColumnOffsetMapping(CColumnOffsetMapping* c_column_offset_mapping); + +void DeleteColumnOffsetMapping(CColumnOffsetMapping c_column_offset_mapping); + +int AddColumnOffset(CColumnOffsetMapping c_column_offset_mapping, + const char* field_name, + int64_t path_index, + int64_t col_index); + +void GetColumnOffsetMappingKeys(CColumnOffsetMapping c_column_offset_mapping, void* keys); + +int GetColumnOffsetMappingSize(CColumnOffsetMapping c_column_offset_mapping); + +int GetColumnOffset(CColumnOffsetMapping c_column_offset_mapping, + const char* field_name, + int* path_index, + int* col_index); + +#ifdef __cplusplus +} +#endif diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index c808af0..a8e4e4b 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -49,7 +49,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { PackedRecordBatchReader(arrow::fs::FileSystem& fs, const std::vector& paths, const std::shared_ptr schema, - const std::vector& column_offsets, + const std::vector& column_offsets, const std::set& needed_columns, const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); @@ -76,7 +76,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { int64_t row_limit_; std::unique_ptr chunk_manager_; int64_t absolute_row_position_; - std::vector needed_column_offsets_; + std::vector needed_column_offsets_; std::vector> row_group_sizes_; int read_count_; }; diff --git a/cpp/src/packed/chunk_manager.cpp b/cpp/src/packed/chunk_manager.cpp index bca5136..bf89cf1 100644 --- a/cpp/src/packed/chunk_manager.cpp +++ b/cpp/src/packed/chunk_manager.cpp @@ -24,7 +24,7 @@ namespace milvus_storage { -ChunkManager::ChunkManager(const std::vector& column_offsets, int64_t chunksize) +ChunkManager::ChunkManager(const std::vector& column_offsets, int64_t chunksize) : column_offsets_(column_offsets), chunksize_(chunksize) { chunk_states_ = std::vector(column_offsets_.size()); } @@ -40,8 +40,8 @@ std::vector> ChunkManager::SliceChunksByMaxCon // Identify the chunk for each column and adjust chunk size for (int i = 0; i < column_offsets_.size(); ++i) { auto offset = column_offsets_[i]; - auto table_queue = tables[offset.path_index]; - auto column = table_queue.front()->column(offset.col_index); + auto table_queue = tables[offset->path_index]; + auto column = table_queue.front()->column(offset->col_index); auto chunk = column->chunk(chunk_states_[i].chunk).get(); // Adjust chunksize if a smaller contiguous chunk is found @@ -67,7 +67,7 @@ std::vector> ChunkManager::SliceChunksByMaxCon // Mark the table to pop if all chunks are consumed if (chunk_state.chunk == chunk_sizes[i]) { - table_to_pop.insert(column_offsets_[i].path_index); + table_to_pop.insert(column_offsets_[i]->path_index); chunk_state.reset(); } } else { @@ -87,7 +87,7 @@ std::vector> ChunkManager::SliceChunksByMaxCon // resets the chunk states for columns in a specific file. void ChunkManager::ResetChunkState(int path_index) { for (int j = 0; j < column_offsets_.size(); ++j) { - if (column_offsets_[j].path_index == path_index) { + if (column_offsets_[j]->path_index == path_index) { chunk_states_[j].reset(); } } diff --git a/cpp/src/packed/column_offset_mapping_c.cpp b/cpp/src/packed/column_offset_mapping_c.cpp new file mode 100644 index 0000000..fab3a13 --- /dev/null +++ b/cpp/src/packed/column_offset_mapping_c.cpp @@ -0,0 +1,75 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "packed/column_offset_mapping_c.h" +#include "packed/column_group.h" + +int NewColumnOffsetMapping(CColumnOffsetMapping* c_column_offset_mapping) { + try { + auto column_offset_mapping = std::make_unique(); + *c_column_offset_mapping = column_offset_mapping.release(); + return 0; + } catch (const std::exception& ex) { + return -1; + } +} + +void DeleteColumnOffsetMapping(CColumnOffsetMapping c_column_offset_mapping) { + auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; + delete column_offset_mapping; +} + +int AddColumnOffset(CColumnOffsetMapping c_column_offset_mapping, + const char* field_name, + int64_t path_index, + int64_t col_index) { + try { + auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; + std::string field_name_str(field_name); + column_offset_mapping->AddColumnOffset(field_name_str, path_index, col_index); + return 0; + } catch (const std::exception& ex) { + return -1; + } +} + +void GetColumnOffsetMappingKeys(CColumnOffsetMapping c_column_offset_mapping, void* keys) { + auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; + const char** keys_ = (const char**)keys; + auto map_ = column_offset_mapping->GetMapping(); + std::size_t i = 0; + for (auto it = map_.begin(); it != map_.end(); ++it, i++) { + keys_[i] = it->first.c_str(); + } +} + +int GetColumnOffsetMappingSize(CColumnOffsetMapping c_column_offset_mapping) { + auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; + return column_offset_mapping->Size(); +} + +int GetColumnOffset(CColumnOffsetMapping c_column_offset_mapping, + const char* field_name, + int* path_index, + int* col_index) { + auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping; + std::string field_name_str(field_name); + auto column_offset = column_offset_mapping->GetByFieldName(field_name_str); + if (column_offset == nullptr) { + return 0; + } + *path_index = column_offset->path_index; + *col_index = column_offset->col_index; + return 0; +} diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index fd2226a..8c653e1 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -32,12 +32,12 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, const std::shared_ptr schema, const int64_t buffer_size) : PackedRecordBatchReader( - fs, std::vector{path}, schema, std::vector(), std::set(), buffer_size) {} + fs, std::vector{path}, schema, std::vector(), std::set(), buffer_size) {} PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, const std::vector& paths, const std::shared_ptr schema, - const std::vector& column_offsets, + const std::vector& column_offsets, const std::set& needed_columns, const int64_t buffer_size) : schema_(schema), @@ -52,16 +52,16 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, cols.emplace(i); } } - auto offsets = std::vector(column_offsets); + auto offsets = std::vector(column_offsets); if (column_offsets.empty()) { for (int i = 0; i < schema->num_fields(); i++) { - offsets.emplace_back(0, i); + offsets.emplace_back(std::make_shared(0, i)); } } std::set needed_paths; for (int i : cols) { needed_column_offsets_.push_back(offsets[i]); - needed_paths.emplace(offsets[i].path_index); + needed_paths.emplace(offsets[i]->path_index); } for (auto i : needed_paths) { auto result = MakeArrowFileReader(fs, paths[i]); diff --git a/cpp/test/format/parquet/file_reader_test.cpp b/cpp/test/format/parquet/file_reader_test.cpp index 0fb9173..b64809f 100644 --- a/cpp/test/format/parquet/file_reader_test.cpp +++ b/cpp/test/format/parquet/file_reader_test.cpp @@ -49,10 +49,10 @@ TEST_F(FileReaderTest, FileRecordBatchReader) { ASSERT_STATUS_OK(fr.Close()); std::set needed_columns = {0, 1, 2}; - std::vector column_offsets = { - ColumnOffset(0, 0), - ColumnOffset(0, 1), - ColumnOffset(0, 2), + std::vector column_offsets = { + std::make_shared(0, 0), + std::make_shared(0, 1), + std::make_shared(0, 2), }; PackedRecordBatchReader pr(*fs_, {path}, schema, column_offsets, needed_columns, reader_memory_); ASSERT_AND_ARROW_ASSIGN(auto pr_table, pr.ToTable()); diff --git a/cpp/test/packed/chunk_manager_test.cpp b/cpp/test/packed/chunk_manager_test.cpp index c4513c7..31c6b9b 100644 --- a/cpp/test/packed/chunk_manager_test.cpp +++ b/cpp/test/packed/chunk_manager_test.cpp @@ -36,12 +36,13 @@ class ChunkManagerTest : public PackedTestBase { // large string column tables_[1].push(table_->SelectColumns({2}).ValueOrDie()); - column_offsets_ = {ColumnOffset(0, 0), ColumnOffset(0, 1), ColumnOffset(1, 0)}; + column_offsets_ = {std::make_shared(0, 0), std::make_shared(0, 1), + std::make_shared(1, 0)}; chunk_manager_ = std::make_unique(column_offsets_, chunksize_); } - std::vector column_offsets_; + std::vector column_offsets_; std::unique_ptr chunk_manager_; std::vector>> tables_; int chunksize_ = 2; diff --git a/cpp/test/packed/packed_integration_test.cpp b/cpp/test/packed/packed_integration_test.cpp index b9ee213..602eac0 100644 --- a/cpp/test/packed/packed_integration_test.cpp +++ b/cpp/test/packed/packed_integration_test.cpp @@ -28,9 +28,9 @@ TEST_F(PackedIntegrationTest, TestOneFile) { auto column_offset_mapping = writer.Close(); EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields()); - std::vector column_offsets; + std::vector column_offsets; for (auto& field : schema_->fields()) { - column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name())); + column_offsets.push_back(column_offset_mapping->GetByFieldName(field->name())); } std::vector paths = {file_path_ + "/0"}; @@ -54,9 +54,9 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) { auto column_offset_mapping = writer.Close(); EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields()); - std::vector column_offsets; + std::vector column_offsets; for (auto& field : schema_->fields()) { - column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name())); + column_offsets.push_back(column_offset_mapping->GetByFieldName(field->name())); } std::vector paths = {file_path_ + "/0", file_path_ + "/1"}; diff --git a/go/packed/column_offset_mapping.go b/go/packed/column_offset_mapping.go new file mode 100644 index 0000000..e4f032e --- /dev/null +++ b/go/packed/column_offset_mapping.go @@ -0,0 +1,58 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packed + +/* +#include +#include "milvus-storage/packed/column_offset_mapping_c.h" +*/ +import "C" + +import ( + "fmt" + "unsafe" +) + +type ColumnOffset struct { + PathIndex int + ColIndex int +} + +func GetColumnOffsetMappingKeys(cColumnOffsetMapping C.CColumnOffsetMapping) ([]string, error) { + size := int(C.GetColumnOffsetMappingSize(cColumnOffsetMapping)) + if size == 0 { + return nil, fmt.Errorf("ColumnOffsetMapping is empty") + } + keys := make([]unsafe.Pointer, size) + + C.GetColumnOffsetMappingKeys(cColumnOffsetMapping, unsafe.Pointer(&keys[0])) + ret := make([]string, size) + for i := 0; i < size; i++ { + ret[i] = C.GoString((*C.char)(keys[i])) + } + return ret, nil +} + +func GetColumnOffset(cColumnOffsetMapping C.CColumnOffsetMapping, key string) (ColumnOffset, error) { + cKey := C.CString(key) + defer C.free(unsafe.Pointer(cKey)) + var cPathIndex, cColIndex C.int + + status := C.GetColumnOffset(cColumnOffsetMapping, cKey, &cPathIndex, &cColIndex) + if status != 0 { + return ColumnOffset{-1, -1}, fmt.Errorf("GetColumnOffset from key %s failed", key) + } + return ColumnOffset{int(cPathIndex), int(cColIndex)}, nil +} diff --git a/go/packed/packed.go b/go/packed/packed.go index ef4b2dc..32e7c6f 100644 --- a/go/packed/packed.go +++ b/go/packed/packed.go @@ -18,6 +18,7 @@ package packed #include #include "milvus-storage/packed/reader_c.h" #include "milvus-storage/packed/writer_c.h" +#include "milvus-storage/packed/column_offset_mapping_c.h" #include "arrow/c/abi.h" #include "arrow/c/helpers.h" */ @@ -38,9 +39,8 @@ type PackedWriter struct { } type ( - ColumnOffsetMapping = C.CColumnOffsetMapping - CArrowSchema = C.struct_ArrowSchema - CArrowArray = C.struct_ArrowArray + CArrowSchema = C.struct_ArrowSchema + CArrowArray = C.struct_ArrowArray ) func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { @@ -78,13 +78,31 @@ func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error { return nil } -func (pw *PackedWriter) close() (ColumnOffsetMapping, error) { - var columnOffsetMapping ColumnOffsetMapping - status := C.Close(pw.cPackedWriter, &columnOffsetMapping) +func (pw *PackedWriter) close() (map[string]ColumnOffset, error) { + var cColumnOffsetMapping C.CColumnOffsetMapping + status := C.Close(pw.cPackedWriter, &cColumnOffsetMapping) if status != 0 { - return columnOffsetMapping, errors.New("PackedWriter: failed to close file") + return map[string]ColumnOffset{}, errors.New("PackedWriter: failed to close file") } - return columnOffsetMapping, nil + defer func() { + if cColumnOffsetMapping != nil { + C.DeleteColumnOffsetMapping(cColumnOffsetMapping) + } + }() + + res := make(map[string]ColumnOffset) + fileNames, err := GetColumnOffsetMappingKeys(cColumnOffsetMapping) + if err != nil { + return nil, err + } + for _, fileName := range fileNames { + colOffset, err := GetColumnOffset(cColumnOffsetMapping, fileName) + if err != nil { + return nil, err + } + res[fileName] = colOffset + } + return res, nil } func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {