Skip to content

Commit

Permalink
RemoteIO: use a pinned bounce buffer (#519)
Browse files Browse the repository at this point in the history
We use a bounce buffer to avoid many small memory copies to device. Libcurl has a maximum chunk size of 16kb (`CURL_MAX_WRITE_SIZE`) but chunks are often much smaller.

For now, the bounce buffer isn't double buffered, let's defer that to later: #520

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #519
  • Loading branch information
madsbk authored Oct 30, 2024
1 parent 607c6a5 commit b65eda3
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class AllocRetain {
// The size of each allocation in `_free_allocs`
std::size_t _size{defaults::bounce_buffer_size()};

public:
/**
* @brief An host memory allocation
*/
Expand All @@ -56,6 +57,7 @@ class AllocRetain {
Alloc& operator=(Alloc&& o) = delete;
~Alloc() noexcept { _manager->put(_alloc, _size); }
void* get() noexcept { return _alloc; }
void* get(std::ptrdiff_t offset) noexcept { return static_cast<char*>(_alloc) + offset; }
std::size_t size() noexcept { return _size; }
};

Expand All @@ -67,6 +69,7 @@ class AllocRetain {
// <https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization>
~AllocRetain() noexcept = default;

private:
/**
* @brief Free all retained allocations
*
Expand Down
110 changes: 104 additions & 6 deletions cpp/include/kvikio/remote_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
#pragma once

#include <cassert>
#include <cstddef>
#include <cstring>
#include <iostream>
#include <memory>
#include <optional>
#include <regex>
Expand All @@ -34,6 +36,98 @@
namespace kvikio {
namespace detail {

/**
* @brief Bounce buffer in pinned host memory.
*
* @note Is not thread-safe.
*/
class BounceBufferH2D {
CUstream _stream; // The CUDA steam to use.
CUdeviceptr _dev; // The output device buffer.
AllocRetain::Alloc _host_buffer; // The host buffer to bounce data on.
std::ptrdiff_t _dev_offset{0}; // Number of bytes written to `_dev`.
std::ptrdiff_t _host_offset{0}; // Number of bytes written to `_host` (resets on flush).

public:
/**
* @brief Create a bounce buffer for an output device buffer.
*
* @param stream The CUDA stream used throughout the lifetime of the bounce buffer.
* @param device_buffer The output device buffer (final destination of the data).
*/
BounceBufferH2D(CUstream stream, void* device_buffer)
: _stream{stream},
_dev{convert_void2deviceptr(device_buffer)},
_host_buffer{AllocRetain::instance().get()}
{
}

/**
* @brief The bounce buffer if flushed to device on destruction.
*/
~BounceBufferH2D() noexcept
{
try {
flush();
} catch (CUfileException const& e) {
std::cerr << "BounceBufferH2D error on final flush: ";
std::cerr << e.what();
std::cerr << std::endl;
}
}

private:
/**
* @brief Write host memory to the output device buffer.
*
* @param src The host memory source.
* @param size Number of bytes to write.
*/
void write_to_device(void const* src, std::size_t size)
{
if (size > 0) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(_dev + _dev_offset, src, size, _stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream));
_dev_offset += size;
}
}

/**
* @brief Flush the bounce buffer by writing everything to the output device buffer.
*/
void flush()
{
write_to_device(_host_buffer.get(), _host_offset);
_host_offset = 0;
}

public:
/**
* @brief Write host memory to the bounce buffer (also host memory).
*
* Only when the bounce buffer has been filled up is data copied to the output device buffer.
*
* @param data The host memory source.
* @param size Number of bytes to write.
*/
void write(char const* data, std::size_t size)
{
if (_host_buffer.size() - _host_offset < size) { // Not enough space left in the bounce buffer
flush();
assert(_host_offset == 0);
}
if (_host_buffer.size() < size) {
// If still not enough space, we just copy the data to the device. This only happens when
// `defaults::bounce_buffer_size()` is smaller than 16kb thus no need to performance
// optimize for this case.
write_to_device(data, size);
} else if (size > 0) {
std::memcpy(_host_buffer.get(_host_offset), data, size);
_host_offset += size;
}
}
};

/**
* @brief Context used by the "CURLOPT_WRITEFUNCTION" callbacks.
*/
Expand All @@ -46,6 +140,7 @@ struct CallbackContext {
: buf{static_cast<char*>(buf)}, size{size}, offset{0}, overflow_error{0}
{
}
BounceBufferH2D* bounce_buffer{nullptr}; // Only used by callback_device_memory
};

/**
Expand Down Expand Up @@ -98,12 +193,7 @@ inline std::size_t callback_device_memory(char* data,
}
KVIKIO_NVTX_FUNC_RANGE("RemoteHandle - callback_device_memory()", nbytes);

CUstream stream = detail::StreamsByThread::get();
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(
convert_void2deviceptr(ctx->buf + ctx->offset), data, nbytes, stream));
// We have to sync since curl might overwrite or free `data`.
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));

ctx->bounce_buffer->write(data, nbytes);
ctx->offset += nbytes;
return nbytes;
}
Expand Down Expand Up @@ -414,6 +504,10 @@ class RemoteHandle {
/**
* @brief Read from remote source into buffer (host or device memory).
*
* When reading into device memory, a bounce buffer is used to avoid many small memory
* copies to device. Use `kvikio::default::bounce_buffer_size_reset()` to set the size
* of this bounce buffer (default 16 MiB).
*
* @param buf Pointer to host or device memory.
* @param size Number of bytes to read.
* @param file_offset File offset in bytes.
Expand Down Expand Up @@ -450,6 +544,10 @@ class RemoteHandle {
curl.perform();
} else {
PushAndPopContext c(get_context_from_pointer(buf));
// We use a bounce buffer to avoid many small memory copies to device. Libcurl has a
// maximum chunk size of 16kb (`CURL_MAX_WRITE_SIZE`) but chunks are often much smaller.
detail::BounceBufferH2D bounce_buffer(detail::StreamsByThread::get(), buf);
ctx.bounce_buffer = &bounce_buffer;
curl.perform();
}
} catch (std::runtime_error const& e) {
Expand Down

0 comments on commit b65eda3

Please sign in to comment.