From 0ba4e76b23fed77d09be7f56da783ab3f0b2d497 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:40 -0500 Subject: python/aqmp: rename 'accept()' to 'start_server_and_accept()' MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, I had a method named "accept()" that under-the-hood calls bind(2), listen(2) *and* accept(2). I meant this as a simplification and counterpart to the one-shot "connect()" method. This is confusing to readers who expect accept() to mean *just* accept(2). Since I need to split apart the "accept()" method into multiple methods anyway (one of which strongly resembling accept(2)), it feels pertinent to rename this method *now*. Rename this all-in-one method "start_server_and_accept()" instead. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-3-jsnow@redhat.com Signed-off-by: John Snow --- python/tests/protocol.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'python/tests/protocol.py') diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 5cd7938be3..354d6559b9 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -413,14 +413,14 @@ class Accept(Connect): assert family in ('INET', 'UNIX') if family == 'INET': - await self.proto.accept(('example.com', 1)) + await self.proto.start_server_and_accept(('example.com', 1)) elif family == 'UNIX': - await self.proto.accept('/dev/null') + await self.proto.start_server_and_accept('/dev/null') async def _hanging_connection(self): with TemporaryDirectory(suffix='.aqmp') as tmpdir: sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") - await self.proto.accept(sock) + await self.proto.start_server_and_accept(sock) class FakeSession(TestBase): @@ -449,13 +449,13 @@ class FakeSession(TestBase): @TestBase.async_test async def testFakeAccept(self): """Test the full state lifecycle (via accept) with a no-op session.""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') self.assertEqual(self.proto.runstate, Runstate.RUNNING) @TestBase.async_test async def testFakeRecv(self): """Test receiving a fake/null message.""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') logname = self.proto.logger.name with self.assertLogs(logname, level='DEBUG') as context: @@ -471,7 +471,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testFakeSend(self): """Test sending a fake/null message.""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') logname = self.proto.logger.name with self.assertLogs(logname, level='DEBUG') as context: @@ -493,7 +493,7 @@ class FakeSession(TestBase): ): with self.assertRaises(StateError) as context: if accept: - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') else: await self.proto.connect('/not/a/real/path') @@ -504,7 +504,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testAcceptRequireRunning(self): """Test that accept() cannot be called when Runstate=RUNNING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') await self._prod_session_api( Runstate.RUNNING, @@ -515,7 +515,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testConnectRequireRunning(self): """Test that connect() cannot be called when Runstate=RUNNING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') await self._prod_session_api( Runstate.RUNNING, @@ -526,7 +526,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testAcceptRequireDisconnecting(self): """Test that accept() cannot be called when Runstate=DISCONNECTING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') # Cheat: force a disconnect. await self.proto.simulate_disconnect() @@ -541,7 +541,7 @@ class FakeSession(TestBase): @TestBase.async_test async def testConnectRequireDisconnecting(self): """Test that connect() cannot be called when Runstate=DISCONNECTING""" - await self.proto.accept('/not/a/real/path') + await self.proto.start_server_and_accept('/not/a/real/path') # Cheat: force a disconnect. await self.proto.simulate_disconnect() @@ -576,7 +576,7 @@ class SimpleSession(TestBase): async def testSmoke(self): with TemporaryDirectory(suffix='.aqmp') as tmpdir: sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock") - server_task = create_task(self.server.accept(sock)) + server_task = create_task(self.server.start_server_and_accept(sock)) # give the server a chance to start listening [...] await asyncio.sleep(0) -- cgit 1.4.1 From 68a6cf3ffe3532c0655efbbf5910bd99a1b4a3fa Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:41 -0500 Subject: python/aqmp: remove _new_session and _establish_connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These two methods attempted to entirely envelop the logic of establishing a connection to a peer start to finish. However, we need to break apart the incoming connection step into more granular steps. We will no longer be able to reasonably constrain the logic inside of these helper functions. So, remove them - with _session_guard(), they no longer serve a real purpose. Although the public API doesn't change, the internal API does. Now that there are no intermediary methods between e.g. connect() and _do_connect(), there's no hook where the runstate is set. As a result, the test suite changes a little to cope with the new semantics of _do_accept() and _do_connect(). Lastly, take some pieces of the now-deleted docstrings and move them up to the public interface level. They were a little more detailed, and it won't hurt to keep them. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-4-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 117 +++++++++++++++++-------------------------- python/tests/protocol.py | 10 +++- 2 files changed, 53 insertions(+), 74 deletions(-) (limited to 'python/tests/protocol.py') diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 73719257e0..b7e5e635d8 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -275,13 +275,25 @@ class AsyncProtocol(Generic[T]): If this call fails, `runstate` is guaranteed to be set back to `IDLE`. :param address: - Address to listen to; UNIX socket path or TCP address/port. + Address to listen on; UNIX socket path or TCP address/port. :param ssl: SSL context to use, if any. :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection could not be accepted. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. """ - await self._new_session(address, ssl, accept=True) + await self._session_guard( + self._do_accept(address, ssl), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING @upper_half @require(Runstate.IDLE) @@ -297,9 +309,21 @@ class AsyncProtocol(Generic[T]): :param ssl: SSL context to use, if any. :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection cannot be made to the server. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. """ - await self._new_session(address, ssl) + await self._session_guard( + self._do_connect(address, ssl), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING @upper_half async def disconnect(self) -> None: @@ -401,73 +425,6 @@ class AsyncProtocol(Generic[T]): self._runstate_event.set() self._runstate_event.clear() - @upper_half - async def _new_session(self, - address: SocketAddrT, - ssl: Optional[SSLContext] = None, - accept: bool = False) -> None: - """ - Establish a new connection and initialize the session. - - Connect or accept a new connection, then begin the protocol - session machinery. If this call fails, `runstate` is guaranteed - to be set back to `IDLE`. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - assert self.runstate == Runstate.IDLE - - await self._session_guard( - self._establish_connection(address, ssl, accept), - 'Failed to establish connection') - - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - - assert self.runstate == Runstate.RUNNING - - @upper_half - async def _establish_connection( - self, - address: SocketAddrT, - ssl: Optional[SSLContext] = None, - accept: bool = False - ) -> None: - """ - Establish a new connection. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - if accept: - await self._do_accept(address, ssl) - else: - await self._do_connect(address, ssl) - def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None: """ Used to create a socket in advance of accept(). @@ -508,6 +465,9 @@ class AsyncProtocol(Generic[T]): :raise OSError: For stream-related errors. """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + self.logger.debug("Awaiting connection on %s ...", address) connected = asyncio.Event() server: Optional[asyncio.AbstractServer] = None @@ -550,6 +510,11 @@ class AsyncProtocol(Generic[T]): sock=self._sock, ) + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + server = await coro # Starts listening await connected.wait() # Waits for the callback to fire (and finish) assert server is None @@ -569,6 +534,14 @@ class AsyncProtocol(Generic[T]): :raise OSError: For stream-related errors. """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + self.logger.debug("Connecting to %s ...", address) if isinstance(address, tuple): diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 354d6559b9..8dd26c4ed1 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -42,11 +42,17 @@ class NullProtocol(AsyncProtocol[None]): await super()._establish_session() async def _do_accept(self, address, ssl=None): - if not self.fake_session: + if self.fake_session: + self._set_state(Runstate.CONNECTING) + await asyncio.sleep(0) + else: await super()._do_accept(address, ssl) async def _do_connect(self, address, ssl=None): - if not self.fake_session: + if self.fake_session: + self._set_state(Runstate.CONNECTING) + await asyncio.sleep(0) + else: await super()._do_connect(address, ssl) async def _do_recv(self) -> None: -- cgit 1.4.1 From 5e9902a030ab832b0b6577764c65ce6a6f874af6 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:44 -0500 Subject: python/aqmp: refactor _do_accept() into two distinct steps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor _do_accept() into _do_start_server() and _do_accept(). As of this commit, the former calls the latter, but in subsequent commits they'll be split apart. (So please forgive the misnomer for _do_start_server(); it will live up to its name shortly, and the docstring will be updated then too. I'm just cutting down on some churn.) Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-7-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 29 ++++++++++++++++++++++++----- python/tests/protocol.py | 4 ++-- 2 files changed, 26 insertions(+), 7 deletions(-) (limited to 'python/tests/protocol.py') diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 631bcdaa55..e2bdad542d 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -295,7 +295,7 @@ class AsyncProtocol(Generic[T]): session, the wrapped error may also be an `QMPError`. """ await self._session_guard( - self._do_accept(address, ssl), + self._do_start_server(address, ssl), 'Failed to establish connection') await self._session_guard( self._establish_session(), @@ -509,8 +509,8 @@ class AsyncProtocol(Generic[T]): self._sock = sock @upper_half - async def _do_accept(self, address: SocketAddrT, - ssl: Optional[SSLContext] = None) -> None: + async def _do_start_server(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: """ Acting as the transport server, accept a single connection. @@ -551,9 +551,28 @@ class AsyncProtocol(Generic[T]): # otherwise yield. await asyncio.sleep(0) - self._server = await coro # Starts listening - await self._accepted.wait() # Waits for the callback to finish + # This will start the server (bind(2), listen(2)). It will also + # call accept(2) if we yield, but we don't block on that here. + self._server = await coro + + # Just for this one commit, wait for a peer. + # This gets split out in the next patch. + await self._do_accept() + + @upper_half + async def _do_accept(self) -> None: + """ + Wait for and accept an incoming connection. + + Requires that we have not yet accepted an incoming connection + from the upper_half, but it's OK if the server is no longer + running because the bottom_half has already accepted the + connection. + """ + assert self._accepted is not None + await self._accepted.wait() assert self._server is None + self._accepted = None self._sock = None self.logger.debug("Connection accepted.") diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 8dd26c4ed1..5e442e1efb 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -41,12 +41,12 @@ class NullProtocol(AsyncProtocol[None]): self.trigger_input = asyncio.Event() await super()._establish_session() - async def _do_accept(self, address, ssl=None): + async def _do_start_server(self, address, ssl=None): if self.fake_session: self._set_state(Runstate.CONNECTING) await asyncio.sleep(0) else: - await super()._do_accept(address, ssl) + await super()._do_start_server(address, ssl) async def _do_connect(self, address, ssl=None): if self.fake_session: -- cgit 1.4.1 From 481607c7d35de2bc4d9bec7f4734036fc467f330 Mon Sep 17 00:00:00 2001 From: John Snow Date: Fri, 25 Feb 2022 15:59:46 -0500 Subject: python/aqmp: add start_server() and accept() methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add start_server() and accept() methods that can be used instead of start_server_and_accept() to allow more fine-grained control over the incoming connection process. (Eagle-eyed reviewers will surely notice that it's a bit weird that "CONNECTING" is a state that's shared between both the start_server() and connect() states. That's absolutely true, and it's very true that checking on the presence of _accepted as an indicator of state is a hack. That's also very certainly true. But ... this keeps client code an awful lot simpler, as it doesn't have to care exactly *how* the connection is being made, just that it *is*. Is it worth disrupting that simplicity in order to provide a better state guard on `accept()`? Hm.) Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-9-jsnow@redhat.com Signed-off-by: John Snow --- python/qemu/aqmp/protocol.py | 67 ++++++++++++++++++++++++++++++++++++++++---- python/tests/protocol.py | 7 +++++ 2 files changed, 69 insertions(+), 5 deletions(-) (limited to 'python/tests/protocol.py') diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index cdbc9cba0d..2ecba14555 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -280,6 +280,8 @@ class AsyncProtocol(Generic[T]): Accept a connection and begin processing message queues. If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + This method is precisely equivalent to calling `start_server()` + followed by `accept()`. :param address: Address to listen on; UNIX socket path or TCP address/port. @@ -294,9 +296,62 @@ class AsyncProtocol(Generic[T]): protocol-level failure occurs while establishing a new session, the wrapped error may also be an `QMPError`. """ + await self.start_server(address, ssl) + await self.accept() + assert self.runstate == Runstate.RUNNING + + @upper_half + @require(Runstate.IDLE) + async def start_server(self, address: SocketAddrT, + ssl: Optional[SSLContext] = None) -> None: + """ + Start listening for an incoming connection, but do not wait for a peer. + + This method starts listening for an incoming connection, but + does not block waiting for a peer. This call will return + immediately after binding and listening on a socket. A later + call to `accept()` must be made in order to finalize the + incoming connection. + + :param address: + Address to listen on; UNIX socket path or TCP address/port. + :param ssl: SSL context to use, if any. + + :raise StateError: When the `Runstate` is not `IDLE`. + :raise ConnectError: + When the server could not start listening on this address. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError`. + """ await self._session_guard( self._do_start_server(address, ssl), 'Failed to establish connection') + assert self.runstate == Runstate.CONNECTING + + @upper_half + @require(Runstate.CONNECTING) + async def accept(self) -> None: + """ + Accept an incoming connection and begin processing message queues. + + If this call fails, `runstate` is guaranteed to be set back to `IDLE`. + + :raise StateError: When the `Runstate` is not `CONNECTING`. + :raise QMPError: When `start_server()` was not called yet. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. + """ + if self._accepted is None: + raise QMPError("Cannot call accept() before start_server().") + await self._session_guard( + self._do_accept(), + 'Failed to establish connection') await self._session_guard( self._establish_session(), 'Failed to establish session') @@ -512,7 +567,12 @@ class AsyncProtocol(Generic[T]): async def _do_start_server(self, address: SocketAddrT, ssl: Optional[SSLContext] = None) -> None: """ - Acting as the transport server, accept a single connection. + Start listening for an incoming connection, but do not wait for a peer. + + This method starts listening for an incoming connection, but does not + block waiting for a peer. This call will return immediately after + binding and listening to a socket. A later call to accept() must be + made in order to finalize the incoming connection. :param address: Address to listen on; UNIX socket path or TCP address/port. @@ -554,10 +614,7 @@ class AsyncProtocol(Generic[T]): # This will start the server (bind(2), listen(2)). It will also # call accept(2) if we yield, but we don't block on that here. self._server = await coro - - # Just for this one commit, wait for a peer. - # This gets split out in the next patch. - await self._do_accept() + self.logger.debug("Server listening on %s", address) @upper_half async def _do_accept(self) -> None: diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 5e442e1efb..d6849ad306 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -43,11 +43,18 @@ class NullProtocol(AsyncProtocol[None]): async def _do_start_server(self, address, ssl=None): if self.fake_session: + self._accepted = asyncio.Event() self._set_state(Runstate.CONNECTING) await asyncio.sleep(0) else: await super()._do_start_server(address, ssl) + async def _do_accept(self): + if self.fake_session: + self._accepted = None + else: + await super()._do_accept() + async def _do_connect(self, address, ssl=None): if self.fake_session: self._set_state(Runstate.CONNECTING) -- cgit 1.4.1