Skip to content

Commit

Permalink
cgo packed writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 6, 2025
1 parent 51b7244 commit e7ab8b4
Show file tree
Hide file tree
Showing 21 changed files with 336 additions and 182 deletions.
16 changes: 10 additions & 6 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
}
}

static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
static void PackedWrite(benchmark::State& st,
std::shared_ptr<arrow::fs::FileSystem> fs,
std::string& path,
size_t buffer_size) {
auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8())});
arrow::Int32Builder int_builder;
Expand All @@ -134,17 +137,18 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties());
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf);
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
st.SkipWithError(r.ToString());
break;
}
}
auto r = writer.Close();
if (!r.ok()) {
st.SkipWithError(r.ToString());
auto column_index_groups = writer.Close();
if (column_index_groups->Size() == 0) {
st.SkipWithError("Failed to close writer");
break;
}
}
}
Expand All @@ -153,7 +157,7 @@ std::string PATH = "/tmp/bench/foo";

BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) {
SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st);
PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024);
PackedWrite(st, fs_, PATH, 22 * 1024 * 1024);
}
BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime();

Expand Down
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <sstream>
#include <parquet/properties.h>

using namespace std;

Expand All @@ -31,6 +32,7 @@ struct StorageConfig {
std::string region = "";
bool use_custom_part_upload_size = false;
int64_t part_size = 0;
parquet::WriterProperties writer_props = *parquet::default_writer_properties();

std::string ToString() const {
std::stringstream ss;
Expand Down
10 changes: 0 additions & 10 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <parquet/properties.h>
#include <memory>
#include "arrow/filesystem/filesystem.h"
#include "format/writer.h"
Expand All @@ -27,19 +26,11 @@ namespace milvus_storage {

class ParquetFileWriter : public FileWriter {
public:
// with default WriterProperties
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config);

// with custom WriterProperties
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props);

Status Init() override;

Status Write(const arrow::RecordBatch& record) override;
Expand All @@ -61,7 +52,6 @@ class ParquetFileWriter : public FileWriter {

std::unique_ptr<parquet::arrow::FileWriter> writer_;
std::shared_ptr<arrow::KeyValueMetadata> kv_metadata_;
parquet::WriterProperties props_;
int64_t count_ = 0;
int row_group_num_ = 0;
std::vector<size_t> row_group_sizes_;
Expand Down
8 changes: 1 addition & 7 deletions cpp/include/milvus-storage/packed/chunk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "packed/column_group.h"
#include <parquet/arrow/reader.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
Expand All @@ -24,13 +25,6 @@

namespace milvus_storage {

struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// record which chunk is in use and its offset in the file
struct ChunkState {
int chunk;
Expand Down
47 changes: 47 additions & 0 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <arrow/type.h>
#include <arrow/record_batch.h>
#include <queue>
#include "common/status.h"
Expand Down Expand Up @@ -87,4 +88,50 @@ struct ColumnGroupState {
void resetMemorySize() { this->memory_size = 0; }
};

struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset() = default;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// ColumnOffsetMapping is a map of original field name to ColumnOffset.
// The column offset is a pair of path index and a column index in the path.
struct ColumnOffsetMapping {
ColumnOffsetMapping() = default;

ColumnOffsetMapping(const std::vector<std::vector<int>>& group_indices, const std::shared_ptr<arrow::Schema> schema) {
for (int path_index = 0; path_index < group_indices.size(); path_index++) {
for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) {
int original_col_index = group_indices[path_index][col_index];
std::string field_name = schema->field(original_col_index)->name();
mapping_[field_name] = ColumnOffset(path_index, col_index);
}
}
}

ColumnOffset GetColumnOffset(std::string field_name) {
if (mapping_.find(field_name) == mapping_.end()) {
return ColumnOffset(-1, -1);
}
return mapping_[field_name];
}

std::string ToString() {
std::string str;
for (auto& pair : mapping_) {
str += pair.first + "->" + std::to_string(pair.second.path_index) + ":" + std::to_string(pair.second.col_index) +
";";
}
return str;
}

size_t Size() { return mapping_.size(); }

private:
std::unordered_map<std::string, ColumnOffset> mapping_;
};

} // namespace milvus_storage
8 changes: 0 additions & 8 deletions cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ class ColumnGroupWriter {
const StorageConfig& storage_config,
const std::vector<int>& origin_column_indices);

ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props,
const std::vector<int>& origin_column_indices);

Status Init();
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status Flush();
Expand Down
1 change: 0 additions & 1 deletion cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ extern "C" {
#include <arrow/c/abi.h>

typedef void* CReader;
typedef void* CStatus;
typedef void* CRecordBatch;
typedef void* CFileSystem;

Expand Down
18 changes: 10 additions & 8 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ class PackedRecordBatchWriter {
public:
PackedRecordBatchWriter(size_t memory_limit,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
StorageConfig& storage_config,
parquet::WriterProperties& props);
const StorageConfig& storage_config);

// Put the record batch into the corresponding column group,
// , and write the maximum buffer of column group to the file.
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status Close();
// Close the writer and return the mapping of field name written to column offset.
std::unique_ptr<ColumnOffsetMapping> Close();

private:
// split first buffer into column groups based on column size
Expand All @@ -51,17 +51,19 @@ class PackedRecordBatchWriter {

Status writeWithSplitIndex(const std::shared_ptr<arrow::RecordBatch>& record, size_t batch_size);
Status balanceMaxHeap();
Status flushRemainingBuffer();
Status flushUnsplittedBuffer();

std::vector<std::shared_ptr<arrow::RecordBatch>> buffered_batches_;
bool size_split_done_;
size_t memory_limit_;
std::shared_ptr<arrow::Schema> schema_;
arrow::fs::FileSystem& fs_;
const std::string& file_path_;
const StorageConfig& storage_config_;
parquet::WriterProperties& props_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
const std::string file_path_;
const StorageConfig storage_config_;
size_t current_memory_usage_;
std::vector<std::unique_ptr<ColumnGroupWriter>> group_writers_;
std::vector<std::vector<int>> group_indices_;
IndicesBasedSplitter splitter_;
MemoryMaxHeap max_heap_;
};
Expand Down
39 changes: 39 additions & 0 deletions cpp/include/milvus-storage/packed/writer_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#ifdef __cplusplus
extern "C" {
#endif

#include <arrow/c/abi.h>

typedef void* CPackedWriter;
typedef void* CColumnOffsetMapping;

int NewPackedWriter(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedWriter* c_packed_writer);

int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema);

int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping);

void DeletePackedWriter(CPackedWriter c_packed_writer);

#ifdef __cplusplus
}
#endif
14 changes: 1 addition & 13 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,7 @@ ParquetFileWriter::ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config)
: schema_(std::move(schema)),
fs_(fs),
file_path_(file_path),
storage_config_(storage_config),
props_(*parquet::default_writer_properties()),
count_(0) {}

ParquetFileWriter::ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props)
: schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), props_(props) {}
: schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), count_(0) {}

Status ParquetFileWriter::Init() {
auto coln = schema_->num_fields();
Expand Down
14 changes: 0 additions & 14 deletions cpp/src/packed/column_group_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,6 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id,
column_group_(group_id, origin_column_indices),
finished_(false) {}

ColumnGroupWriter::ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const StorageConfig& storage_config,
const parquet::WriterProperties& props,
const std::vector<int>& origin_column_indices)
: group_id_(group_id),
writer_(std::move(schema), fs, file_path, storage_config, props),
column_group_(group_id, origin_column_indices),
flushed_batches_(0),
flushed_rows_(0),
finished_(false) {}

Status ColumnGroupWriter::Init() { return writer_.Init(); }

Status ColumnGroupWriter::Write(const std::shared_ptr<arrow::RecordBatch>& record) {
Expand Down
Loading

0 comments on commit e7ab8b4

Please sign in to comment.