Skip to content

Commit

Permalink
feat: Add BufferedOutputStream (#12052)
Browse files Browse the repository at this point in the history
Summary:

Context:
I plan to update the PrestoBatchVectorSerializer to write directly to the OutputStream rather than going through
VectorStreams as I've seen this can greatly improve the speed.  However, this ends up with many small writes to
the OutputStream, which is typically a ByteOutputStream, so this leads to a lot of small allocations which is
counter productive to the optimization I'm trying to make.

To address this I add a BufferedOutputStream which wraps around another OutputStream, coalesces writes in a 
buffer, and flushes those as large writes to the wrapped OutputStream as the buffer fills up, or as needed.

In my experiments I've seen the cost of the additional copy is far less then the cost of the tiny allocations.

Differential Revision: D67997655
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jan 10, 2025
1 parent 9dcfd39 commit de59cc7
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
48 changes: 48 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,54 @@ class OutputStream {
OutputStreamListener* listener_;
};

/// An OutputStream that wraps another and coalesces writes into a buffer before
/// flushing them as large writes to the wrapped OutputStream.
///
/// Note that you must call flush before calling tellp or seekp and at the end
/// of writing to ensure the changes propagate to the wrapped OutputStream.
class BufferedOutputStream : public OutputStream {
public:
BufferedOutputStream(
OutputStream* out,
StreamArena* arena,
int32_t bufferSize = 1 << 20)
: OutputStream(), out_(out) {
arena->newRange(bufferSize, nullptr, &buffer_);
}

void write(const char* s, std::streamsize count) override {
auto remaining = count;
do {
auto copyLength =
std::min(remaining, (int64_t)buffer_.size - buffer_.position);
simd::memcpy(
buffer_.buffer + buffer_.position, s + count - remaining, copyLength);
buffer_.position += copyLength;
remaining -= copyLength;
if (buffer_.position == buffer_.size) {
flush();
}
} while (remaining > 0);
}

std::streampos tellp() const override {
return out_->tellp();
}

void seekp(std::streampos pos) override {
out_->seekp(pos);
}

void flush() {
out_->write(reinterpret_cast<char*>(buffer_.buffer), buffer_.position);
buffer_.position = 0;
}

private:
OutputStream* out_;
ByteRange buffer_;
};

class OStreamOutputStream : public OutputStream {
public:
explicit OStreamOutputStream(
Expand Down
39 changes: 39 additions & 0 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,45 @@ TEST_F(ByteStreamTest, outputStream) {
EXPECT_EQ(0, mmapAllocator_->numAllocated());
}

TEST_F(ByteStreamTest, bufferedOutputStream) {
auto arena = newArena();
auto out = std::make_unique<IOBufOutputStream>(*pool_, nullptr, 10000);
auto buffered =
std::make_unique<BufferedOutputStream>(out.get(), arena.get());

std::stringstream referenceSStream;
auto reference = std::make_unique<OStreamOutputStream>(&referenceSStream);
for (auto i = 0; i < 1000; ++i) {
std::string data;
data.resize(10000);
std::fill(data.begin(), data.end(), i);
buffered->write(data.data(), data.size());
reference->write(data.data(), data.size());
}
buffered->flush();
EXPECT_EQ(reference->tellp(), buffered->tellp());
EXPECT_EQ(out->tellp(), buffered->tellp());

for (auto i = 0; i < 100; ++i) {
std::string data;
data.resize(6000);
std::fill(data.begin(), data.end(), i + 10);
buffered->seekp(i * 10000 + 5000);
reference->seekp(i * 10000 + 5000);
buffered->write(data.data(), data.size());
reference->write(data.data(), data.size());
buffered->flush();
}

auto str = referenceSStream.str();
auto iobuf = out->getIOBuf();
auto outData = iobuf->coalesce();
EXPECT_EQ(
str,
std::string(
reinterpret_cast<const char*>(outData.data()), outData.size()));
}

TEST_F(ByteStreamTest, newRangeAllocation) {
const int kPageSize = AllocationTraits::kPageSize;
struct {
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1713,7 +1713,7 @@ VELOX_INSTANTIATE_TEST_SUITE_P(
TEST_F(TestReader, testEmptyFile) {
MemorySink sink{1024, {.pool = pool()}};
DataBufferHolder holder{*pool(), 1024, 0, DEFAULT_PAGE_GROW_RATIO, &sink};
BufferedOutputStream output{holder};
facebook::velox::dwio::common::BufferedOutputStream output{holder};

proto::Footer footer;
footer.set_numberofrows(0);
Expand Down

0 comments on commit de59cc7

Please sign in to comment.