Skip to content

Commit

Permalink
writer close return column offsets
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 4, 2025
1 parent b86d764 commit 388d539
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 54 deletions.
7 changes: 4 additions & 3 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,10 @@ static void PackedWrite(benchmark::State& st,
break;
}
}
auto r = writer.Close();
if (!r.ok()) {
st.SkipWithError(r.ToString());
auto column_index_groups = writer.Close();
if (column_index_groups->Size() == 0) {
st.SkipWithError("Failed to close writer");
break;
}
}
}
Expand Down
8 changes: 1 addition & 7 deletions cpp/include/milvus-storage/packed/chunk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "packed/column_group.h"
#include <parquet/arrow/reader.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
Expand All @@ -24,13 +25,6 @@

namespace milvus_storage {

struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// record which chunk is in use and its offset in the file
struct ChunkState {
int chunk;
Expand Down
30 changes: 24 additions & 6 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,34 @@ struct ColumnGroupState {
void resetMemorySize() { this->memory_size = 0; }
};

struct ColumnIndexGroups {
std::vector<std::vector<int>> groups;
// ColumnOffset is a pair of path index and original column index.
struct ColumnOffset {
int path_index;
int col_index;

void AddColumnIndexGroup(const std::vector<int>& indices) { groups.push_back(indices); }
ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// ColumnOffsets is a sequece of ColumnOffset.
struct ColumnOffsets {
ColumnOffsets() = default;

ColumnOffsets(std::vector<ColumnOffset>& pairs) : pairs_(std::move(pairs)) {}

const std::vector<int>& GetColumnIndexGroup(size_t index) const { return groups.at(index); }
ColumnOffsets(const std::vector<std::vector<int>>& groups) {
for (size_t i = 0; i < groups.size(); i++) {
for (auto col : groups[i]) {
pairs_.emplace_back(ColumnOffset(i, col));
}
}
}

const std::vector<std::vector<int>>& GetColumnIndexGroup() const { return groups; }
std::vector<ColumnOffset> GetColumnOffsets() const { return pairs_; }

size_t Size() const { return groups.size(); }
size_t Size() const { return pairs_.size(); }

private:
std::vector<ColumnOffset> pairs_;
};

} // namespace milvus_storage
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ namespace milvus_storage {

class IndicesBasedSplitter : public SplitterPlugin {
public:
explicit IndicesBasedSplitter(const ColumnIndexGroups& column_indices);
explicit IndicesBasedSplitter(const std::vector<std::vector<int>>& column_indices);

void Init() override;

std::vector<ColumnGroup> Split(const std::shared_ptr<arrow::RecordBatch>& record) override;

private:
ColumnIndexGroups column_indices_;
std::vector<std::vector<int>> column_indices_;
};

} // namespace milvus_storage
7 changes: 5 additions & 2 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class PackedRecordBatchWriter {
// Put the record batch into the corresponding column group,
// , and write the maximum buffer of column group to the file.
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status Close(ColumnIndexGroups& column_index_groups_);
// Close the writer and return the mapping of each file path to its original column indices.
std::unique_ptr<ColumnOffsets> Close();

private:
// split first buffer into column groups based on column size
Expand All @@ -51,6 +52,8 @@ class PackedRecordBatchWriter {

Status writeWithSplitIndex(const std::shared_ptr<arrow::RecordBatch>& record, size_t batch_size);
Status balanceMaxHeap();
Status flushRemainingBuffer();
Status flushUnsplittedBuffer();

std::vector<std::shared_ptr<arrow::RecordBatch>> buffered_batches_;
bool size_split_done_;
Expand All @@ -62,7 +65,7 @@ class PackedRecordBatchWriter {
parquet::WriterProperties& props_;
size_t current_memory_usage_;
std::vector<std::unique_ptr<ColumnGroupWriter>> group_writers_;
ColumnIndexGroups column_index_groups_;
std::vector<std::vector<int>> group_indices_;
IndicesBasedSplitter splitter_;
MemoryMaxHeap max_heap_;
};
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/packed/splitter/indices_based_splitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

namespace milvus_storage {

IndicesBasedSplitter::IndicesBasedSplitter(const ColumnIndexGroups& column_indices) : column_indices_(column_indices) {}
IndicesBasedSplitter::IndicesBasedSplitter(const std::vector<std::vector<int>>& column_indices)
: column_indices_(column_indices) {}

void IndicesBasedSplitter::Init() {}

std::vector<ColumnGroup> IndicesBasedSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& record) {
std::vector<ColumnGroup> column_groups;

for (GroupId group_id = 0; group_id < column_indices_.Size(); group_id++) {
auto batch = record->SelectColumns(column_indices_.GetColumnIndexGroup(group_id)).ValueOrDie();
column_groups.push_back(ColumnGroup(group_id, column_indices_.GetColumnIndexGroup(group_id), batch));
for (GroupId group_id = 0; group_id < column_indices_.size(); group_id++) {
auto batch = record->SelectColumns(column_indices_[group_id]).ValueOrDie();
column_groups.push_back(ColumnGroup(group_id, column_indices_[group_id], batch));
}

return column_groups;
Expand Down
52 changes: 35 additions & 17 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
}

max_heap_.emplace(i, group.GetMemoryUsage());
column_index_groups_.AddColumnIndexGroup(group.GetOriginColumnIndices());
group_indices_.emplace_back(group.GetOriginColumnIndices());
group_writers_.emplace_back(std::move(writer));
}
splitter_ = IndicesBasedSplitter(column_index_groups_);
splitter_ = IndicesBasedSplitter(group_indices_);

// check memory usage limit
size_t min_memory_limit = groups.size() * ARROW_PART_UPLOAD_SIZE;
Expand Down Expand Up @@ -118,23 +118,26 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow:
return balanceMaxHeap();
}

Status PackedRecordBatchWriter::Close(ColumnIndexGroups& column_index_groups) {
std::unique_ptr<ColumnOffsets> PackedRecordBatchWriter::Close() {
// write unsplitted record batch to one file if the buffer record batches are not splitted
if (!size_split_done_ && !buffered_batches_.empty()) {
std::string group_path = file_path_ + "/" + std::to_string(0);
std::vector<int> indices(buffered_batches_[0]->num_columns());
std::iota(std::begin(indices), std::end(indices), 0);
auto writer = std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), *fs_, group_path,
storage_config_, props_, indices);
RETURN_NOT_OK(writer->Init());
for (auto& batch : buffered_batches_) {
RETURN_NOT_OK(writer->Write(batch));
auto status = flushUnsplittedBuffer();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to write unsplitted record batch while closing writer. Error message: ";
return std::make_unique<ColumnOffsets>();
}
RETURN_NOT_OK(writer->Flush());
RETURN_NOT_OK(writer->Close());
column_index_groups_.AddColumnIndexGroup(indices);
return Status::OK();
return std::make_unique<ColumnOffsets>(std::move(group_indices_));
}
// flush all remaining column groups before closing'
// flush all remaining column groups before closing
auto status = flushRemainingBuffer();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to flush remaining column groups while closing writer.";
return std::make_unique<ColumnOffsets>();
}
return std::make_unique<ColumnOffsets>(std::move(group_indices_));
}

Status PackedRecordBatchWriter::flushRemainingBuffer() {
while (!max_heap_.empty()) {
auto max_group = max_heap_.top();
max_heap_.pop();
Expand All @@ -147,7 +150,22 @@ Status PackedRecordBatchWriter::Close(ColumnIndexGroups& column_index_groups) {
for (auto& writer : group_writers_) {
RETURN_NOT_OK(writer->Close());
}
column_index_groups = std::move(column_index_groups_);
return Status::OK();
}

Status PackedRecordBatchWriter::flushUnsplittedBuffer() {
std::string group_path = file_path_ + "/" + std::to_string(0);
std::vector<int> indices(buffered_batches_[0]->num_columns());
std::iota(std::begin(indices), std::end(indices), 0);
auto writer = std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), *fs_, group_path,
storage_config_, props_, indices);
RETURN_NOT_OK(writer->Init());
for (auto& batch : buffered_batches_) {
RETURN_NOT_OK(writer->Write(batch));
}
RETURN_NOT_OK(writer->Flush());
RETURN_NOT_OK(writer->Close());
group_indices_.emplace_back(indices);
return Status::OK();
}

Expand Down
7 changes: 2 additions & 5 deletions cpp/src/packed/writer_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,9 @@ int Close(CPackedWriter c_packed_writer, CColumnIndexGroups c_column_index_group
return 0;
}
auto packed_writer = static_cast<milvus_storage::PackedRecordBatchWriter*>(c_packed_writer);
auto column_index_groups = static_cast<milvus_storage::ColumnIndexGroups*>(c_column_index_groups);
auto status = packed_writer->Close(c_column_index_groups);
auto column_index_groups = packed_writer->Close();
c_column_index_groups = column_index_groups.release();
delete packed_writer;
if (!status.ok()) {
return -1;
}
return 0;
} catch (std::exception& e) {
return -1;
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/format/parquet/file_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ TEST_F(FileReaderTest, FileRecordBatchReader) {
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
auto column_index_groups = writer.Close();

std::vector<std::shared_ptr<arrow::Field>> fields = {
arrow::field("int32", arrow::int32()),
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PackedTestBase : public ::testing::Test {
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
auto path_column_pairs = writer.Close();

std::set<int> needed_columns = {0, 1, 2};
auto new_schema = arrow::schema(fields);
Expand Down
4 changes: 0 additions & 4 deletions go/packed/packed.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ func (pw *PackedWriter) close() error {
return nil
}

func deletePackedWriter(packedWriter *PackedWriter) {
C.DeletePackedWriter(packedWriter.cPackedWriter)
}

func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {
// var cSchemaPtr uintptr
// cSchema := cdata.SchemaFromPtr(cSchemaPtr)
Expand Down
2 changes: 0 additions & 2 deletions go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ func TestRead(t *testing.T) {
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := newPackedWriter(path, schema, bufferSize)
assert.NoError(t, err)
defer deletePackedWriter(pw)

err = pw.writeRecordBatch(rec)
assert.NoError(t, err)
err = pw.close()
Expand Down

0 comments on commit 388d539

Please sign in to comment.