Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Oct 8, 2024
1 parent 3d22dd0 commit 1c69040
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 21 deletions.
5 changes: 4 additions & 1 deletion cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class S3Fixture : public benchmark::Fixture {
conf.access_key_value = std::string(secret_key);
conf.file_path = std::string(file_path);
}
storage_config_ = std::move(conf);

auto base = std::string();
auto factory = std::make_shared<FileSystemFactory>();
Expand All @@ -70,6 +71,7 @@ class S3Fixture : public benchmark::Fixture {
}

std::shared_ptr<arrow::fs::FileSystem> fs_;
StorageConfig storage_config_;
};

static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
Expand Down Expand Up @@ -129,7 +131,8 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
auto record_batch = arrow::RecordBatch::Make(schema, 3, arrays);

for (auto _ : st) {
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, *parquet::default_writer_properties());
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, storage_config_,
*parquet::default_writer_properties());
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/milvus-storage/common/config.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zilliz
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@ struct StorageConfig {
std::string cloud_provider = "";
std::string region = "";
bool use_custom_part_upload_size = false;
int part_size = 0;

std::string ToString() const {
std::stringstream ss;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ namespace milvus_storage {

class MultiPartUploadS3FS : public arrow::fs::S3FileSystem {
public:
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStreamWithUploadSize(const std::string& s,
int64_t part_size);

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStreamWithUploadSize(
const std::string& s, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata, int64_t part_size);

Expand Down
8 changes: 7 additions & 1 deletion cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
#include "parquet/arrow/writer.h"
#include "arrow/table.h"
#include <arrow/util/key_value_metadata.h>
#include "common/config.h"

namespace milvus_storage {

class ParquetFileWriter : public FileWriter {
public:
// with default WriterProperties
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema, arrow::fs::FileSystem& fs, const std::string& file_path);
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config);

// with custom WriterProperties
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props);

Status Init() override;
Expand All @@ -52,6 +57,7 @@ class ParquetFileWriter : public FileWriter {
arrow::fs::FileSystem& fs_;
std::shared_ptr<arrow::Schema> schema_;
const std::string file_path_;
const StorageConfig& storage_config_;

std::unique_ptr<parquet::arrow::FileWriter> writer_;
std::shared_ptr<arrow::KeyValueMetadata> kv_metadata_;
Expand Down
3 changes: 3 additions & 0 deletions cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/filesystem/filesystem.h"
#include "common/status.h"
#include "packed/column_group.h"
#include "common/config.h"

namespace milvus_storage {

Expand All @@ -30,12 +31,14 @@ class ColumnGroupWriter {
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const std::vector<int>& origin_column_indices);

ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props,
const std::vector<int>& origin_column_indices);

Expand Down
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class PackedRecordBatchWriter {
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
StorageConfig& storage_config,
parquet::WriterProperties& props);

// Put the record batch into the corresponding column group,
Expand All @@ -57,6 +58,7 @@ class PackedRecordBatchWriter {
std::shared_ptr<arrow::Schema> schema_;
arrow::fs::FileSystem& fs_;
const std::string& file_path_;
const StorageConfig& storage_config_;
parquet::WriterProperties& props_;
size_t current_memory_usage_;
std::vector<std::unique_ptr<ColumnGroupWriter>> group_writers_;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2253,6 +2253,11 @@ class MultiPartUploadS3FS::Impl : public std::enable_shared_from_this<MultiPartU
}
};

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> MultiPartUploadS3FS::OpenOutputStreamWithUploadSize(
const std::string& s, int64_t upload_size) {
return OpenOutputStreamWithUploadSize(s, std::shared_ptr<const arrow::KeyValueMetadata>{}, upload_size);
};

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> MultiPartUploadS3FS::OpenOutputStreamWithUploadSize(
const std::string& s, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata, int64_t upload_size) {
RETURN_NOT_OK(arrow::fs::internal::AssertNoTrailingSlash(s));
Expand Down
26 changes: 19 additions & 7 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,43 @@
#include <boost/variant.hpp>
#include "common/config.h"
#include "packed/utils/serde.h"
#include "filesystem/s3/multi_part_upload_s3_fs.h"

namespace milvus_storage {

ParquetFileWriter::ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path)
const std::string& file_path,
const StorageConfig& storage_config)
: schema_(std::move(schema)),
fs_(fs),
file_path_(file_path),
storage_config_(storage_config),
props_(*parquet::default_writer_properties()),
count_(0) {}

ParquetFileWriter::ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props)
: schema_(std::move(schema)), fs_(fs), file_path_(file_path), props_(props) {}
: schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), props_(props) {}

Status ParquetFileWriter::Init() {
auto coln = schema_->num_fields();
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto sink, fs_.OpenOutputStream(file_path_));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto writer,
parquet::arrow::FileWriter::Open(*schema_, arrow::default_memory_pool(), sink));

writer_ = std::move(writer);
if (storage_config_.use_custom_part_upload_size && storage_config_.part_size > 0) {
auto& s3fs = dynamic_cast<MultiPartUploadS3FS&>(fs_);
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto sink,
s3fs.OpenOutputStreamWithUploadSize(file_path_, storage_config_.part_size));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto writer,
parquet::arrow::FileWriter::Open(*schema_, arrow::default_memory_pool(), sink));
writer_ = std::move(writer);
} else {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto sink, fs_.OpenOutputStream(file_path_));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto writer,
parquet::arrow::FileWriter::Open(*schema_, arrow::default_memory_pool(), sink));
writer_ = std::move(writer);
}
kv_metadata_ = std::make_shared<arrow::KeyValueMetadata>();
return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/packed/column_group_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,22 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const std::vector<int>& origin_column_indices)
: group_id_(group_id),
writer_(std::move(schema), fs, file_path),
writer_(std::move(schema), fs, file_path, storage_config),
column_group_(group_id, origin_column_indices),
finished_(false) {}

ColumnGroupWriter::ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props,
const std::vector<int>& origin_column_indices)
: group_id_(group_id),
writer_(std::move(schema), fs, file_path, props),
writer_(std::move(schema), fs, file_path, storage_config, props),
column_group_(group_id, origin_column_indices),
flushed_batches_(0),
flushed_rows_(0),
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
StorageConfig& storage_config,
parquet::WriterProperties& props)
: memory_limit_(memory_limit),
schema_(std::move(schema)),
fs_(fs),
file_path_(file_path),
storage_config_(storage_config),
props_(props),
splitter_({}),
current_memory_usage_(0),
Expand Down Expand Up @@ -67,8 +69,8 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
for (GroupId i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
std::string group_path = file_path_ + "/" + std::to_string(i);
auto writer =
std::make_unique<ColumnGroupWriter>(i, group.Schema(), fs_, group_path, props_, group.GetOriginColumnIndices());
auto writer = std::make_unique<ColumnGroupWriter>(i, group.Schema(), fs_, group_path, storage_config_, props_,
group.GetOriginColumnIndices());
RETURN_NOT_OK(writer->Init());
for (auto& batch : group.GetRecordBatches()) {
RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0)));
Expand Down Expand Up @@ -122,8 +124,8 @@ Status PackedRecordBatchWriter::Close() {
std::string group_path = file_path_ + "/" + std::to_string(0);
std::vector<int> indices(buffered_batches_[0]->num_columns());
std::iota(std::begin(indices), std::end(indices), 0);
auto writer =
std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), fs_, group_path, props_, indices);
auto writer = std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), fs_, group_path,
storage_config_, props_, indices);
RETURN_NOT_OK(writer->Init());
for (auto& batch : buffered_batches_) {
RETURN_NOT_OK(writer->Write(batch));
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/storage/space.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ Status Space::Write(arrow::RecordBatchReader& reader, const WriteOption& option)

if (scalar_writer == nullptr) {
auto scalar_file_path = GetNewParquetFilePath(GetScalarDataDir(path_));
scalar_writer.reset(new ParquetFileWriter(scalar_schema, *fs_, scalar_file_path));
scalar_writer.reset(new ParquetFileWriter(scalar_schema, *fs_, scalar_file_path, StorageConfig()));
RETURN_NOT_OK(scalar_writer->Init());
scalar_fragment.add_file(scalar_file_path);
}

if (vector_writer == nullptr) {
auto vector_file_path = GetNewParquetFilePath(GetVectorDataDir(path_));
vector_writer.reset(new ParquetFileWriter(vector_schema, *fs_, vector_file_path));
vector_writer.reset(new ParquetFileWriter(vector_schema, *fs_, vector_file_path, StorageConfig()));
RETURN_NOT_OK(vector_writer->Init());
vector_fragment.add_file(vector_file_path);
}
Expand Down Expand Up @@ -153,7 +153,7 @@ Status Space::Delete(arrow::RecordBatchReader& reader) {

if (!writer) {
delete_file = GetNewParquetFilePath(GetDeleteDataDir(path_));
writer = new ParquetFileWriter(manifest_->schema()->delete_schema(), *fs_, delete_file);
writer = new ParquetFileWriter(manifest_->schema()->delete_schema(), *fs_, delete_file, StorageConfig());
RETURN_NOT_OK(writer->Init());
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PackedTestBase : public ::testing::Test {
conf.access_key_value = std::string(secret_key);
}
file_path_ = GenerateUniqueFilePath(env_file_path);
storage_config_ = std::move(conf);

auto factory = std::make_shared<FileSystemFactory>();
ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(conf, &file_path_));
Expand All @@ -86,7 +87,7 @@ class PackedTestBase : public ::testing::Test {
const std::vector<std::string>& paths,
const std::vector<std::shared_ptr<arrow::Field>>& fields,
const std::vector<ColumnOffset>& column_offsets) {
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_);
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
Expand Down Expand Up @@ -189,6 +190,7 @@ class PackedTestBase : public ::testing::Test {
std::shared_ptr<arrow::fs::FileSystem> fs_;
std::string file_path_;
parquet::WriterProperties props_;
StorageConfig storage_config_;

std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::RecordBatch> record_batch_;
Expand Down
3 changes: 2 additions & 1 deletion cpp/test/test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <arrow/type_fwd.h>
#include "format/parquet/file_writer.h"
#include "arrow/array/builder_primitive.h"
#include "common/config.h"
namespace milvus_storage {
std::shared_ptr<arrow::Schema> CreateArrowSchema(std::vector<std::string> field_names,
std::vector<std::shared_ptr<arrow::DataType>> field_types) {
Expand All @@ -31,7 +32,7 @@ Status PrepareSimpleParquetFile(std::shared_ptr<arrow::Schema> schema,
const std::string& file_path,
int num_rows) {
// TODO: parse schema and generate data
ParquetFileWriter w(schema, fs, file_path);
ParquetFileWriter w(schema, fs, file_path, StorageConfig());
w.Init();
arrow::Int64Builder builder;
for (int i = 0; i < num_rows; i++) {
Expand Down

0 comments on commit 1c69040

Please sign in to comment.