From 5570b5e51cfb3410fb3995b6a0ebc767e9dbdca3 Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Fri, 10 Jan 2025 15:50:45 -0800 Subject: [PATCH] feat: Add BufferedOutputStream (#12052) Summary: Context: I plan to update the PrestoBatchVectorSerializer to write directly to the OutputStream rather than going through VectorStreams as I've seen this can greatly improve the speed. However, this ends up with many small writes to the OutputStream, which is typically a ByteOutputStream, so this leads to a lot of small allocations which is counter productive to the optimization I'm trying to make. To address this I add a BufferedOutputStream which wraps around another OutputStream, coalesces writes in a buffer, and flushes those as large writes to the wrapped OutputStream as the buffer fills up, or as needed. In my experiments I've seen the cost of the additional copy is far less then the cost of the tiny allocations. Differential Revision: D67997655 --- velox/common/memory/ByteStream.h | 48 ++++++++++++++++++++ velox/common/memory/tests/ByteStreamTest.cpp | 39 ++++++++++++++++ velox/dwio/dwrf/test/ReaderTest.cpp | 2 +- 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 20678d59fa72..423b386cec20 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -71,6 +71,54 @@ class OutputStream { OutputStreamListener* listener_; }; +/// An OutputStream that wraps another and coalesces writes into a buffer before +/// flushing them as large writes to the wrapped OutputStream. +/// +/// Note that you must call flush before calling tellp or seekp and at the end +/// of writing to ensure the changes propagate to the wrapped OutputStream. +class BufferedOutputStream : public OutputStream { + public: + BufferedOutputStream( + OutputStream* out, + StreamArena* arena, + int32_t bufferSize = 1 << 20) + : OutputStream(), out_(out) { + arena->newRange(bufferSize, nullptr, &buffer_); + } + + void write(const char* s, std::streamsize count) override { + auto remaining = count; + do { + auto copyLength = + std::min(remaining, (int64_t)buffer_.size - buffer_.position); + simd::memcpy( + buffer_.buffer + buffer_.position, s + count - remaining, copyLength); + buffer_.position += copyLength; + remaining -= copyLength; + if (buffer_.position == buffer_.size) { + flush(); + } + } while (remaining > 0); + } + + std::streampos tellp() const override { + return out_->tellp(); + } + + void seekp(std::streampos pos) override { + out_->seekp(pos); + } + + void flush() { + out_->write(reinterpret_cast(buffer_.buffer), buffer_.position); + buffer_.position = 0; + } + + private: + OutputStream* out_; + ByteRange buffer_; +}; + class OStreamOutputStream : public OutputStream { public: explicit OStreamOutputStream( diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index 0916b412934c..c56c468e46ec 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -138,6 +138,45 @@ TEST_F(ByteStreamTest, outputStream) { EXPECT_EQ(0, mmapAllocator_->numAllocated()); } +TEST_F(ByteStreamTest, bufferedOutputStream) { + auto arena = newArena(); + auto out = std::make_unique(*pool_, nullptr, 10000); + auto buffered = + std::make_unique(out.get(), arena.get()); + + std::stringstream referenceSStream; + auto reference = std::make_unique(&referenceSStream); + for (auto i = 0; i < 1000; ++i) { + std::string data; + data.resize(10000); + std::fill(data.begin(), data.end(), i); + buffered->write(data.data(), data.size()); + reference->write(data.data(), data.size()); + } + buffered->flush(); + EXPECT_EQ(reference->tellp(), buffered->tellp()); + EXPECT_EQ(out->tellp(), buffered->tellp()); + + for (auto i = 0; i < 100; ++i) { + std::string data; + data.resize(6000); + std::fill(data.begin(), data.end(), i + 10); + buffered->seekp(i * 10000 + 5000); + reference->seekp(i * 10000 + 5000); + buffered->write(data.data(), data.size()); + reference->write(data.data(), data.size()); + buffered->flush(); + } + + auto str = referenceSStream.str(); + auto iobuf = out->getIOBuf(); + auto outData = iobuf->coalesce(); + EXPECT_EQ( + str, + std::string( + reinterpret_cast(outData.data()), outData.size())); +} + TEST_F(ByteStreamTest, newRangeAllocation) { const int kPageSize = AllocationTraits::kPageSize; struct { diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index ad26b7954e6c..08d62900b11a 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -1713,7 +1713,7 @@ VELOX_INSTANTIATE_TEST_SUITE_P( TEST_F(TestReader, testEmptyFile) { MemorySink sink{1024, {.pool = pool()}}; DataBufferHolder holder{*pool(), 1024, 0, DEFAULT_PAGE_GROW_RATIO, &sink}; - BufferedOutputStream output{holder}; + facebook::velox::dwio::common::BufferedOutputStream output{holder}; proto::Footer footer; footer.set_numberofrows(0);