Skip to content

Commit

Permalink
Add test to reproduce Morpheus issue #2086
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Dec 18, 2024
1 parent 7961e5a commit 680ea4c
Showing 1 changed file with 59 additions and 1 deletion.
60 changes: 59 additions & 1 deletion python/mrc/_pymrc/tests/test_asyncio_runnable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Scheduler;

namespace py = pybind11;
namespace pymrc = mrc::pymrc;
using namespace std::chrono_literals;
using namespace std::string_literals;
using namespace py::literals;

Expand Down Expand Up @@ -316,7 +317,6 @@ auto run_operation(OperationT& operation) -> mrc::coroutines::Task<int>
TEST_F(TestAsyncioRunnable, BoostFutureAwaitableOperationCanReturn)
{
auto operation = mrc::pymrc::BoostFutureAwaitableOperation<int()>([]() {
using namespace std::chrono_literals;
boost::this_fiber::sleep_for(10ms);
return 5;
});
Expand All @@ -333,3 +333,61 @@ TEST_F(TestAsyncioRunnable, BoostFutureAwaitableOperationCanThrow)

ASSERT_THROW(mrc::coroutines::sync_wait(run_operation(operation)), std::runtime_error);
}

TEST_F(TestAsyncioRunnable, UseAsyncioTasksThrows2086)
{
// Reproduces Morpheus issue #2086 where an exception is thrown in Async Python code, and the source does not emit
// any additional values. When the source emits an additional value or calls on_completed, the pipeline completes
// and the exception is thrown to the caller.
pymrc::Pipeline p;

py::object globals = py::globals();
py::exec(
R"(
async def fn(value):
if value == 1:
raise RuntimeError("oops")
print(value)
)",
globals);

pymrc::PyObjectHolder fn = static_cast<py::object>(globals["fn"]);

auto init = [&fn](mrc::segment::IBuilder& seg) {
auto src = seg.make_source<int>("src", [](rxcpp::subscriber<int>& s) {
int i = 0;
while (s.is_subscribed())
{
if (i < 2)
{
s.on_next(i);
++i;
}
else
{
boost::this_fiber::sleep_for(10ms);
}
}

s.on_completed();
});

auto sink = seg.construct_object<PythonCallbackAsyncioRunnable>("sink", fn);

seg.make_edge(src, sink);
};

p.make_segment("seg1"s, init);

auto options = std::make_shared<mrc::Options>();

// AsyncioRunnable only works with the Thread engine due to asyncio loops being thread-specific.
options->engine_factories().set_default_engine_type(mrc::runnable::EngineType::Thread);

pymrc::Executor exec{options};
exec.register_pipeline(p);

exec.start();

ASSERT_THROW(exec.join(), std::runtime_error);
}

0 comments on commit 680ea4c

Please sign in to comment.