Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate the KJ event loop into Python's asyncio event loop #310

Merged
merged 47 commits into from
Jun 6, 2023

Conversation

LasseBlaauwbroek
Copy link
Contributor

@LasseBlaauwbroek LasseBlaauwbroek commented Mar 27, 2023

Fix #256

This PR attempts to remove the slow and expensive polling behavior for asyncio in favor of proper linking of the KJ event loop to the asyncio event loop.

This is a highly experimental proof-of-concept. There are probably memory leaks all over the place; my knowledge of both the C++ side and the Cython side is limited. However, early feedback from @kentonv, @haata, and others would be highly appreciated. Also, don't hesitate to push any and all improvement to this branch (or another branch).

Current state: The example in examples/async_client.py is functional, but its counterpart examples/async_server.py is not because timers are not yet implemented. So to run this, the server needs to be started with a version of pycapnp without these changes.

Most of the code in capnp/helpers/asyncProvider.cpp was taken from @kentonv's nodejs implementation. So I'm hereby asking for permission to copy that. If that's okay, let me know what kind of attribution you might want.

@kentonv
Copy link
Member

kentonv commented Mar 27, 2023

Very cool to see this, I haven't looked that closely but from the description this sounds like the right way to solve this problem. It's nice to see it is finally happening. I'm pretty overloaded so will have to mostly leave this review to @haata but let me know if there are any specific questions I can help answer.

Most of the code in capnp/helpers/asyncProvider.cpp was taken from @kentonv's nodejs implementation. So I'm hereby asking for permission to copy that. If that's okay, let me know what kind of attribution you might want.

The license is BSD 2-clause, which does not require permission: https://github.com/capnproto/node-capnp/blob/node14/LICENSE

I am not a lawyer and this is not legal advice, but what I typically do is put a comment in the code saying something like "derived from X, which has the following license:" then reproduce the copyright and license file.

@LasseBlaauwbroek
Copy link
Contributor Author

Thanks @kentonv. The licensing is a bit tricky, because the file this is taken from lists MIT at the top, while the repository license specifies BSD: https://github.com/capnproto/node-capnp/blob/node10/src/node-capnp/capnp.cc

@kentonv
Copy link
Member

kentonv commented Mar 27, 2023

Oh interesting, apparently I changed the license in: capnproto/node-capnp@9b3e84b

But I forgot to update the LICENSE file.

The two licenses are functionally equivalent. I'm happy with you citing either one of them.

@haata
Copy link
Collaborator

haata commented Mar 27, 2023

Nice! I'll try to take some time this week to play around with the code as well.
Some things to watch out for

  • Make sure SSL works (that's actually what made me a maintainer of pycapnp in the first place, lol). My original experiments were a rust canpnproto server with pycapnp client. Python is pretty picky about how SSL is implemented (which is what led be to asyncio in the first place).
  • While you don't need explicit test cases for it, make sure to test errors from all sorts of place (KJ, inside python, before/after SSL connection, client dies, server dies, etc.). I've found that asyncio can be tricky and it's easy to get stuck in an infinite loop somewhere.
  • Nothing against Kenton's excellent libcapnp and KJ, but often times the errors can be a bit opaque when looking at a Python backtrace (as it's easy just to push up a generic exception). Generating better backtraces for known types of errors will help everyone who uses pycapnp a lot. Don't worry about doing this in an exhaustive way, just the most common types of exceptions that you notice during testing.

@LasseBlaauwbroek
Copy link
Contributor Author

LasseBlaauwbroek commented Mar 28, 2023

Okay, everything is basically functional now. The main problem I'm trying to deal with now is the clean shutdown of the event loop. This leads to some problems in clients running this interface: The client connects to a server and registers a StatusSubscriber interface with the server. The server then keeps calling this interface even when the client is already done with its "main" actions. When I just exit the asyncio loop, things error out because the KJ loop is still running...

@kentonv could you suggest how to shut that down gracefully? I was looking at TwoPartyVatNetwork::shutdown() but that is a private function...

@LasseBlaauwbroek LasseBlaauwbroek force-pushed the remove-polling branch 2 times, most recently from 3ae9f25 to f2c7473 Compare March 29, 2023 15:28
@LasseBlaauwbroek
Copy link
Contributor Author

LasseBlaauwbroek commented Mar 29, 2023

All tests now pass on linux and macos. Windows still fails due to usage of unix-specific apis. But otherwise, this is probably ready for a round of review.

@haata
Copy link
Collaborator

haata commented Apr 3, 2023

Yeah, it looks like something like this for Windows https://stackoverflow.com/questions/20543940/where-do-i-get-arpa-inet-h

Copy link
Collaborator

@haata haata left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it a good read over and I think it looks pretty good overall. I really like how it makes some of the examples a lot simpler.

I want to try using this with my existing pycapnp library to see how it performs. Fingers crossed I should be able to do this by the end of the week.


def __dealloc__(self):
del self.thisptr

cpdef _connect(self, host_string):
# TODO: Make async
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can KJ help with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #311 for my proposal w.r.t. file descriptors. This would solve async connection issue.

capnp/lib/capnp.pyx Outdated Show resolved Hide resolved
@@ -37,7 +37,7 @@ def alive(self, **kwargs):

class Server:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would be worthwhile to have an alternative api to use the internal KJ TLS as well?

It might be a bit simpler of an API (depending on how much work it is to use). I don't think it's necessary to do as part of this PR unless you think it's easy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #311 for my proposal w.r.t. file descriptors. This would solve the TLS issue.

@LasseBlaauwbroek
Copy link
Contributor Author

@kentonv, I'm still having trouble cleanly shutting down the event loop. I cannot control when Python's event loop shuts down, and it makes sense to me to shut down the kj event loop as well at that point. Even if some promises are still running. Otherwise, errors will occur.

The problem here, is that after shutting down the loop, references to objects like TwoPartyClient, DynamicCapability::Client and other may still exist. I cannot control that. As a result, when the WaitScope object I have allocated is destroyed before such objects are destroyed, I tend to get segmentation faults. It seems that the destructors of these objects assume that a running kj loop exists.

Is it just plain impossible to shut down the event loop while objects that are using the loop still exist?

@kentonv
Copy link
Member

kentonv commented Apr 3, 2023

Is it just plain impossible to shut down the event loop while objects that are using the loop still exist?

Yes, this leads to dangling pointers and use-after-free.

I haven't looked at your code so this may or may not make sense, but I imagine the solution would be something like:

  • The EventPort, EventLoop, WaitScope, etc. are all owned by some sort of refcounted object.
  • Every Python wrapper around KJ I/O objects holds a reference to that main refcounted object.
  • When the Python event loop is shut down, your EventPort implementation should respond by causing all current and future promises to reject with an exception. However, the EventPort itself would continue to exist until all references are dropped.

@LasseBlaauwbroek
Copy link
Contributor Author

@kentonv that makes sense. However:

[..] your EventPort implementation should respond by causing all current and future promises to reject with an exception

What is the best way to do this? As far as I can see, neither the EventPort nor the EventLoop can wholesale cancel all promises. Should I create a giant kj::Canceller owned by the EventPort, and wrap every promise ever created into that canceller?

@kentonv
Copy link
Member

kentonv commented Apr 3, 2023

Sorry, to clarify, by "all current and future promises" I specifically meant promises created by the EventPort itself to represent fundamental events. Promises created by the app need not be canceled explicitly, as long as the EventLoop still exists.

@LasseBlaauwbroek
Copy link
Contributor Author

LasseBlaauwbroek commented Apr 6, 2023

@kentonv I just tried my best interpretation of your approach:

[..] your EventPort implementation should respond by causing all current and future promises to reject with an exception.
[..] I specifically meant promises created by the EventPort itself to represent fundamental events.

I'm not exactly sure what you mean by 'fundamental events'. Are you talking about events in the Python loop? Those have already been cancelled during the closing of the event loop...

Basically, what I need to decide is how to respond to calls of EventPort::setRunnable() and friends when no Pyhon loop exists anymore. I see three options:

  1. Throw a Python exception. This doesn't work, because setRunnable is invoked by C++ code, which doesn't understand Python exceptions (what happens in practice is that a warning about the exception gets printed to stdout).
  2. Throw a C++ exception. I just tried this, but this doesn't seem satisfactory either. Problem is that the eventport gets called during C++ destructors, after which the program gets terminated. I don't think Python code can recover from that.
  3. Just silently return from setRunnable() without scheduling any event. I'm not sure what this means for the objects that rely on the loop? Will this mean that the destructors of KJ I/O classes will potentially hang indefinitely?

@kentonv
Copy link
Member

kentonv commented Apr 10, 2023

I'm not exactly sure what you mean by 'fundamental events'.

I mean the promises that the EventPort itself returns. As opposed to other promises created by the app derived from those. My point here is you do not need to cancel and destroy every existing promise, you just need everything which is specifically waiting on an event from the EventPort to receive an exception.

Presumably, once the Python event loop is gone, then the KJ event loop can only continue until it is empty. Once it's empty, then there's no way for it to become non-empty again, because no further events can ever be delivered... I think? I guess if Python code is still running it can still call pycapnp in a way that schedules new events, but maybe the answer is to have all the Python functions throw Python exceptions if the event loop no longer exists?

Will this mean that the destructors of KJ I/O classes will potentially hang indefinitely?

No, destructors never wait for the loop. They might queue an event, but they expect that event to run later, they don't wait for it.

@LasseBlaauwbroek
Copy link
Contributor Author

LasseBlaauwbroek commented Apr 10, 2023

I mean the promises that the EventPort itself returns.

Unfortunately, I still don't understand what that means. As far as I can tell, the EventPort API never returns a kj::Promise. Or any other promise for that matter. The only thing it does is schedule an event to be run in the Python loop...

Presumably, once the Python event loop is gone, then the KJ event loop can only continue until it is empty.

I think that if (1) the Python event loop is gone and (2) the KJ event loop is non-empty, the KJ event loop will never become empty again in a non-exceptional way. Once Python's loop is gone, I have no way of scheduling events through EventPort::setRunnable anymore. So the only way for the loop to become empty is if all the promises are canceled. But I guess that's fine. When Python's asyncio loop stops, it cancels all coroutines that run in the background, so that should cancel most KJ promises. For anything else, we can expect the user to clean up after himself (or just exit the program).

I guess if Python code is still running it can still call pycapnp in a way that schedules new events, but maybe the answer is to have all the Python functions throw Python exceptions if the event loop no longer exists?

Yes, that is my plan.

No, destructors never wait for the loop.

Thanks, that's good to know.

@kentonv
Copy link
Member

kentonv commented Apr 10, 2023

As far as I can tell, the EventPort API never returns a kj::Promise. Or any other promise for that matter.

OK I see the confusion. kj::UnixEventPort does sport an API for listening on various low-level events, e.g. waiting for a file descriptor to become readable or writable. Everything else (e.g. the AsyncIoProivder implementation) is built on top of that. It has to be that way because UnixEventPort implements the listening, via epoll, so it has to know about all events it's listening for.

Your code here is a bit different, the AsyncIoProvider implementation uses the Python APIs directly to listen for events, so the EventPort is sort of a side object that's just used to control the event loop.

Anyway, what I meant was, for example, the promise OwnedFileDescriptor::onReadable() should be designed to reject with an exception if the Python event loop shuts down. Probably the easiest way to accomplish this is to have a kj::Canceler that you use to wrap the returned promise, and cancel it at shutdown.

OTOH, you do NOT need to wrap the promise returned by, say, PyIoStream::tryRead(). You can assume the exception from the lower-level onReadable() promise will propagate up to cause this to reject as well.

Once Python's loop is gone, I have no way of scheduling events through EventPort::setRunnable anymore.

I see, for some reason I was imagining that the python event loop would be shut down as a result of a call made by the app during a KJ event, i.e. the KJ event loop is currently running and will continue to empty its queue.

But I realize now I have no idea when the python event loop shuts down. In fact it sounds to me like something that shouldn't really happen except at program exit? Anyway, I guess I don't know the right answer in that case.

@LasseBlaauwbroek
Copy link
Contributor Author

Anyway, what I meant was, for example, the promise OwnedFileDescriptor::onReadable() should be designed to reject with an exception if the Python event loop shuts down.

Okay, yes, I get it now. That makes a lot of sense. Thanks.

Your code here is a bit different, the AsyncIoProvider implementation uses the Python APIs directly to listen for events

For the record: I'm not very happy with Python's API, because its cross-platform support is somewhat lacking. I'd be happier to reuse some KJ API that lets me poll for events. But as far as I can see, the code for that is not publicly exposed. And I'm not super keen on copying all that code over and maintaining it separately. Do you have a better solution for that?

But I realize now I have no idea when the python event loop shuts down

Python has a asyncio.run(my_coroutine) function that starts up an event loop and shuts it down when the coroutine is done. This function is usually run top-level and only once. But you could potentially run it multiple times (which will start multiple subsequent event loops). But it's possible to leak async objects from asyncio.run. This happens, for example, when you have some background task that pings a server every so often. If the user doesn't shut that task down properly, it might leak. But I think that with your comments I can solve that now.

@kentonv
Copy link
Member

kentonv commented Apr 10, 2023

I'd be happier to reuse some KJ API that lets me poll for events. But as far as I can see, the code for that is not publicly exposed.

This is hard because at the bottom of the stack, kj::UnixEventPort is waiting for all I/O events using a single unified poll() loop (or using epoll on Linux, kqueue on BSD, etc.), and you can't really have two poll() loops running in the same thread. The Python asyncio system presumably has its own poll loop, so you can't use UnixEventPort at the same time.

I tried to design the KJ APIs to allow building on top of some other system's event loop. This is exactly what you did in this PR.

It's possible there's a different level of abstraction that would have worked better here, like maybe if UnixEventPort's API were abstract, you could implement that directly on top of the Python event loop, and then you'd be able to reuse the AsyncIoProvider implementation that sits on UnixEventPort. I'm not really sure, though, because UnixEventPort is very much tied to file descriptors and unix-isms -- it makes no sense on Windows. But I'd expect many of these other event loops are designed to be somewhat cross-platform. AsyncIoProvider is the only abstraction I've come up with which avoids being platform-specific.

@LasseBlaauwbroek
Copy link
Contributor Author

Okay, fair enough. I guess we'll just have to live with Python's API here. And if we ever do #311, then the situation will improve because Python's higher level API's are more cross-platform.

Thanks!

It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.
Copy link
Collaborator

@haata haata left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change from a_wait() to await(). We'll probably need to keep a_wait() around for a bit, but I think it would be good to deprecate it over a release or two.

@@ -317,7 +306,7 @@ ctypedef fused PromiseTypes:
_Promise
_RemotePromise
_VoidPromise
PromiseFulfillerPair
# PromiseFulfillerPair
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the PromiseFulfillerPair does not at all support the same operations as the other classes in this fused type. So I had to remove it. (But I don't believe anything is lost by that.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, please add a comment indicating this, or just remove this line.

raise KjException('Function passed to `then` call must take no arguments')
Will still work with non-asyncio socket communication, but requires async handling of the function call.
"""
# TODO: Is keeping a separate _VoidPromise class really worth it? Does it make things faster?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I unfortunately, don't know the answer to this question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm keeping it like it is for now. But in the future, someone might look into refactoring/simplifying the class hierarchy of Promises more.

@LasseBlaauwbroek
Copy link
Contributor Author

@kentonv, sorry for all the questions, but my systems programming skills are really being pushed to the limit here, especially on windows. I'm now trying to port to windows, where my strategy is basically to copy large parts of async-io-win32.c++ (very early attempt here). This seems somewhat trickier than for Unix though, because this file is much more intertwined with the Win32EventPort. In particular, it uses Win32EventPort::observeIo(HANDLE) and related operations.

Question:
Is it safe for me to create a Win32EventPort and use it only to call observeIo? (Handling of other signals would still be done through Python.)
If not, could you suggest a simple way to adapt the code in async-io-win32.c++? My Windows programming knowledge is really close to zero.

@LasseBlaauwbroek
Copy link
Contributor Author

Thinking about the cross-platform issue a bit more: I'm starting to think that we should just double down on my proposal in #311 and entirely get rid of the use of raw file descriptors when the asyncio loop is active. That way we can entirely circumvent all this rather tricky, low-level system programming. It will probably be a little slower, but I think that if we implement it cleverly using buffered streaming protocols the performance should be acceptable.

That will allow us to get rid of all the new code in asyncProvider.cpp, and solve all the cross-platform issues. Thoughts, @haata and @kentonv ?

@kentonv
Copy link
Member

kentonv commented Apr 12, 2023

No, you can't just use a Win32EventPort in the same thread as the python event loop. The problem is that Win32EventPort, like UnixEventPort, assumes that whenever the event loop is empty, it'll call eventPort.wait() to wait for the next event. But in your setup on top of the python event loop, eventPort.wait() is never used, because Python's event loop is the one that does the waiting. So if you created a Win32EventPort and tried to use it, it would never produce any events.

So, you will need to figure out how to wait on Win32 handles using the Python event loop APIs. Yeah, it does sound like #311 might be part of the answer here.

@haata
Copy link
Collaborator

haata commented Apr 12, 2023

This was something that I also had difficulty with, the file descriptors just didn't work well when interacting with Python. Working with something Python supports directly (cross-platform) would make a lot of this easier to think about/debug for a python developer.
Performance can always be tuned later.

@LasseBlaauwbroek
Copy link
Contributor Author

I have now implemented AsyncIoStream based on Python's Transport/Protocol functionality. It is quite tricky to do this wrapping, because propagating error conditions and cancellation between AsyncIoStream and Protocol is nontrivial. As such, there are probably some subtle bugs. Additionally, the Python implementation of transports/protocols is itself quite buggy and I had to work around three distinct bugs...

This PR is now fully cross-platform. As such, I believe that this is now a candidate for merging.

This PR introduces a number of backwards-incompatible changes:

  • The KJ loop is now always associated to a local thread. As such, capnp.remove_event_loop() has been removed. It is no longer needed.
  • The TwoPartyServer.write() and TwoPartyClient.write() methods are now async.
  • You can no longer pass a address:port string to TwoPartyClient() and TwoPartyServer() in asyncio mode. This would require the initializer to be marked async, which is not possible. (And I don't personally like passing strings for addresses in there anyway, so I'd rather get rid of all string passing at some point).
  • Probably more subtle changes, it is difficult to keep track of everything.

As such, I would propose that after this is merged, a 2.0 beta 1 release is made. This way, we can have backwards incompatible changes, and everything can be tested by a wider audience.

@LasseBlaauwbroek LasseBlaauwbroek marked this pull request as ready for review April 19, 2023 11:43
@LasseBlaauwbroek
Copy link
Contributor Author

@haata Gentle reminder that this is ready for final review and merging.

@haata
Copy link
Collaborator

haata commented May 18, 2023

Thanks for the reminder (somehow I'll find some time to do this by the end of the weekend).

Copy link
Collaborator

@haata haata left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for taking so long, I think this looks great!

}

kj::Promise<void> PyAsyncIoStream::whenWriteDisconnected() {
// TODO: Possibly connect this to protocol.connection_lost?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you notice any issues doing this? Seems like it's supported in 3.7.
https://docs.python.org/3.7/library/asyncio-protocol.html#asyncio.BaseProtocol.connection_lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I believe that the semantics of connection_lost are not exactly the same as the semantics of whenWriteDisconnected, but @kentonv might be able to comment more accurately.

In any case, as I understand it, this function is basically an optimization, and there is no requirement for the the promise to ever return. So it wasn't a priority for me.

@@ -317,7 +306,7 @@ ctypedef fused PromiseTypes:
_Promise
_RemotePromise
_VoidPromise
PromiseFulfillerPair
# PromiseFulfillerPair
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, please add a comment indicating this, or just remove this line.

@haata
Copy link
Collaborator

haata commented May 21, 2023

@LasseBlaauwbroek can you summarize a changelog entry for me? You don't need to add it to the file (I can do that later), just add it as a comment here. After that I'll merge it in.

@haata haata merged commit d32854e into capnproto:master Jun 6, 2023
@LasseBlaauwbroek
Copy link
Contributor Author

Thanks for merging, and sorry for the lack of reply. My attention was shifted away from pycapnp (but it will probably return at some point).

Now that this is merged, I rebased #312 #312. They should be ready for reviewing/merging.

# started.
_C_DEFAULT_EVENT_LOOP_LOCAL.loop = _weakref.ref(kjloop)
loop.close = oldclose()
return oldclose()
Copy link

@DaneSlattery DaneSlattery Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be causing an issue in my local tests. Should it not be return oldClose?

Just an example of what the error I'm seeing is:

@pytest.mark.asyncio
async def test_connect_async():
    nr_client = tsnbox_ipc.NodeRouter._new_client(NodeRouter())
   

The library pytest_asyncio does a bit of cleanup of event loops after each test,

def _close_event_loop() -> None:
    policy = asyncio.get_event_loop_policy()
    try:
        loop = policy.get_event_loop()
    except RuntimeError:
        loop = None
    if loop is not None:
        if not loop.is_closed():
            warnings.warn(
                _UNCLOSED_EVENT_LOOP_WARNING % loop,
                DeprecationWarning,
            )
        loop.close() <<<<<<<<<<<<<<<<<<
        ````

`loop.close()` fails with Nonetype object is not callable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good eye.
I already have this fix as part of another PR. To be opened soon.

Copy link

@DaneSlattery DaneSlattery Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of this further down is also confusing, since we re-assign loop.close to the result of this function.
I understand a bit better now, seeing the changes in cull-sync

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

RPC throughput slow due to poll sleep
4 participants