diff --git a/cpp/benchmark/benchmark_packed.cpp b/cpp/benchmark/benchmark_packed.cpp index 634d3ab..296075e 100644 --- a/cpp/benchmark/benchmark_packed.cpp +++ b/cpp/benchmark/benchmark_packed.cpp @@ -84,7 +84,7 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st auto paths = std::vector{path + "/0", path + "/1"}; - // after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file + // after writing, the pk and the ts are in the first file, and the large str is in the second file std::vector> fields = { arrow::field("int", arrow::utf8()), arrow::field("int64", arrow::int32()), @@ -93,7 +93,7 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st auto schema = arrow::schema(fields); for (auto _ : st) { - PackedRecordBatchReader pr(*fs, paths, schema, column_offsets, needed_columns, buffer_size); + PackedRecordBatchReader pr(*fs, path, schema, needed_columns, buffer_size); auto r = arrow::RecordBatch::MakeEmpty(schema); SKIP_IF_NOT_OK(r.status(), st) auto rb = r.ValueOrDie(); @@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st } } -static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) { +static void PackedWrite(benchmark::State& st, + std::shared_ptr fs, + std::string& path, + size_t buffer_size) { auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()), arrow::field("str", arrow::utf8())}); arrow::Int32Builder int_builder; @@ -134,7 +137,7 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s auto conf = StorageConfig(); conf.use_custom_part_upload_size = true; conf.part_size = 30 * 1024 * 1024; - PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties()); + PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf); for (int i = 0; i < 8 * 1024; ++i) { auto r = writer.Write(record_batch); if (!r.ok()) { @@ -153,7 +156,7 @@ std::string PATH = "/tmp/bench/foo"; BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) { SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st); - PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024); + PackedWrite(st, fs_, PATH, 22 * 1024 * 1024); } BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime(); diff --git a/cpp/include/milvus-storage/common/config.h b/cpp/include/milvus-storage/common/config.h index 27b5ebc..e460854 100644 --- a/cpp/include/milvus-storage/common/config.h +++ b/cpp/include/milvus-storage/common/config.h @@ -15,11 +15,23 @@ #pragma once #include +#include using namespace std; namespace milvus_storage { +static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB + +// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596 +static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB + +static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE; + +// Default number of rows to read when using ::arrow::RecordBatchReader +static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024; +static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; + struct StorageConfig { std::string uri = ""; std::string bucket_name = ""; @@ -31,6 +43,7 @@ struct StorageConfig { std::string region = ""; bool use_custom_part_upload_size = false; int64_t part_size = 0; + parquet::WriterProperties writer_props = *parquet::default_writer_properties(); std::string ToString() const { std::stringstream ss; @@ -42,17 +55,4 @@ struct StorageConfig { } }; -static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB - -// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596 -static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB - -static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE; - -static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size"; - -// Default number of rows to read when using ::arrow::RecordBatchReader -static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024; -static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; - } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/common/path_util.h b/cpp/include/milvus-storage/common/path_util.h index 1bbec94..22f7ff3 100644 --- a/cpp/include/milvus-storage/common/path_util.h +++ b/cpp/include/milvus-storage/common/path_util.h @@ -21,13 +21,13 @@ namespace milvus_storage { constexpr char kSep = '/'; -arrow::Status NotAFile(std::string_view path) { +static inline arrow::Status NotAFile(std::string_view path) { return arrow::Status::IOError("Not a regular file: " + std::string(path)); } -bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; } +static inline bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; } -std::string EnsureTrailingSlash(std::string_view v) { +static inline std::string EnsureTrailingSlash(std::string_view v) { if (!v.empty() && !HasTrailingSlash(v)) { // XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"... // Unless the local filesystem always uses absolute paths @@ -37,7 +37,7 @@ std::string EnsureTrailingSlash(std::string_view v) { } } -std::pair GetAbstractPathParent(const std::string& s) { +static inline std::pair GetAbstractPathParent(const std::string& s) { // XXX should strip trailing slash? auto pos = s.find_last_of(kSep); @@ -48,4 +48,8 @@ std::pair GetAbstractPathParent(const std::string& s) return {s.substr(0, pos), s.substr(pos + 1)}; } +static inline std::string ConcatenateFilePath(const std::string& parent, const std::string& child) { + return parent + kSep + child; +} + } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/common/serde.h b/cpp/include/milvus-storage/common/serde.h index 46438df..632fdd1 100644 --- a/cpp/include/milvus-storage/common/serde.h +++ b/cpp/include/milvus-storage/common/serde.h @@ -18,22 +18,78 @@ namespace milvus_storage { +static const std::string GROUP_DELIMITER = ";"; +static const std::string COLUMN_DELIMITER = ","; +static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size"; +static const std::string COLUMN_OFFSETS_META_KEY = "column_offsets"; + class PackedMetaSerde { public: // Serialize a vector of size_t to a byte array and convert it to a string - static std::string serialize(const std::vector& sizes) { + static std::string SerializeRowGroupSizes(const std::vector& sizes) { std::vector byteArray(sizes.size() * sizeof(size_t)); std::memcpy(byteArray.data(), sizes.data(), byteArray.size()); return std::string(byteArray.begin(), byteArray.end()); } // Deserialize a string back to a vector of size_t - static std::vector deserialize(const std::string& input) { + static std::vector DeserializeRowGroupSizes(const std::string& input) { std::vector byteArray(input.begin(), input.end()); std::vector sizes(byteArray.size() / sizeof(size_t)); std::memcpy(sizes.data(), byteArray.data(), byteArray.size()); return sizes; } + + static std::string SerializeColumnOffsets(const std::vector>& column_offsets) { + std::stringstream ss; + for (size_t i = 0; i < column_offsets.size(); ++i) { + if (i > 0) { + ss << GROUP_DELIMITER; + } + + for (size_t j = 0; j < column_offsets[i].size(); ++j) { + if (j > 0) { + ss << COLUMN_DELIMITER; + } + ss << column_offsets[i][j]; + } + } + + auto s = ss.str(); + return s; + } + + static std::vector> DeserializeColumnOffsets(const std::string& input) { + std::vector> column_offsets; + + size_t group_start = 0; + size_t group_end = input.find(GROUP_DELIMITER); + + while (group_start != std::string::npos) { + std::string group = input.substr(group_start, group_end - group_start); + std::vector group_indices; + + size_t column_start = 0; + size_t column_end = group.find(COLUMN_DELIMITER); + while (column_start != std::string::npos) { + std::string column = group.substr(column_start, column_end - column_start); + if (!column.empty()) { + group_indices.push_back(std::stoi(column)); + } + column_start = (column_end == std::string::npos) ? std::string::npos : column_end + COLUMN_DELIMITER.size(); + column_end = group.find(COLUMN_DELIMITER, column_start); + } + + if (!group_indices.empty()) { + column_offsets.push_back(group_indices); + } + + group_start = (group_end == std::string::npos) ? std::string::npos : group_end + GROUP_DELIMITER.size(); + group_end = input.find(GROUP_DELIMITER, group_start); + } + + return column_offsets; + } }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/common/status.h b/cpp/include/milvus-storage/common/status.h index fa6d42c..3fa3828 100644 --- a/cpp/include/milvus-storage/common/status.h +++ b/cpp/include/milvus-storage/common/status.h @@ -37,6 +37,8 @@ class Status { static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); } + static Status ReaderError(const std::string& msg) { return Status(kReaderError, msg); } + static Status IOError(const std::string& msg) { return Status(kIOError, msg); } bool ok() const { return code_ == kOk; } @@ -51,6 +53,8 @@ class Status { bool IsWriterError() const { return code_ == kWriterError; } + bool IsReaderError() const { return code_ == kReaderError; } + bool IsIOError() const { return code_ == kIOError; } std::string ToString() const; @@ -64,6 +68,7 @@ class Status { kFileNotFound = 4, kWriterError = 5, kIOError = 6, + kReaderError = 7 }; explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {} diff --git a/cpp/include/milvus-storage/format/parquet/file_writer.h b/cpp/include/milvus-storage/format/parquet/file_writer.h index aa86b8f..d85be8e 100644 --- a/cpp/include/milvus-storage/format/parquet/file_writer.h +++ b/cpp/include/milvus-storage/format/parquet/file_writer.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include "arrow/filesystem/filesystem.h" #include "format/writer.h" @@ -27,19 +26,11 @@ namespace milvus_storage { class ParquetFileWriter : public FileWriter { public: - // with default WriterProperties ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, const StorageConfig& storage_config); - // with custom WriterProperties - ParquetFileWriter(std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props); - Status Init() override; Status Write(const arrow::RecordBatch& record) override; @@ -49,6 +40,8 @@ class ParquetFileWriter : public FileWriter { Status WriteRecordBatches(const std::vector>& batches, const std::vector& batch_memory_sizes); + void AppendKVMetadata(const std::string& key, const std::string& value); + int64_t count() override; Status Close() override; @@ -61,7 +54,6 @@ class ParquetFileWriter : public FileWriter { std::unique_ptr writer_; std::shared_ptr kv_metadata_; - parquet::WriterProperties props_; int64_t count_ = 0; int row_group_num_ = 0; std::vector row_group_sizes_; diff --git a/cpp/include/milvus-storage/packed/chunk_manager.h b/cpp/include/milvus-storage/packed/chunk_manager.h index 318ea58..d0d0a7f 100644 --- a/cpp/include/milvus-storage/packed/chunk_manager.h +++ b/cpp/include/milvus-storage/packed/chunk_manager.h @@ -14,6 +14,7 @@ #pragma once +#include "packed/column_group.h" #include #include #include @@ -28,7 +29,13 @@ struct ColumnOffset { int path_index; int col_index; + ColumnOffset() = default; + ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {} + + std::string ToString() { + return "path_index: " + std::to_string(path_index) + ", col_index: " + std::to_string(col_index); + } }; // record which chunk is in use and its offset in the file diff --git a/cpp/include/milvus-storage/packed/column_group.h b/cpp/include/milvus-storage/packed/column_group.h index ff53e99..d408a5f 100644 --- a/cpp/include/milvus-storage/packed/column_group.h +++ b/cpp/include/milvus-storage/packed/column_group.h @@ -14,9 +14,12 @@ #pragma once +#include #include #include #include "common/status.h" +#include +#include namespace milvus_storage { diff --git a/cpp/include/milvus-storage/packed/column_group_writer.h b/cpp/include/milvus-storage/packed/column_group_writer.h index df8a62c..17217f7 100644 --- a/cpp/include/milvus-storage/packed/column_group_writer.h +++ b/cpp/include/milvus-storage/packed/column_group_writer.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "packed/column_group.h" #include "common/config.h" +#include "common/serde.h" namespace milvus_storage { @@ -34,16 +35,9 @@ class ColumnGroupWriter { const StorageConfig& storage_config, const std::vector& origin_column_indices); - ColumnGroupWriter(GroupId group_id, - std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props, - const std::vector& origin_column_indices); - Status Init(); Status Write(const std::shared_ptr& record); + Status WriteColumnOffsetsMeta(const std::vector>& column_offsets); Status Flush(); Status Close(); GroupId Group_id() const; diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index c808af0..087dafe 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -40,16 +40,9 @@ using RowOffsetMinHeap = class PackedRecordBatchReader : public arrow::RecordBatchReader { public: - // Test only PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::string& path, + const std::string& file_path, const std::shared_ptr schema, - const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); - - PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::vector& paths, - const std::shared_ptr schema, - const std::vector& column_offsets, const std::set& needed_columns, const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE); @@ -60,6 +53,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { arrow::Status Close() override; private: + Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set& needed_columns, size_t num_fields); // Advance buffer to fill the expected buffer size arrow::Status advanceBuffer(); std::vector collectChunks(int64_t chunksize) const; @@ -77,7 +71,9 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { std::unique_ptr chunk_manager_; int64_t absolute_row_position_; std::vector needed_column_offsets_; + std::set needed_paths_; std::vector> row_group_sizes_; + const std::string file_path_; int read_count_; }; diff --git a/cpp/include/milvus-storage/packed/reader_c.h b/cpp/include/milvus-storage/packed/reader_c.h index c7576fa..b1f29f1 100644 --- a/cpp/include/milvus-storage/packed/reader_c.h +++ b/cpp/include/milvus-storage/packed/reader_c.h @@ -20,13 +20,21 @@ extern "C" { #include -typedef void* CReader; -typedef void* CStatus; -typedef void* CRecordBatch; -typedef void* CFileSystem; +typedef void* CPackedReader; +typedef void* CArrowArray; +typedef void* CArrowSchema; int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out); +int NewPackedReader(const char* path, + struct ArrowSchema* schema, + const int64_t buffer_size, + CPackedReader* c_packed_reader); + +int ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema); + +int CloseReader(CPackedReader c_packed_reader); + #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h b/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h index da8bd30..3051f53 100644 --- a/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h +++ b/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h @@ -20,6 +20,10 @@ namespace milvus_storage { class SizeBasedSplitter : public SplitterPlugin { public: + /* + * @brief SizeBasedSplitter is a splitter plugin that splits record batches into column groups based on the size of + * each column. + */ explicit SizeBasedSplitter(size_t max_group_size); void Init() override; diff --git a/cpp/include/milvus-storage/packed/writer.h b/cpp/include/milvus-storage/packed/writer.h index 5faa6b1..c781e32 100644 --- a/cpp/include/milvus-storage/packed/writer.h +++ b/cpp/include/milvus-storage/packed/writer.h @@ -34,14 +34,14 @@ class PackedRecordBatchWriter { public: PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, - arrow::fs::FileSystem& fs, + std::shared_ptr fs, const std::string& file_path, - StorageConfig& storage_config, - parquet::WriterProperties& props); + const StorageConfig& storage_config); // Put the record batch into the corresponding column group, // , and write the maximum buffer of column group to the file. Status Write(const std::shared_ptr& record); + // Close the writer and write the mapping of column offset to the metadata of parquet file. Status Close(); private: @@ -51,17 +51,19 @@ class PackedRecordBatchWriter { Status writeWithSplitIndex(const std::shared_ptr& record, size_t batch_size); Status balanceMaxHeap(); + Status flushRemainingBuffer(); + Status flushUnsplittedBuffer(); std::vector> buffered_batches_; bool size_split_done_; size_t memory_limit_; std::shared_ptr schema_; - arrow::fs::FileSystem& fs_; - const std::string& file_path_; - const StorageConfig& storage_config_; - parquet::WriterProperties& props_; + std::shared_ptr fs_; + const std::string file_path_; + const StorageConfig storage_config_; size_t current_memory_usage_; std::vector> group_writers_; + std::vector> group_indices_; IndicesBasedSplitter splitter_; MemoryMaxHeap max_heap_; }; diff --git a/cpp/include/milvus-storage/packed/writer_c.h b/cpp/include/milvus-storage/packed/writer_c.h new file mode 100644 index 0000000..8a876c4 --- /dev/null +++ b/cpp/include/milvus-storage/packed/writer_c.h @@ -0,0 +1,36 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef void* CPackedWriter; + +int NewPackedWriter(const char* path, + struct ArrowSchema* schema, + const int64_t buffer_size, + CPackedWriter* c_packed_writer); + +int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema); + +int CloseWriter(CPackedWriter c_packed_writer); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp b/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp index 1c91496..98287ff 100644 --- a/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp +++ b/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp @@ -74,8 +74,6 @@ #include #include #include -#include -using namespace std; static constexpr const char kSep = '/'; diff --git a/cpp/src/format/parquet/file_reader.cpp b/cpp/src/format/parquet/file_reader.cpp index 51478e1..c120da6 100644 --- a/cpp/src/format/parquet/file_reader.cpp +++ b/cpp/src/format/parquet/file_reader.cpp @@ -50,7 +50,7 @@ FileRecordBatchReader::FileRecordBatchReader(arrow::fs::FileSystem& fs, LOG_STORAGE_ERROR_ << "Metadata not found in file: " << path; throw std::runtime_error(metadata.status().ToString()); } - auto all_row_group_sizes = PackedMetaSerde::deserialize(metadata.ValueOrDie()); + auto all_row_group_sizes = PackedMetaSerde::DeserializeRowGroupSizes(metadata.ValueOrDie()); if (row_group_offset >= all_row_group_sizes.size()) { std::string error_msg = "Row group offset exceeds total number of row groups. " diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index 96d96f3..b1884dc 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -29,19 +29,7 @@ ParquetFileWriter::ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, const StorageConfig& storage_config) - : schema_(std::move(schema)), - fs_(fs), - file_path_(file_path), - storage_config_(storage_config), - props_(*parquet::default_writer_properties()), - count_(0) {} - -ParquetFileWriter::ParquetFileWriter(std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props) - : schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), props_(props) {} + : schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), count_(0) {} Status ParquetFileWriter::Init() { auto coln = schema_->num_fields(); @@ -102,9 +90,13 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vectorAppend(key, value); +} + Status ParquetFileWriter::Close() { - std::string meta = PackedMetaSerde::serialize(row_group_sizes_); - kv_metadata_->Append(ROW_GROUP_SIZE_META_KEY, meta); + std::string meta = PackedMetaSerde::SerializeRowGroupSizes(row_group_sizes_); + AppendKVMetadata(ROW_GROUP_SIZE_META_KEY, meta); RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_)); RETURN_ARROW_NOT_OK(writer_->Close()); return Status::OK(); diff --git a/cpp/src/packed/column_group_writer.cpp b/cpp/src/packed/column_group_writer.cpp index fec7b47..809a258 100644 --- a/cpp/src/packed/column_group_writer.cpp +++ b/cpp/src/packed/column_group_writer.cpp @@ -36,20 +36,6 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, column_group_(group_id, origin_column_indices), finished_(false) {} -ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, - std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props, - const std::vector& origin_column_indices) - : group_id_(group_id), - writer_(std::move(schema), fs, file_path, storage_config, props), - column_group_(group_id, origin_column_indices), - flushed_batches_(0), - flushed_rows_(0), - finished_(false) {} - Status ColumnGroupWriter::Init() { return writer_.Init(); } Status ColumnGroupWriter::Write(const std::shared_ptr& record) { @@ -75,6 +61,12 @@ Status ColumnGroupWriter::Flush() { return Status::OK(); } +Status ColumnGroupWriter::WriteColumnOffsetsMeta(const std::vector>& column_offsets) { + std::string meta = PackedMetaSerde::SerializeColumnOffsets(column_offsets); + writer_.AppendKVMetadata(COLUMN_OFFSETS_META_KEY, meta); + return Status::OK(); +} + Status ColumnGroupWriter::Close() { finished_ = true; LOG_STORAGE_DEBUG_ << "Group " << group_id_ << " flushed " << flushed_batches_ << " batches and " << flushed_rows_ diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index fd2226a..83c4334 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -24,47 +24,28 @@ #include "packed/chunk_manager.h" #include "common/config.h" #include "common/serde.h" +#include "common/path_util.h" namespace milvus_storage { PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::string& path, + const std::string& file_path, const std::shared_ptr schema, - const int64_t buffer_size) - : PackedRecordBatchReader( - fs, std::vector{path}, schema, std::vector(), std::set(), buffer_size) {} - -PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, - const std::vector& paths, - const std::shared_ptr schema, - const std::vector& column_offsets, const std::set& needed_columns, const int64_t buffer_size) - : schema_(schema), + : file_path_(file_path), + schema_(schema), buffer_available_(buffer_size), memory_limit_(buffer_size), row_limit_(0), absolute_row_position_(0), read_count_(0) { - auto cols = std::set(needed_columns); - if (cols.empty()) { - for (int i = 0; i < schema->num_fields(); i++) { - cols.emplace(i); - } + auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields()); + if (!status.ok()) { + throw std::runtime_error(status.ToString()); } - auto offsets = std::vector(column_offsets); - if (column_offsets.empty()) { - for (int i = 0; i < schema->num_fields(); i++) { - offsets.emplace_back(0, i); - } - } - std::set needed_paths; - for (int i : cols) { - needed_column_offsets_.push_back(offsets[i]); - needed_paths.emplace(offsets[i].path_index); - } - for (auto i : needed_paths) { - auto result = MakeArrowFileReader(fs, paths[i]); + for (auto i : needed_paths_) { + auto result = MakeArrowFileReader(fs, ConcatenateFilePath(file_path_, std::to_string(i))); if (!result.ok()) { LOG_STORAGE_ERROR_ << "Error making file reader " << i << ":" << result.status().ToString(); throw std::runtime_error(result.status().ToString()); @@ -73,19 +54,50 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, } for (int i = 0; i < file_readers_.size(); ++i) { - auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY); - if (!metadata.ok()) { - LOG_STORAGE_ERROR_ << "metadata not found in file " << i; - throw std::runtime_error(metadata.status().ToString()); + auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata(); + + auto row_group_size_meta = metadata->Get(ROW_GROUP_SIZE_META_KEY); + if (!row_group_size_meta.ok()) { + LOG_STORAGE_ERROR_ << "row group size meta not found in file " << i; + throw std::runtime_error(row_group_size_meta.status().ToString()); } - row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie())); + row_group_sizes_.push_back(PackedMetaSerde::DeserializeRowGroupSizes(row_group_size_meta.ValueOrDie())); LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size(); } + // Initialize table states and chunk manager column_group_states_.resize(file_readers_.size(), ColumnGroupState(0, -1, 0)); chunk_manager_ = std::make_unique(needed_column_offsets_, 0); // tables are referrenced by column_offsets, so it's size is of paths's size. - tables_.resize(paths.size(), std::queue>()); + tables_.resize(needed_paths_.size(), std::queue>()); +} + +Status PackedRecordBatchReader::initializeColumnOffsets(arrow::fs::FileSystem& fs, + const std::set& needed_columns, + size_t num_fields) { + std::string path = ConcatenateFilePath(file_path_, std::to_string(0)); + auto reader = MakeArrowFileReader(fs, path); + if (!reader.ok()) { + return Status::ReaderError("can not open file reader"); + } + auto metadata = reader.value()->parquet_reader()->metadata()->key_value_metadata(); + auto column_offset_meta = metadata->Get(COLUMN_OFFSETS_META_KEY); + if (!column_offset_meta.ok()) { + return Status::ReaderError("can not find column offset meta"); + } + auto group_indices = PackedMetaSerde::DeserializeColumnOffsets(column_offset_meta.ValueOrDie()); + std::vector offsets(num_fields); + for (int path_index = 0; path_index < group_indices.size(); path_index++) { + for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) { + int origin_col = group_indices[path_index][col_index]; + offsets[origin_col] = ColumnOffset(path_index, col_index); + } + } + for (int col : needed_columns) { + needed_paths_.emplace(offsets[col].path_index); + needed_column_offsets_.push_back(offsets[col]); + } + return Status::OK(); } std::shared_ptr PackedRecordBatchReader::schema() const { return schema_; } diff --git a/cpp/src/packed/reader_c.cpp b/cpp/src/packed/reader_c.cpp index a94279e..9892e63 100644 --- a/cpp/src/packed/reader_c.cpp +++ b/cpp/src/packed/reader_c.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) { @@ -36,12 +35,78 @@ int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size } auto trueFs = r.value(); auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); - auto reader = std::make_shared(*trueFs, path, trueSchema, buffer_size); + std::set needed_columns; + for (int i = 0; i < trueSchema->num_fields(); i++) { + needed_columns.emplace(i); + } + auto reader = + std::make_shared(*trueFs, path, trueSchema, needed_columns, buffer_size); auto status = ExportRecordBatchReader(reader, out); - LOG_STORAGE_ERROR_ << "read export done"; if (!status.ok()) { LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString(); return static_cast(status.code()); } return 0; +} + +int NewPackedReader(const char* path, + struct ArrowSchema* schema, + const int64_t buffer_size, + CPackedReader* c_packed_reader) { + try { + auto truePath = std::string(path); + auto factory = std::make_shared(); + auto conf = milvus_storage::StorageConfig(); + conf.uri = "file:///tmp/"; + auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); + auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); + std::set needed_columns; + for (int i = 0; i < trueSchema->num_fields(); i++) { + needed_columns.emplace(i); + } + auto reader = std::make_unique(*trueFs, path, trueSchema, needed_columns, + buffer_size); + *c_packed_reader = reader.release(); + return 0; + } catch (std::exception& e) { + return -1; + } +} + +int ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema) { + try { + auto packed_reader = static_cast(c_packed_reader); + std::shared_ptr record_batch; + auto status = packed_reader->ReadNext(&record_batch); + if (!status.ok()) { + return -1; + } + if (record_batch == nullptr) { + // end of file + return 0; + } else { + std::unique_ptr arr = std::make_unique(); + std::unique_ptr schema = std::make_unique(); + auto status = arrow::ExportRecordBatch(*record_batch, arr.get(), schema.get()); + if (!status.ok()) { + return -1; + } + *out_array = arr.release(); + *out_schema = schema.release(); + return 0; + } + return 0; + } catch (std::exception& e) { + return -1; + } +} + +int CloseReader(CPackedReader c_packed_reader) { + try { + auto packed_reader = static_cast(c_packed_reader); + delete packed_reader; + return 0; + } catch (std::exception& e) { + return -1; + } } \ No newline at end of file diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index 1d5e449..c6d0067 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -25,21 +25,20 @@ #include "common/config.h" #include "filesystem/fs.h" #include "common/arrow_util.h" +#include "common/path_util.h" namespace milvus_storage { PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, - arrow::fs::FileSystem& fs, + std::shared_ptr fs, const std::string& file_path, - StorageConfig& storage_config, - parquet::WriterProperties& props) + const StorageConfig& storage_config) : memory_limit_(memory_limit), schema_(std::move(schema)), - fs_(fs), + fs_(std::move(fs)), file_path_(file_path), storage_config_(storage_config), - props_(props), splitter_({}), current_memory_usage_(0), size_split_done_(false) {} @@ -63,24 +62,23 @@ Status PackedRecordBatchWriter::Write(const std::shared_ptr& } Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { - std::vector groups = - SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_); - std::vector> group_indices; + auto max_group_size = buffered_batches_[0]->num_columns(); + std::vector groups = SizeBasedSplitter(max_group_size).SplitRecordBatches(buffered_batches_); for (GroupId i = 0; i < groups.size(); ++i) { auto& group = groups[i]; - std::string group_path = file_path_ + "/" + std::to_string(i); - auto writer = std::make_unique(i, group.Schema(), fs_, group_path, storage_config_, props_, + std::string group_path = ConcatenateFilePath(file_path_, std::to_string(i)); + auto writer = std::make_unique(i, group.Schema(), *fs_, group_path, storage_config_, group.GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); for (auto& batch : group.GetRecordBatches()) { - RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0))); + RETURN_NOT_OK(writer->Write(batch)); } max_heap_.emplace(i, group.GetMemoryUsage()); - group_indices.emplace_back(group.GetOriginColumnIndices()); + group_indices_.emplace_back(group.GetOriginColumnIndices()); group_writers_.emplace_back(std::move(writer)); } - splitter_ = IndicesBasedSplitter(group_indices); + splitter_ = IndicesBasedSplitter(group_indices_); // check memory usage limit size_t min_memory_limit = groups.size() * ARROW_PART_UPLOAD_SIZE; @@ -120,21 +118,15 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr indices(buffered_batches_[0]->num_columns()); - std::iota(std::begin(indices), std::end(indices), 0); - auto writer = std::make_unique(0, buffered_batches_[0]->schema(), fs_, group_path, - storage_config_, props_, indices); - RETURN_NOT_OK(writer->Init()); - for (auto& batch : buffered_batches_) { - RETURN_NOT_OK(writer->Write(batch)); - } - RETURN_NOT_OK(writer->Flush()); - RETURN_NOT_OK(writer->Close()); - return Status::OK(); + return flushUnsplittedBuffer(); } - // flush all remaining column groups before closing' + // flush all remaining column groups before closing + return flushRemainingBuffer(); +} + +Status PackedRecordBatchWriter::flushRemainingBuffer() { while (!max_heap_.empty()) { auto max_group = max_heap_.top(); max_heap_.pop(); @@ -145,11 +137,35 @@ Status PackedRecordBatchWriter::Close() { current_memory_usage_ -= max_group.second; } for (auto& writer : group_writers_) { + RETURN_NOT_OK(writer->WriteColumnOffsetsMeta(group_indices_)); RETURN_NOT_OK(writer->Close()); } return Status::OK(); } +Status PackedRecordBatchWriter::flushUnsplittedBuffer() { + if (buffered_batches_.empty()) { + return Status::OK(); + } + std::string group_path = ConcatenateFilePath(file_path_, std::to_string(0)); + std::vector indices(buffered_batches_[0]->num_columns()); + std::iota(std::begin(indices), std::end(indices), 0); + group_indices_.emplace_back(indices); + splitter_ = IndicesBasedSplitter(group_indices_); + std::vector column_groups = splitter_.Split(buffered_batches_[0]); + assert(column_groups.size() == 1); + auto writer = std::make_unique(0, column_groups[0].Schema(), *fs_, group_path, storage_config_, + column_groups[0].GetOriginColumnIndices()); + RETURN_NOT_OK(writer->Init()); + for (int i = 0; i < buffered_batches_.size(); ++i) { + RETURN_NOT_OK(writer->Write(buffered_batches_[i])); + } + RETURN_NOT_OK(writer->Flush()); + RETURN_NOT_OK(writer->WriteColumnOffsetsMeta(group_indices_)); + RETURN_NOT_OK(writer->Close()); + return Status::OK(); +} + Status PackedRecordBatchWriter::balanceMaxHeap() { std::map group_map; while (!max_heap_.empty()) { diff --git a/cpp/src/packed/writer_c.cpp b/cpp/src/packed/writer_c.cpp new file mode 100644 index 0000000..540418e --- /dev/null +++ b/cpp/src/packed/writer_c.cpp @@ -0,0 +1,71 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "packed/writer_c.h" +#include "packed/writer.h" +#include "common/log.h" +#include "common/config.h" +#include "filesystem/fs.h" + +#include +#include + +int NewPackedWriter(const char* path, + struct ArrowSchema* schema, + const int64_t buffer_size, + CPackedWriter* c_packed_writer) { + try { + auto truePath = std::string(path); + auto factory = std::make_shared(); + auto conf = milvus_storage::StorageConfig(); + conf.uri = "file:///tmp/"; + auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); + auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); + auto writer = + std::make_unique(buffer_size, trueSchema, trueFs, truePath, conf); + + *c_packed_writer = writer.release(); + return 0; + } catch (std::exception& e) { + return -1; + } +} + +int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema) { + try { + auto packed_writer = static_cast(c_packed_writer); + auto record_batch = arrow::ImportRecordBatch(array, schema).ValueOrDie(); + auto status = packed_writer->Write(record_batch); + if (!status.ok()) { + return -1; + } + return 0; + } catch (std::exception& e) { + return -1; + } +} + +int CloseWriter(CPackedWriter c_packed_writer) { + try { + auto packed_writer = static_cast(c_packed_writer); + auto status = packed_writer->Close(); + delete packed_writer; + if (!status.ok()) { + return -1; + } + return 0; + } catch (std::exception& e) { + return -1; + } +} \ No newline at end of file diff --git a/cpp/test/common/serde_test.cpp b/cpp/test/common/serde_test.cpp new file mode 100644 index 0000000..110add7 --- /dev/null +++ b/cpp/test/common/serde_test.cpp @@ -0,0 +1,54 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "gtest/gtest.h" +#include "common/serde.h" + +namespace milvus_storage { + +class SerdeTest : public testing::Test {}; + +TEST_F(SerdeTest, TestColumnOffsetSerde) { + std::vector> column_offsets = {{0, 1, 2}, {3, 4}, {5, 6, 7, 8}}; + + std::string serialized = PackedMetaSerde::SerializeColumnOffsets(column_offsets); + EXPECT_EQ(serialized, "0,1,2;3,4;5,6,7,8"); + + std::vector> deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_EQ(deserialized, column_offsets); + + // Test case: Empty input + std::vector> empty_offsets = {}; + serialized = PackedMetaSerde::SerializeColumnOffsets(empty_offsets); + EXPECT_EQ(serialized, ""); + + deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_TRUE(deserialized.empty()); + + // Test case: Single group + std::vector> single_group = {{1, 2, 3}}; + serialized = PackedMetaSerde::SerializeColumnOffsets(single_group); + EXPECT_EQ(serialized, "1,2,3"); + + deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_EQ(deserialized, single_group); + + // Test case: Single column in each group + std::vector> single_column_groups = {{0}, {1}, {2}}; + serialized = PackedMetaSerde::SerializeColumnOffsets(single_column_groups); + EXPECT_EQ(serialized, "0;1;2"); + + deserialized = PackedMetaSerde::DeserializeColumnOffsets(serialized); + EXPECT_EQ(deserialized, single_column_groups); +} +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/format/parquet/file_reader_test.cpp b/cpp/test/format/parquet/file_reader_test.cpp index ce95c41..ae7df0d 100644 --- a/cpp/test/format/parquet/file_reader_test.cpp +++ b/cpp/test/format/parquet/file_reader_test.cpp @@ -21,11 +21,11 @@ class FileReaderTest : public PackedTestBase {}; TEST_F(FileReaderTest, FileRecordBatchReader) { int batch_size = 100; - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_); + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } - EXPECT_TRUE(writer.Close().ok()); + auto column_index_groups = writer.Close(); std::vector> fields = { arrow::field("int32", arrow::int32()), @@ -54,7 +54,7 @@ TEST_F(FileReaderTest, FileRecordBatchReader) { ColumnOffset(0, 1), ColumnOffset(0, 2), }; - PackedRecordBatchReader pr(*fs_, {path}, schema, column_offsets, needed_columns, reader_memory_); + PackedRecordBatchReader pr(*fs_, file_path_, schema, needed_columns, reader_memory_); ASSERT_AND_ARROW_ASSIGN(auto pr_table, pr.ToTable()); ASSERT_STATUS_OK(pr.Close()); ASSERT_EQ(fr_table->num_rows(), pr_table->num_rows()); diff --git a/cpp/test/packed/one_file_test.cpp b/cpp/test/packed/one_file_test.cpp deleted file mode 100644 index c0d6cc7..0000000 --- a/cpp/test/packed/one_file_test.cpp +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2024 Zilliz -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "packed_test_base.h" - -namespace milvus_storage { - -class OneFileTest : public PackedTestBase {}; - -TEST_F(OneFileTest, WriteAndRead) { - int batch_size = 100; - - std::vector column_offsets = { - ColumnOffset(0, 0), - ColumnOffset(0, 1), - ColumnOffset(0, 2), - }; - - std::vector paths = {file_path_ + "/0"}; - - std::vector> fields = { - arrow::field("int32", arrow::int32()), - arrow::field("int64", arrow::int64()), - arrow::field("str", arrow::utf8()), - }; - - TestWriteAndRead(batch_size, paths, fields, column_offsets); -} - -} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/packed/packed_integration_test.cpp b/cpp/test/packed/packed_integration_test.cpp index e6daf9c..2188fa3 100644 --- a/cpp/test/packed/packed_integration_test.cpp +++ b/cpp/test/packed/packed_integration_test.cpp @@ -18,24 +18,44 @@ namespace milvus_storage { class PackedIntegrationTest : public PackedTestBase {}; -TEST_F(PackedIntegrationTest, WriteAndRead) { - int batch_size = 100000; +TEST_F(PackedIntegrationTest, TestOneFile) { + int batch_size = 100; - std::vector column_offsets = { - ColumnOffset(0, 0), - ColumnOffset(1, 0), - ColumnOffset(1, 1), - }; + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); + for (int i = 0; i < batch_size; ++i) { + EXPECT_TRUE(writer.Write(record_batch_).ok()); + } + EXPECT_TRUE(writer.Close().ok()); + + std::vector paths = {file_path_ + "/0"}; + + std::set needed_columns = {0, 1, 2}; + + PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_); + ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); + ASSERT_STATUS_OK(pr.Close()); + + ValidateTableData(table); +} + +TEST_F(PackedIntegrationTest, TestSplitColumnGroup) { + int batch_size = 1000; + + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); + for (int i = 0; i < batch_size; ++i) { + EXPECT_TRUE(writer.Write(record_batch_).ok()); + } + EXPECT_TRUE(writer.Close().ok()); std::vector paths = {file_path_ + "/0", file_path_ + "/1"}; - std::vector> fields = { - arrow::field("str", arrow::utf8()), - arrow::field("int32", arrow::int32()), - arrow::field("int64", arrow::int64()), - }; + std::set needed_columns = {0, 1, 2}; + + PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_); + ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); + ASSERT_STATUS_OK(pr.Close()); - TestWriteAndRead(batch_size, paths, fields, column_offsets); + ValidateTableData(table); } } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index 506e1e4..bea630f 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -32,19 +32,17 @@ #include #include +#include #include #include #include #include #include -#include namespace milvus_storage { class PackedTestBase : public ::testing::Test { protected: - PackedTestBase() : props_(*parquet::default_writer_properties()) {} - void SetUp() override { const char* access_key = std::getenv("ACCESS_KEY"); const char* secret_key = std::getenv("SECRET_KEY"); @@ -67,7 +65,6 @@ class PackedTestBase : public ::testing::Test { ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(storage_config_, &file_path_)); SetUpCommonData(); - props_ = *parquet::default_writer_properties(); writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB for S3FS part upload reader_memory_ = 16 * 1024 * 1024; // 16 MB for reading } @@ -85,26 +82,6 @@ class PackedTestBase : public ::testing::Test { return path; } - void TestWriteAndRead(int batch_size, - const std::vector& paths, - const std::vector>& fields, - const std::vector& column_offsets) { - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_); - for (int i = 0; i < batch_size; ++i) { - EXPECT_TRUE(writer.Write(record_batch_).ok()); - } - EXPECT_TRUE(writer.Close().ok()); - - std::set needed_columns = {0, 1, 2}; - auto new_schema = arrow::schema(fields); - - PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_); - ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); - ASSERT_STATUS_OK(pr.Close()); - - ValidateTableData(table); - } - void ValidateTableData(const std::shared_ptr& table) { int64_t total_rows = table->num_rows(); @@ -191,7 +168,6 @@ class PackedTestBase : public ::testing::Test { size_t reader_memory_; std::shared_ptr fs_; std::string file_path_; - parquet::WriterProperties props_; StorageConfig storage_config_; std::shared_ptr schema_; diff --git a/go/Makefile b/go/Makefile index 0ad0727..059abb4 100644 --- a/go/Makefile +++ b/go/Makefile @@ -22,7 +22,7 @@ test: LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \ CGO_CFLAGS="$(CPPFLAGS)" \ CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \ - go test -timeout 30s ./... + go test -count=1 -timeout 30s ./... proto: mkdir -p proto/manifest_proto diff --git a/go/packed/packed_reader.go b/go/packed/packed_reader.go new file mode 100644 index 0000000..b386e73 --- /dev/null +++ b/go/packed/packed_reader.go @@ -0,0 +1,82 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packed + +/* +#include +#include "milvus-storage/packed/reader_c.h" +#include "arrow/c/abi.h" +#include "arrow/c/helpers.h" +*/ +import "C" + +import ( + "errors" + "fmt" + "unsafe" + + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/cdata" +) + +func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) { + var cas cdata.CArrowSchema + cdata.ExportArrowSchema(schema, &cas) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) + + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + + cBufferSize := C.int64_t(bufferSize) + + var cPackedReader C.CPackedReader + status := C.NewPackedReader(cPath, cSchema, cBufferSize, &cPackedReader) + if status != 0 { + return nil, errors.New(fmt.Sprintf("failed to new packed reader: %s, status: %d", path, status)) + } + return &PackedReader{cPackedReader: cPackedReader, schema: schema}, nil +} + +func (pr *PackedReader) readNext() (arrow.Record, error) { + var cArr C.CArrowArray + var cSchema C.CArrowSchema + status := C.ReadNext(pr.cPackedReader, &cArr, &cSchema) + if status != 0 { + return nil, fmt.Errorf("ReadNext failed with error code %d", status) + } + + if cArr == nil { + return nil, nil // end of stream, no more records to read + } + + // Convert ArrowArray to Go RecordBatch using cdata + goCArr := (*cdata.CArrowArray)(unsafe.Pointer(cArr)) + goCSchema := (*cdata.CArrowSchema)(unsafe.Pointer(cSchema)) + recordBatch, err := cdata.ImportCRecordBatch(goCArr, goCSchema) + if err != nil { + return nil, fmt.Errorf("failed to convert ArrowArray to Record: %w", err) + } + + // Return the RecordBatch as an arrow.Record + return recordBatch, nil +} + +func (pr *PackedReader) close() error { + status := C.CloseReader(pr.cPackedReader) + if status != 0 { + return errors.New("PackedReader: failed to close file") + } + return nil +} diff --git a/go/packed/packed_test.go b/go/packed/packed_test.go index d236c4b..e05fb21 100644 --- a/go/packed/packed_test.go +++ b/go/packed/packed_test.go @@ -23,7 +23,8 @@ import ( "github.com/stretchr/testify/assert" ) -func TestRead(t *testing.T) { +func TestPacked(t *testing.T) { + batches := 100 schema := arrow.NewSchema([]arrow.Field{ {Name: "a", Type: arrow.PrimitiveTypes.Int32}, {Name: "b", Type: arrow.PrimitiveTypes.Int64}, @@ -48,22 +49,23 @@ func TestRead(t *testing.T) { ) } } - //rec := b.NewRecord() - - path := "testdata/0" - // file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666) - // assert.NoError(t, err) - // writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps()) - // assert.NoError(t, err) - // err = writer.Write(rec) - // assert.NoError(t, err) - // err = writer.Close() - // assert.NoError(t, err) + rec := b.NewRecord() + defer rec.Release() + path := "/tmp" + bufferSize := 10 * 1024 * 1024 // 10MB + pw, err := newPackedWriter(path, schema, bufferSize) + assert.NoError(t, err) + for i := 0; i < batches; i++ { + err = pw.writeRecordBatch(rec) + assert.NoError(t, err) + } + err = pw.close() + assert.NoError(t, err) - reader, err := Open(path, schema, 10*1024*1024 /* 10MB */) + reader, err := newPackedReader(path, schema, bufferSize) assert.NoError(t, err) - rr, err := reader.Read() + rr, err := reader.readNext() assert.NoError(t, err) defer rr.Release() - assert.Equal(t, int64(300), rr.NumRows()) + assert.Equal(t, int64(3*batches), rr.NumRows()) } diff --git a/go/packed/packed_writer.go b/go/packed/packed_writer.go new file mode 100644 index 0000000..c7ab8ad --- /dev/null +++ b/go/packed/packed_writer.go @@ -0,0 +1,75 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packed + +/* +#include +#include "milvus-storage/packed/writer_c.h" +#include "arrow/c/abi.h" +#include "arrow/c/helpers.h" +*/ +import "C" + +import ( + "errors" + "fmt" + "unsafe" + + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/cdata" +) + +func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { + var cas cdata.CArrowSchema + cdata.ExportArrowSchema(schema, &cas) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) + + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + + cBufferSize := C.int64_t(bufferSize) + + var cPackedWriter C.CPackedWriter + status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &cPackedWriter) + if status != 0 { + return nil, errors.New(fmt.Sprintf("failed to new packed writer: %s, status: %d", path, status)) + } + return &PackedWriter{cPackedWriter: cPackedWriter}, nil +} + +func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error { + var caa cdata.CArrowArray + var cas cdata.CArrowSchema + + cdata.ExportArrowRecordBatch(recordBatch, &caa, &cas) + + cArr := (*C.struct_ArrowArray)(unsafe.Pointer(&caa)) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) + + status := C.WriteRecordBatch(pw.cPackedWriter, cArr, cSchema) + if status != 0 { + return errors.New("PackedWriter: failed to write record batch") + } + + return nil +} + +func (pw *PackedWriter) close() error { + status := C.CloseWriter(pw.cPackedWriter) + if status != 0 { + return errors.New("PackedWriter: failed to close file") + } + return nil +} diff --git a/go/packed/testdata/0 b/go/packed/testdata/0 deleted file mode 100644 index f5658b3..0000000 Binary files a/go/packed/testdata/0 and /dev/null differ diff --git a/go/packed/packed.go b/go/packed/type.go similarity index 51% rename from go/packed/packed.go rename to go/packed/type.go index 4c71805..45e38e9 100644 --- a/go/packed/packed.go +++ b/go/packed/type.go @@ -16,35 +16,31 @@ package packed /* #include -#include "milvus-storage/packed/reader_c.h" #include "arrow/c/abi.h" #include "arrow/c/helpers.h" +#include "milvus-storage/packed/reader_c.h" +#include "milvus-storage/packed/writer_c.h" */ import "C" -import ( - "errors" - "fmt" - "unsafe" +import ( "github.com/apache/arrow/go/v12/arrow" - "github.com/apache/arrow/go/v12/arrow/arrio" "github.com/apache/arrow/go/v12/arrow/cdata" ) -func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) { - // var cSchemaPtr uintptr - // cSchema := cdata.SchemaFromPtr(cSchemaPtr) - var cas cdata.CArrowSchema - cdata.ExportArrowSchema(schema, &cas) - casPtr := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - var cass cdata.CArrowArrayStream +type PackedWriter struct { + cPackedWriter C.CPackedWriter +} - cPath := C.CString(path) - defer C.free(unsafe.Pointer(cPath)) - status := C.Open(cPath, casPtr, C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass))) - if status != 0 { - return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status)) - } - reader := cdata.ImportCArrayStream((*cdata.CArrowArrayStream)(unsafe.Pointer(&cass)), schema) - return reader, nil +type PackedReader struct { + cPackedReader C.CPackedReader + arr *cdata.CArrowArray + schema *arrow.Schema } + +type ( + // CArrowSchema is the C Data Interface for ArrowSchemas + CArrowSchema = C.struct_ArrowSchema + // CArrowArray is the C Data Interface object for Arrow Arrays as defined in abi.h + CArrowArray = C.struct_ArrowArray +)