Skip to content

Commit

Permalink
cgo column offset mapping
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 6, 2025
1 parent e7ab8b4 commit b06e1c5
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 51 deletions.
8 changes: 4 additions & 4 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class S3Fixture : public benchmark::Fixture {

static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
std::set<int> needed_columns = {0, 1, 2};
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(1, 0),
ColumnOffset(1, 1),
std::vector<ColumnOffsetPtr> column_offsets = {
std::make_shared<ColumnOffset>(0, 0),
std::make_shared<ColumnOffset>(1, 0),
std::make_shared<ColumnOffset>(1, 1),
};

auto paths = std::vector<std::string>{path + "/0", path + "/1"};
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/chunk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct ChunkState {

class ChunkManager {
public:
ChunkManager(const std::vector<ColumnOffset>& column_offsets, int64_t chunksize);
ChunkManager(const std::vector<ColumnOffsetPtr>& column_offsets, int64_t chunksize);

std::vector<std::shared_ptr<arrow::ArrayData>> SliceChunksByMaxContiguousSlice(
int64_t chunksize, std::vector<std::queue<std::shared_ptr<arrow::Table>>>& tables);
Expand All @@ -62,7 +62,7 @@ class ChunkManager {
void SetChunkSize(int64_t chunksize) { chunksize_ = chunksize; }

private:
std::vector<ColumnOffset> column_offsets_;
std::vector<ColumnOffsetPtr> column_offsets_;
std::vector<ChunkState> chunk_states_;
int64_t chunksize_;
};
Expand Down
37 changes: 22 additions & 15 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <arrow/record_batch.h>
#include <queue>
#include "common/status.h"
#include <map>
#include <string>

namespace milvus_storage {

Expand Down Expand Up @@ -97,41 +99,46 @@ struct ColumnOffset {
ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

using ColumnOffsetPtr = std::shared_ptr<ColumnOffset>;

// ColumnOffsetMapping is a map of original field name to ColumnOffset.
// The column offset is a pair of path index and a column index in the path.
struct ColumnOffsetMapping {
ColumnOffsetMapping() = default;
class ColumnOffsetMapping {
public:
ColumnOffsetMapping() { mapping_ = std::map<std::string, ColumnOffsetPtr>(); };

ColumnOffsetMapping(const std::vector<std::vector<int>>& group_indices, const std::shared_ptr<arrow::Schema> schema) {
for (int path_index = 0; path_index < group_indices.size(); path_index++) {
for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) {
int original_col_index = group_indices[path_index][col_index];
std::string field_name = schema->field(original_col_index)->name();
mapping_[field_name] = ColumnOffset(path_index, col_index);
mapping_[field_name] = std::make_shared<ColumnOffset>(path_index, col_index);
}
}
}

ColumnOffset GetColumnOffset(std::string field_name) {
if (mapping_.find(field_name) == mapping_.end()) {
return ColumnOffset(-1, -1);
}
return mapping_[field_name];
void AddColumnOffset(const std::string& field_name, int64_t path_index, int64_t col_index) {
auto column_offset = std::make_shared<ColumnOffset>(path_index, col_index);
mapping_[field_name] = column_offset;
}

std::string ToString() {
std::string str;
for (auto& pair : mapping_) {
str += pair.first + "->" + std::to_string(pair.second.path_index) + ":" + std::to_string(pair.second.col_index) +
";";
ColumnOffsetPtr GetByFieldName(const std::string& field_name) {
if (Contains(field_name)) {
return mapping_.at(field_name);
}
return str;
return nullptr;
}

std::map<std::string, ColumnOffsetPtr> GetMapping() const { return mapping_; }

bool Contains(const std::string& key) const { return mapping_.find(key) != mapping_.end(); }

size_t Size() { return mapping_.size(); }

void Clear() { mapping_.clear(); }

private:
std::unordered_map<std::string, ColumnOffset> mapping_;
std::map<std::string, ColumnOffsetPtr> mapping_;
};

} // namespace milvus_storage
45 changes: 45 additions & 0 deletions cpp/include/milvus-storage/packed/column_offset_mapping_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#ifdef __cplusplus
extern "C" {
#endif

#include <stdint.h>

typedef void* CColumnOffsetMapping;

int NewColumnOffsetMapping(CColumnOffsetMapping* c_column_offset_mapping);

void DeleteColumnOffsetMapping(CColumnOffsetMapping c_column_offset_mapping);

int AddColumnOffset(CColumnOffsetMapping c_column_offset_mapping,
const char* field_name,
int64_t path_index,
int64_t col_index);

void GetColumnOffsetMappingKeys(CColumnOffsetMapping c_column_offset_mapping, void* keys);

int GetColumnOffsetMappingSize(CColumnOffsetMapping c_column_offset_mapping);

int GetColumnOffset(CColumnOffsetMapping c_column_offset_mapping,
const char* field_name,
int* path_index,
int* col_index);

#ifdef __cplusplus
}
#endif
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::vector<ColumnOffsetPtr>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

Expand All @@ -76,7 +76,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
int64_t row_limit_;
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffset> needed_column_offsets_;
std::vector<ColumnOffsetPtr> needed_column_offsets_;
std::vector<std::vector<size_t>> row_group_sizes_;
int read_count_;
};
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/packed/chunk_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace milvus_storage {

ChunkManager::ChunkManager(const std::vector<ColumnOffset>& column_offsets, int64_t chunksize)
ChunkManager::ChunkManager(const std::vector<ColumnOffsetPtr>& column_offsets, int64_t chunksize)
: column_offsets_(column_offsets), chunksize_(chunksize) {
chunk_states_ = std::vector<ChunkState>(column_offsets_.size());
}
Expand All @@ -40,8 +40,8 @@ std::vector<std::shared_ptr<arrow::ArrayData>> ChunkManager::SliceChunksByMaxCon
// Identify the chunk for each column and adjust chunk size
for (int i = 0; i < column_offsets_.size(); ++i) {
auto offset = column_offsets_[i];
auto table_queue = tables[offset.path_index];
auto column = table_queue.front()->column(offset.col_index);
auto table_queue = tables[offset->path_index];
auto column = table_queue.front()->column(offset->col_index);
auto chunk = column->chunk(chunk_states_[i].chunk).get();

// Adjust chunksize if a smaller contiguous chunk is found
Expand All @@ -67,7 +67,7 @@ std::vector<std::shared_ptr<arrow::ArrayData>> ChunkManager::SliceChunksByMaxCon

// Mark the table to pop if all chunks are consumed
if (chunk_state.chunk == chunk_sizes[i]) {
table_to_pop.insert(column_offsets_[i].path_index);
table_to_pop.insert(column_offsets_[i]->path_index);
chunk_state.reset();
}
} else {
Expand All @@ -87,7 +87,7 @@ std::vector<std::shared_ptr<arrow::ArrayData>> ChunkManager::SliceChunksByMaxCon
// resets the chunk states for columns in a specific file.
void ChunkManager::ResetChunkState(int path_index) {
for (int j = 0; j < column_offsets_.size(); ++j) {
if (column_offsets_[j].path_index == path_index) {
if (column_offsets_[j]->path_index == path_index) {
chunk_states_[j].reset();
}
}
Expand Down
75 changes: 75 additions & 0 deletions cpp/src/packed/column_offset_mapping_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "packed/column_offset_mapping_c.h"
#include "packed/column_group.h"

int NewColumnOffsetMapping(CColumnOffsetMapping* c_column_offset_mapping) {
try {
auto column_offset_mapping = std::make_unique<milvus_storage::ColumnOffsetMapping>();
*c_column_offset_mapping = column_offset_mapping.release();
return 0;
} catch (const std::exception& ex) {
return -1;
}
}

void DeleteColumnOffsetMapping(CColumnOffsetMapping c_column_offset_mapping) {
auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping;
delete column_offset_mapping;
}

int AddColumnOffset(CColumnOffsetMapping c_column_offset_mapping,
const char* field_name,
int64_t path_index,
int64_t col_index) {
try {
auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping;
std::string field_name_str(field_name);
column_offset_mapping->AddColumnOffset(field_name_str, path_index, col_index);
return 0;
} catch (const std::exception& ex) {
return -1;
}
}

void GetColumnOffsetMappingKeys(CColumnOffsetMapping c_column_offset_mapping, void* keys) {
auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping;
const char** keys_ = (const char**)keys;
auto map_ = column_offset_mapping->GetMapping();
std::size_t i = 0;
for (auto it = map_.begin(); it != map_.end(); ++it, i++) {
keys_[i] = it->first.c_str();
}
}

int GetColumnOffsetMappingSize(CColumnOffsetMapping c_column_offset_mapping) {
auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping;
return column_offset_mapping->Size();
}

int GetColumnOffset(CColumnOffsetMapping c_column_offset_mapping,
const char* field_name,
int* path_index,
int* col_index) {
auto column_offset_mapping = (milvus_storage::ColumnOffsetMapping*)c_column_offset_mapping;
std::string field_name_str(field_name);
auto column_offset = column_offset_mapping->GetByFieldName(field_name_str);
if (column_offset == nullptr) {
return 0;
}
*path_index = column_offset->path_index;
*col_index = column_offset->col_index;
return 0;
}
10 changes: 5 additions & 5 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size)
: PackedRecordBatchReader(
fs, std::vector<std::string>{path}, schema, std::vector<ColumnOffset>(), std::set<int>(), buffer_size) {}
fs, std::vector<std::string>{path}, schema, std::vector<ColumnOffsetPtr>(), std::set<int>(), buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::vector<ColumnOffsetPtr>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size)
: schema_(schema),
Expand All @@ -52,16 +52,16 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
cols.emplace(i);
}
}
auto offsets = std::vector<ColumnOffset>(column_offsets);
auto offsets = std::vector<ColumnOffsetPtr>(column_offsets);
if (column_offsets.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
offsets.emplace_back(0, i);
offsets.emplace_back(std::make_shared<ColumnOffset>(0, i));
}
}
std::set<int> needed_paths;
for (int i : cols) {
needed_column_offsets_.push_back(offsets[i]);
needed_paths.emplace(offsets[i].path_index);
needed_paths.emplace(offsets[i]->path_index);
}
for (auto i : needed_paths) {
auto result = MakeArrowFileReader(fs, paths[i]);
Expand Down
8 changes: 4 additions & 4 deletions cpp/test/format/parquet/file_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ TEST_F(FileReaderTest, FileRecordBatchReader) {
ASSERT_STATUS_OK(fr.Close());

std::set<int> needed_columns = {0, 1, 2};
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(0, 1),
ColumnOffset(0, 2),
std::vector<ColumnOffsetPtr> column_offsets = {
std::make_shared<ColumnOffset>(0, 0),
std::make_shared<ColumnOffset>(0, 1),
std::make_shared<ColumnOffset>(0, 2),
};
PackedRecordBatchReader pr(*fs_, {path}, schema, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto pr_table, pr.ToTable());
Expand Down
5 changes: 3 additions & 2 deletions cpp/test/packed/chunk_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ class ChunkManagerTest : public PackedTestBase {
// large string column
tables_[1].push(table_->SelectColumns({2}).ValueOrDie());

column_offsets_ = {ColumnOffset(0, 0), ColumnOffset(0, 1), ColumnOffset(1, 0)};
column_offsets_ = {std::make_shared<ColumnOffset>(0, 0), std::make_shared<ColumnOffset>(0, 1),
std::make_shared<ColumnOffset>(1, 0)};

chunk_manager_ = std::make_unique<ChunkManager>(column_offsets_, chunksize_);
}

std::vector<ColumnOffset> column_offsets_;
std::vector<ColumnOffsetPtr> column_offsets_;
std::unique_ptr<ChunkManager> chunk_manager_;
std::vector<std::queue<std::shared_ptr<arrow::Table>>> tables_;
int chunksize_ = 2;
Expand Down
8 changes: 4 additions & 4 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ TEST_F(PackedIntegrationTest, TestOneFile) {
auto column_offset_mapping = writer.Close();
EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields());

std::vector<ColumnOffset> column_offsets;
std::vector<ColumnOffsetPtr> column_offsets;
for (auto& field : schema_->fields()) {
column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name()));
column_offsets.push_back(column_offset_mapping->GetByFieldName(field->name()));
}

std::vector<std::string> paths = {file_path_ + "/0"};
Expand All @@ -54,9 +54,9 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
auto column_offset_mapping = writer.Close();
EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields());

std::vector<ColumnOffset> column_offsets;
std::vector<ColumnOffsetPtr> column_offsets;
for (auto& field : schema_->fields()) {
column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name()));
column_offsets.push_back(column_offset_mapping->GetByFieldName(field->name()));
}

std::vector<std::string> paths = {file_path_ + "/0", file_path_ + "/1"};
Expand Down
Loading

0 comments on commit b06e1c5

Please sign in to comment.