Skip to content

Commit

Permalink
writer close return column offsets
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 4, 2025
1 parent 388d539 commit 3bf1728
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 95 deletions.
45 changes: 31 additions & 14 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <arrow/type.h>
#include <arrow/record_batch.h>
#include <queue>
#include "common/status.h"
Expand Down Expand Up @@ -87,34 +88,50 @@ struct ColumnGroupState {
void resetMemorySize() { this->memory_size = 0; }
};

// ColumnOffset is a pair of path index and original column index.
struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset() = default;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// ColumnOffsets is a sequece of ColumnOffset.
struct ColumnOffsets {
ColumnOffsets() = default;

ColumnOffsets(std::vector<ColumnOffset>& pairs) : pairs_(std::move(pairs)) {}

ColumnOffsets(const std::vector<std::vector<int>>& groups) {
for (size_t i = 0; i < groups.size(); i++) {
for (auto col : groups[i]) {
pairs_.emplace_back(ColumnOffset(i, col));
// ColumnOffsetMapping is a map of original field name to ColumnOffset.
// The column offset is a pair of path index and a column index in the path.
struct ColumnOffsetMapping {
ColumnOffsetMapping() = default;

ColumnOffsetMapping(const std::vector<std::vector<int>>& group_indices, const std::shared_ptr<arrow::Schema> schema) {
for (int path_index = 0; path_index < group_indices.size(); path_index++) {
for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) {
int original_col_index = group_indices[path_index][col_index];
std::string field_name = schema->field(original_col_index)->name();
mapping_[field_name] = ColumnOffset(path_index, col_index);
}
}
}

std::vector<ColumnOffset> GetColumnOffsets() const { return pairs_; }
ColumnOffset GetColumnOffset(std::string field_name) {
if (mapping_.find(field_name) == mapping_.end()) {
return ColumnOffset(-1, -1);
}
return mapping_[field_name];
}

std::string ToString() {
std::string str;
for (auto& pair : mapping_) {
str += pair.first + "->" + std::to_string(pair.second.path_index) + ":" + std::to_string(pair.second.col_index) +
";";
}
return str;
}

size_t Size() const { return pairs_.size(); }
size_t Size() { return mapping_.size(); }

private:
std::vector<ColumnOffset> pairs_;
std::unordered_map<std::string, ColumnOffset> mapping_;
};

} // namespace milvus_storage
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class PackedRecordBatchWriter {
// Put the record batch into the corresponding column group,
// , and write the maximum buffer of column group to the file.
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
// Close the writer and return the mapping of each file path to its original column indices.
std::unique_ptr<ColumnOffsets> Close();
// Close the writer and return the mapping of field name written to column offset.
std::unique_ptr<ColumnOffsetMapping> Close();

private:
// split first buffer into column groups based on column size
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,23 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow:
return balanceMaxHeap();
}

std::unique_ptr<ColumnOffsets> PackedRecordBatchWriter::Close() {
std::unique_ptr<ColumnOffsetMapping> PackedRecordBatchWriter::Close() {
// write unsplitted record batch to one file if the buffer record batches are not splitted
if (!size_split_done_ && !buffered_batches_.empty()) {
auto status = flushUnsplittedBuffer();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to write unsplitted record batch while closing writer. Error message: ";
return std::make_unique<ColumnOffsets>();
return std::make_unique<ColumnOffsetMapping>();
}
return std::make_unique<ColumnOffsets>(std::move(group_indices_));
return std::make_unique<ColumnOffsetMapping>(std::move(group_indices_), schema_);
}
// flush all remaining column groups before closing
auto status = flushRemainingBuffer();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to flush remaining column groups while closing writer.";
return std::make_unique<ColumnOffsets>();
return std::make_unique<ColumnOffsetMapping>();
}
return std::make_unique<ColumnOffsets>(std::move(group_indices_));
return std::make_unique<ColumnOffsetMapping>(std::move(group_indices_), schema_);
}

Status PackedRecordBatchWriter::flushRemainingBuffer() {
Expand Down
41 changes: 0 additions & 41 deletions cpp/test/packed/one_file_test.cpp

This file was deleted.

58 changes: 45 additions & 13 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,56 @@ namespace milvus_storage {

class PackedIntegrationTest : public PackedTestBase {};

TEST_F(PackedIntegrationTest, WriteAndRead) {
int batch_size = 100000;
TEST_F(PackedIntegrationTest, TestOneFile) {
int batch_size = 100;

std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(1, 0),
ColumnOffset(1, 1),
};
PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_, props_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
auto column_offset_mapping = writer.Close();
EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields());

std::vector<ColumnOffset> column_offsets;
for (auto& field : schema_->fields()) {
column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name()));
}

std::vector<std::string> paths = {file_path_ + "/0"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, paths, schema_, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());
ASSERT_STATUS_OK(pr.Close());

ValidateTableData(table);
}

TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
int batch_size = 100;

PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_, props_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
auto column_offset_mapping = writer.Close();
EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields());

std::vector<ColumnOffset> column_offsets;
for (auto& field : schema_->fields()) {
column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name()));
}

std::vector<std::string> paths = {file_path_ + "/0", file_path_ + "/1"};

std::vector<std::shared_ptr<arrow::Field>> fields = {
arrow::field("str", arrow::utf8()),
arrow::field("int32", arrow::int32()),
arrow::field("int64", arrow::int64()),
};
std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, paths, schema_, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());
ASSERT_STATUS_OK(pr.Close());

TestWriteAndRead(batch_size, paths, fields, column_offsets);
ValidateTableData(table);
}

} // namespace milvus_storage
21 changes: 1 addition & 20 deletions cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <packed/writer.h>
#include <packed/reader.h>
#include <packed/column_group.h>
#include <memory>
#include <gtest/gtest.h>
#include <parquet/properties.h>
Expand Down Expand Up @@ -85,26 +86,6 @@ class PackedTestBase : public ::testing::Test {
return path;
}

void TestWriteAndRead(int batch_size,
const std::vector<std::string>& paths,
const std::vector<std::shared_ptr<arrow::Field>>& fields,
const std::vector<ColumnOffset>& column_offsets) {
PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_, props_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
auto path_column_pairs = writer.Close();

std::set<int> needed_columns = {0, 1, 2};
auto new_schema = arrow::schema(fields);

PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_);
ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable());
ASSERT_STATUS_OK(pr.Close());

ValidateTableData(table);
}

void ValidateTableData(const std::shared_ptr<arrow::Table>& table) {
int64_t total_rows = table->num_rows();

Expand Down

0 comments on commit 3bf1728

Please sign in to comment.