-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ZEO5] asyncio.client: Fix connect logic wrt IO loop startup #227
Conversation
Éloi Rivard reports that starting from ZEO 5.4.0 client no longer retries failed connection to server via unix sockets. That happens to be due to that `connect` coroutine starts to be executed before `loop.run_forever` is called and so `asyncio.sleep()` raises "no running event loop". Here are the details: with the following debug patch: ```diff --- a/src/ZEO/asyncio/client.py +++ b/src/ZEO/asyncio/client.py @@ -145,6 +145,9 @@ def pop_futures(self): return futures def connect(self): + trace('connect') + import traceback as tb; tb.print_stack() + trace() if isinstance(self.addr, tuple): host, port = self.addr cr = lambda: self.loop.create_connection( # noqa: E731 @@ -156,6 +159,9 @@ def connect(self): @coroutine def connect(): + trace('coro connect') + import traceback as tb; tb.print_stack() + trace('running loop:', asyncio.events.get_running_loop()) while not self.closed: try: yield cr() @@ -1035,9 +1041,14 @@ def run_io_thread(self): try: loop = self.loop if self.loop is not None else new_event_loop() asyncio.set_event_loop(loop) + trace('>>> setup_delegation') self.setup_delegation(loop) + trace('<<< setup_delegation') loop.call_soon(self.started.set) + import time; time.sleep(3) + trace('>>> loop.run_forever') loop_run_forever(loop) + trace('<<< loop.run_forever') except Exception as exc: logger.exception("Client thread") self.exception = exc @@ -1088,3 +1099,8 @@ def cancel_task(task): if sys.version_info < (3,9): task.add_done_callback( lambda future: future.cancelled() or future.exception()) + + +import sys +def trace(*argv): + print(*argv, file=sys.stderr, flush=True) ``` and the following test program ```py import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') import ZEO print('AAA') zcli = ZEO.client('/path/to/zeo.sock') print('BBB') ``` the output is like this: ``` (py3.venv) kirr@deca:~/src/wendelin/z/ZEO$ ../ZEO5/x.py 2>&1 AAA 2023-03-14 07:21:01,569 - INFO - /path/to/zeo.sock ClientStorage (pid=49949) created RW/normal for storage: '1' 2023-03-14 07:21:01,569 - INFO - created temporary cache file 3 2023-03-14 07:21:01,570 - DEBUG - Using selector: EpollSelector >>> setup_delegation 2023-03-14 07:21:01,570 - DEBUG - disconnected <ZEO.asyncio.client.ClientIO object at 0x7f03c50077c0> None 2023-03-14 07:21:01,570 - DEBUG - try_connecting <ZEO.asyncio.client.ClientIO object at 0x7f03c50077c0> connect File "/usr/lib/python3.9/threading.py", line 912, in _bootstrap self._bootstrap_inner() File "/usr/lib/python3.9/threading.py", line 954, in _bootstrap_inner self.run() File "/usr/lib/python3.9/threading.py", line 892, in run self._target(*self._args, **self._kwargs) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 1045, in run_io_thread self.setup_delegation(loop) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 861, in setup_delegation self.client = client = ClientIO(loop, *self.__args, **self.__kwargs) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 477, in __init__ self.disconnected(None) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 537, in disconnected self.try_connecting() File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 549, in try_connecting self.protocols = [ File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 550, in <listcomp> Protocol(self.loop, addr, self, File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 93, in __init__ self.connect() File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 149, in connect import traceback as tb; tb.print_stack() coro connect File "/usr/lib/python3.9/threading.py", line 912, in _bootstrap self._bootstrap_inner() File "/usr/lib/python3.9/threading.py", line 954, in _bootstrap_inner self.run() File "/usr/lib/python3.9/threading.py", line 892, in run self._target(*self._args, **self._kwargs) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 1045, in run_io_thread self.setup_delegation(loop) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 861, in setup_delegation self.client = client = ClientIO(loop, *self.__args, **self.__kwargs) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 477, in __init__ self.disconnected(None) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 537, in disconnected self.try_connecting() File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 549, in try_connecting self.protocols = [ File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 550, in <listcomp> Protocol(self.loop, addr, self, File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 93, in __init__ self.connect() File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 178, in connect self._connecting = Task(connect(), self.loop) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 390, in __init__ self.executor.step() File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 271, in step result = self.coro.send(await_result) File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 163, in connect import traceback as tb; tb.print_stack() <<< setup_delegation >>> loop.run_forever ... ``` which shows that `connect` coroutine indeed starts to be executing immediately by `self._connecting = Task(connect(), self.loop)` because that Task is our task with immediate callback invocations. In other words here we try to run asyncio code outside of a running loop and that's why it breaks. The fix is similar in style to d27a2cd and works by moving initial "connect" code to be executed from under running event loop. For the reference: we do not have this problem on master, because there we changed `._connecting` to be native asyncio task instead of our own in 6342939 + e3b8658, and that has the side effect of not running connect coroutine immediately and instead scheduling it to be run when the loop is run. Still I would consider to apply my fix to master as well because it is potentially safer to do async-related activity from under already setup running loop. /reported-by @azmeuk /helped-by @d-maurer /fixes #226
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier, I argued that the connection coroutine should not use our optimized Task
but the usual asyncio.Task
(i.e. it should get started via asyncio.ensure_future
). My argument then: we do not know the implementation details of the connection process (a loop responsibility); it is well possible that this requires a full blown Task
implementation - and not our optimized but limited implementation. I had thought that I had convinced you then.
If you implement the solution above now, it will solve the current problem (because the first step of a standard task is "called soon" rather than immediately -- i.e. delayed until the loop is running) and you are safer regarding potential task related requirements by the connection process.
@d-maurer, thanks for feedback. You arguments are sound, and if it could indeed work I would go this way. I already considered to cherry-pick 6342939 to 5.x as well when we were discussing it in #218 (comment). Unfortunately it is not that simple: I do not recall all the details now, but the reason we cannot do it for 5.x is that mixing our own AsyncTask and native asyncio/trollius Task does not play well with our CoroutineExecutor in 5.x where it manually implements --- a/src/ZEO/asyncio/client.py
+++ b/src/ZEO/asyncio/client.py
@@ -169,7 +169,7 @@ def connect():
yield asyncio.sleep(self.connect_poll + local_random.random())
logger.info("retry connecting %r", self.addr)
- self._connecting = Task(connect(), self.loop)
+ self._connecting = self.loop.create_task(connect())
def connection_made(self, transport):
logger.debug('connection_made %s', self) on top of either 5.x or on top of hereby pull-request: `zope-testrunner -uvv --test-path=src`(py3.venv) kirr@deca:~/src/wendelin/z/ZEO$ zope-testrunner -uvv --test-path=src
Running tests at level 1
Running zope.testrunner.layer.UnitTests tests:
Set up zope.testrunner.layer.UnitTests in 0.000 seconds.
Running:
test_async_future (ZEO.asyncio.tests.AsyncTaskTests)
test_cancel_future (ZEO.asyncio.tests.AsyncTaskTests)
test_cancel_task_while_blocked_on_async_future (ZEO.asyncio.tests.AsyncTaskTests)
test_cancel_task_while_blocked_on_optimized_future (ZEO.asyncio.tests.AsyncTaskTests)
test_cancel_task_while_running (ZEO.asyncio.tests.AsyncTaskTests)
test_exception (ZEO.asyncio.tests.AsyncTaskTests)
test_handled_exception (ZEO.asyncio.tests.AsyncTaskTests)
test_nest_to_async_coro (ZEO.asyncio.tests.AsyncTaskTests)
test_nested_coro (ZEO.asyncio.tests.AsyncTaskTests)
test_noop (ZEO.asyncio.tests.AsyncTaskTests)
test_optimized_future (ZEO.asyncio.tests.AsyncTaskTests)
test_repr (ZEO.asyncio.tests.AsyncTaskTests)
test_return_ (ZEO.asyncio.tests.AsyncTaskTests)
testClientBasics (ZEO.asyncio.tests.ClientTests)/usr/lib/python3.9/asyncio/events.py:80: RuntimeWarning: coroutine 'sleep' was never awaited
self._context.run(self._callback, *self._args)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
test_ClientDisconnected_on_call_timeout (ZEO.asyncio.tests.ClientTests)
test_asyncall_doesnt_overtake_reply (ZEO.asyncio.tests.ClientTests)
test_bad_protocol (ZEO.asyncio.tests.ClientTests)
test_bad_server_tid (ZEO.asyncio.tests.ClientTests)
test_cache_behind (ZEO.asyncio.tests.ClientTests)
test_cache_way_behind (ZEO.asyncio.tests.ClientTests)
test_call_async (ZEO.asyncio.tests.ClientTests)
test_close_with_running_loop (ZEO.asyncio.tests.ClientTests)
test_close_with_stopped_loop (ZEO.asyncio.tests.ClientTests)
test_errors_in_data_received (ZEO.asyncio.tests.ClientTests)
test_flow_control (ZEO.asyncio.tests.ClientTests)
test_get_peername (ZEO.asyncio.tests.ClientTests)
test_heartbeat (ZEO.asyncio.tests.ClientTests)
test_invalidations_while_verifying (ZEO.asyncio.tests.ClientTests)
test_io_close (ZEO.asyncio.tests.ClientTests)
test_io_close_after_connection_loss (ZEO.asyncio.tests.ClientTests)
test_io_close_after_register_exception (ZEO.asyncio.tests.ClientTests)
test_io_close_after_register_exception_after_connection_lost (ZEO.asyncio.tests.ClientTests)
test_io_close_after_register_exception_after_reconnection (ZEO.asyncio.tests.ClientTests)
test_io_close_after_register_exception_before_reconnection (ZEO.asyncio.tests.ClientTests)
test_io_close_before_connection (ZEO.asyncio.tests.ClientTests)
test_multiple_addresses (ZEO.asyncio.tests.ClientTests)
Error in test test_multiple_addresses (ZEO.asyncio.tests.ClientTests)
Traceback (most recent call last):
File "/usr/lib/python3.9/unittest/case.py", line 59, in testPartExecutor
yield
File "/usr/lib/python3.9/unittest/case.py", line 593, in run
self._callTestMethod(testMethod)
File "/usr/lib/python3.9/unittest/case.py", line 550, in _callTestMethod
method()
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/tests.py", line 528, in test_multiple_addresses
delay, func, args, handle = loop.later.pop(0)
IndexError: pop from empty list
test_readonly_fallback (ZEO.asyncio.tests.ClientTests)
test_reply_doesnt_overtake_asyncall (ZEO.asyncio.tests.ClientTests)
test_serialized_registration (ZEO.asyncio.tests.ClientTests)
test_callback_can_add_callback (ZEO.asyncio.tests.ConcurrentFutureTests)
test_cancel (ZEO.asyncio.tests.ConcurrentFutureTests)
test_exception_in_callback (ZEO.asyncio.tests.ConcurrentFutureTests)
test_remove_add_callback (ZEO.asyncio.tests.ConcurrentFutureTests)
test_result_timeout (ZEO.asyncio.tests.ConcurrentFutureTests)
test_set_exception (ZEO.asyncio.tests.ConcurrentFutureTests)
test_set_result (ZEO.asyncio.tests.ConcurrentFutureTests)
test_async_future (ZEO.asyncio.tests.ConcurrentTaskTests)
test_cancel_future (ZEO.asyncio.tests.ConcurrentTaskTests)
test_cancel_task_while_blocked_on_async_future (ZEO.asyncio.tests.ConcurrentTaskTests)
test_cancel_task_while_blocked_on_optimized_future (ZEO.asyncio.tests.ConcurrentTaskTests)
test_cancel_task_while_running (ZEO.asyncio.tests.ConcurrentTaskTests)
test_exception (ZEO.asyncio.tests.ConcurrentTaskTests)
test_handled_exception (ZEO.asyncio.tests.ConcurrentTaskTests)
test_nest_to_async_coro (ZEO.asyncio.tests.ConcurrentTaskTests)
test_nested_coro (ZEO.asyncio.tests.ConcurrentTaskTests)
test_noop (ZEO.asyncio.tests.ConcurrentTaskTests)
test_optimized_future (ZEO.asyncio.tests.ConcurrentTaskTests)
test_repr (ZEO.asyncio.tests.ConcurrentTaskTests)
test_return_ (ZEO.asyncio.tests.ConcurrentTaskTests)
test_callback_can_add_callback (ZEO.asyncio.tests.FutureTests)
test_cancel (ZEO.asyncio.tests.FutureTests)
test_exception_in_callback (ZEO.asyncio.tests.FutureTests)
test_remove_add_callback (ZEO.asyncio.tests.FutureTests)
test_set_exception (ZEO.asyncio.tests.FutureTests)
test_set_result (ZEO.asyncio.tests.FutureTests)
testClientBasics (ZEO.asyncio.tests.MsgpackClientTests)
test_ClientDisconnected_on_call_timeout (ZEO.asyncio.tests.MsgpackClientTests)
test_asyncall_doesnt_overtake_reply (ZEO.asyncio.tests.MsgpackClientTests)
test_bad_protocol (ZEO.asyncio.tests.MsgpackClientTests)
test_bad_server_tid (ZEO.asyncio.tests.MsgpackClientTests)
test_cache_behind (ZEO.asyncio.tests.MsgpackClientTests)
test_cache_way_behind (ZEO.asyncio.tests.MsgpackClientTests)
test_call_async (ZEO.asyncio.tests.MsgpackClientTests)
test_close_with_running_loop (ZEO.asyncio.tests.MsgpackClientTests)
test_close_with_stopped_loop (ZEO.asyncio.tests.MsgpackClientTests)
test_errors_in_data_received (ZEO.asyncio.tests.MsgpackClientTests)
test_flow_control (ZEO.asyncio.tests.MsgpackClientTests)
test_get_peername (ZEO.asyncio.tests.MsgpackClientTests)
test_heartbeat (ZEO.asyncio.tests.MsgpackClientTests)
test_invalidations_while_verifying (ZEO.asyncio.tests.MsgpackClientTests)
test_io_close (ZEO.asyncio.tests.MsgpackClientTests)
test_io_close_after_connection_loss (ZEO.asyncio.tests.MsgpackClientTests)
test_io_close_after_register_exception (ZEO.asyncio.tests.MsgpackClientTests)
test_io_close_after_register_exception_after_connection_lost (ZEO.asyncio.tests.MsgpackClientTests)
test_io_close_after_register_exception_after_reconnection (ZEO.asyncio.tests.MsgpackClientTests)
test_io_close_after_register_exception_before_reconnection (ZEO.asyncio.tests.MsgpackClientTests)
test_io_close_before_connection (ZEO.asyncio.tests.MsgpackClientTests)
test_multiple_addresses (ZEO.asyncio.tests.MsgpackClientTests)
Error in test test_multiple_addresses (ZEO.asyncio.tests.MsgpackClientTests)
Traceback (most recent call last):
File "/usr/lib/python3.9/unittest/case.py", line 59, in testPartExecutor
yield
File "/usr/lib/python3.9/unittest/case.py", line 593, in run
self._callTestMethod(testMethod)
File "/usr/lib/python3.9/unittest/case.py", line 550, in _callTestMethod
method()
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/tests.py", line 528, in test_multiple_addresses
delay, func, args, handle = loop.later.pop(0)
IndexError: pop from empty list
test_readonly_fallback (ZEO.asyncio.tests.MsgpackClientTests)
test_reply_doesnt_overtake_asyncall (ZEO.asyncio.tests.MsgpackClientTests)
test_serialized_registration (ZEO.asyncio.tests.MsgpackClientTests)
testServerBasics (ZEO.asyncio.tests.MsgpackServerTests)
test_invalid_methods (ZEO.asyncio.tests.MsgpackServerTests)
testServerBasics (ZEO.asyncio.tests.ServerTests)
test_invalid_methods (ZEO.asyncio.tests.ServerTests)
test_sm_data_received (ZEO.asyncio.tests.SizedMessageProtocolTests)
test_sm_receive (ZEO.asyncio.tests.SizedMessageProtocolTests)
test_sm_write_iter (ZEO.asyncio.tests.SizedMessageProtocolTests)
test_sm_write_message (ZEO.asyncio.tests.SizedMessageProtocolTests)
test_repr (ZEO.asyncio.tests.ZEOBaseProtocolTests)
test_write_message_iter (ZEO.asyncio.tests.ZEOBaseProtocolTests)
/home/kirr/src/wendelin/z/ZEO/src/ZEO/scripts/zeopack.test
test_client_record_iternext (ZEO.tests.testConversionSupport)
test_server_record_iternext (ZEO.tests.testConversionSupport)
checkOrderPreserved (ZEO.tests.testTransactionBuffer.TransBufTests)
checkTypicalUsage (ZEO.tests.testTransactionBuffer.TransBufTests)
test_ZEO_DB_convenience_error (ZEO.tests.testZEO.Test_convenience_functions)
test_ZEO_DB_convenience_ok (ZEO.tests.testZEO.Test_convenience_functions)
test_ZEO_client_convenience (ZEO.tests.testZEO.Test_convenience_functions)
test_ZEO_connection_convenience_ok (ZEO.tests.testZEO.Test_convenience_functions)
test_ZEO_connection_convenience_value (ZEO.tests.testZEO.Test_convenience_functions)
errors_in_vote_should_clear_lock (ZEO.tests.testZEO2)
lock_sanity_check (ZEO.tests.testZEO2)
proper_handling_of_blob_conflicts (ZEO.tests.testZEO2)
proper_handling_of_errors_in_restart (ZEO.tests.testZEO2)
some_basic_locking_tests (ZEO.tests.testZEO2)
test_basic (ZEO.tests.testZEOOptions.TestZEOOptions)
test_commandline_overrides (ZEO.tests.testZEOOptions.TestZEOOptions)
test_configure (ZEO.tests.testZEOOptions.TestZEOOptions)
test_default_help (ZEO.tests.testZEOOptions.TestZEOOptions)
test_default_help_with_doc_kw (ZEO.tests.testZEOOptions.TestZEOOptions)
test_default_subclass_help (ZEO.tests.testZEOOptions.TestZEOOptions)
test_defaults_with_schema (ZEO.tests.testZEOOptions.TestZEOOptions)
test_defaults_without_schema (ZEO.tests.testZEOOptions.TestZEOOptions)
test_has_help_with_doc_kw (ZEO.tests.testZEOOptions.TestZEOOptions)
test_help (ZEO.tests.testZEOOptions.TestZEOOptions)
test_no_help (ZEO.tests.testZEOOptions.TestZEOOptions)
test_no_help_with_doc_kw (ZEO.tests.testZEOOptions.TestZEOOptions)
test_unrecognized (ZEO.tests.testZEOOptions.TestZEOOptions)
test_version (ZEO.tests.testZEOOptions.TestZEOOptions)
testFailCreateServer (ZEO.tests.testZEOServer.AttributeErrorTests)
testCallSequence (ZEO.tests.testZEOServer.CloseServerTests)
testFailCreateServer (ZEO.tests.testZEOServer.CloseServerTests)
testFailLoopForever (ZEO.tests.testZEOServer.CloseServerTests)
test_clear_with_bytes (ZEO.tests.testZEOServer.TestZEOServerSocket)
test_clear_with_native_str (ZEO.tests.testZEOServer.TestZEOServerSocket)
test_clear_with_tuple (ZEO.tests.testZEOServer.TestZEOServerSocket)
test_clear_with_unicode_str (ZEO.tests.testZEOServer.TestZEOServerSocket)
testChangingCacheSize (ZEO.tests.test_cache.CacheTests)
testConversionOfLargeFreeBlocks (ZEO.tests.test_cache.CacheTests)
testCurrentObjectLargerThanCache (ZEO.tests.test_cache.CacheTests)
testEviction (ZEO.tests.test_cache.CacheTests)
testException (ZEO.tests.test_cache.CacheTests)
testInvalidate (ZEO.tests.test_cache.CacheTests)
testLastTid (ZEO.tests.test_cache.CacheTests)
testLoad (ZEO.tests.test_cache.CacheTests)
testNonCurrent (ZEO.tests.test_cache.CacheTests)
testOldObjectLargerThanCache (ZEO.tests.test_cache.CacheTests)
testSerialization (ZEO.tests.test_cache.CacheTests)
testSetAnyLastTidOnEmptyCache (ZEO.tests.test_cache.CacheTests)
testVeryLargeCaches (ZEO.tests.test_cache.CacheTests)
test_clear_zeo_cache (ZEO.tests.test_cache.CacheTests)
test_loadBefore_doesnt_miss_current (ZEO.tests.test_cache.CacheTests)
broken_non_current (ZEO.tests.test_cache)
cache_simul_properly_handles_load_miss_after_eviction_and_inval (ZEO.tests.test_cache)
cache_trace_analysis (ZEO.tests.test_cache)
cannot_open_same_cache_file_twice (ZEO.tests.test_cache)
full_cache_is_valid (ZEO.tests.test_cache)
invalidations_with_current_tid_dont_wreck_cache (ZEO.tests.test_cache)
kill_does_not_cause_cache_corruption (ZEO.tests.test_cache)
rename_bad_cache_file (ZEO.tests.test_cache)
test_client_side (ZEO.tests.test_client_side_conflict_resolution.ClientSideConflictResolutionTests)/usr/lib/python3.9/asyncio/events.py:80: RuntimeWarning: coroutine 'BaseEventLoop.create_connection' was never awaited
self._context.run(self._callback, *self._args)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Error in test test_client_side (ZEO.tests.test_client_side_conflict_resolution.ClientSideConflictResolutionTests)
Traceback (most recent call last):
File "/usr/lib/python3.9/unittest/case.py", line 59, in testPartExecutor
yield
File "/usr/lib/python3.9/unittest/case.py", line 593, in run
self._callTestMethod(testMethod)
File "/usr/lib/python3.9/unittest/case.py", line 550, in _callTestMethod
method()
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/tests/test_client_side_conflict_resolution.py", line 119, in test_client_side
db = ZEO.DB(addr, wait_timeout=2)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/__init__.py", line 37, in DB
s = client(*args, **kw)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/__init__.py", line 30, in client
return ZEO.ClientStorage.ClientStorage(*args, **kw)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/ClientStorage.py", line 353, in __init__
self._wait()
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 1000, in wait
return self.io_call(
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 867, in io_call
return future.result() if wait else future
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 230, in result
return Future.result(self)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 81, in result
raise self._result
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 273, in step
result = self.coro.throw(await_result)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 705, in await_operational_co
raise ClientDisconnected("timed out waiting for connection")
ZEO.Exceptions.ClientDisconnected: timed out waiting for connection
test_server_side (ZEO.tests.test_client_side_conflict_resolution.ClientSideConflictResolutionTests)
testServerDecodeZopeUndoFilter (ZEO.tests.test_marshal.MarshalTests)
test_server_sync (ZEO.tests.test_sync.SyncTests)
Error in test test_server_sync (ZEO.tests.test_sync.SyncTests)
Traceback (most recent call last):
File "/usr/lib/python3.9/unittest/case.py", line 59, in testPartExecutor
yield
File "/usr/lib/python3.9/unittest/case.py", line 593, in run
self._callTestMethod(testMethod)
File "/usr/lib/python3.9/unittest/case.py", line 550, in _callTestMethod
method()
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/tests/test_sync.py", line 29, in test_server_sync
c = client(addr, wait_timeout=2)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/__init__.py", line 30, in client
return ZEO.ClientStorage.ClientStorage(*args, **kw)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/ClientStorage.py", line 353, in __init__
self._wait()
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 1000, in wait
return self.io_call(
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 867, in io_call
return future.result() if wait else future
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 230, in result
return Future.result(self)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 81, in result
raise self._result
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/futures.py", line 273, in step
result = self.coro.throw(await_result)
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/client.py", line 705, in await_operational_co
raise ClientDisconnected("timed out waiting for connection")
ZEO.Exceptions.ClientDisconnected: timed out waiting for connection
The following test left new threads behind:
test_server_sync (ZEO.tests.test_sync.SyncTests)
New thread(s): [<Thread(s140475728655648-server, started daemon 140475711026752)>, <Thread(s140475728655648-server-runner, started daemon 140475728307776)>]
Ran 165 tests with 0 failures, 4 errors and 0 skipped in 15.060 seconds.
Tearing down left over layers:
Tear down zope.testrunner.layer.UnitTests in 0.000 seconds.
Tests with errors:
test_multiple_addresses (ZEO.asyncio.tests.ClientTests)
test_multiple_addresses (ZEO.asyncio.tests.MsgpackClientTests)
test_client_side (ZEO.tests.test_client_side_conflict_resolution.ClientSideConflictResolutionTests)
test_server_sync (ZEO.tests.test_sync.SyncTests) And without that switch to loop.create_task all tests pass as ok previously. Maybe I'm missing something but it looks like we can use 6342939 only after 7f574ec. Kirill |
Kirill Smelkov wrote at 2023-3-30 11:59 -0700:
...
I do not recall all the details now, but the reason we cannot do it for 5.x is that mixing our own AsyncTask and native asyncio/trollius Task does not play well with our CoroutineExecutor in 5.x where it manually implements `yield from` mechanics.
In fact, a `Task` is (more precisely: encapsulates) a
coroutine executor. Therefore, a native `Task` has nothing to do
with our coroutine executor -- it always uses its own one.
It is possible that the `connect` coroutine requires
a standard ***@***.***` decorator (instead of yours which
essentially only checks the type)
and that it must replace the `yield` in a way to be compatible
with a standard `Task` (when I remember right, this is `yield from`
in Python 3; no idea what it is for `trollius`).
...
testClientBasics (ZEO.asyncio.tests.ClientTests)/usr/lib/python3.9/asyncio/events.py:80: RuntimeWarning: coroutine 'sleep' was never awaited
The reason might be the use of `yield` rather than `yield from`.
...
test_multiple_addresses (ZEO.asyncio.tests.ClientTests)
Error in test test_multiple_addresses (ZEO.asyncio.tests.ClientTests)
Traceback (most recent call last):
File "/usr/lib/python3.9/unittest/case.py", line 59, in testPartExecutor
yield
File "/usr/lib/python3.9/unittest/case.py", line 593, in run
self._callTestMethod(testMethod)
File "/usr/lib/python3.9/unittest/case.py", line 550, in _callTestMethod
method()
File "/home/kirr/src/wendelin/z/ZEO/src/ZEO/asyncio/tests.py", line 528, in test_multiple_addresses
delay, func, args, handle = loop.later.pop(0)
IndexError: pop from empty list
This error (and the following similar ones) may have the following
cause:
Our test loop uses `loop.later` to remember the `call_later` calls.
The `pop from empty list` indicates that at that point there was not yet such
a call. Because a standard `Task` delays each step executions
by one loop turn, it is well possible that the test requires adaptation
to give the task sufficient time for the execution.
To check whether this cause is really responsible for the error,
I suggest to introduce a small `sleep` into the test.
If this is the case, we could try later to replace the `sleep` again
with something not dependent on the timing of the test infrastructure.
|
Let's go the way Dieter proposes in #228. |
Éloi Rivard reports that starting from ZEO 5.4.0 client no longer retries failed connection to server via unix sockets.
That happens to be due to that
connect
coroutine starts to be executed beforeloop.run_forever
is called and soasyncio.sleep()
raises "no running event loop". Here are the details:with the following debug patch:
and the following test program
the output is like this:
which shows that
connect
coroutine indeed starts to be executing immediately byself._connecting = Task(connect(), self.loop)
because that Task is our task with immediate callback invocations. In other words here we try to run asyncio code outside of a running loop and that's why it breaks.The fix is similar in style to d27a2cd and works by moving initial "connect" code to be executed from under running event loop.
For the reference: we do not have this problem on master, because there we changed
._connecting
to be native asyncio task instead of our own in 6342939/reported-by @azmeuk
/helped-by @d-maurer
/fixes #226