Skip to content

Commit

Permalink
refactor scheduler to be simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
cwharris committed Nov 1, 2023
1 parent 2b2f26e commit df8feb8
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 170 deletions.
1 change: 0 additions & 1 deletion cpp/mrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ add_library(libmrc
src/public/core/logging.cpp
src/public/core/thread.cpp
src/public/coroutines/event.cpp
src/public/coroutines/scheduler.cpp
src/public/coroutines/sync_wait.cpp
src/public/coroutines/task_container.cpp
src/public/coroutines/thread_local_context.cpp
Expand Down
79 changes: 5 additions & 74 deletions cpp/mrc/include/mrc/coroutines/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include "mrc/coroutines/task.hpp"

#include <coroutine>
#include <cstddef>
#include <memory>
Expand All @@ -27,87 +29,16 @@ namespace mrc::coroutines {

/**
* @brief Scheduler base class
*
* Allows all schedulers to be discovered via the mrc::this_thread::current_scheduler()
*/
class Scheduler : public std::enable_shared_from_this<Scheduler>
{
public:
struct Operation
{
Operation(Scheduler& scheduler);

constexpr static auto await_ready() noexcept -> bool
{
return false;
}

std::coroutine_handle<> await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept;

constexpr static auto await_resume() noexcept -> void {}

Scheduler& m_scheduler;
std::coroutine_handle<> m_awaiting_coroutine;
Operation* m_next{nullptr};
};

Scheduler();
virtual ~Scheduler() = default;

/**
* @brief Description of Scheduler
*/
virtual std::string description() const = 0;

/**
* Schedules the currently executing coroutine to be run on this thread pool. This must be
* called from within the coroutines function body to schedule the coroutine on the thread pool.
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
* @return The operation to switch from the calling scheduling thread to the executor thread
* pool thread.
*/
[[nodiscard]] virtual auto schedule() -> Operation;

/**
* Schedules any coroutine handle that is ready to be resumed.
* @param handle The coroutine handle to schedule.
*/
virtual auto resume(std::coroutine_handle<> coroutine) -> void = 0;

/**
* Yields the current task to the end of the queue of waiting tasks.
*/
[[nodiscard]] auto yield() -> Operation;

/**
* If the calling thread controlled by a Scheduler, return a pointer to the Scheduler
*/
static auto from_current_thread() noexcept -> Scheduler*;

/**
* If the calling thread is owned by a thread_pool, return the thread index (rank) of the current thread with
* respect the threads in the pool; otherwise, return the std::hash of std::this_thread::get_id
*/
static auto get_thread_id() noexcept -> std::size_t;

protected:
virtual auto on_thread_start(std::size_t) -> void;

private:
/**
* @brief When co_await schedule() is called, this function will be executed by the awaiter. Each scheduler
* implementation should determine how and when to execute the operation.
*
* @param operation The schedule() awaitable pointer
* @return std::coroutine_handle<> Return a coroutine handle to which will be
* used as the return value for await_suspend().
*/
virtual std::coroutine_handle<> schedule_operation(Operation* operation) = 0;

mutable std::mutex m_mutex;
virtual void resume(std::coroutine_handle<> handle) noexcept = 0;

thread_local static Scheduler* m_thread_local_scheduler;
thread_local static std::size_t m_thread_id;
[[nodiscard]] virtual Task<> schedule() = 0;
[[nodiscard]] virtual Task<> yield() = 0;
};

} // namespace mrc::coroutines
71 changes: 0 additions & 71 deletions cpp/mrc/src/public/coroutines/scheduler.cpp

This file was deleted.

64 changes: 40 additions & 24 deletions python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "pymrc/coro.hpp"
#include "pymrc/utilities/acquire_gil.hpp"
#include "pymrc/utilities/object_wrappers.hpp"

#include <boost/fiber/future/async.hpp>
#include <mrc/coroutines/scheduler.hpp>
Expand All @@ -28,52 +29,67 @@
#include <pybind11/pytypes.h>

#include <stdexcept>
#include <utility>

namespace py = pybind11;

namespace mrc::pymrc {

/**
* @brief A MRC Scheduler which allows resuming C++20 coroutines on an Asyncio event loop.
*/
class AsyncioScheduler : public mrc::coroutines::Scheduler
{
public:
AsyncioScheduler(PyObjectHolder loop) : m_loop(std::move(loop)) {}

std::string description() const override
private:
class ContinueOnLoopOperation
{
return "AsyncioScheduler";
}
public:
ContinueOnLoopOperation(PyObjectHolder loop) : m_loop(std::move(loop)) {}

void resume(std::coroutine_handle<> coroutine) override
{
if (coroutine.done())
static bool await_ready() noexcept
{
LOG(WARNING) << "AsyncioScheduler::resume() > Attempted to resume a completed coroutine";
return;
return false;
}

py::gil_scoped_acquire gil;
void await_suspend(std::coroutine_handle<> handle) noexcept
{
AsyncioScheduler::resume(m_loop, handle);
}

// TODO(MDD): Check whether or not we need thread safe version
m_loop.attr("call_soon_threadsafe")(py::cpp_function([this, handle = std::move(coroutine)]() {
if (handle.done())
{
LOG(WARNING) << "AsyncioScheduler::resume() > Attempted to resume a completed coroutine";
return;
}
static void await_resume() noexcept {}

py::gil_scoped_release nogil;
private:
PyObjectHolder m_loop;
};

static void resume(PyObjectHolder loop, std::coroutine_handle<> handle) noexcept
{
pybind11::gil_scoped_acquire acquire;
loop.attr("call_soon_threadsafe")(pybind11::cpp_function([handle]() { //
pybind11::gil_scoped_release release;
handle.resume();
}));
}

private:
std::coroutine_handle<> schedule_operation(Operation* operation) override
public:
AsyncioScheduler(PyObjectHolder loop) : m_loop(std::move(loop)) {}

void resume(std::coroutine_handle<> handle) noexcept override

Check warning on line 77 in python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp#L77

Added line #L77 was not covered by tests
{
AsyncioScheduler::resume(m_loop, handle);

Check warning on line 79 in python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/include/pymrc/asyncio_scheduler.hpp#L79

Added line #L79 was not covered by tests
}

[[nodiscard]] coroutines::Task<> schedule() override
{
this->resume(std::move(operation->m_awaiting_coroutine));
co_await ContinueOnLoopOperation(m_loop);
}

return std::noop_coroutine();
[[nodiscard]] coroutines::Task<> yield() override
{
co_await ContinueOnLoopOperation(m_loop);
}

private:
mrc::pymrc::PyHolder m_loop;
};

Expand Down

0 comments on commit df8feb8

Please sign in to comment.