Skip to content

Commit

Permalink
cgo packed writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 3, 2025
1 parent a547a3d commit e278017
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 44 deletions.
9 changes: 6 additions & 3 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
}
}

static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
static void PackedWrite(benchmark::State& st,
std::shared_ptr<arrow::fs::FileSystem> fs,
std::string& path,
size_t buffer_size) {
auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()),
arrow::field("str", arrow::utf8())});
arrow::Int32Builder int_builder;
Expand All @@ -134,7 +137,7 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties());
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf, *parquet::default_writer_properties());
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
Expand All @@ -153,7 +156,7 @@ std::string PATH = "/tmp/bench/foo";

BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) {
SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st);
PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024);
PackedWrite(st, fs_, PATH, 22 * 1024 * 1024);
}
BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime();

Expand Down
6 changes: 3 additions & 3 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PackedRecordBatchWriter {
public:
PackedRecordBatchWriter(size_t memory_limit,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
StorageConfig& storage_config,
parquet::WriterProperties& props = *parquet::default_writer_properties());
Expand All @@ -56,8 +56,8 @@ class PackedRecordBatchWriter {
bool size_split_done_;
size_t memory_limit_;
std::shared_ptr<arrow::Schema> schema_;
arrow::fs::FileSystem& fs_;
const std::string& file_path_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
const std::string file_path_;
const StorageConfig& storage_config_;
parquet::WriterProperties& props_;
size_t current_memory_usage_;
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/packed/writer_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ int NewPackedWriter(const char* path,

int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, struct ArrowSchema* schema);

int Close(CPackedWriter c_packed_writer);

void DeletePackedWriter(CPackedWriter c_packed_writer);

#ifdef __cplusplus
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ namespace milvus_storage {

PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
std::shared_ptr<arrow::fs::FileSystem> fs,
const std::string& file_path,
StorageConfig& storage_config,
parquet::WriterProperties& props)
: memory_limit_(memory_limit),
schema_(std::move(schema)),
fs_(fs),
fs_(std::move(fs)),
file_path_(file_path),
storage_config_(storage_config),
props_(props),
Expand Down Expand Up @@ -69,7 +69,7 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
for (GroupId i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
std::string group_path = file_path_ + "/" + std::to_string(i);
auto writer = std::make_unique<ColumnGroupWriter>(i, group.Schema(), fs_, group_path, storage_config_, props_,
auto writer = std::make_unique<ColumnGroupWriter>(i, group.Schema(), *fs_, group_path, storage_config_, props_,
group.GetOriginColumnIndices());
RETURN_NOT_OK(writer->Init());
for (auto& batch : group.GetRecordBatches()) {
Expand Down Expand Up @@ -124,7 +124,7 @@ Status PackedRecordBatchWriter::Close() {
std::string group_path = file_path_ + "/" + std::to_string(0);
std::vector<int> indices(buffered_batches_[0]->num_columns());
std::iota(std::begin(indices), std::end(indices), 0);
auto writer = std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), fs_, group_path,
auto writer = std::make_unique<ColumnGroupWriter>(0, buffered_batches_[0]->schema(), *fs_, group_path,
storage_config_, props_, indices);
RETURN_NOT_OK(writer->Init());
for (auto& batch : buffered_batches_) {
Expand Down
16 changes: 15 additions & 1 deletion cpp/src/packed/writer_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int NewPackedWriter(const char* path,
auto trueFs = factory->BuildFileSystem(conf, &truePath).value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto writer =
std::make_unique<milvus_storage::PackedRecordBatchWriter>(buffer_size, trueSchema, *trueFs, truePath, conf);
std::make_unique<milvus_storage::PackedRecordBatchWriter>(buffer_size, trueSchema, trueFs, truePath, conf);

*packed_writer = writer.release();
return 0;
Expand All @@ -51,6 +51,20 @@ int WriteRecordBatch(CPackedWriter c_packed_writer, struct ArrowArray* array, st
if (!status.ok()) {
return -1;
}
return 0;
} catch (std::exception& e) {
return -1;
}
}

int Close(CPackedWriter c_packed_writer) {
try {
auto packed_writer = static_cast<milvus_storage::PackedRecordBatchWriter*>(c_packed_writer);
auto status = packed_writer->Close();
if (!status.ok()) {
return -1;
}
return 0;
} catch (std::exception& e) {
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/format/parquet/file_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class FileReaderTest : public PackedTestBase {};
TEST_F(FileReaderTest, FileRecordBatchReader) {
int batch_size = 100;

PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_);
PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_, props_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/packed/packed_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class PackedTestBase : public ::testing::Test {
const std::vector<std::string>& paths,
const std::vector<std::shared_ptr<arrow::Field>>& fields,
const std::vector<ColumnOffset>& column_offsets) {
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_);
PackedRecordBatchWriter writer(writer_memory_, schema_, fs_, file_path_, storage_config_, props_);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
Expand Down
44 changes: 24 additions & 20 deletions go/packed/packed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ import (
"github.com/apache/arrow/go/v12/arrow/cdata"
)

type PackedWriter struct {
cPackedWriter C.CPackedWriter
}

type (
CArrowSchema = C.struct_ArrowSchema
CArrowArray = C.struct_ArrowArray
PackedWriter = C.CPackedWriter
)

func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (PackedWriter, error) {
func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
Expand All @@ -49,40 +52,41 @@ func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (PackedW

cBufferSize := C.int64_t(bufferSize)

var packedWriter PackedWriter
status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &packedWriter)
var cPackedWriter C.CPackedWriter
status := C.NewPackedWriter(cPath, cSchema, cBufferSize, &cPackedWriter)
if status != 0 {
return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status))
}
return packedWriter, nil
return &PackedWriter{cPackedWriter: cPackedWriter}, nil
}

func WriteRecordBatch(packedWriter PackedWriter, recordBatch arrow.Record) error {
func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error {
var caa cdata.CArrowArray
cdata.ExportArrowArray(schema, &cas)
casPtr := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))

var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
cdata.ExportArrowRecordBatch(recordBatch, cArr, cSchema)
defer func() {
cdata.ReleaseCArrowArray(cArr)
cdata.ReleaseCArrowSchema(cSchema)

C.free(unsafe.Pointer(arr))
}()
cdata.ExportArrowRecordBatch(recordBatch, &caa, &cas)

status := C.WriteRecordBatch(packedWriter, &cArr, &cSchema)
cArr := (*C.struct_ArrowArray)(unsafe.Pointer(&caa))
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))

status := C.WriteRecordBatch(pw.cPackedWriter, cArr, cSchema)
if status != 0 {
return errors.New("PackedWriter: failed to write record batch")
}

return nil
}

func DeletePackedWriter(packedWriter PackedWriter) {
C.DeletePackedWriter(packedWriter)
func (pw *PackedWriter) close() error {
status := C.Close(pw.cPackedWriter)
if status != 0 {
return errors.New("PackedWriter: failed to close file")
}
return nil
}

func deletePackedWriter(packedWriter *PackedWriter) {
C.DeletePackedWriter(packedWriter.cPackedWriter)
}

func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {
Expand Down
23 changes: 12 additions & 11 deletions go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,23 @@ func TestRead(t *testing.T) {
)
}
}
//rec := b.NewRecord()
rec := b.NewRecord()
defer rec.Release()
path := "/tmp/0"
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := newPackedWriter(path, schema, bufferSize)
assert.NoError(t, err)
defer deletePackedWriter(pw)

path := "testdata/0"
// file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666)
// assert.NoError(t, err)
// writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
// assert.NoError(t, err)
// err = writer.Write(rec)
// assert.NoError(t, err)
// err = writer.Close()
// assert.NoError(t, err)
err = pw.writeRecordBatch(rec)
assert.NoError(t, err)
err = pw.close()
assert.NoError(t, err)

reader, err := Open(path, schema, 10*1024*1024 /* 10MB */)
assert.NoError(t, err)
rr, err := reader.Read()
assert.NoError(t, err)
defer rr.Release()
assert.Equal(t, int64(300), rr.NumRows())
assert.Equal(t, int64(3), rr.NumRows())
}

0 comments on commit e278017

Please sign in to comment.