Skip to content

Commit

Permalink
packed reader
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 7, 2025
1 parent ffc3bca commit 95b172e
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 48 deletions.
4 changes: 4 additions & 0 deletions cpp/include/milvus-storage/common/path_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s)
return {s.substr(0, pos), s.substr(pos + 1)};
}

static inline std::string ConcatenateFilePath(const std::string& parent, const std::string& child) {
return parent + kSep + child;
}

} // namespace milvus_storage
5 changes: 5 additions & 0 deletions cpp/include/milvus-storage/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Status {

static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); }

static Status ReaderError(const std::string& msg) { return Status(kReaderError, msg); }

static Status IOError(const std::string& msg) { return Status(kIOError, msg); }

bool ok() const { return code_ == kOk; }
Expand All @@ -51,6 +53,8 @@ class Status {

bool IsWriterError() const { return code_ == kWriterError; }

bool IsReaderError() const { return code_ == kReaderError; }

bool IsIOError() const { return code_ == kIOError; }

std::string ToString() const;
Expand All @@ -64,6 +68,7 @@ class Status {
kFileNotFound = 4,
kWriterError = 5,
kIOError = 6,
kReaderError = 7
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}
Expand Down
19 changes: 11 additions & 8 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,11 @@ using RowOffsetMinHeap =

class PackedRecordBatchReader : public arrow::RecordBatchReader {
public:
// Test only
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const int pk_index,
const int ts_index,
const std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

Expand All @@ -60,6 +55,10 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
arrow::Status Close() override;

private:
Status initializeColumnOffsets(arrow::fs::FileSystem& fs,
const std::set<int>& needed_columns,
const int pk_index,
const int ts_index);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();
std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;
Expand All @@ -77,8 +76,12 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffset> needed_column_offsets_;
std::set<int> needed_paths_;
std::vector<std::vector<size_t>> row_group_sizes_;
const std::string file_path_;
int read_count_;
int pk_index_;
int ts_index_;
};

} // namespace milvus_storage
7 changes: 6 additions & 1 deletion cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ typedef void* CReader;
typedef void* CRecordBatch;
typedef void* CFileSystem;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);
int Open(const char* path,
struct ArrowSchema* schema,
const int pk_index,
const int ts_index,
const int64_t buffer_size,
struct ArrowArrayStream* out);

#ifdef __cplusplus
}
Expand Down
92 changes: 61 additions & 31 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,33 @@
#include "packed/chunk_manager.h"
#include "common/config.h"
#include "common/serde.h"
#include "common/path_util.h"

namespace milvus_storage {

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size)
: PackedRecordBatchReader(
fs, std::vector<std::string>{path}, schema, std::vector<ColumnOffset>(), std::set<int>(), buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const int pk_index,
const int ts_index,
const std::set<int>& needed_columns,
const int64_t buffer_size)
: schema_(schema),
: file_path_(file_path),
schema_(schema),
buffer_available_(buffer_size),
memory_limit_(buffer_size),
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
auto cols = std::set(needed_columns);
if (cols.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
cols.emplace(i);
}
if (needed_columns.find(pk_index) == needed_columns.end() || needed_columns.find(ts_index) == needed_columns.end()) {
throw std::runtime_error("pk_index or ts_index should be in needed_columns");
}
auto offsets = std::vector<ColumnOffset>(column_offsets);
if (column_offsets.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
offsets.emplace_back(0, i);
}
}
std::set<int> needed_paths;
for (int i : cols) {
needed_column_offsets_.push_back(offsets[i]);
needed_paths.emplace(offsets[i].path_index);
auto status = initializeColumnOffsets(fs, needed_columns, pk_index, ts_index);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
for (auto i : needed_paths) {
auto result = MakeArrowFileReader(fs, paths[i]);
for (auto i : needed_paths_) {
auto result = MakeArrowFileReader(fs, ConcatenateFilePath(file_path_, std::to_string(i)));
if (!result.ok()) {
LOG_STORAGE_ERROR_ << "Error making file reader " << i << ":" << result.status().ToString();
throw std::runtime_error(result.status().ToString());
Expand All @@ -73,21 +59,65 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
}

for (int i = 0; i < file_readers_.size(); ++i) {
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
if (!metadata.ok()) {
LOG_STORAGE_ERROR_ << "metadata not found in file " << i;
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata();

auto row_group_size_meta = metadata->Get(ROW_GROUP_SIZE_META_KEY);
if (!row_group_size_meta.ok()) {
LOG_STORAGE_ERROR_ << "row group size meta not found in file " << i;
throw std::runtime_error(metadata.status().ToString());
}
row_group_sizes_.push_back(PackedMetaSerde::DeserializeRowGroupSizes(metadata.ValueOrDie()));
row_group_sizes_.push_back(PackedMetaSerde::DeserializeRowGroupSizes(row_group_size_meta.ValueOrDie()));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
}

// Initialize table states and chunk manager
column_group_states_.resize(file_readers_.size(), ColumnGroupState(0, -1, 0));
chunk_manager_ = std::make_unique<ChunkManager>(needed_column_offsets_, 0);
// tables are referrenced by column_offsets, so it's size is of paths's size.
tables_.resize(paths.size(), std::queue<std::shared_ptr<arrow::Table>>());
}

Status PackedRecordBatchReader::initializeColumnOffsets(arrow::fs::FileSystem& fs,
const std::set<int>& needed_columns,
const int pk_index,
const int ts_index) {
std::string path = ConcatenateFilePath(file_path_, std::to_string(0));
auto result = MakeArrowFileReader(fs, path);
if (!result.ok()) {
return Status::ReaderError("can not open file reader");
}

file_readers_.emplace_back(std::move(result.value()));
auto column_offset_meta = metadata->Get(COLUMN_OFFSETS_META_KEY);
if (!column_offset_meta.ok()) {
return Status::ReaderError("can not find column offset meta");
}
auto group_indices = PackedMetaSerde::DeserializeColumnOffsets(column_offset_meta.ValueOrDie());
std::vector<ColumnOffset> offsets;
for (int path_index = 0; path_index < group_indices.size(); path_index++) {
for (int col_index : group_indices[path_index]) {
offsets.emplace_back(ColumnOffset(path_index, col_index));
}
}
for (int col : needed_columns) {
swtich(col) {
case pk_index:
needed_paths_.emplace(offsets[0].path_index);
needed_column_offsets_.push_back(offsets[0]);
break;
case ts_index:
needed_paths_.emplace(offsets[1].path_index);
needed_column_offsets_.push_back(offsets[1]);
break;
default:
needed_paths_.emplace(offsets[col].path_index);
needed_column_offsets_.push_back(offsets[col]);
break;
}
}
return Status::OK();
}

std::shared_ptr<arrow::Schema> PackedRecordBatchReader::schema() const { return schema_; }

arrow::Status PackedRecordBatchReader::advanceBuffer() {
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
#include <iostream>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
int Open(const char* path,
struct ArrowSchema* schema,
const int pk_index,
const int ts_index,
const int64_t buffer_size,
struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
Expand All @@ -36,7 +41,8 @@ int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, pk_index, ts_index,
buffer_size);
auto status = ExportRecordBatchReader(reader, out);
LOG_STORAGE_ERROR_ << "read export done";
if (!status.ok()) {
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/config.h"
#include "filesystem/fs.h"
#include "common/arrow_util.h"
#include "common/path_util.h"

namespace milvus_storage {

Expand Down Expand Up @@ -77,7 +78,7 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
SizeBasedSplitter(max_group_size, pk_index_, ts_index_).SplitRecordBatches(buffered_batches_);
for (GroupId i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
std::string group_path = file_path_ + "/" + std::to_string(i);
std::string group_path = ConcatenateFilePath(file_path_, std::to_string(i));
auto writer = std::make_unique<ColumnGroupWriter>(i, group.Schema(), *fs_, group_path, storage_config_,
group.GetOriginColumnIndices());
RETURN_NOT_OK(writer->Init());
Expand Down Expand Up @@ -147,11 +148,8 @@ Status PackedRecordBatchWriter::flushRemainingBuffer() {
RETURN_NOT_OK(writer->Flush());
current_memory_usage_ -= max_group.second;
}
// the first column group writer needs to write column offsets meta
if (!group_writers_.empty()) {
group_writers_[0]->WriteColumnOffsetsMeta(group_indices_);
}
for (auto& writer : group_writers_) {
RETURN_NOT_OK(writer->WriteColumnOffsetsMeta(group_indices_));
RETURN_NOT_OK(writer->Close());
}
return Status::OK();
Expand All @@ -161,7 +159,7 @@ Status PackedRecordBatchWriter::flushUnsplittedBuffer() {
if (buffered_batches_.empty()) {
return Status::OK();
}
std::string group_path = file_path_ + "/" + std::to_string(0);
std::string group_path = ConcatenateFilePath(file_path_, std::to_string(0));
std::vector<int> indices;
indices.push_back(pk_index_);
indices.push_back(ts_index_);
Expand Down
2 changes: 2 additions & 0 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class PackedIntegrationTest : public PackedTestBase {};
TEST_F(PackedIntegrationTest, TestOneFile) {
int batch_size = 100;

// 0,1,2
std::vector<ColumnOffset> column_offsets = {
ColumnOffset(0, 0),
ColumnOffset(0, 1),
Expand Down Expand Up @@ -52,6 +53,7 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
ColumnOffset(0, 1),
ColumnOffset(1, 0),
};
// 0,1;2

PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, pk_index_, ts_index_, storage_config_);
for (int i = 0; i < batch_size; ++i) {
Expand Down

0 comments on commit 95b172e

Please sign in to comment.