Skip to content

Commit

Permalink
init buffer for packed writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Sep 1, 2024
1 parent c330061 commit 40ed2d1
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 58 deletions.
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/common/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB
// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

} // namespace milvus_storage
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#pragma once

#include "splitter_plugin.h"
#include "common/result.h"

namespace milvus_storage {

Expand All @@ -23,6 +24,8 @@ class SizeBasedSplitter : public SplitterPlugin {

void Init() override;

Result<std::vector<ColumnGroup>> SplitRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);

std::vector<ColumnGroup> Split(const std::shared_ptr<arrow::RecordBatch>& record) override;

private:
Expand Down
8 changes: 5 additions & 3 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ class PackedRecordBatchWriter {
arrow::fs::FileSystem& fs,
std::string& file_path,
parquet::WriterProperties& props);
// Init with the first batch of record.
// Split the first batch into column groups and initialize ColumnGroupWriters.
Status Init(const std::shared_ptr<arrow::RecordBatch>& record);

// Put the record batch into the corresponding column group,
// , and write the maximum buffer of column group to the file.
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status Close();

private:
Status bufferUntilLimit(const std::shared_ptr<arrow::RecordBatch>& record);
Status writeWithSplitIndex(const std::shared_ptr<arrow::RecordBatch>& record);
Status balanceMaxHeap();

std::vector<std::shared_ptr<arrow::RecordBatch>> buffered_batches_;
bool size_split_done_;
size_t memory_limit_;
std::shared_ptr<arrow::Schema> schema_;
arrow::fs::FileSystem& fs_;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
uri_parser.password();
options.ConfigureAccessKey(std::getenv("ACCESS_KEY"), std::getenv("SECRET_KEY"));
*out_path = std::getenv("FILE_PATH");
if (std::getenv("REGION") != nullptr) {
options.region = std::getenv("REGION");
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(options));

return std::shared_ptr<arrow::fs::FileSystem>(fs);
Expand Down
32 changes: 18 additions & 14 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/log.h"
#include "common/macro.h"
#include "format/parquet/file_writer.h"
#include <parquet/properties.h>
Expand Down Expand Up @@ -61,23 +62,26 @@ Status ParquetFileWriter::WriteTable(const arrow::Table& table) {

Status ParquetFileWriter::WriteRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::vector<size_t>& batch_memory_sizes) {
size_t current_group_size = 0;
std::vector<std::shared_ptr<arrow::RecordBatch>> current_group_batches;
auto WriteRowGroup = [&](const std::vector<std::shared_ptr<arrow::RecordBatch>>& batch, size_t group_size) -> Status {
kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(group_size));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(batch));
RETURN_ARROW_NOT_OK(writer_->WriteTable(*table));
return Status::OK();
};

size_t current_size = 0;
std::vector<std::shared_ptr<arrow::RecordBatch>> current_batches;
for (int i = 0; i < batches.size(); i++) {
if (current_group_size + batch_memory_sizes[i] >= DEFAULT_MAX_ROW_GROUP_SIZE) {
kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(current_group_size));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(current_group_batches));
RETURN_ARROW_NOT_OK(writer_->WriteTable(*table));
current_group_batches.clear();
current_group_size = 0;
if (current_size + batch_memory_sizes[i] >= DEFAULT_MAX_ROW_GROUP_SIZE && !current_batches.empty()) {
RETURN_ARROW_NOT_OK(WriteRowGroup(current_batches, current_size));
current_batches.clear();
current_size = 0;
}
current_group_batches.push_back(batches[i]);
current_group_size += batch_memory_sizes[i];
current_batches.push_back(batches[i]);
current_size += batch_memory_sizes[i];
}
if (!current_group_batches.empty()) {
kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(current_group_size));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(current_group_batches));
RETURN_ARROW_NOT_OK(writer_->WriteTable(*table));
if (!current_batches.empty()) {
RETURN_ARROW_NOT_OK(WriteRowGroup(current_batches, current_size));
}
RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_));
return Status::OK();
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/packed/splitter/size_based_splitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,38 @@

#include "packed/splitter/size_based_splitter.h"
#include "common/arrow_util.h"
#include "common/log.h"
#include "common/macro.h"
#include "packed/column_group.h"
#include <stdexcept>
#include <arrow/table.h>
#include <arrow/array/concatenate.h>

namespace milvus_storage {

SizeBasedSplitter::SizeBasedSplitter(size_t max_group_size) : max_group_size_(max_group_size) {}

void SizeBasedSplitter::Init() {}

Result<std::vector<ColumnGroup>> SizeBasedSplitter::SplitRecordBatches(
const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
auto schema = batches[0]->schema();

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto merged_table, arrow::Table::FromRecordBatches(schema, batches));

std::vector<std::shared_ptr<Array>> arrays;
for (const auto& column : merged_table->columns()) {
// Concatenate all chunks of the current column
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto concatenated_array,
arrow::Concatenate(column->chunks(), arrow::default_memory_pool()));
arrays.push_back(concatenated_array);
}
std::shared_ptr<RecordBatch> batch =
RecordBatch::Make(merged_table->schema(), merged_table->num_rows(), std::move(arrays));
LOG_STORAGE_INFO_ << "split record batch: " << merged_table->num_rows();
return Split(batch);
}

std::vector<ColumnGroup> SizeBasedSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& record) {
if (!record) {
throw std::invalid_argument("RecordBatch is null");
Expand Down
98 changes: 60 additions & 38 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "packed/splitter/indices_based_splitter.h"
#include "packed/splitter/size_based_splitter.h"
#include "common/fs_util.h"
#include "common/arrow_util.h"

namespace milvus_storage {

Expand All @@ -35,49 +36,70 @@ PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit,
file_path_(file_path),
props_(props),
splitter_({}),
current_memory_usage_(0) {}

Status PackedRecordBatchWriter::Init(const std::shared_ptr<arrow::RecordBatch>& record) {
// split first batch into column groups
std::vector<ColumnGroup> groups = SizeBasedSplitter(record->num_columns()).Split(record);

// init column group writer and
// put column groups into max heap
std::vector<std::vector<int>> group_indices;
GroupId group_id = 0;
for (const ColumnGroup& group : groups) {
std::string group_path = file_path_ + "/" + std::to_string(group_id);
auto writer = std::make_unique<ColumnGroupWriter>(group_id, group.Schema(), fs_, group_path, props_,
group.GetOriginColumnIndices());
auto status = writer->Init();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to init column group writer: " << status.ToString();
return status;
}
current_memory_usage_ += group.GetMemoryUsage();
max_heap_.emplace(group_id, group.GetMemoryUsage());
status = writer->Write(group.GetRecordBatch(0));
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to write column group: " << group_id << ", " << status.ToString();
return status;
}
group_indices.emplace_back(group.GetOriginColumnIndices());
group_writers_.emplace_back(std::move(writer));
group_id++;
current_memory_usage_(0),
size_split_done_(false) {
if (memory_limit < MIN_BUFFER_SIZE_PER_FILE) {
LOG_STORAGE_ERROR_ << "Please provide at least " + std::to_string(MIN_BUFFER_SIZE_PER_FILE / 1024 / 1024) +
" MB of memory for packed writer.";
}
splitter_ = IndicesBasedSplitter(group_indices);
}

// check memory usage limit
size_t min_memory_limit = group_id * (DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE);
if (memory_limit_ < min_memory_limit) {
return Status::InvalidArgument("Please provide at least " + std::to_string(min_memory_limit / 1024 / 1024) +
" MB of memory for packed writer.");
Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>& record) {
if (!size_split_done_) {
return bufferUntilLimit(record);
} else {
return writeWithSplitIndex(record);
}
memory_limit_ -= min_memory_limit;
return balanceMaxHeap();
}

Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>& record) {
Status PackedRecordBatchWriter::bufferUntilLimit(const std::shared_ptr<arrow::RecordBatch>& record) {
buffered_batches_.push_back(record);
current_memory_usage_ += GetRecordBatchMemorySize(record);

// split first buffer into column groups based on column size
if (current_memory_usage_ >= MIN_BUFFER_SIZE_PER_FILE) {
size_split_done_ = true;
std::vector<ColumnGroup> groups =
SizeBasedSplitter(record->num_columns()).SplitRecordBatches(buffered_batches_).value();
// init column group writer and put column groups into max heap
std::vector<std::vector<int>> group_indices;
GroupId group_id = 0;
for (const ColumnGroup& group : groups) {
std::string group_path = file_path_ + "/" + std::to_string(group_id);
auto writer = std::make_unique<ColumnGroupWriter>(group_id, group.Schema(), fs_, group_path, props_,
group.GetOriginColumnIndices());
auto status = writer->Init();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to init column group writer: " << status.ToString();
return status;
}
max_heap_.emplace(group_id, group.GetMemoryUsage());
LOG_STORAGE_INFO_ << " group rows: " << group.GetRecordBatch(0)->num_rows();
status = writer->Write(group.GetRecordBatch(0));
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to write column group: " << group_id << ", " << status.ToString();
return status;
}
group_indices.emplace_back(group.GetOriginColumnIndices());
group_writers_.emplace_back(std::move(writer));
group_id++;
}
splitter_ = IndicesBasedSplitter(group_indices);

// check memory usage limit
size_t min_memory_limit = group_id * (DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE);
if (memory_limit_ < min_memory_limit) {
return Status::InvalidArgument("Please provide at least " + std::to_string(min_memory_limit / 1024 / 1024) +
" MB of memory for packed writer.");
}
LOG_STORAGE_INFO_ << "groups " << group_id;
memory_limit_ -= min_memory_limit;
return balanceMaxHeap();
}
return Status::OK();
}

Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow::RecordBatch>& record) {
std::vector<ColumnGroup> column_groups = splitter_.Split(record);

// Calculate the total memory usage of the new column groups
Expand Down
5 changes: 2 additions & 3 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PackedIntegrationTest : public PackedTestBase {
SetUpCommonData();
props_ = *parquet::default_writer_properties();
writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB memory is for s3fs part upload
reader_memory_ = 16 * 1024 * 1024; // 16 MB memory for reading
reader_memory_ = 16 * 1024 * 1024; // 16 MB memory for reading
}

void TearDown() override { fs_->DeleteDir(file_path_); }
Expand All @@ -65,8 +65,7 @@ class PackedIntegrationTest : public PackedTestBase {

TEST_F(PackedIntegrationTest, WriteAndRead) {
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_);
EXPECT_TRUE(writer.Init(record_batch_).ok());
for (int i = 1; i < bath_size; ++i) {
for (int i = 0; i < bath_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
Expand Down

0 comments on commit 40ed2d1

Please sign in to comment.