diff --git a/cpp/benchmark/benchmark_packed.cpp b/cpp/benchmark/benchmark_packed.cpp index 634d3ab..80e56c2 100644 --- a/cpp/benchmark/benchmark_packed.cpp +++ b/cpp/benchmark/benchmark_packed.cpp @@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st } } -static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) { +static void PackedWrite(benchmark::State& st, + std::shared_ptr fs, + std::string& path, + size_t buffer_size) { auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()), arrow::field("str", arrow::utf8())}); arrow::Int32Builder int_builder; @@ -134,7 +137,7 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s auto conf = StorageConfig(); conf.use_custom_part_upload_size = true; conf.part_size = 30 * 1024 * 1024; - PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties()); + PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf, *parquet::default_writer_properties()); for (int i = 0; i < 8 * 1024; ++i) { auto r = writer.Write(record_batch); if (!r.ok()) { @@ -153,7 +156,7 @@ std::string PATH = "/tmp/bench/foo"; BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) { SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st); - PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024); + PackedWrite(st, fs_, PATH, 22 * 1024 * 1024); } BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime(); diff --git a/cpp/include/milvus-storage/packed/writer.h b/cpp/include/milvus-storage/packed/writer.h index 2a64d03..0236b02 100644 --- a/cpp/include/milvus-storage/packed/writer.h +++ b/cpp/include/milvus-storage/packed/writer.h @@ -34,7 +34,7 @@ class PackedRecordBatchWriter { public: PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, - arrow::fs::FileSystem& fs, + std::shared_ptr fs, const std::string& file_path, StorageConfig& storage_config, parquet::WriterProperties& props = *parquet::default_writer_properties()); @@ -56,8 +56,8 @@ class PackedRecordBatchWriter { bool size_split_done_; size_t memory_limit_; std::shared_ptr schema_; - arrow::fs::FileSystem& fs_; - const std::string& file_path_; + std::shared_ptr fs_; + const std::string file_path_; const StorageConfig& storage_config_; parquet::WriterProperties& props_; size_t current_memory_usage_; diff --git a/cpp/include/milvus-storage/packed/writer_c.h b/cpp/include/milvus-storage/packed/writer_c.h index 4ed977e..a0ade71 100644 --- a/cpp/include/milvus-storage/packed/writer_c.h +++ b/cpp/include/milvus-storage/packed/writer_c.h @@ -29,6 +29,8 @@ int NewPackedWriter(const char* path, int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema); +int Close(CPackedWriter c_packed_writer); + void DeletePackedWriter(CPackedWriter c_packed_writer); #ifdef __cplusplus diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index 1d5e449..f1b8ff9 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -30,13 +30,13 @@ namespace milvus_storage { PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, - arrow::fs::FileSystem& fs, + std::shared_ptr fs, const std::string& file_path, StorageConfig& storage_config, parquet::WriterProperties& props) : memory_limit_(memory_limit), schema_(std::move(schema)), - fs_(fs), + fs_(std::move(fs)), file_path_(file_path), storage_config_(storage_config), props_(props), @@ -69,7 +69,7 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { for (GroupId i = 0; i < groups.size(); ++i) { auto& group = groups[i]; std::string group_path = file_path_ + "/" + std::to_string(i); - auto writer = std::make_unique(i, group.Schema(), fs_, group_path, storage_config_, props_, + auto writer = std::make_unique(i, group.Schema(), *fs_, group_path, storage_config_, props_, group.GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); for (auto& batch : group.GetRecordBatches()) { @@ -124,7 +124,7 @@ Status PackedRecordBatchWriter::Close() { std::string group_path = file_path_ + "/" + std::to_string(0); std::vector indices(buffered_batches_[0]->num_columns()); std::iota(std::begin(indices), std::end(indices), 0); - auto writer = std::make_unique(0, buffered_batches_[0]->schema(), fs_, group_path, + auto writer = std::make_unique(0, buffered_batches_[0]->schema(), *fs_, group_path, storage_config_, props_, indices); RETURN_NOT_OK(writer->Init()); for (auto& batch : buffered_batches_) { diff --git a/cpp/src/packed/writer_c.cpp b/cpp/src/packed/writer_c.cpp index af54a63..250fbf6 100644 --- a/cpp/src/packed/writer_c.cpp +++ b/cpp/src/packed/writer_c.cpp @@ -34,7 +34,7 @@ int NewPackedWriter(const char* path, auto trueFs = factory->BuildFileSystem(conf, &truePath).value(); auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); auto writer = - std::make_unique(buffer_size, trueSchema, *trueFs, truePath, conf); + std::make_unique(buffer_size, trueSchema, trueFs, truePath, conf); *packed_writer = writer.release(); return 0; @@ -51,6 +51,20 @@ int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, st if (!status.ok()) { return -1; } + return 0; + } catch (std::exception& e) { + return -1; + } +} + +int Close(CPackedWriter c_packed_writer) { + try { + auto packed_writer = static_cast(c_packed_writer); + auto status = packed_writer->Close(); + if (!status.ok()) { + return -1; + } + return 0; } catch (std::exception& e) { return -1; } diff --git a/cpp/test/format/parquet/file_reader_test.cpp b/cpp/test/format/parquet/file_reader_test.cpp index ce95c41..4968324 100644 --- a/cpp/test/format/parquet/file_reader_test.cpp +++ b/cpp/test/format/parquet/file_reader_test.cpp @@ -21,7 +21,7 @@ class FileReaderTest : public PackedTestBase {}; TEST_F(FileReaderTest, FileRecordBatchReader) { int batch_size = 100; - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_); + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_, props_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index 506e1e4..9585e19 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -89,7 +89,7 @@ class PackedTestBase : public ::testing::Test { const std::vector& paths, const std::vector>& fields, const std::vector& column_offsets) { - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_); + PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_, props_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } diff --git a/go/packed/packed.go b/go/packed/packed.go index 98cd01e..e8faad5 100644 --- a/go/packed/packed.go +++ b/go/packed/packed.go @@ -33,13 +33,16 @@ import ( "github.com/apache/arrow/go/v12/arrow/cdata" ) +type PackedWriter struct { + cPackedWriter C.CPackedWriter +} + type ( CArrowSchema = C.struct_ArrowSchema CArrowArray = C.struct_ArrowArray - PackedWriter = C.CPackedWriter ) -func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (PackedWriter, error) { +func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) @@ -49,31 +52,24 @@ func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (PackedW cBufferSize := C.int64_t(bufferSize) - var packedWriter PackedWriter - status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &packedWriter) + var cPackedWriter C.CPackedWriter + status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &cPackedWriter) if status != 0 { return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status)) } - return packedWriter, nil + return &PackedWriter{cPackedWriter: cPackedWriter}, nil } -func WriteRecordBatch(packedWriter PackedWriter, recordBatch arrow.Record) error { +func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error { var caa cdata.CArrowArray - cdata.ExportArrowArray(schema, &cas) - casPtr := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - var cas cdata.CArrowSchema - cdata.ExportArrowSchema(schema, &cas) - cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - cdata.ExportArrowRecordBatch(recordBatch, cArr, cSchema) - defer func() { - cdata.ReleaseCArrowArray(cArr) - cdata.ReleaseCArrowSchema(cSchema) - C.free(unsafe.Pointer(arr)) - }() + cdata.ExportArrowRecordBatch(recordBatch, &caa, &cas) - status := C.WriteRecordBatch(packedWriter, &cArr, &cSchema) + cArr := (*C.struct_ArrowArray)(unsafe.Pointer(&caa)) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) + + status := C.WriteRecordBatch(pw.cPackedWriter, cArr, cSchema) if status != 0 { return errors.New("PackedWriter: failed to write record batch") } @@ -81,8 +77,16 @@ func WriteRecordBatch(packedWriter PackedWriter, recordBatch arrow.Record) error return nil } -func DeletePackedWriter(packedWriter PackedWriter) { - C.DeletePackedWriter(packedWriter) +func (pw *PackedWriter) close() error { + status := C.Close(pw.cPackedWriter) + if status != 0 { + return errors.New("PackedWriter: failed to close file") + } + return nil +} + +func deletePackedWriter(packedWriter *PackedWriter) { + C.DeletePackedWriter(packedWriter.cPackedWriter) } func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) { diff --git a/go/packed/packed_test.go b/go/packed/packed_test.go index d236c4b..d913b67 100644 --- a/go/packed/packed_test.go +++ b/go/packed/packed_test.go @@ -48,22 +48,23 @@ func TestRead(t *testing.T) { ) } } - //rec := b.NewRecord() + rec := b.NewRecord() + defer rec.Release() + path := "/tmp/0" + bufferSize := 10 * 1024 * 1024 // 10MB + pw, err := newPackedWriter(path, schema, bufferSize) + assert.NoError(t, err) + defer deletePackedWriter(pw) - path := "testdata/0" - // file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666) - // assert.NoError(t, err) - // writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps()) - // assert.NoError(t, err) - // err = writer.Write(rec) - // assert.NoError(t, err) - // err = writer.Close() - // assert.NoError(t, err) + err = pw.writeRecordBatch(rec) + assert.NoError(t, err) + err = pw.close() + assert.NoError(t, err) reader, err := Open(path, schema, 10*1024*1024 /* 10MB */) assert.NoError(t, err) rr, err := reader.Read() assert.NoError(t, err) defer rr.Release() - assert.Equal(t, int64(300), rr.NumRows()) + assert.Equal(t, int64(3), rr.NumRows()) }