diff --git a/cpp/benchmark/benchmark_packed.cpp b/cpp/benchmark/benchmark_packed.cpp index 634d3ab..d2b7eee 100644 --- a/cpp/benchmark/benchmark_packed.cpp +++ b/cpp/benchmark/benchmark_packed.cpp @@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st } } -static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) { +static void PackedWrite(benchmark::State& st, + std::shared_ptr fs, + std::string& path, + size_t buffer_size) { auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()), arrow::field("str", arrow::utf8())}); arrow::Int32Builder int_builder; @@ -134,7 +137,7 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s auto conf = StorageConfig(); conf.use_custom_part_upload_size = true; conf.part_size = 30 * 1024 * 1024; - PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties()); + PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf); for (int i = 0; i < 8 * 1024; ++i) { auto r = writer.Write(record_batch); if (!r.ok()) { @@ -142,9 +145,10 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s break; } } - auto r = writer.Close(); - if (!r.ok()) { - st.SkipWithError(r.ToString()); + auto column_index_groups = writer.Close(); + if (column_index_groups->Size() == 0) { + st.SkipWithError("Failed to close writer"); + break; } } } @@ -153,7 +157,7 @@ std::string PATH = "/tmp/bench/foo"; BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) { SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st); - PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024); + PackedWrite(st, fs_, PATH, 22 * 1024 * 1024); } BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime(); diff --git a/cpp/include/milvus-storage/common/config.h b/cpp/include/milvus-storage/common/config.h index 27b5ebc..22900b0 100644 --- a/cpp/include/milvus-storage/common/config.h +++ b/cpp/include/milvus-storage/common/config.h @@ -15,6 +15,7 @@ #pragma once #include +#include using namespace std; @@ -31,6 +32,7 @@ struct StorageConfig { std::string region = ""; bool use_custom_part_upload_size = false; int64_t part_size = 0; + parquet::WriterProperties writer_props = *parquet::default_writer_properties(); std::string ToString() const { std::stringstream ss; diff --git a/cpp/include/milvus-storage/format/parquet/file_writer.h b/cpp/include/milvus-storage/format/parquet/file_writer.h index aa86b8f..e4f3acc 100644 --- a/cpp/include/milvus-storage/format/parquet/file_writer.h +++ b/cpp/include/milvus-storage/format/parquet/file_writer.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include "arrow/filesystem/filesystem.h" #include "format/writer.h" @@ -27,19 +26,11 @@ namespace milvus_storage { class ParquetFileWriter : public FileWriter { public: - // with default WriterProperties ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, const StorageConfig& storage_config); - // with custom WriterProperties - ParquetFileWriter(std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props); - Status Init() override; Status Write(const arrow::RecordBatch& record) override; @@ -61,7 +52,6 @@ class ParquetFileWriter : public FileWriter { std::unique_ptr writer_; std::shared_ptr kv_metadata_; - parquet::WriterProperties props_; int64_t count_ = 0; int row_group_num_ = 0; std::vector row_group_sizes_; diff --git a/cpp/include/milvus-storage/packed/chunk_manager.h b/cpp/include/milvus-storage/packed/chunk_manager.h index 318ea58..2092839 100644 --- a/cpp/include/milvus-storage/packed/chunk_manager.h +++ b/cpp/include/milvus-storage/packed/chunk_manager.h @@ -14,6 +14,7 @@ #pragma once +#include "packed/column_group.h" #include #include #include @@ -24,13 +25,6 @@ namespace milvus_storage { -struct ColumnOffset { - int path_index; - int col_index; - - ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {} -}; - // record which chunk is in use and its offset in the file struct ChunkState { int chunk; diff --git a/cpp/include/milvus-storage/packed/column_group.h b/cpp/include/milvus-storage/packed/column_group.h index ff53e99..d498812 100644 --- a/cpp/include/milvus-storage/packed/column_group.h +++ b/cpp/include/milvus-storage/packed/column_group.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include "common/status.h" @@ -87,4 +88,50 @@ struct ColumnGroupState { void resetMemorySize() { this->memory_size = 0; } }; +struct ColumnOffset { + int path_index; + int col_index; + + ColumnOffset() = default; + + ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {} +}; + +// ColumnOffsetMapping is a map of original field name to ColumnOffset. +// The column offset is a pair of path index and a column index in the path. +struct ColumnOffsetMapping { + ColumnOffsetMapping() = default; + + ColumnOffsetMapping(const std::vector>& group_indices, const std::shared_ptr schema) { + for (int path_index = 0; path_index < group_indices.size(); path_index++) { + for (int col_index = 0; col_index < group_indices[path_index].size(); col_index++) { + int original_col_index = group_indices[path_index][col_index]; + std::string field_name = schema->field(original_col_index)->name(); + mapping_[field_name] = ColumnOffset(path_index, col_index); + } + } + } + + ColumnOffset GetColumnOffset(std::string field_name) { + if (mapping_.find(field_name) == mapping_.end()) { + return ColumnOffset(-1, -1); + } + return mapping_[field_name]; + } + + std::string ToString() { + std::string str; + for (auto& pair : mapping_) { + str += pair.first + "->" + std::to_string(pair.second.path_index) + ":" + std::to_string(pair.second.col_index) + + ";"; + } + return str; + } + + size_t Size() { return mapping_.size(); } + + private: + std::unordered_map mapping_; +}; + } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/packed/column_group_writer.h b/cpp/include/milvus-storage/packed/column_group_writer.h index df8a62c..5dc89ac 100644 --- a/cpp/include/milvus-storage/packed/column_group_writer.h +++ b/cpp/include/milvus-storage/packed/column_group_writer.h @@ -34,14 +34,6 @@ class ColumnGroupWriter { const StorageConfig& storage_config, const std::vector& origin_column_indices); - ColumnGroupWriter(GroupId group_id, - std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props, - const std::vector& origin_column_indices); - Status Init(); Status Write(const std::shared_ptr& record); Status Flush(); diff --git a/cpp/include/milvus-storage/packed/reader_c.h b/cpp/include/milvus-storage/packed/reader_c.h index c7576fa..65ede1c 100644 --- a/cpp/include/milvus-storage/packed/reader_c.h +++ b/cpp/include/milvus-storage/packed/reader_c.h @@ -21,7 +21,6 @@ extern "C" { #include typedef void* CReader; -typedef void* CStatus; typedef void* CRecordBatch; typedef void* CFileSystem; diff --git a/cpp/include/milvus-storage/packed/writer.h b/cpp/include/milvus-storage/packed/writer.h index 5faa6b1..141d4d3 100644 --- a/cpp/include/milvus-storage/packed/writer.h +++ b/cpp/include/milvus-storage/packed/writer.h @@ -34,15 +34,15 @@ class PackedRecordBatchWriter { public: PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, - arrow::fs::FileSystem& fs, + std::shared_ptr fs, const std::string& file_path, - StorageConfig& storage_config, - parquet::WriterProperties& props); + const StorageConfig& storage_config); // Put the record batch into the corresponding column group, // , and write the maximum buffer of column group to the file. Status Write(const std::shared_ptr& record); - Status Close(); + // Close the writer and return the mapping of field name written to column offset. + std::unique_ptr Close(); private: // split first buffer into column groups based on column size @@ -51,17 +51,19 @@ class PackedRecordBatchWriter { Status writeWithSplitIndex(const std::shared_ptr& record, size_t batch_size); Status balanceMaxHeap(); + Status flushRemainingBuffer(); + Status flushUnsplittedBuffer(); std::vector> buffered_batches_; bool size_split_done_; size_t memory_limit_; std::shared_ptr schema_; - arrow::fs::FileSystem& fs_; - const std::string& file_path_; - const StorageConfig& storage_config_; - parquet::WriterProperties& props_; + std::shared_ptr fs_; + const std::string file_path_; + const StorageConfig storage_config_; size_t current_memory_usage_; std::vector> group_writers_; + std::vector> group_indices_; IndicesBasedSplitter splitter_; MemoryMaxHeap max_heap_; }; diff --git a/cpp/include/milvus-storage/packed/writer_c.h b/cpp/include/milvus-storage/packed/writer_c.h new file mode 100644 index 0000000..979f353 --- /dev/null +++ b/cpp/include/milvus-storage/packed/writer_c.h @@ -0,0 +1,39 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef void* CPackedWriter; +typedef void* CColumnOffsetMapping; + +int NewPackedWriter(const char* path, + struct ArrowSchema* schema, + const int64_t buffer_size, + CPackedWriter* c_packed_writer); + +int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema); + +int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping); + +void DeletePackedWriter(CPackedWriter c_packed_writer); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index 96d96f3..ab09628 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -29,19 +29,7 @@ ParquetFileWriter::ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, const StorageConfig& storage_config) - : schema_(std::move(schema)), - fs_(fs), - file_path_(file_path), - storage_config_(storage_config), - props_(*parquet::default_writer_properties()), - count_(0) {} - -ParquetFileWriter::ParquetFileWriter(std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props) - : schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), props_(props) {} + : schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), count_(0) {} Status ParquetFileWriter::Init() { auto coln = schema_->num_fields(); diff --git a/cpp/src/packed/column_group_writer.cpp b/cpp/src/packed/column_group_writer.cpp index fec7b47..e9212a6 100644 --- a/cpp/src/packed/column_group_writer.cpp +++ b/cpp/src/packed/column_group_writer.cpp @@ -36,20 +36,6 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, column_group_(group_id, origin_column_indices), finished_(false) {} -ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, - std::shared_ptr schema, - arrow::fs::FileSystem& fs, - const std::string& file_path, - const StorageConfig& storage_config, - const parquet::WriterProperties& props, - const std::vector& origin_column_indices) - : group_id_(group_id), - writer_(std::move(schema), fs, file_path, storage_config, props), - column_group_(group_id, origin_column_indices), - flushed_batches_(0), - flushed_rows_(0), - finished_(false) {} - Status ColumnGroupWriter::Init() { return writer_.Init(); } Status ColumnGroupWriter::Write(const std::shared_ptr& record) { diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index 1d5e449..df0092b 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -30,16 +30,14 @@ namespace milvus_storage { PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, - arrow::fs::FileSystem& fs, + std::shared_ptr fs, const std::string& file_path, - StorageConfig& storage_config, - parquet::WriterProperties& props) + const StorageConfig& storage_config) : memory_limit_(memory_limit), schema_(std::move(schema)), - fs_(fs), + fs_(std::move(fs)), file_path_(file_path), storage_config_(storage_config), - props_(props), splitter_({}), current_memory_usage_(0), size_split_done_(false) {} @@ -65,11 +63,10 @@ Status PackedRecordBatchWriter::Write(const std::shared_ptr& Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { std::vector groups = SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_); - std::vector> group_indices; for (GroupId i = 0; i < groups.size(); ++i) { auto& group = groups[i]; std::string group_path = file_path_ + "/" + std::to_string(i); - auto writer = std::make_unique(i, group.Schema(), fs_, group_path, storage_config_, props_, + auto writer = std::make_unique(i, group.Schema(), *fs_, group_path, storage_config_, group.GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); for (auto& batch : group.GetRecordBatches()) { @@ -77,10 +74,10 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { } max_heap_.emplace(i, group.GetMemoryUsage()); - group_indices.emplace_back(group.GetOriginColumnIndices()); + group_indices_.emplace_back(group.GetOriginColumnIndices()); group_writers_.emplace_back(std::move(writer)); } - splitter_ = IndicesBasedSplitter(group_indices); + splitter_ = IndicesBasedSplitter(group_indices_); // check memory usage limit size_t min_memory_limit = groups.size() * ARROW_PART_UPLOAD_SIZE; @@ -119,22 +116,26 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr PackedRecordBatchWriter::Close() { + // write unsplitted record batch to one file if the buffer record batches are not splitted if (!size_split_done_ && !buffered_batches_.empty()) { - std::string group_path = file_path_ + "/" + std::to_string(0); - std::vector indices(buffered_batches_[0]->num_columns()); - std::iota(std::begin(indices), std::end(indices), 0); - auto writer = std::make_unique(0, buffered_batches_[0]->schema(), fs_, group_path, - storage_config_, props_, indices); - RETURN_NOT_OK(writer->Init()); - for (auto& batch : buffered_batches_) { - RETURN_NOT_OK(writer->Write(batch)); + auto status = flushUnsplittedBuffer(); + if (!status.ok()) { + LOG_STORAGE_ERROR_ << "Failed to write unsplitted record batch while closing writer."; + return std::make_unique(); } - RETURN_NOT_OK(writer->Flush()); - RETURN_NOT_OK(writer->Close()); - return Status::OK(); + return std::make_unique(std::move(group_indices_), schema_); } - // flush all remaining column groups before closing' + // flush all remaining column groups before closing + auto status = flushRemainingBuffer(); + if (!status.ok()) { + LOG_STORAGE_ERROR_ << "Failed to flush remaining column groups while closing writer."; + return std::make_unique(); + } + return std::make_unique(std::move(group_indices_), schema_); +} + +Status PackedRecordBatchWriter::flushRemainingBuffer() { while (!max_heap_.empty()) { auto max_group = max_heap_.top(); max_heap_.pop(); @@ -150,6 +151,22 @@ Status PackedRecordBatchWriter::Close() { return Status::OK(); } +Status PackedRecordBatchWriter::flushUnsplittedBuffer() { + std::string group_path = file_path_ + "/" + std::to_string(0); + std::vector indices(buffered_batches_[0]->num_columns()); + std::iota(std::begin(indices), std::end(indices), 0); + auto writer = std::make_unique(0, buffered_batches_[0]->schema(), *fs_, group_path, + storage_config_, indices); + RETURN_NOT_OK(writer->Init()); + for (auto& batch : buffered_batches_) { + RETURN_NOT_OK(writer->Write(batch)); + } + RETURN_NOT_OK(writer->Flush()); + RETURN_NOT_OK(writer->Close()); + group_indices_.emplace_back(indices); + return Status::OK(); +} + Status PackedRecordBatchWriter::balanceMaxHeap() { std::map group_map; while (!max_heap_.empty()) { diff --git a/cpp/src/packed/writer_c.cpp b/cpp/src/packed/writer_c.cpp new file mode 100644 index 0000000..55d4f69 --- /dev/null +++ b/cpp/src/packed/writer_c.cpp @@ -0,0 +1,69 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "packed/writer_c.h" +#include "packed/writer.h" +#include "common/log.h" +#include "common/config.h" +#include "filesystem/fs.h" + +#include +#include +#include + +int NewPackedWriter(const char* path, + struct ArrowSchema* schema, + const int64_t buffer_size, + CPackedWriter* c_packed_writer) { + try { + auto truePath = std::string(path); + auto factory = std::make_shared(); + auto conf = milvus_storage::StorageConfig(); + conf.uri = "file:///tmp/"; + auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); + auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); + auto writer = + std::make_unique(buffer_size, trueSchema, trueFs, truePath, conf); + + *c_packed_writer = writer.release(); + return 0; + } catch (std::exception& e) { + return -1; + } +} + +int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema) { + try { + auto packed_writer = static_cast(c_packed_writer); + auto record_batch = arrow::ImportRecordBatch(array, schema).ValueOrDie(); + auto status = packed_writer->Write(record_batch); + if (!status.ok()) { + return -1; + } + return 0; + } catch (std::exception& e) { + return -1; + } +} + +int Close(CPackedWriter c_packed_writer, CColumnOffsetMapping* c_column_offset_mapping) { + try { + auto packed_writer = static_cast(c_packed_writer); + *c_column_offset_mapping = packed_writer->Close().release(); + delete packed_writer; + return 0; + } catch (std::exception& e) { + return -1; + } +} \ No newline at end of file diff --git a/cpp/test/format/parquet/file_reader_test.cpp b/cpp/test/format/parquet/file_reader_test.cpp index ce95c41..0fb9173 100644 --- a/cpp/test/format/parquet/file_reader_test.cpp +++ b/cpp/test/format/parquet/file_reader_test.cpp @@ -21,11 +21,11 @@ class FileReaderTest : public PackedTestBase {}; TEST_F(FileReaderTest, FileRecordBatchReader) { int batch_size = 100; - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_); + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } - EXPECT_TRUE(writer.Close().ok()); + auto column_index_groups = writer.Close(); std::vector> fields = { arrow::field("int32", arrow::int32()), diff --git a/cpp/test/packed/one_file_test.cpp b/cpp/test/packed/one_file_test.cpp deleted file mode 100644 index c0d6cc7..0000000 --- a/cpp/test/packed/one_file_test.cpp +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2024 Zilliz -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "packed_test_base.h" - -namespace milvus_storage { - -class OneFileTest : public PackedTestBase {}; - -TEST_F(OneFileTest, WriteAndRead) { - int batch_size = 100; - - std::vector column_offsets = { - ColumnOffset(0, 0), - ColumnOffset(0, 1), - ColumnOffset(0, 2), - }; - - std::vector paths = {file_path_ + "/0"}; - - std::vector> fields = { - arrow::field("int32", arrow::int32()), - arrow::field("int64", arrow::int64()), - arrow::field("str", arrow::utf8()), - }; - - TestWriteAndRead(batch_size, paths, fields, column_offsets); -} - -} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/packed/packed_integration_test.cpp b/cpp/test/packed/packed_integration_test.cpp index e6daf9c..b9ee213 100644 --- a/cpp/test/packed/packed_integration_test.cpp +++ b/cpp/test/packed/packed_integration_test.cpp @@ -18,24 +18,56 @@ namespace milvus_storage { class PackedIntegrationTest : public PackedTestBase {}; -TEST_F(PackedIntegrationTest, WriteAndRead) { - int batch_size = 100000; +TEST_F(PackedIntegrationTest, TestOneFile) { + int batch_size = 100; - std::vector column_offsets = { - ColumnOffset(0, 0), - ColumnOffset(1, 0), - ColumnOffset(1, 1), - }; + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); + for (int i = 0; i < batch_size; ++i) { + EXPECT_TRUE(writer.Write(record_batch_).ok()); + } + auto column_offset_mapping = writer.Close(); + EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields()); + + std::vector column_offsets; + for (auto& field : schema_->fields()) { + column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name())); + } + + std::vector paths = {file_path_ + "/0"}; + + std::set needed_columns = {0, 1, 2}; + + PackedRecordBatchReader pr(*fs_, paths, schema_, column_offsets, needed_columns, reader_memory_); + ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); + ASSERT_STATUS_OK(pr.Close()); + + ValidateTableData(table); +} + +TEST_F(PackedIntegrationTest, TestSplitColumnGroup) { + int batch_size = 100; + + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_); + for (int i = 0; i < batch_size; ++i) { + EXPECT_TRUE(writer.Write(record_batch_).ok()); + } + auto column_offset_mapping = writer.Close(); + EXPECT_EQ(column_offset_mapping->Size(), schema_->num_fields()); + + std::vector column_offsets; + for (auto& field : schema_->fields()) { + column_offsets.push_back(column_offset_mapping->GetColumnOffset(field->name())); + } std::vector paths = {file_path_ + "/0", file_path_ + "/1"}; - std::vector> fields = { - arrow::field("str", arrow::utf8()), - arrow::field("int32", arrow::int32()), - arrow::field("int64", arrow::int64()), - }; + std::set needed_columns = {0, 1, 2}; + + PackedRecordBatchReader pr(*fs_, paths, schema_, column_offsets, needed_columns, reader_memory_); + ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); + ASSERT_STATUS_OK(pr.Close()); - TestWriteAndRead(batch_size, paths, fields, column_offsets); + ValidateTableData(table); } } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index 506e1e4..0c889d0 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -43,8 +44,6 @@ namespace milvus_storage { class PackedTestBase : public ::testing::Test { protected: - PackedTestBase() : props_(*parquet::default_writer_properties()) {} - void SetUp() override { const char* access_key = std::getenv("ACCESS_KEY"); const char* secret_key = std::getenv("SECRET_KEY"); @@ -67,7 +66,6 @@ class PackedTestBase : public ::testing::Test { ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(storage_config_, &file_path_)); SetUpCommonData(); - props_ = *parquet::default_writer_properties(); writer_memory_ = (22 + 16) * 1024 * 1024; // 22 MB for S3FS part upload reader_memory_ = 16 * 1024 * 1024; // 16 MB for reading } @@ -85,26 +83,6 @@ class PackedTestBase : public ::testing::Test { return path; } - void TestWriteAndRead(int batch_size, - const std::vector& paths, - const std::vector>& fields, - const std::vector& column_offsets) { - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_); - for (int i = 0; i < batch_size; ++i) { - EXPECT_TRUE(writer.Write(record_batch_).ok()); - } - EXPECT_TRUE(writer.Close().ok()); - - std::set needed_columns = {0, 1, 2}; - auto new_schema = arrow::schema(fields); - - PackedRecordBatchReader pr(*fs_, paths, new_schema, column_offsets, needed_columns, reader_memory_); - ASSERT_AND_ARROW_ASSIGN(auto table, pr.ToTable()); - ASSERT_STATUS_OK(pr.Close()); - - ValidateTableData(table); - } - void ValidateTableData(const std::shared_ptr& table) { int64_t total_rows = table->num_rows(); @@ -191,7 +169,6 @@ class PackedTestBase : public ::testing::Test { size_t reader_memory_; std::shared_ptr fs_; std::string file_path_; - parquet::WriterProperties props_; StorageConfig storage_config_; std::shared_ptr schema_; diff --git a/go/Makefile b/go/Makefile index 0ad0727..059abb4 100644 --- a/go/Makefile +++ b/go/Makefile @@ -22,7 +22,7 @@ test: LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \ CGO_CFLAGS="$(CPPFLAGS)" \ CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \ - go test -timeout 30s ./... + go test -count=1 -timeout 30s ./... proto: mkdir -p proto/manifest_proto diff --git a/go/packed/packed.go b/go/packed/packed.go index 4c71805..ef4b2dc 100644 --- a/go/packed/packed.go +++ b/go/packed/packed.go @@ -17,10 +17,12 @@ package packed /* #include #include "milvus-storage/packed/reader_c.h" +#include "milvus-storage/packed/writer_c.h" #include "arrow/c/abi.h" #include "arrow/c/helpers.h" */ import "C" + import ( "errors" "fmt" @@ -31,6 +33,60 @@ import ( "github.com/apache/arrow/go/v12/arrow/cdata" ) +type PackedWriter struct { + cPackedWriter C.CPackedWriter +} + +type ( + ColumnOffsetMapping = C.CColumnOffsetMapping + CArrowSchema = C.struct_ArrowSchema + CArrowArray = C.struct_ArrowArray +) + +func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { + var cas cdata.CArrowSchema + cdata.ExportArrowSchema(schema, &cas) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) + + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + + cBufferSize := C.int64_t(bufferSize) + + var cPackedWriter C.CPackedWriter + status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &cPackedWriter) + if status != 0 { + return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status)) + } + return &PackedWriter{cPackedWriter: cPackedWriter}, nil +} + +func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error { + var caa cdata.CArrowArray + var cas cdata.CArrowSchema + + cdata.ExportArrowRecordBatch(recordBatch, &caa, &cas) + + cArr := (*C.struct_ArrowArray)(unsafe.Pointer(&caa)) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) + + status := C.WriteRecordBatch(pw.cPackedWriter, cArr, cSchema) + if status != 0 { + return errors.New("PackedWriter: failed to write record batch") + } + + return nil +} + +func (pw *PackedWriter) close() (ColumnOffsetMapping, error) { + var columnOffsetMapping ColumnOffsetMapping + status := C.Close(pw.cPackedWriter, &columnOffsetMapping) + if status != 0 { + return columnOffsetMapping, errors.New("PackedWriter: failed to close file") + } + return columnOffsetMapping, nil +} + func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) { // var cSchemaPtr uintptr // cSchema := cdata.SchemaFromPtr(cSchemaPtr) diff --git a/go/packed/packed_test.go b/go/packed/packed_test.go index d236c4b..79e3d05 100644 --- a/go/packed/packed_test.go +++ b/go/packed/packed_test.go @@ -15,6 +15,7 @@ package packed import ( + "fmt" "testing" "github.com/apache/arrow/go/v12/arrow" @@ -48,22 +49,22 @@ func TestRead(t *testing.T) { ) } } - //rec := b.NewRecord() - - path := "testdata/0" - // file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666) - // assert.NoError(t, err) - // writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps()) - // assert.NoError(t, err) - // err = writer.Write(rec) - // assert.NoError(t, err) - // err = writer.Close() - // assert.NoError(t, err) + rec := b.NewRecord() + defer rec.Release() + path := "/tmp/0" + bufferSize := 10 * 1024 * 1024 // 10MB + pw, err := newPackedWriter(path, schema, bufferSize) + assert.NoError(t, err) + err = pw.writeRecordBatch(rec) + assert.NoError(t, err) + mapping, err := pw.close() + assert.NoError(t, err) + fmt.Println(mapping) reader, err := Open(path, schema, 10*1024*1024 /* 10MB */) assert.NoError(t, err) rr, err := reader.Read() assert.NoError(t, err) defer rr.Release() - assert.Equal(t, int64(300), rr.NumRows()) + assert.Equal(t, int64(3), rr.NumRows()) } diff --git a/go/packed/testdata/0 b/go/packed/testdata/0 deleted file mode 100644 index f5658b3..0000000 Binary files a/go/packed/testdata/0 and /dev/null differ