Skip to content

Commit

Permalink
packed writer writes column offsets to meta
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 7, 2025
1 parent 894edfa commit 8150e22
Show file tree
Hide file tree
Showing 26 changed files with 319 additions and 242 deletions.
19 changes: 10 additions & 9 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class S3Fixture : public benchmark::Fixture {

static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
std::set<int> needed_columns = {0, 1, 2};
std::vector<ColumnOffsetPtr> column_offsets = {
std::make_shared<ColumnOffset>(0, 0),
std::make_shared<ColumnOffset>(1, 0),
std::make_shared<ColumnOffset>(1, 1),
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(1, 0),
ColumnOffset(1, 1),
};

auto paths = std::vector<std::string>{path + "/0", path + "/1"};
Expand Down Expand Up @@ -113,6 +113,8 @@ static void PackedWrite(benchmark::State& st,
size_t buffer_size) {
auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8())});
int pk_index = 0;
int ts_index = 1;
arrow::Int32Builder int_builder;
arrow::Int64Builder int64_builder;
arrow::StringBuilder str_builder;
Expand All @@ -137,18 +139,17 @@ static void PackedWrite(benchmark::State& st,
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf);
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, pk_index, ts_index, conf);
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
st.SkipWithError(r.ToString());
break;
}
}
auto column_index_groups = writer.Close();
if (column_index_groups->Size() == 0) {
st.SkipWithError("Failed to close writer");
break;
auto r = writer.Close();
if (!r.ok()) {
st.SkipWithError(r.ToString());
}
}
}
Expand Down
24 changes: 11 additions & 13 deletions cpp/include/milvus-storage/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ using namespace std;

namespace milvus_storage {

static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB

// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;

struct StorageConfig {
std::string uri = "";
std::string bucket_name = "";
Expand All @@ -44,17 +55,4 @@ struct StorageConfig {
}
};

static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB

// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;

} // namespace milvus_storage
62 changes: 60 additions & 2 deletions cpp/include/milvus-storage/common/serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,83 @@
#pragma once
#include <arrow/filesystem/filesystem.h>
#include <string>
#include <iostream>

namespace milvus_storage {

static const std::string GROUP_DELIMITER = ";";
static const std::string COLUMN_DELIMITER = ",";
static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";
static const std::string COLUMN_OFFSETS_META_KEY = "column_offsets";

class PackedMetaSerde {
public:
// Serialize a vector of size_t to a byte array and convert it to a string
static std::string serialize(const std::vector<size_t>& sizes) {
static std::string SerializeRowGroupSizes(const std::vector<size_t>& sizes) {
std::vector<uint8_t> byteArray(sizes.size() * sizeof(size_t));
std::memcpy(byteArray.data(), sizes.data(), byteArray.size());
return std::string(byteArray.begin(), byteArray.end());
}

// Deserialize a string back to a vector of size_t
static std::vector<size_t> deserialize(const std::string& input) {
static std::vector<size_t> DeserializeRowGroupSizes(const std::string& input) {
std::vector<uint8_t> byteArray(input.begin(), input.end());
std::vector<size_t> sizes(byteArray.size() / sizeof(size_t));
std::memcpy(sizes.data(), byteArray.data(), byteArray.size());
return sizes;
}

static std::string SerializeColumnOffsets(const std::vector<std::vector<int>>& column_offsets) {
std::stringstream ss;
for (size_t i = 0; i < column_offsets.size(); ++i) {
if (i > 0) {
ss << GROUP_DELIMITER;
}

for (size_t j = 0; j < column_offsets[i].size(); ++j) {
if (j > 0) {
ss << COLUMN_DELIMITER;
}
ss << column_offsets[i][j];
}
}

auto s = ss.str();
std::cout << "SerializeColumnOffsets: " << s << std::endl;
return s;
}

static std::vector<std::vector<int>> DeserializeColumnOffsets(const std::string& input) {
std::vector<std::vector<int>> column_offsets;

size_t group_start = 0;
size_t group_end = input.find(GROUP_DELIMITER);

while (group_start != std::string::npos) {
std::string group = input.substr(group_start, group_end - group_start);
std::vector<int> group_indices;

size_t column_start = 0;
size_t column_end = group.find(COLUMN_DELIMITER);
while (column_start != std::string::npos) {
std::string column = group.substr(column_start, column_end - column_start);
if (!column.empty()) {
group_indices.push_back(std::stoi(column));
}
column_start = (column_end == std::string::npos) ? std::string::npos : column_end + COLUMN_DELIMITER.size();
column_end = group.find(COLUMN_DELIMITER, column_start);
}

if (!group_indices.empty()) {
column_offsets.push_back(group_indices);
}

group_start = (group_end == std::string::npos) ? std::string::npos : group_end + GROUP_DELIMITER.size();
group_end = input.find(GROUP_DELIMITER, group_start);
}

return column_offsets;
}
};

} // namespace milvus_storage
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class ParquetFileWriter : public FileWriter {
Status WriteRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::vector<size_t>& batch_memory_sizes);

void AppendKVMetadata(const std::string& key, const std::string& value);

int64_t count() override;

Status Close() override;
Expand Down
13 changes: 11 additions & 2 deletions cpp/include/milvus-storage/packed/chunk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@

namespace milvus_storage {

struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset() = default;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

// record which chunk is in use and its offset in the file
struct ChunkState {
int chunk;
Expand All @@ -50,7 +59,7 @@ struct ChunkState {

class ChunkManager {
public:
ChunkManager(const std::vector<ColumnOffsetPtr>& column_offsets, int64_t chunksize);
ChunkManager(const std::vector<ColumnOffset>& column_offsets, int64_t chunksize);

std::vector<std::shared_ptr<arrow::ArrayData>> SliceChunksByMaxContiguousSlice(
int64_t chunksize, std::vector<std::queue<std::shared_ptr<arrow::Table>>>& tables);
Expand All @@ -62,7 +71,7 @@ class ChunkManager {
void SetChunkSize(int64_t chunksize) { chunksize_ = chunksize; }

private:
std::vector<ColumnOffsetPtr> column_offsets_;
std::vector<ColumnOffset> column_offsets_;
std::vector<ChunkState> chunk_states_;
int64_t chunksize_;
};
Expand Down
51 changes: 0 additions & 51 deletions cpp/include/milvus-storage/packed/column_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,55 +90,4 @@ struct ColumnGroupState {
void resetMemorySize() { this->memory_size = 0; }
};

struct ColumnOffset {
int path_index;
int col_index;

ColumnOffset() = default;

ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
};

using ColumnOffsetPtr = std::shared_ptr<ColumnOffset>;

// ColumnOffsetMapping is a map of original field name to ColumnOffset.
// The column offset is a pair of path index and a column index in the path.
class ColumnOffsetMapping {
public:
ColumnOffsetMapping() { mapping_ = std::map<std::string, ColumnOffsetPtr>(); };

ColumnOffsetMapping(const std::vector<std::vector<int>>& group_indices, const std::shared_ptr<arrow::Schema> schema) {
for (int path_index = 0; path_index < group_indices.size(); path_index++) {
for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) {
int original_col_index = group_indices[path_index][col_index];
std::string field_name = schema->field(original_col_index)->name();
mapping_[field_name] = std::make_shared<ColumnOffset>(path_index, col_index);
}
}
}

void AddColumnOffset(const std::string& field_name, int64_t path_index, int64_t col_index) {
auto column_offset = std::make_shared<ColumnOffset>(path_index, col_index);
mapping_[field_name] = column_offset;
}

ColumnOffsetPtr GetByFieldName(const std::string& field_name) {
if (Contains(field_name)) {
return mapping_.at(field_name);
}
return nullptr;
}

std::map<std::string, ColumnOffsetPtr> GetMapping() const { return mapping_; }

bool Contains(const std::string& key) const { return mapping_.find(key) != mapping_.end(); }

size_t Size() { return mapping_.size(); }

void Clear() { mapping_.clear(); }

private:
std::map<std::string, ColumnOffsetPtr> mapping_;
};

} // namespace milvus_storage
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/status.h"
#include "packed/column_group.h"
#include "common/config.h"
#include "common/serde.h"

namespace milvus_storage {

Expand All @@ -36,6 +37,7 @@ class ColumnGroupWriter {

Status Init();
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status WriteColumnOffsetsMeta(const std::vector<std::vector<int>>& column_offsets);
Status Flush();
Status Close();
GroupId Group_id() const;
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffsetPtr>& column_offsets,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

Expand All @@ -76,7 +76,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
int64_t row_limit_;
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffsetPtr> needed_column_offsets_;
std::vector<ColumnOffset> needed_column_offsets_;
std::vector<std::vector<size_t>> row_group_sizes_;
int read_count_;
};
Expand Down
11 changes: 10 additions & 1 deletion cpp/include/milvus-storage/packed/splitter/size_based_splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ namespace milvus_storage {

class SizeBasedSplitter : public SplitterPlugin {
public:
explicit SizeBasedSplitter(size_t max_group_size);
/*
* @brief SizeBasedSplitter is a splitter plugin that splits record batches into column groups based on the size of
* each column. The pk_index and ts_index are determined into the first group.
*/
explicit SizeBasedSplitter(size_t max_group_size, int pk_index, int ts_index);

void Init() override;

std::vector<ColumnGroup> SplitRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);

std::vector<ColumnGroup> Split(const std::shared_ptr<arrow::RecordBatch>& record) override;

private:
std::vector<int> initializeSmallGroupIndices();

int pk_index_;
int ts_index_;
size_t max_group_size_;
static constexpr size_t SPLIT_THRESHOLD = 1024; // 1K
};
Expand Down
8 changes: 6 additions & 2 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ class PackedRecordBatchWriter {
std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
const int pk_index,
const int ts_index,
const StorageConfig& storage_config);

// Put the record batch into the corresponding column group,
// , and write the maximum buffer of column group to the file.
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
// Close the writer and return the mapping of field name written to column offset.
std::unique_ptr<ColumnOffsetMapping> Close();
// Close the writer and write the mapping of column offset to the metadata of parquet file.
Status Close();

private:
// split first buffer into column groups based on column size
Expand All @@ -60,6 +62,8 @@ class PackedRecordBatchWriter {
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
const std::string file_path_;
const int pk_index_;
const int ts_index_;
const StorageConfig storage_config_;
size_t current_memory_usage_;
std::vector<std::unique_ptr<ColumnGroupWriter>> group_writers_;
Expand Down
6 changes: 3 additions & 3 deletions cpp/include/milvus-storage/packed/writer_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ typedef void* CColumnOffsetMapping;
int NewPackedWriter(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
const int pk_index,
const int ts_index,
CPackedWriter* c_packed_writer);

int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema);

int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping);

void DeletePackedWriter(CPackedWriter c_packed_writer);
int Close(CPackedWriter c_packed_writer);

#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/format/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ FileRecordBatchReader::FileRecordBatchReader(arrow::fs::FileSystem& fs,
LOG_STORAGE_ERROR_ << "Metadata not found in file: " << path;
throw std::runtime_error(metadata.status().ToString());
}
auto all_row_group_sizes = PackedMetaSerde::deserialize(metadata.ValueOrDie());
auto all_row_group_sizes = PackedMetaSerde::DeserializeRowGroupSizes(metadata.ValueOrDie());
if (row_group_offset >= all_row_group_sizes.size()) {
std::string error_msg =
"Row group offset exceeds total number of row groups. "
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vector<std::shared_ptr<a

int64_t ParquetFileWriter::count() { return count_; }

void ParquetFileWriter::AppendKVMetadata(const std::string& key, const std::string& value) {
kv_metadata_->Append(key, value);
}

Status ParquetFileWriter::Close() {
std::string meta = PackedMetaSerde::serialize(row_group_sizes_);
kv_metadata_->Append(ROW_GROUP_SIZE_META_KEY, meta);
std::string meta = PackedMetaSerde::SerializeRowGroupSizes(row_group_sizes_);
AppendKVMetadata(ROW_GROUP_SIZE_META_KEY, meta);
RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_));
RETURN_ARROW_NOT_OK(writer_->Close());
return Status::OK();
Expand Down
Loading

0 comments on commit 8150e22

Please sign in to comment.