Skip to content

Commit

Permalink
feat: Add BufferedOutputStream
Browse files Browse the repository at this point in the history
Summary:
Context:
I plan to update the PrestoBatchVectorSerializer to write directly to the OutputStream rather than going through
VectorStreams as I've seen this can greatly improve the speed.  However, this ends up with many small writes to
the OutputStream, which is typically a ByteOutputStream, so this leads to a lot of small allocations which is
counter productive to the optimization I'm trying to make.

To address this I add a BufferedOutputStream which wraps around another OutputStream, coalesces writes in a 
buffer, and flushes those as large writes to the wrapped OutputStream as the buffer fills up, or as needed.

In my experiments I've seen the cost of the additional copy is far less then the cost of the tiny allocations.

Differential Revision: D67997655
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jan 10, 2025
1 parent c965e7c commit cc9cffa
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
48 changes: 48 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,54 @@ class OutputStream {
OutputStreamListener* listener_;
};

/// An OutputStream that wraps another and coalesces writes into a buffer before
/// flushing them as large writes to the wrapped OutputStream.
///
/// Note that you must call flush before calling tellp or seekp and at the end
/// of writing to ensure the changes propagate to the wrapped OutputStream.
class BufferedOutputStream : public OutputStream {
public:
BufferedOutputStream(
OutputStream* out,
StreamArena* arena,
int32_t bufferSize = 1 << 20)
: OutputStream(), out_(out) {
arena->newRange(bufferSize, nullptr, &buffer_);
}

void write(const char* s, std::streamsize count) override {
auto remaining = count;
do {
auto copyLength =
std::min(remaining, (int64_t)buffer_.size - buffer_.position);
simd::memcpy(
buffer_.buffer + buffer_.position, s + count - remaining, copyLength);
buffer_.position += copyLength;
remaining -= copyLength;
if (buffer_.position == buffer_.size) {
flush();
}
} while (remaining > 0);
}

std::streampos tellp() const {
return out_->tellp();
}

void seekp(std::streampos pos) {
out_->seekp(pos);
}

void flush() {
out_->write(reinterpret_cast<char*>(buffer_.buffer), buffer_.position);
buffer_.position = 0;
}

private:
OutputStream* out_;
ByteRange buffer_;
};

class OStreamOutputStream : public OutputStream {
public:
explicit OStreamOutputStream(
Expand Down
39 changes: 39 additions & 0 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,45 @@ TEST_F(ByteStreamTest, outputStream) {
EXPECT_EQ(0, mmapAllocator_->numAllocated());
}

TEST_F(ByteStreamTest, bufferedOutputStream) {
auto arena = newArena();
auto out = std::make_unique<IOBufOutputStream>(*pool_, nullptr, 10000);
auto buffered =
std::make_unique<BufferedOutputStream>(out.get(), arena.get());

std::stringstream referenceSStream;
auto reference = std::make_unique<OStreamOutputStream>(&referenceSStream);
for (auto i = 0; i < 1000; ++i) {
std::string data;
data.resize(10000);
std::fill(data.begin(), data.end(), i);
buffered->write(data.data(), data.size());
reference->write(data.data(), data.size());
}
buffered->flush();
EXPECT_EQ(reference->tellp(), buffered->tellp());
EXPECT_EQ(out->tellp(), buffered->tellp());

for (auto i = 0; i < 100; ++i) {
std::string data;
data.resize(6000);
std::fill(data.begin(), data.end(), i + 10);
buffered->seekp(i * 10000 + 5000);
reference->seekp(i * 10000 + 5000);
buffered->write(data.data(), data.size());
reference->write(data.data(), data.size());
buffered->flush();
}

auto str = referenceSStream.str();
auto iobuf = out->getIOBuf();
auto outData = iobuf->coalesce();
EXPECT_EQ(
str,
std::string(
reinterpret_cast<const char*>(outData.data()), outData.size()));
}

TEST_F(ByteStreamTest, newRangeAllocation) {
const int kPageSize = AllocationTraits::kPageSize;
struct {
Expand Down

0 comments on commit cc9cffa

Please sign in to comment.