Skip to content

Commit

Permalink
add boost future awaitable operation test
Browse files Browse the repository at this point in the history
  • Loading branch information
cwharris committed Nov 1, 2023
1 parent ce77e06 commit e733106
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
2 changes: 1 addition & 1 deletion python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class BoostFutureAwaitableOperation
std::move(continuation));
}

auto await_resume() noexcept
auto await_resume()
{
return m_future.get();
}
Expand Down
59 changes: 58 additions & 1 deletion python/mrc/_pymrc/tests/test_asyncio_runnable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "pymrc/utilities/object_wrappers.hpp"

#include "mrc/coroutines/async_generator.hpp"
#include "mrc/coroutines/sync_wait.hpp"
#include "mrc/coroutines/task.hpp"
#include "mrc/node/rx_sink.hpp"
#include "mrc/node/rx_source.hpp"
#include "mrc/options/engine_groups.hpp"
Expand All @@ -31,6 +33,7 @@
#include "mrc/segment/builder.hpp"
#include "mrc/segment/object.hpp"

#include <boost/fiber/operations.hpp>
#include <boost/fiber/policy.hpp>
#include <gtest/gtest.h>
#include <pybind11/cast.h>
Expand All @@ -44,6 +47,8 @@
#include <atomic>
#include <coroutine>
#include <memory>
#include <stdexcept>
#include <stop_token>
#include <string>
#include <utility>

Expand Down Expand Up @@ -163,7 +168,7 @@ TEST_F(TestAsyncioRunnable, UseAsyncioTasks)
auto options = std::make_shared<mrc::Options>();
options->topology().user_cpuset("0");
// AsyncioRunnable only works with the Thread engine due to asyncio loops being thread-specific.
options->engine_factories().set_default_engine_type(runnable::EngineType::Thread);
options->engine_factories().set_default_engine_type(mrc::runnable::EngineType::Thread);

pymrc::Executor exec{options};
exec.register_pipeline(p);
Expand All @@ -173,3 +178,55 @@ TEST_F(TestAsyncioRunnable, UseAsyncioTasks)

EXPECT_EQ(counter, 60);
}

template <typename OperationT>
auto run_operation(OperationT& operation) -> mrc::coroutines::Task<int>
{
auto stop_source = std::stop_source();

auto coro = [](auto& operation, auto stop_source) -> mrc::coroutines::Task<int> {
try
{
auto value = co_await operation();
stop_source.request_stop();
co_return value;
} catch (...)
{
stop_source.request_stop();
throw;
}
}(operation, stop_source);

coro.resume();

while (not stop_source.stop_requested())
{
if (boost::fibers::has_ready_fibers())
{
boost::this_fiber::yield();
}
}

co_return co_await coro;
}

TEST_F(TestAsyncioRunnable, BoostFutureAwaitableOperationCanReturn)
{
auto operation = mrc::pymrc::BoostFutureAwaitableOperation<int()>([]() {
using namespace std::chrono_literals;
boost::this_fiber::sleep_for(10ms);
return 5;
});

ASSERT_EQ(mrc::coroutines::sync_wait(run_operation(operation)), 5);
}

TEST_F(TestAsyncioRunnable, BoostFutureAwaitableOperationCanThrow)
{
auto operation = mrc::pymrc::BoostFutureAwaitableOperation<int()>([]() {
throw std::runtime_error("oops");
return 5;
});

ASSERT_THROW(mrc::coroutines::sync_wait(run_operation(operation)), std::runtime_error);
}

0 comments on commit e733106

Please sign in to comment.