Skip to content

Commit

Permalink
move exception catcher to MRC
Browse files Browse the repository at this point in the history
  • Loading branch information
cwharris committed Oct 31, 2023
1 parent d456588 commit 222edcd
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 37 deletions.
1 change: 1 addition & 0 deletions cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ add_library(libmrc
src/public/cuda/sync.cpp
src/public/edge/edge_adapter_registry.cpp
src/public/edge/edge_builder.cpp
src/public/exceptions/exception_catcher.cpp
src/public/manifold/manifold.cpp
src/public/memory/buffer_view.cpp
src/public/memory/codable/buffer.cpp
Expand Down
34 changes: 34 additions & 0 deletions cpp/mrc/include/mrc/exceptions/exception_catcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <exception>
#include <mutex>
#include <queue>

namespace mrc {

/**
* @brief A utility for catching out-of-stack exceptions in a thread-safe manner such that they
* can be checked and throw from a parent thread.
*/
class ExceptionCatcher
{
public:
/**
* @brief "catches" an exception to the catcher
*/
void push_exception(std::exception_ptr ex);

/**
* @brief checks to see if any exceptions have been "caught" by the catcher.
*/
bool has_exception();

/**
* @brief rethrows the next exception (in the order in which it was "caught").
*/
void rethrow_next_exception();

private:
std::mutex m_mutex{};
std::queue<std::exception_ptr> m_exceptions{};
};

} // namespace mrc
33 changes: 33 additions & 0 deletions cpp/mrc/src/public/exceptions/exception_catcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <mrc/exceptions/exception_catcher.hpp>

namespace mrc {

void ExceptionCatcher::push_exception(std::exception_ptr ex)

Check warning on line 5 in cpp/mrc/src/public/exceptions/exception_catcher.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/exceptions/exception_catcher.cpp#L5

Added line #L5 was not covered by tests
{
auto lock = std::lock_guard(m_mutex);
m_exceptions.push(ex);

Check warning on line 8 in cpp/mrc/src/public/exceptions/exception_catcher.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/exceptions/exception_catcher.cpp#L7-L8

Added lines #L7 - L8 were not covered by tests
}

bool ExceptionCatcher::has_exception()
{
auto lock = std::lock_guard(m_mutex);
return not m_exceptions.empty();
}

void ExceptionCatcher::rethrow_next_exception()
{
auto lock = std::lock_guard(m_mutex);

if (m_exceptions.empty())
{
return;
}

auto ex = m_exceptions.front();

Check warning on line 26 in cpp/mrc/src/public/exceptions/exception_catcher.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/exceptions/exception_catcher.cpp#L26

Added line #L26 was not covered by tests

m_exceptions.pop();

Check warning on line 28 in cpp/mrc/src/public/exceptions/exception_catcher.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/exceptions/exception_catcher.cpp#L28

Added line #L28 was not covered by tests

std::rethrow_exception(ex);

Check warning on line 30 in cpp/mrc/src/public/exceptions/exception_catcher.cpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/src/public/exceptions/exception_catcher.cpp#L30

Added line #L30 was not covered by tests
}

} // namespace mrc
39 changes: 2 additions & 37 deletions python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mrc/coroutines/closable_ring_buffer.hpp>
#include <mrc/coroutines/task.hpp>
#include <mrc/coroutines/task_container.hpp>
#include <mrc/exceptions/exception_catcher.hpp>
#include <mrc/node/sink_properties.hpp>
#include <mrc/runnable/forward.hpp>

Expand All @@ -35,41 +36,6 @@

namespace mrc::pymrc {

class ExceptionCatcher
{
public:
void set_exception(std::exception_ptr ex)
{
auto lock = std::lock_guard(m_mutex);
m_exceptions.push(ex);
}

bool has_exception()
{
auto lock = std::lock_guard(m_mutex);
return not m_exceptions.empty();
}

void rethrow_next_exception()
{
auto lock = std::lock_guard(m_mutex);

if (m_exceptions.empty())
{
return;
}

auto ex = m_exceptions.front();
m_exceptions.pop();

std::rethrow_exception(ex);
}

private:
std::mutex m_mutex{};
std::queue<std::exception_ptr> m_exceptions{};
};

template <typename SignatureT>
class BoostFutureAwaiter
{
Expand Down Expand Up @@ -367,8 +333,7 @@ coroutines::Task<> AsyncioRunnable<InputT, OutputT>::process_one(InputT&& value,
}
} catch (...)
{
// TODO(cwharris): communicate error back to the runnable's main main task
catcher.set_exception(std::current_exception());
catcher.push_exception(std::current_exception());
}

// Return the slot to the task buffer
Expand Down

0 comments on commit 222edcd

Please sign in to comment.