summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--docs/devel/memory.rst7
-rw-r--r--python/qemu/machine/qtest.py2
-rw-r--r--python/qemu/qmp/__init__.py3
-rw-r--r--python/qemu/qmp/error.py7
-rw-r--r--python/qemu/qmp/events.py50
-rw-r--r--python/qemu/qmp/legacy.py46
-rw-r--r--python/qemu/qmp/message.py22
-rw-r--r--python/qemu/qmp/models.py8
-rw-r--r--python/qemu/qmp/protocol.py194
-rw-r--r--python/qemu/qmp/qmp_client.py155
-rw-r--r--python/qemu/qmp/qmp_shell.py159
-rw-r--r--python/qemu/qmp/qmp_tui.py30
-rw-r--r--python/qemu/qmp/util.py143
-rw-r--r--python/tests/protocol.py10
-rw-r--r--system/memory.c46
-rw-r--r--system/physmem.c4
-rwxr-xr-xtests/qemu-iotests/1471
-rwxr-xr-xtests/qemu-iotests/1515
-rwxr-xr-xtests/qemu-iotests/check4
-rw-r--r--tests/qemu-iotests/testenv.py7
-rw-r--r--tests/qemu-iotests/testrunner.py9
21 files changed, 573 insertions, 339 deletions
diff --git a/docs/devel/memory.rst b/docs/devel/memory.rst
index 57fb2aec76..42d3ca29c4 100644
--- a/docs/devel/memory.rst
+++ b/docs/devel/memory.rst
@@ -158,8 +158,11 @@ ioeventfd) can be changed during the region lifecycle.  They take effect
 as soon as the region is made visible.  This can be immediately, later,
 or never.
 
-Destruction of a memory region happens automatically when the owner
-object dies.
+Destruction of a memory region happens automatically when the owner object
+dies.  When there are multiple memory regions under the same owner object,
+the memory API will guarantee all memory regions will be properly detached
+and finalized one by one.  The order in which memory regions will be
+finalized is not guaranteed.
 
 If however the memory region is part of a dynamically allocated data
 structure, you should call object_unparent() to destroy the memory region
diff --git a/python/qemu/machine/qtest.py b/python/qemu/machine/qtest.py
index 4f5ede85b2..781f674ffa 100644
--- a/python/qemu/machine/qtest.py
+++ b/python/qemu/machine/qtest.py
@@ -177,6 +177,8 @@ class QEMUQtestMachine(QEMUMachine):
             self._qtest_sock_pair[0].close()
             self._qtest_sock_pair[1].close()
             self._qtest_sock_pair = None
+        if self._qtest is not None:
+            self._qtest.close()
         super()._post_shutdown()
 
     def qtest(self, cmd: str) -> str:
diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py
index 69190d057a..058139dc3c 100644
--- a/python/qemu/qmp/__init__.py
+++ b/python/qemu/qmp/__init__.py
@@ -39,7 +39,8 @@ from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
 logging.getLogger('qemu.qmp').addHandler(logging.NullHandler())
 
 
-# The order of these fields impact the Sphinx documentation order.
+# IMPORTANT: When modifying this list, update the Sphinx overview docs.
+# Anything visible in the qemu.qmp namespace should be on the overview page.
 __all__ = (
     # Classes, most to least important
     'QMPClient',
diff --git a/python/qemu/qmp/error.py b/python/qemu/qmp/error.py
index 24ba4d5054..c87b078f62 100644
--- a/python/qemu/qmp/error.py
+++ b/python/qemu/qmp/error.py
@@ -44,7 +44,10 @@ class ProtocolError(QMPError):
 
     :param error_message: Human-readable string describing the error.
     """
-    def __init__(self, error_message: str):
-        super().__init__(error_message)
+    def __init__(self, error_message: str, *args: object):
+        super().__init__(error_message, *args)
         #: Human-readable error message, without any prefix.
         self.error_message: str = error_message
+
+    def __str__(self) -> str:
+        return self.error_message
diff --git a/python/qemu/qmp/events.py b/python/qemu/qmp/events.py
index 6199776cc6..cfb5f0ac62 100644
--- a/python/qemu/qmp/events.py
+++ b/python/qemu/qmp/events.py
@@ -12,7 +12,14 @@ EventListener Tutorial
 ----------------------
 
 In all of the following examples, we assume that we have a `QMPClient`
-instantiated named ``qmp`` that is already connected.
+instantiated named ``qmp`` that is already connected. For example:
+
+.. code:: python
+
+   from qemu.qmp import QMPClient
+
+   qmp = QMPClient('example-vm')
+   await qmp.connect('127.0.0.1', 1234)
 
 
 `listener()` context blocks with one name
@@ -87,7 +94,9 @@ This is analogous to the following code:
            event = listener.get()
            print(f"Event arrived: {event['event']}")
 
-This event stream will never end, so these blocks will never terminate.
+This event stream will never end, so these blocks will never
+terminate. Even if the QMP connection errors out prematurely, this
+listener will go silent without raising an error.
 
 
 Using asyncio.Task to concurrently retrieve events
@@ -227,16 +236,20 @@ Clearing listeners
 .. code:: python
 
    await qmp.execute('stop')
-   qmp.events.clear()
+   discarded = qmp.events.clear()
    await qmp.execute('cont')
    event = await qmp.events.get()
    assert event['event'] == 'RESUME'
+   assert discarded[0]['event'] == 'STOP'
 
 `EventListener` objects are FIFO queues. If events are not consumed,
 they will remain in the queue until they are witnessed or discarded via
 `clear()`. FIFO queues will be drained automatically upon leaving a
 context block, or when calling `remove_listener()`.
 
+Any events removed from the queue in this fashion will be returned by
+the clear call.
+
 
 Accessing listener history
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -350,6 +363,12 @@ While `listener()` is only capable of creating a single listener,
                break
 
 
+Note that in the above example, we explicitly wait on jobA to conclude
+first, and then wait for jobB to do the same. All we have guaranteed is
+that the code that waits for jobA will not accidentally consume the
+event intended for the jobB waiter.
+
+
 Extending the `EventListener` class
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -407,13 +426,13 @@ Experimental Interfaces & Design Issues
 These interfaces are not ones I am sure I will keep or otherwise modify
 heavily.
 
-qmp.listener()’s type signature
+qmp.listen()’s type signature
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-`listener()` does not return anything, because it was assumed the caller
+`listen()` does not return anything, because it was assumed the caller
 already had a handle to the listener. However, for
-``qmp.listener(EventListener())`` forms, the caller will not have saved
-a handle to the listener.
+``qmp.listen(EventListener())`` forms, the caller will not have saved a
+handle to the listener.
 
 Because this function can accept *many* listeners, I found it hard to
 accurately type in a way where it could be used in both “one” or “many”
@@ -497,6 +516,21 @@ class EventListener:
         #: Optional, secondary event filter.
         self.event_filter: Optional[EventFilter] = event_filter
 
+    def __repr__(self) -> str:
+        args: List[str] = []
+        if self.names:
+            args.append(f"names={self.names!r}")
+        if self.event_filter:
+            args.append(f"event_filter={self.event_filter!r}")
+
+        if self._queue.qsize():
+            state = f"<pending={self._queue.qsize()}>"
+        else:
+            state = ''
+
+        argstr = ", ".join(args)
+        return f"{type(self).__name__}{state}({argstr})"
+
     @property
     def history(self) -> Tuple[Message, ...]:
         """
@@ -618,7 +652,7 @@ class Events:
     def __init__(self) -> None:
         self._listeners: List[EventListener] = []
 
-        #: Default, all-events `EventListener`.
+        #: Default, all-events `EventListener`. See `qmp.events` for more info.
         self.events: EventListener = EventListener()
         self.register_listener(self.events)
 
diff --git a/python/qemu/qmp/legacy.py b/python/qemu/qmp/legacy.py
index 22a2b5616e..060ed0eb9d 100644
--- a/python/qemu/qmp/legacy.py
+++ b/python/qemu/qmp/legacy.py
@@ -38,6 +38,7 @@ from typing import (
 from .error import QMPError
 from .protocol import Runstate, SocketAddrT
 from .qmp_client import QMPClient
+from .util import get_or_create_event_loop
 
 
 #: QMPMessage is an entire QMP message of any kind.
@@ -86,10 +87,13 @@ class QEMUMonitorProtocol:
                 "server argument should be False when passing a socket")
 
         self._qmp = QMPClient(nickname)
-        self._aloop = asyncio.get_event_loop()
         self._address = address
         self._timeout: Optional[float] = None
 
+        # This is a sync shim intended for use in fully synchronous
+        # programs. Create and set an event loop if necessary.
+        self._aloop = get_or_create_event_loop()
+
         if server:
             assert not isinstance(self._address, socket.socket)
             self._sync(self._qmp.start_server(self._address))
@@ -231,6 +235,9 @@ class QEMUMonitorProtocol:
 
         :return: The first available QMP event, or None.
         """
+        # Kick the event loop to allow events to accumulate
+        self._sync(asyncio.sleep(0))
+
         if not wait:
             # wait is False/0: "do not wait, do not except."
             if self._qmp.events.empty():
@@ -286,8 +293,8 @@ class QEMUMonitorProtocol:
         """
         Set the timeout for QMP RPC execution.
 
-        This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
-        The `accept`, `pull_event` and `get_event` methods have their
+        This timeout affects the `cmd`, `cmd_obj`, and `cmd_raw` methods.
+        The `accept`, `pull_event` and `get_events` methods have their
         own configurable timeouts.
 
         :param timeout:
@@ -303,17 +310,30 @@ class QEMUMonitorProtocol:
         self._qmp.send_fd_scm(fd)
 
     def __del__(self) -> None:
-        if self._qmp.runstate == Runstate.IDLE:
-            return
+        if self._qmp.runstate != Runstate.IDLE:
+            self._qmp.logger.warning(
+                "QEMUMonitorProtocol object garbage collected without a prior "
+                "call to close()"
+            )
 
         if not self._aloop.is_running():
-            self.close()
-        else:
-            # Garbage collection ran while the event loop was running.
-            # Nothing we can do about it now, but if we don't raise our
-            # own error, the user will be treated to a lot of traceback
-            # they might not understand.
+            if self._qmp.runstate != Runstate.IDLE:
+                # If the user neglected to close the QMP session and we
+                # are not currently running in an asyncio context, we
+                # have the opportunity to close the QMP session. If we
+                # do not do this, the error messages presented over
+                # dangling async resources may not make any sense to the
+                # user.
+                self.close()
+
+        if self._qmp.runstate != Runstate.IDLE:
+            # If QMP is still not quiesced, it means that the garbage
+            # collector ran from a context within the event loop and we
+            # are simply too late to take any corrective action. Raise
+            # our own error to give meaningful feedback to the user in
+            # order to prevent pages of asyncio stacktrace jargon.
             raise QMPError(
-                "QEMUMonitorProtocol.close()"
-                " was not called before object was garbage collected"
+                "QEMUMonitorProtocol.close() was not called before object was "
+                "garbage collected, and could not be closed due to GC running "
+                "in the event loop"
             )
diff --git a/python/qemu/qmp/message.py b/python/qemu/qmp/message.py
index f76ccc9074..dabb8ec360 100644
--- a/python/qemu/qmp/message.py
+++ b/python/qemu/qmp/message.py
@@ -28,7 +28,8 @@ class Message(MutableMapping[str, object]):
     be instantiated from either another mapping (like a `dict`), or from
     raw `bytes` that still need to be deserialized.
 
-    Once instantiated, it may be treated like any other MutableMapping::
+    Once instantiated, it may be treated like any other
+    :py:obj:`~collections.abc.MutableMapping`::
 
         >>> msg = Message(b'{"hello": "world"}')
         >>> assert msg['hello'] == 'world'
@@ -50,12 +51,19 @@ class Message(MutableMapping[str, object]):
        >>> dict(msg)
        {'hello': 'world'}
 
+    Or pretty-printed::
+
+       >>> print(str(msg))
+       {
+         "hello": "world"
+       }
 
     :param value: Initial value, if any.
     :param eager:
         When `True`, attempt to serialize or deserialize the initial value
         immediately, so that conversion exceptions are raised during
         the call to ``__init__()``.
+
     """
     # pylint: disable=too-many-ancestors
 
@@ -178,15 +186,15 @@ class DeserializationError(ProtocolError):
     :param raw: The raw `bytes` that prompted the failure.
     """
     def __init__(self, error_message: str, raw: bytes):
-        super().__init__(error_message)
+        super().__init__(error_message, raw)
         #: The raw `bytes` that were not understood as JSON.
         self.raw: bytes = raw
 
     def __str__(self) -> str:
-        return "\n".join([
+        return "\n".join((
             super().__str__(),
             f"  raw bytes were: {str(self.raw)}",
-        ])
+        ))
 
 
 class UnexpectedTypeError(ProtocolError):
@@ -197,13 +205,13 @@ class UnexpectedTypeError(ProtocolError):
     :param value: The deserialized JSON value that wasn't an object.
     """
     def __init__(self, error_message: str, value: object):
-        super().__init__(error_message)
+        super().__init__(error_message, value)
         #: The JSON value that was expected to be an object.
         self.value: object = value
 
     def __str__(self) -> str:
         strval = json.dumps(self.value, indent=2)
-        return "\n".join([
+        return "\n".join((
             super().__str__(),
             f"  json value was: {strval}",
-        ])
+        ))
diff --git a/python/qemu/qmp/models.py b/python/qemu/qmp/models.py
index da52848d5a..7e0d0baf03 100644
--- a/python/qemu/qmp/models.py
+++ b/python/qemu/qmp/models.py
@@ -54,7 +54,7 @@ class Model:
 
 class Greeting(Model):
     """
-    Defined in qmp-spec.rst, section "Server Greeting".
+    Defined in `interop/qmp-spec`, "Server Greeting" section.
 
     :param raw: The raw Greeting object.
     :raise KeyError: If any required fields are absent.
@@ -82,7 +82,7 @@ class Greeting(Model):
 
 class QMPGreeting(Model):
     """
-    Defined in qmp-spec.rst, section "Server Greeting".
+    Defined in `interop/qmp-spec`, "Server Greeting" section.
 
     :param raw: The raw QMPGreeting object.
     :raise KeyError: If any required fields are absent.
@@ -104,7 +104,7 @@ class QMPGreeting(Model):
 
 class ErrorResponse(Model):
     """
-    Defined in qmp-spec.rst, section "Error".
+    Defined in `interop/qmp-spec`, "Error" section.
 
     :param raw: The raw ErrorResponse object.
     :raise KeyError: If any required fields are absent.
@@ -126,7 +126,7 @@ class ErrorResponse(Model):
 
 class ErrorInfo(Model):
     """
-    Defined in qmp-spec.rst, section "Error".
+    Defined in `interop/qmp-spec`, "Error" section.
 
     :param raw: The raw ErrorInfo object.
     :raise KeyError: If any required fields are absent.
diff --git a/python/qemu/qmp/protocol.py b/python/qemu/qmp/protocol.py
index a4ffdfad51..219d092a79 100644
--- a/python/qemu/qmp/protocol.py
+++ b/python/qemu/qmp/protocol.py
@@ -15,13 +15,16 @@ class.
 
 import asyncio
 from asyncio import StreamReader, StreamWriter
+from contextlib import asynccontextmanager
 from enum import Enum
 from functools import wraps
+from inspect import iscoroutinefunction
 import logging
 import socket
 from ssl import SSLContext
 from typing import (
     Any,
+    AsyncGenerator,
     Awaitable,
     Callable,
     Generic,
@@ -36,13 +39,10 @@ from typing import (
 from .error import QMPError
 from .util import (
     bottom_half,
-    create_task,
     exception_summary,
     flush,
-    is_closing,
     pretty_traceback,
     upper_half,
-    wait_closed,
 )
 
 
@@ -54,6 +54,9 @@ InternetAddrT = Tuple[str, int]
 UnixAddrT = str
 SocketAddrT = Union[UnixAddrT, InternetAddrT]
 
+# Maximum allowable size of read buffer, default
+_DEFAULT_READBUFLEN = 64 * 1024
+
 
 class Runstate(Enum):
     """Protocol session runstate."""
@@ -76,11 +79,17 @@ class ConnectError(QMPError):
     This Exception always wraps a "root cause" exception that can be
     interrogated for additional information.
 
+    For example, when connecting to a non-existent socket::
+
+        await qmp.connect('not_found.sock')
+        # ConnectError: Failed to establish connection:
+        #               [Errno 2] No such file or directory
+
     :param error_message: Human-readable string describing the error.
     :param exc: The root-cause exception.
     """
     def __init__(self, error_message: str, exc: Exception):
-        super().__init__(error_message)
+        super().__init__(error_message, exc)
         #: Human-readable error string
         self.error_message: str = error_message
         #: Wrapped root cause exception
@@ -99,8 +108,8 @@ class StateError(QMPError):
     An API command (connect, execute, etc) was issued at an inappropriate time.
 
     This error is raised when a command like
-    :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
-    time.
+    :py:meth:`~AsyncProtocol.connect()` is called when the client is
+    already connected.
 
     :param error_message: Human-readable string describing the state violation.
     :param state: The actual `Runstate` seen at the time of the violation.
@@ -108,11 +117,14 @@ class StateError(QMPError):
     """
     def __init__(self, error_message: str,
                  state: Runstate, required: Runstate):
-        super().__init__(error_message)
+        super().__init__(error_message, state, required)
         self.error_message = error_message
         self.state = state
         self.required = required
 
+    def __str__(self) -> str:
+        return self.error_message
+
 
 F = TypeVar('F', bound=Callable[..., Any])  # pylint: disable=invalid-name
 
@@ -125,6 +137,25 @@ def require(required_state: Runstate) -> Callable[[F], F]:
     :param required_state: The `Runstate` required to invoke this method.
     :raise StateError: When the required `Runstate` is not met.
     """
+    def _check(proto: 'AsyncProtocol[Any]') -> None:
+        name = type(proto).__name__
+        if proto.runstate == required_state:
+            return
+
+        if proto.runstate == Runstate.CONNECTING:
+            emsg = f"{name} is currently connecting."
+        elif proto.runstate == Runstate.DISCONNECTING:
+            emsg = (f"{name} is disconnecting."
+                    " Call disconnect() to return to IDLE state.")
+        elif proto.runstate == Runstate.RUNNING:
+            emsg = f"{name} is already connected and running."
+        elif proto.runstate == Runstate.IDLE:
+            emsg = f"{name} is disconnected and idle."
+        else:
+            assert False
+
+        raise StateError(emsg, proto.runstate, required_state)
+
     def _decorator(func: F) -> F:
         # _decorator is the decorator that is built by calling the
         # require() decorator factory; e.g.:
@@ -135,29 +166,20 @@ def require(required_state: Runstate) -> Callable[[F], F]:
         @wraps(func)
         def _wrapper(proto: 'AsyncProtocol[Any]',
                      *args: Any, **kwargs: Any) -> Any:
-            # _wrapper is the function that gets executed prior to the
-            # decorated method.
-
-            name = type(proto).__name__
-
-            if proto.runstate != required_state:
-                if proto.runstate == Runstate.CONNECTING:
-                    emsg = f"{name} is currently connecting."
-                elif proto.runstate == Runstate.DISCONNECTING:
-                    emsg = (f"{name} is disconnecting."
-                            " Call disconnect() to return to IDLE state.")
-                elif proto.runstate == Runstate.RUNNING:
-                    emsg = f"{name} is already connected and running."
-                elif proto.runstate == Runstate.IDLE:
-                    emsg = f"{name} is disconnected and idle."
-                else:
-                    assert False
-                raise StateError(emsg, proto.runstate, required_state)
-            # No StateError, so call the wrapped method.
+            _check(proto)
             return func(proto, *args, **kwargs)
 
-        # Return the decorated method;
-        # Transforming Func to Decorated[Func].
+        @wraps(func)
+        async def _async_wrapper(proto: 'AsyncProtocol[Any]',
+                                 *args: Any, **kwargs: Any) -> Any:
+            _check(proto)
+            return await func(proto, *args, **kwargs)
+
+        # Return the decorated method; F => Decorated[F]
+        # Use an async version when applicable, which
+        # preserves async signature generation in sphinx.
+        if iscoroutinefunction(func):
+            return cast(F, _async_wrapper)
         return cast(F, _wrapper)
 
     # Return the decorator instance from the decorator factory. Phew!
@@ -200,24 +222,26 @@ class AsyncProtocol(Generic[T]):
         will log to 'qemu.qmp.protocol', but each individual connection
         can be given its own logger by giving it a name; messages will
         then log to 'qemu.qmp.protocol.${name}'.
+    :param readbuflen:
+        The maximum read buffer length of the underlying StreamReader
+        instance.
     """
     # pylint: disable=too-many-instance-attributes
 
     #: Logger object for debugging messages from this connection.
     logger = logging.getLogger(__name__)
 
-    # Maximum allowable size of read buffer
-    _limit = 64 * 1024
-
     # -------------------------
     # Section: Public interface
     # -------------------------
 
-    def __init__(self, name: Optional[str] = None) -> None:
-        #: The nickname for this connection, if any.
-        self.name: Optional[str] = name
-        if self.name is not None:
-            self.logger = self.logger.getChild(self.name)
+    def __init__(
+        self, name: Optional[str] = None,
+        readbuflen: int = _DEFAULT_READBUFLEN
+    ) -> None:
+        self._name: Optional[str]
+        self.name = name
+        self.readbuflen = readbuflen
 
         # stream I/O
         self._reader: Optional[StreamReader] = None
@@ -254,6 +278,24 @@ class AsyncProtocol(Generic[T]):
         tokens.append(f"runstate={self.runstate.name}")
         return f"<{cls_name} {' '.join(tokens)}>"
 
+    @property
+    def name(self) -> Optional[str]:
+        """
+        The nickname for this connection, if any.
+
+        This name is used for differentiating instances in debug output.
+        """
+        return self._name
+
+    @name.setter
+    def name(self, name: Optional[str]) -> None:
+        logger = logging.getLogger(__name__)
+        if name:
+            self.logger = logger.getChild(name)
+        else:
+            self.logger = logger
+        self._name = name
+
     @property  # @upper_half
     def runstate(self) -> Runstate:
         """The current `Runstate` of the connection."""
@@ -262,7 +304,7 @@ class AsyncProtocol(Generic[T]):
     @upper_half
     async def runstate_changed(self) -> Runstate:
         """
-        Wait for the `runstate` to change, then return that runstate.
+        Wait for the `runstate` to change, then return that `Runstate`.
         """
         await self._runstate_event.wait()
         return self.runstate
@@ -276,9 +318,9 @@ class AsyncProtocol(Generic[T]):
         """
         Accept a connection and begin processing message queues.
 
-        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
-        This method is precisely equivalent to calling `start_server()`
-        followed by `accept()`.
+        If this call fails, `runstate` is guaranteed to be set back to
+        `IDLE`.  This method is precisely equivalent to calling
+        `start_server()` followed by :py:meth:`~AsyncProtocol.accept()`.
 
         :param address:
             Address to listen on; UNIX socket path or TCP address/port.
@@ -291,7 +333,8 @@ class AsyncProtocol(Generic[T]):
             This exception will wrap a more concrete one. In most cases,
             the wrapped exception will be `OSError` or `EOFError`. If a
             protocol-level failure occurs while establishing a new
-            session, the wrapped error may also be an `QMPError`.
+            session, the wrapped error may also be a `QMPError`.
+
         """
         await self.start_server(address, ssl)
         await self.accept()
@@ -307,8 +350,8 @@ class AsyncProtocol(Generic[T]):
         This method starts listening for an incoming connection, but
         does not block waiting for a peer. This call will return
         immediately after binding and listening on a socket. A later
-        call to `accept()` must be made in order to finalize the
-        incoming connection.
+        call to :py:meth:`~AsyncProtocol.accept()` must be made in order
+        to finalize the incoming connection.
 
         :param address:
             Address to listen on; UNIX socket path or TCP address/port.
@@ -321,9 +364,8 @@ class AsyncProtocol(Generic[T]):
             This exception will wrap a more concrete one. In most cases,
             the wrapped exception will be `OSError`.
         """
-        await self._session_guard(
-            self._do_start_server(address, ssl),
-            'Failed to establish connection')
+        async with self._session_guard('Failed to establish connection'):
+            await self._do_start_server(address, ssl)
         assert self.runstate == Runstate.CONNECTING
 
     @upper_half
@@ -332,10 +374,12 @@ class AsyncProtocol(Generic[T]):
         """
         Accept an incoming connection and begin processing message queues.
 
-        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+        Used after a previous call to `start_server()` to accept an
+        incoming connection. If this call fails, `runstate` is
+        guaranteed to be set back to `IDLE`.
 
         :raise StateError: When the `Runstate` is not `CONNECTING`.
-        :raise QMPError: When `start_server()` was not called yet.
+        :raise QMPError: When `start_server()` was not called first.
         :raise ConnectError:
             When a connection or session cannot be established.
 
@@ -346,12 +390,10 @@ class AsyncProtocol(Generic[T]):
         """
         if self._accepted is None:
             raise QMPError("Cannot call accept() before start_server().")
-        await self._session_guard(
-            self._do_accept(),
-            'Failed to establish connection')
-        await self._session_guard(
-            self._establish_session(),
-            'Failed to establish session')
+        async with self._session_guard('Failed to establish connection'):
+            await self._do_accept()
+        async with self._session_guard('Failed to establish session'):
+            await self._establish_session()
         assert self.runstate == Runstate.RUNNING
 
     @upper_half
@@ -376,12 +418,10 @@ class AsyncProtocol(Generic[T]):
             protocol-level failure occurs while establishing a new
             session, the wrapped error may also be an `QMPError`.
         """
-        await self._session_guard(
-            self._do_connect(address, ssl),
-            'Failed to establish connection')
-        await self._session_guard(
-            self._establish_session(),
-            'Failed to establish session')
+        async with self._session_guard('Failed to establish connection'):
+            await self._do_connect(address, ssl)
+        async with self._session_guard('Failed to establish session'):
+            await self._establish_session()
         assert self.runstate == Runstate.RUNNING
 
     @upper_half
@@ -392,7 +432,11 @@ class AsyncProtocol(Generic[T]):
         If there was an exception that caused the reader/writers to
         terminate prematurely, it will be raised here.
 
-        :raise Exception: When the reader or writer terminate unexpectedly.
+        :raise Exception:
+            When the reader or writer terminate unexpectedly. You can
+            expect to see `EOFError` if the server hangs up, or
+            `OSError` for connection-related issues. If there was a QMP
+            protocol-level problem, `ProtocolError` will be seen.
         """
         self.logger.debug("disconnect() called.")
         self._schedule_disconnect()
@@ -402,7 +446,8 @@ class AsyncProtocol(Generic[T]):
     # Section: Session machinery
     # --------------------------
 
-    async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None:
+    @asynccontextmanager
+    async def _session_guard(self, emsg: str) -> AsyncGenerator[None, None]:
         """
         Async guard function used to roll back to `IDLE` on any error.
 
@@ -419,10 +464,9 @@ class AsyncProtocol(Generic[T]):
         :raise ConnectError:
             When any other error is encountered in the guarded block.
         """
-        # Note: After Python 3.6 support is removed, this should be an
-        # @asynccontextmanager instead of accepting a callback.
         try:
-            await coro
+            # Caller's code runs here.
+            yield
         except BaseException as err:
             self.logger.error("%s: %s", emsg, exception_summary(err))
             self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
@@ -561,7 +605,7 @@ class AsyncProtocol(Generic[T]):
                 port=address[1],
                 ssl=ssl,
                 backlog=1,
-                limit=self._limit,
+                limit=self.readbuflen,
             )
         else:
             coro = asyncio.start_unix_server(
@@ -569,7 +613,7 @@ class AsyncProtocol(Generic[T]):
                 path=address,
                 ssl=ssl,
                 backlog=1,
-                limit=self._limit,
+                limit=self.readbuflen,
             )
 
         # Allow runstate watchers to witness 'CONNECTING' state; some
@@ -624,7 +668,7 @@ class AsyncProtocol(Generic[T]):
                               "fd=%d, family=%r, type=%r",
                               address.fileno(), address.family, address.type)
             connect = asyncio.open_connection(
-                limit=self._limit,
+                limit=self.readbuflen,
                 ssl=ssl,
                 sock=address,
             )
@@ -634,14 +678,14 @@ class AsyncProtocol(Generic[T]):
                 address[0],
                 address[1],
                 ssl=ssl,
-                limit=self._limit,
+                limit=self.readbuflen,
             )
         else:
             self.logger.debug("Connecting to file://%s ...", address)
             connect = asyncio.open_unix_connection(
                 path=address,
                 ssl=ssl,
-                limit=self._limit,
+                limit=self.readbuflen,
             )
 
         self._reader, self._writer = await connect
@@ -663,8 +707,8 @@ class AsyncProtocol(Generic[T]):
         reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
         writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
 
-        self._reader_task = create_task(reader_coro)
-        self._writer_task = create_task(writer_coro)
+        self._reader_task = asyncio.create_task(reader_coro)
+        self._writer_task = asyncio.create_task(writer_coro)
 
         self._bh_tasks = asyncio.gather(
             self._reader_task,
@@ -689,7 +733,7 @@ class AsyncProtocol(Generic[T]):
         if not self._dc_task:
             self._set_state(Runstate.DISCONNECTING)
             self.logger.debug("Scheduling disconnect.")
-            self._dc_task = create_task(self._bh_disconnect())
+            self._dc_task = asyncio.create_task(self._bh_disconnect())
 
     @upper_half
     async def _wait_disconnect(self) -> None:
@@ -825,13 +869,13 @@ class AsyncProtocol(Generic[T]):
         if not self._writer:
             return
 
-        if not is_closing(self._writer):
+        if not self._writer.is_closing():
             self.logger.debug("Closing StreamWriter.")
             self._writer.close()
 
         self.logger.debug("Waiting for StreamWriter to close ...")
         try:
-            await wait_closed(self._writer)
+            await self._writer.wait_closed()
         except Exception:  # pylint: disable=broad-except
             # It's hard to tell if the Stream is already closed or
             # not. Even if one of the tasks has failed, it may have
diff --git a/python/qemu/qmp/qmp_client.py b/python/qemu/qmp/qmp_client.py
index 2a817f9db3..8beccfe29d 100644
--- a/python/qemu/qmp/qmp_client.py
+++ b/python/qemu/qmp/qmp_client.py
@@ -41,7 +41,7 @@ class _WrappedProtocolError(ProtocolError):
     :param exc: The root-cause exception.
     """
     def __init__(self, error_message: str, exc: Exception):
-        super().__init__(error_message)
+        super().__init__(error_message, exc)
         self.exc = exc
 
     def __str__(self) -> str:
@@ -70,21 +70,38 @@ class ExecuteError(QMPError):
     """
     Exception raised by `QMPClient.execute()` on RPC failure.
 
+    This exception is raised when the server received, interpreted, and
+    replied to a command successfully; but the command itself returned a
+    failure status.
+
+    For example::
+
+        await qmp.execute('block-dirty-bitmap-add',
+                          {'node': 'foo', 'name': 'my_bitmap'})
+        # qemu.qmp.qmp_client.ExecuteError:
+        #     Cannot find device='foo' nor node-name='foo'
+
     :param error_response: The RPC error response object.
     :param sent: The sent RPC message that caused the failure.
     :param received: The raw RPC error reply received.
     """
     def __init__(self, error_response: ErrorResponse,
                  sent: Message, received: Message):
-        super().__init__(error_response.error.desc)
+        super().__init__(error_response, sent, received)
         #: The sent `Message` that caused the failure
         self.sent: Message = sent
         #: The received `Message` that indicated failure
         self.received: Message = received
         #: The parsed error response
         self.error: ErrorResponse = error_response
-        #: The QMP error class
-        self.error_class: str = error_response.error.class_
+
+    @property
+    def error_class(self) -> str:
+        """The QMP error class"""
+        return self.error.error.class_
+
+    def __str__(self) -> str:
+        return self.error.error.desc
 
 
 class ExecInterruptedError(QMPError):
@@ -93,9 +110,22 @@ class ExecInterruptedError(QMPError):
 
     This error is raised when an `execute()` statement could not be
     completed.  This can occur because the connection itself was
-    terminated before a reply was received.
+    terminated before a reply was received. The true cause of the
+    interruption will be available via `disconnect()`.
 
-    The true cause of the interruption will be available via `disconnect()`.
+    The QMP protocol does not make it possible to know if a command
+    succeeded or failed after such an event; the client will need to
+    query the server to determine the state of the server on a
+    case-by-case basis.
+
+    For example, ECONNRESET might look like this::
+
+        try:
+            await qmp.execute('query-block')
+            # ExecInterruptedError: Disconnected
+        except ExecInterruptedError:
+            await qmp.disconnect()
+            # ConnectionResetError: [Errno 104] Connection reset by peer
     """
 
 
@@ -110,8 +140,8 @@ class _MsgProtocolError(ProtocolError):
     :param error_message: Human-readable string describing the error.
     :param msg: The QMP `Message` that caused the error.
     """
-    def __init__(self, error_message: str, msg: Message):
-        super().__init__(error_message)
+    def __init__(self, error_message: str, msg: Message, *args: object):
+        super().__init__(error_message, msg, *args)
         #: The received `Message` that caused the error.
         self.msg: Message = msg
 
@@ -150,30 +180,44 @@ class BadReplyError(_MsgProtocolError):
     :param sent: The message that was sent that prompted the error.
     """
     def __init__(self, error_message: str, msg: Message, sent: Message):
-        super().__init__(error_message, msg)
+        super().__init__(error_message, msg, sent)
         #: The sent `Message` that caused the failure
         self.sent = sent
 
 
 class QMPClient(AsyncProtocol[Message], Events):
-    """
-    Implements a QMP client connection.
+    """Implements a QMP client connection.
+
+    `QMPClient` can be used to either connect or listen to a QMP server,
+    but always acts as the QMP client.
 
-    QMP can be used to establish a connection as either the transport
-    client or server, though this class always acts as the QMP client.
+    :param name:
+        Optional nickname for the connection, used to differentiate
+        instances when logging.
 
-    :param name: Optional nickname for the connection, used for logging.
+    :param readbuflen:
+        The maximum buffer length for reads and writes to and from the QMP
+        server, in bytes. Default is 10MB. If `QMPClient` is used to
+        connect to a guest agent to transfer files via ``guest-file-read``/
+        ``guest-file-write``, increasing this value may be required.
 
     Basic script-style usage looks like this::
 
-      qmp = QMPClient('my_virtual_machine_name')
-      await qmp.connect(('127.0.0.1', 1234))
-      ...
-      res = await qmp.execute('block-query')
-      ...
-      await qmp.disconnect()
+      import asyncio
+      from qemu.qmp import QMPClient
+
+      async def main():
+          qmp = QMPClient('my_virtual_machine_name')
+          await qmp.connect(('127.0.0.1', 1234))
+          ...
+          res = await qmp.execute('query-block')
+          ...
+          await qmp.disconnect()
 
-    Basic async client-style usage looks like this::
+      asyncio.run(main())
+
+    A more advanced example that starts to take advantage of asyncio
+    might look like this::
 
       class Client:
           def __init__(self, name: str):
@@ -193,25 +237,32 @@ class QMPClient(AsyncProtocol[Message], Events):
               await self.disconnect()
 
     See `qmp.events` for more detail on event handling patterns.
+
     """
     #: Logger object used for debugging messages.
     logger = logging.getLogger(__name__)
 
-    # Read buffer limit; 10MB like libvirt default
-    _limit = 10 * 1024 * 1024
+    # Read buffer default limit; 10MB like libvirt default
+    _readbuflen = 10 * 1024 * 1024
 
     # Type alias for pending execute() result items
     _PendingT = Union[Message, ExecInterruptedError]
 
-    def __init__(self, name: Optional[str] = None) -> None:
-        super().__init__(name)
+    def __init__(
+        self,
+        name: Optional[str] = None,
+        readbuflen: int = _readbuflen
+    ) -> None:
+        super().__init__(name, readbuflen)
         Events.__init__(self)
 
         #: Whether or not to await a greeting after establishing a connection.
+        #: Defaults to True; QGA servers expect this to be False.
         self.await_greeting: bool = True
 
-        #: Whether or not to perform capabilities negotiation upon connection.
-        #: Implies `await_greeting`.
+        #: Whether or not to perform capabilities negotiation upon
+        #: connection. Implies `await_greeting`. Defaults to True; QGA
+        #: servers expect this to be False.
         self.negotiate: bool = True
 
         # Cached Greeting, if one was awaited.
@@ -228,7 +279,13 @@ class QMPClient(AsyncProtocol[Message], Events):
 
     @property
     def greeting(self) -> Optional[Greeting]:
-        """The `Greeting` from the QMP server, if any."""
+        """
+        The `Greeting` from the QMP server, if any.
+
+        Defaults to ``None``, and will be set after a greeting is
+        received during the connection process. It is reset at the start
+        of each connection attempt.
+        """
         return self._greeting
 
     @upper_half
@@ -369,7 +426,7 @@ class QMPClient(AsyncProtocol[Message], Events):
             # This is very likely a server parsing error.
             # It doesn't inherently belong to any pending execution.
             # Instead of performing clever recovery, just terminate.
-            # See "NOTE" in qmp-spec.rst, section "Error".
+            # See "NOTE" in interop/qmp-spec, "Error" section.
             raise ServerParseError(
                 ("Server sent an error response without an ID, "
                  "but there are no ID-less executions pending. "
@@ -377,7 +434,7 @@ class QMPClient(AsyncProtocol[Message], Events):
                 msg
             )
 
-        # qmp-spec.rst, section "Commands Responses":
+        # qmp-spec.rst, "Commands Responses" section:
         # 'Clients should drop all the responses
         # that have an unknown "id" field.'
         self.logger.log(
@@ -550,7 +607,7 @@ class QMPClient(AsyncProtocol[Message], Events):
     @require(Runstate.RUNNING)
     async def execute_msg(self, msg: Message) -> object:
         """
-        Execute a QMP command and return its value.
+        Execute a QMP command on the server and return its value.
 
         :param msg: The QMP `Message` to execute.
 
@@ -562,7 +619,9 @@ class QMPClient(AsyncProtocol[Message], Events):
             If the QMP `Message` does not have either the 'execute' or
             'exec-oob' fields set.
         :raise ExecuteError: When the server returns an error response.
-        :raise ExecInterruptedError: if the connection was terminated early.
+        :raise ExecInterruptedError:
+            If the connection was disrupted before
+            receiving a reply from the server.
         """
         if not ('execute' in msg or 'exec-oob' in msg):
             raise ValueError("Requires 'execute' or 'exec-oob' message")
@@ -601,9 +660,11 @@ class QMPClient(AsyncProtocol[Message], Events):
 
         :param cmd: QMP command name.
         :param arguments: Arguments (if any). Must be JSON-serializable.
-        :param oob: If `True`, execute "out of band".
+        :param oob:
+            If `True`, execute "out of band". See `interop/qmp-spec`
+            section "Out-of-band execution".
 
-        :return: An executable QMP `Message`.
+        :return: A QMP `Message` that can be executed with `execute_msg()`.
         """
         msg = Message({'exec-oob' if oob else 'execute': cmd})
         if arguments is not None:
@@ -615,18 +676,22 @@ class QMPClient(AsyncProtocol[Message], Events):
                       arguments: Optional[Mapping[str, object]] = None,
                       oob: bool = False) -> object:
         """
-        Execute a QMP command and return its value.
+        Execute a QMP command on the server and return its value.
 
         :param cmd: QMP command name.
         :param arguments: Arguments (if any). Must be JSON-serializable.
-        :param oob: If `True`, execute "out of band".
+        :param oob:
+            If `True`, execute "out of band". See `interop/qmp-spec`
+            section "Out-of-band execution".
 
         :return:
             The command execution return value from the server. The type of
             object returned depends on the command that was issued,
             though most in QEMU return a `dict`.
         :raise ExecuteError: When the server returns an error response.
-        :raise ExecInterruptedError: if the connection was terminated early.
+        :raise ExecInterruptedError:
+            If the connection was disrupted before
+            receiving a reply from the server.
         """
         msg = self.make_execute_msg(cmd, arguments, oob=oob)
         return await self.execute_msg(msg)
@@ -634,8 +699,20 @@ class QMPClient(AsyncProtocol[Message], Events):
     @upper_half
     @require(Runstate.RUNNING)
     def send_fd_scm(self, fd: int) -> None:
-        """
-        Send a file descriptor to the remote via SCM_RIGHTS.
+        """Send a file descriptor to the remote via SCM_RIGHTS.
+
+        This method does not close the file descriptor.
+
+        :param fd: The file descriptor to send to QEMU.
+
+        This is an advanced feature of QEMU where file descriptors can
+        be passed from client to server. This is usually used as a
+        security measure to isolate the QEMU process from being able to
+        open its own files. See the QMP commands ``getfd`` and
+        ``add-fd`` for more information.
+
+        See `socket.socket.sendmsg` for more information on the Python
+        implementation for sending file descriptors over a UNIX socket.
         """
         assert self._writer is not None
         sock = self._writer.transport.get_extra_info('socket')
diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py
index 98e684e9e8..f818800568 100644
--- a/python/qemu/qmp/qmp_shell.py
+++ b/python/qemu/qmp/qmp_shell.py
@@ -10,9 +10,15 @@
 #
 
 """
-Low-level QEMU shell on top of QMP.
+qmp-shell - An interactive QEMU shell powered by QMP
 
-usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
+qmp-shell offers a simple shell with a convenient shorthand syntax as an
+alternative to typing JSON by hand. This syntax is not standardized and
+is not meant to be used as a scriptable interface. This shorthand *may*
+change incompatibly in the future, and it is strongly encouraged to use
+the QMP library to provide API-stable scripting when needed.
+
+usage: qmp-shell [-h] [-H] [-v] [-p] [-l LOGFILE] [-N] qmp_server
 
 positional arguments:
   qmp_server            < UNIX socket path | TCP address:port >
@@ -20,41 +26,52 @@ positional arguments:
 optional arguments:
   -h, --help            show this help message and exit
   -H, --hmp             Use HMP interface
-  -N, --skip-negotiation
-                        Skip negotiate (for qemu-ga)
   -v, --verbose         Verbose (echo commands sent and received)
   -p, --pretty          Pretty-print JSON
+  -l LOGFILE, --logfile LOGFILE
+                        Save log of all QMP messages to PATH
+  -N, --skip-negotiation
+                        Skip negotiate (for qemu-ga)
+
+Usage
+-----
 
+First, start QEMU with::
 
-Start QEMU with:
+    > qemu [...] -qmp unix:./qmp-sock,server=on[,wait=off]
 
-# qemu [...] -qmp unix:./qmp-sock,server
+Then run the shell, passing the address of the socket::
 
-Run the shell:
+    > qmp-shell ./qmp-sock
 
-$ qmp-shell ./qmp-sock
+Syntax
+------
 
-Commands have the following format:
+Commands have the following format::
 
-   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
 
-For example:
+For example, to add a network device::
 
-(QEMU) device_add driver=e1000 id=net1
-{'return': {}}
-(QEMU)
+    (QEMU) device_add driver=e1000 id=net1
+    {'return': {}}
+    (QEMU)
 
-key=value pairs also support Python or JSON object literal subset notations,
-without spaces. Dictionaries/objects {} are supported as are arrays [].
+key=value pairs support either Python or JSON object literal notations,
+**without spaces**. Dictionaries/objects ``{}`` are supported, as are
+arrays ``[]``::
 
-   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
+    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
 
-Both JSON and Python formatting should work, including both styles of
-string literal quotes. Both paradigms of literal values should work,
-including null/true/false for JSON and None/True/False for Python.
+Either JSON or Python formatting for compound values works, including
+both styles of string literal quotes (either single or double
+quotes). Both paradigms of literal values are accepted, including
+``null/true/false`` for JSON and ``None/True/False`` for Python.
 
+Transactions
+------------
 
-Transactions have the following multi-line format:
+Transactions have the following multi-line format::
 
    transaction(
    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
@@ -62,11 +79,11 @@ Transactions have the following multi-line format:
    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
    )
 
-One line transactions are also supported:
+One line transactions are also supported::
 
    transaction( action-name1 ... )
 
-For example:
+For example::
 
     (QEMU) transaction(
     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
@@ -75,9 +92,35 @@ For example:
     {"return": {}}
     (QEMU)
 
-Use the -v and -p options to activate the verbose and pretty-print options,
-which will echo back the properly formatted JSON-compliant QMP that is being
-sent to QEMU, which is useful for debugging and documentation generation.
+Commands
+--------
+
+Autocomplete of command names using <tab> is supported. Pressing <tab>
+at a blank CLI prompt will show you a list of all available commands
+that the connected QEMU instance supports.
+
+For documentation on QMP commands and their arguments, please see
+`qmp ref`.
+
+Events
+------
+
+qmp-shell will display events received from the server, but this version
+does not do so asynchronously. To check for new events from the server,
+press <enter> on a blank line::
+
+    (QEMU) ⏎
+    {'timestamp': {'seconds': 1660071944, 'microseconds': 184667},
+     'event': 'STOP'}
+
+Display options
+---------------
+
+Use the -v and -p options to activate the verbose and pretty-print
+options, which will echo back the properly formatted JSON-compliant QMP
+that is being sent to QEMU. This is useful for debugging to see the
+wire-level QMP data being exchanged, and generating output for use in
+writing documentation for QEMU.
 """
 
 import argparse
@@ -514,21 +557,29 @@ def die(msg: str) -> NoReturn:
     sys.exit(1)
 
 
-def main() -> None:
-    """
-    qmp-shell entry point: parse command line arguments and start the REPL.
-    """
+def common_parser() -> argparse.ArgumentParser:
+    """Build common parsing options used by qmp-shell and qmp-shell-wrap."""
     parser = argparse.ArgumentParser()
     parser.add_argument('-H', '--hmp', action='store_true',
                         help='Use HMP interface')
-    parser.add_argument('-N', '--skip-negotiation', action='store_true',
-                        help='Skip negotiate (for qemu-ga)')
     parser.add_argument('-v', '--verbose', action='store_true',
                         help='Verbose (echo commands sent and received)')
     parser.add_argument('-p', '--pretty', action='store_true',
                         help='Pretty-print JSON')
     parser.add_argument('-l', '--logfile',
                         help='Save log of all QMP messages to PATH')
+    # NOTE: When changing arguments, update both this module docstring
+    # and the manpage synopsis in docs/man/qmp_shell.rst.
+    return parser
+
+
+def main() -> None:
+    """
+    qmp-shell entry point: parse command line arguments and start the REPL.
+    """
+    parser = common_parser()
+    parser.add_argument('-N', '--skip-negotiation', action='store_true',
+                        help='Skip negotiate (for qemu-ga)')
 
     default_server = os.environ.get('QMP_SOCKET')
     parser.add_argument('qmp_server', action='store',
@@ -561,19 +612,37 @@ def main() -> None:
 
 def main_wrap() -> None:
     """
-    qmp-shell-wrap entry point: parse command line arguments and
-    start the REPL.
-    """
-    parser = argparse.ArgumentParser()
-    parser.add_argument('-H', '--hmp', action='store_true',
-                        help='Use HMP interface')
-    parser.add_argument('-v', '--verbose', action='store_true',
-                        help='Verbose (echo commands sent and received)')
-    parser.add_argument('-p', '--pretty', action='store_true',
-                        help='Pretty-print JSON')
-    parser.add_argument('-l', '--logfile',
-                        help='Save log of all QMP messages to PATH')
+    qmp-shell-wrap - QEMU + qmp-shell launcher utility
+
+    Launch QEMU and connect to it with `qmp-shell` in a single command.
+    CLI arguments will be forwarded to qemu, with additional arguments
+    added to allow `qmp-shell` to then connect to the recently launched
+    QEMU instance.
+
+    usage: qmp-shell-wrap [-h] [-H] [-v] [-p] [-l LOGFILE] ...
 
+    positional arguments:
+      command               QEMU command line to invoke
+
+    optional arguments:
+      -h, --help            show this help message and exit
+      -H, --hmp             Use HMP interface
+      -v, --verbose         Verbose (echo commands sent and received)
+      -p, --pretty          Pretty-print JSON
+      -l LOGFILE, --logfile LOGFILE
+                            Save log of all QMP messages to PATH
+
+    Usage
+    -----
+
+    Prepend "qmp-shell-wrap" to your usual QEMU command line::
+
+        > qmp-shell-wrap qemu-system-x86_64 -M q35 -m 4096 -display none
+        Welcome to the QMP low-level shell!
+        Connected
+        (QEMU)
+    """
+    parser = common_parser()
     parser.add_argument('command', nargs=argparse.REMAINDER,
                         help='QEMU command line to invoke')
 
@@ -610,6 +679,8 @@ def main_wrap() -> None:
 
                 for _ in qemu.repl():
                     pass
+    except FileNotFoundError:
+        sys.stderr.write(f"ERROR: QEMU executable '{cmd[0]}' not found.\n")
     finally:
         os.unlink(sockpath)
 
diff --git a/python/qemu/qmp/qmp_tui.py b/python/qemu/qmp/qmp_tui.py
index 2d9ebbd20b..d946c20513 100644
--- a/python/qemu/qmp/qmp_tui.py
+++ b/python/qemu/qmp/qmp_tui.py
@@ -21,6 +21,7 @@ import json
 import logging
 from logging import Handler, LogRecord
 import signal
+import sys
 from typing import (
     List,
     Optional,
@@ -30,17 +31,27 @@ from typing import (
     cast,
 )
 
-from pygments import lexers
-from pygments import token as Token
-import urwid
-import urwid_readline
+
+try:
+    from pygments import lexers
+    from pygments import token as Token
+    import urwid
+    import urwid_readline
+except ModuleNotFoundError as exc:
+    print(
+        f"Module '{exc.name}' not found.",
+        "You need the optional 'tui' group: pip install qemu.qmp[tui]",
+        sep='\n',
+        file=sys.stderr,
+    )
+    sys.exit(1)
 
 from .error import ProtocolError
 from .legacy import QEMUMonitorProtocol, QMPBadPortError
 from .message import DeserializationError, Message, UnexpectedTypeError
 from .protocol import ConnectError, Runstate
 from .qmp_client import ExecInterruptedError, QMPClient
-from .util import create_task, pretty_traceback
+from .util import get_or_create_event_loop, pretty_traceback
 
 
 # The name of the signal that is used to update the history list
@@ -225,7 +236,7 @@ class App(QMPClient):
         """
         try:
             msg = Message(bytes(raw_msg, encoding='utf-8'))
-            create_task(self._send_to_server(msg))
+            asyncio.create_task(self._send_to_server(msg))
         except (DeserializationError, UnexpectedTypeError) as err:
             raw_msg = format_json(raw_msg)
             logging.info('Invalid message: %s', err.error_message)
@@ -246,7 +257,7 @@ class App(QMPClient):
         Initiates killing of app. A bridge between asynchronous and synchronous
         code.
         """
-        create_task(self._kill_app())
+        asyncio.create_task(self._kill_app())
 
     async def _kill_app(self) -> None:
         """
@@ -376,8 +387,7 @@ class App(QMPClient):
         """
         screen = urwid.raw_display.Screen()
         screen.set_terminal_properties(256)
-
-        self.aloop = asyncio.get_event_loop()
+        self.aloop = get_or_create_event_loop()
         self.aloop.set_debug(debug)
 
         # Gracefully handle SIGTERM and SIGINT signals
@@ -393,7 +403,7 @@ class App(QMPClient):
                                    handle_mouse=True,
                                    event_loop=event_loop)
 
-        create_task(self.manage_connection(), self.aloop)
+        self.aloop.create_task(self.manage_connection())
         try:
             main_loop.run()
         except Exception as err:
diff --git a/python/qemu/qmp/util.py b/python/qemu/qmp/util.py
index ca6225e9cd..a8229e5524 100644
--- a/python/qemu/qmp/util.py
+++ b/python/qemu/qmp/util.py
@@ -1,25 +1,16 @@
 """
 Miscellaneous Utilities
 
-This module provides asyncio utilities and compatibility wrappers for
-Python 3.6 to provide some features that otherwise become available in
-Python 3.7+.
-
-Various logging and debugging utilities are also provided, such as
-`exception_summary()` and `pretty_traceback()`, used primarily for
-adding information into the logging stream.
+This module provides asyncio and various logging and debugging
+utilities, such as `exception_summary()` and `pretty_traceback()`, used
+primarily for adding information into the logging stream.
 """
 
 import asyncio
 import sys
 import traceback
-from typing import (
-    Any,
-    Coroutine,
-    Optional,
-    TypeVar,
-    cast,
-)
+from typing import TypeVar, cast
+import warnings
 
 
 T = TypeVar('T')
@@ -30,9 +21,35 @@ T = TypeVar('T')
 # --------------------------
 
 
+def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
+    """
+    Return this thread's current event loop, or create a new one.
+
+    This function behaves similarly to asyncio.get_event_loop() in
+    Python<=3.13, where if there is no event loop currently associated
+    with the current context, it will create and register one. It should
+    generally not be used in any asyncio-native applications.
+    """
+    try:
+        with warnings.catch_warnings():
+            # Python <= 3.13 will trigger deprecation warnings if no
+            # event loop is set, but will create and set a new loop.
+            warnings.simplefilter("ignore")
+            loop = asyncio.get_event_loop()
+    except RuntimeError:
+        # Python 3.14+: No event loop set for this thread,
+        # create and set one.
+        loop = asyncio.new_event_loop()
+        # Set this loop as the current thread's loop, to be returned
+        # by calls to get_event_loop() in the future.
+        asyncio.set_event_loop(loop)
+
+    return loop
+
+
 async def flush(writer: asyncio.StreamWriter) -> None:
     """
-    Utility function to ensure a StreamWriter is *fully* drained.
+    Utility function to ensure an `asyncio.StreamWriter` is *fully* drained.
 
     `asyncio.StreamWriter.drain` only promises we will return to below
     the "high-water mark". This function ensures we flush the entire
@@ -72,102 +89,13 @@ def bottom_half(func: T) -> T:
 
     These methods do not, in general, have the ability to directly
     report information to a caller’s context and will usually be
-    collected as a Task result instead.
+    collected as an `asyncio.Task` result instead.
 
     They must not call upper-half functions directly.
     """
     return func
 
 
-# -------------------------------
-# Section: Compatibility Wrappers
-# -------------------------------
-
-
-def create_task(coro: Coroutine[Any, Any, T],
-                loop: Optional[asyncio.AbstractEventLoop] = None
-                ) -> 'asyncio.Future[T]':
-    """
-    Python 3.6-compatible `asyncio.create_task` wrapper.
-
-    :param coro: The coroutine to execute in a task.
-    :param loop: Optionally, the loop to create the task in.
-
-    :return: An `asyncio.Future` object.
-    """
-    if sys.version_info >= (3, 7):
-        if loop is not None:
-            return loop.create_task(coro)
-        return asyncio.create_task(coro)  # pylint: disable=no-member
-
-    # Python 3.6:
-    return asyncio.ensure_future(coro, loop=loop)
-
-
-def is_closing(writer: asyncio.StreamWriter) -> bool:
-    """
-    Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
-
-    :param writer: The `asyncio.StreamWriter` object.
-    :return: `True` if the writer is closing, or closed.
-    """
-    if sys.version_info >= (3, 7):
-        return writer.is_closing()
-
-    # Python 3.6:
-    transport = writer.transport
-    assert isinstance(transport, asyncio.WriteTransport)
-    return transport.is_closing()
-
-
-async def wait_closed(writer: asyncio.StreamWriter) -> None:
-    """
-    Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
-
-    :param writer: The `asyncio.StreamWriter` to wait on.
-    """
-    if sys.version_info >= (3, 7):
-        await writer.wait_closed()
-        return
-
-    # Python 3.6
-    transport = writer.transport
-    assert isinstance(transport, asyncio.WriteTransport)
-
-    while not transport.is_closing():
-        await asyncio.sleep(0)
-
-    # This is an ugly workaround, but it's the best I can come up with.
-    sock = transport.get_extra_info('socket')
-
-    if sock is None:
-        # Our transport doesn't have a socket? ...
-        # Nothing we can reasonably do.
-        return
-
-    while sock.fileno() != -1:
-        await asyncio.sleep(0)
-
-
-def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
-    """
-    Python 3.6-compatible `asyncio.run` wrapper.
-
-    :param coro: A coroutine to execute now.
-    :return: The return value from the coroutine.
-    """
-    if sys.version_info >= (3, 7):
-        return asyncio.run(coro, debug=debug)
-
-    # Python 3.6
-    loop = asyncio.get_event_loop()
-    loop.set_debug(debug)
-    ret = loop.run_until_complete(coro)
-    loop.close()
-
-    return ret
-
-
 # ----------------------------
 # Section: Logging & Debugging
 # ----------------------------
@@ -177,8 +105,11 @@ def exception_summary(exc: BaseException) -> str:
     """
     Return a summary string of an arbitrary exception.
 
-    It will be of the form "ExceptionType: Error Message", if the error
+    It will be of the form "ExceptionType: Error Message" if the error
     string is non-empty, and just "ExceptionType" otherwise.
+
+    This code is based on CPython's implementation of
+    `traceback.TracebackException.format_exception_only`.
     """
     name = type(exc).__qualname__
     smod = type(exc).__module__
diff --git a/python/tests/protocol.py b/python/tests/protocol.py
index 56c4d441f9..e565802516 100644
--- a/python/tests/protocol.py
+++ b/python/tests/protocol.py
@@ -8,7 +8,6 @@ import avocado
 
 from qemu.qmp import ConnectError, Runstate
 from qemu.qmp.protocol import AsyncProtocol, StateError
-from qemu.qmp.util import asyncio_run, create_task
 
 
 class NullProtocol(AsyncProtocol[None]):
@@ -124,7 +123,7 @@ def run_as_task(coro, allow_cancellation=False):
             if allow_cancellation:
                 return
             raise
-    return create_task(_runner())
+    return asyncio.create_task(_runner())
 
 
 @contextmanager
@@ -228,7 +227,7 @@ class TestBase(avocado.Test):
         Decorator; adds SetUp and TearDown to async tests.
         """
         async def _wrapper(self, *args, **kwargs):
-            loop = asyncio.get_event_loop()
+            loop = asyncio.get_running_loop()
             loop.set_debug(True)
 
             await self._asyncSetUp()
@@ -271,7 +270,7 @@ class TestBase(avocado.Test):
                     msg=f"Expected state '{state.name}'",
                 )
 
-        self.runstate_watcher = create_task(_watcher())
+        self.runstate_watcher = asyncio.create_task(_watcher())
         # Kick the loop and force the task to block on the event.
         await asyncio.sleep(0)
 
@@ -589,7 +588,8 @@ class SimpleSession(TestBase):
     async def testSmoke(self):
         with TemporaryDirectory(suffix='.qmp') as tmpdir:
             sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
-            server_task = create_task(self.server.start_server_and_accept(sock))
+            server_task = asyncio.create_task(
+                self.server.start_server_and_accept(sock))
 
             # give the server a chance to start listening [...]
             await asyncio.sleep(0)
diff --git a/system/memory.c b/system/memory.c
index 44701c465c..cf8cad6961 100644
--- a/system/memory.c
+++ b/system/memory.c
@@ -1796,16 +1796,37 @@ static void memory_region_finalize(Object *obj)
 {
     MemoryRegion *mr = MEMORY_REGION(obj);
 
-    assert(!mr->container);
-
-    /* We know the region is not visible in any address space (it
-     * does not have a container and cannot be a root either because
-     * it has no references, so we can blindly clear mr->enabled.
-     * memory_region_set_enabled instead could trigger a transaction
-     * and cause an infinite loop.
+    /*
+     * Each memory region (that can be freed) must have an owner, and it
+     * always has the same lifecycle of its owner.  It means when reaching
+     * here, the memory region's owner's refcount is zero.
+     *
+     * Here it is possible that the MR has:
+     *
+     * (1) mr->container set, which means this MR is a subregion of a
+     *     container MR. In this case they must share the same owner as the
+     *     container (otherwise the container should have kept a refcount
+     *     of this MR's owner).
+     *
+     * (2) mr->subregions non-empty, which means this MR is a container of
+     *     one or more other MRs (which might have the the owner as this
+     *     MR, or a different owner).
+     *
+     * We know the MR, or any MR that is attached to this one as either
+     * container or children, is not visible in any address space, because
+     * otherwise the address space should have taken at least one refcount
+     * of this MR's owner.  So we can blindly clear mr->enabled.
+     *
+     * memory_region_set_enabled instead could trigger a transaction and
+     * cause an infinite loop.
      */
     mr->enabled = false;
     memory_region_transaction_begin();
+    if (mr->container) {
+        /* Must share the owner; see above comments */
+        assert(mr->container->owner == mr->owner);
+        memory_region_del_subregion(mr->container, mr);
+    }
     while (!QTAILQ_EMPTY(&mr->subregions)) {
         MemoryRegion *subregion = QTAILQ_FIRST(&mr->subregions);
         memory_region_del_subregion(mr, subregion);
@@ -2640,7 +2661,10 @@ static void memory_region_update_container_subregions(MemoryRegion *subregion)
 
     memory_region_transaction_begin();
 
-    memory_region_ref(subregion);
+    if (mr->owner != subregion->owner) {
+        memory_region_ref(subregion);
+    }
+
     QTAILQ_FOREACH(other, &mr->subregions, subregions_link) {
         if (subregion->priority >= other->priority) {
             QTAILQ_INSERT_BEFORE(other, subregion, subregions_link);
@@ -2698,7 +2722,11 @@ void memory_region_del_subregion(MemoryRegion *mr,
         assert(alias->mapped_via_alias >= 0);
     }
     QTAILQ_REMOVE(&mr->subregions, subregion, subregions_link);
-    memory_region_unref(subregion);
+
+    if (mr->owner != subregion->owner) {
+        memory_region_unref(subregion);
+    }
+
     memory_region_update_pending |= mr->enabled && subregion->enabled;
     memory_region_transaction_commit();
 }
diff --git a/system/physmem.c b/system/physmem.c
index 311011156c..ddd58e9eb8 100644
--- a/system/physmem.c
+++ b/system/physmem.c
@@ -3027,7 +3027,7 @@ static MemTxResult flatview_write(FlatView *fv, hwaddr addr, MemTxAttrs attrs,
 
     l = len;
     mr = flatview_translate(fv, addr, &mr_addr, &l, true, attrs);
-    if (!flatview_access_allowed(mr, attrs, addr, len)) {
+    if (!flatview_access_allowed(mr, attrs, mr_addr, l)) {
         return MEMTX_ACCESS_ERROR;
     }
     return flatview_write_continue(fv, addr, attrs, buf, len,
@@ -3118,7 +3118,7 @@ static MemTxResult flatview_read(FlatView *fv, hwaddr addr,
 
     l = len;
     mr = flatview_translate(fv, addr, &mr_addr, &l, false, attrs);
-    if (!flatview_access_allowed(mr, attrs, addr, len)) {
+    if (!flatview_access_allowed(mr, attrs, mr_addr, l)) {
         return MEMTX_ACCESS_ERROR;
     }
     return flatview_read_continue(fv, addr, attrs, buf, len,
diff --git a/tests/qemu-iotests/147 b/tests/qemu-iotests/147
index 6d6f077a14..3e14bd389a 100755
--- a/tests/qemu-iotests/147
+++ b/tests/qemu-iotests/147
@@ -277,6 +277,7 @@ class BuiltinNBD(NBDBlockdevAddBase):
                      } }
         self.client_test(filename, flatten_sock_addr(address), 'nbd-export')
 
+        sockfd.close()
         self._server_down()
 
 
diff --git a/tests/qemu-iotests/151 b/tests/qemu-iotests/151
index f2ff9c5dac..06ee3585db 100755
--- a/tests/qemu-iotests/151
+++ b/tests/qemu-iotests/151
@@ -263,6 +263,11 @@ class TestThrottledWithNbdExportBase(iotests.QMPTestCase):
                         break
                     except subprocess.TimeoutExpired:
                         self.vm.qtest(f'clock_step {1 * 1000 * 1000 * 1000}')
+                try:
+                    p.kill()
+                    p.stdout.close()
+                except:
+                    pass
         except IndexError:
             pass
 
diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check
index 545f9ec7bd..d9b7c1d598 100755
--- a/tests/qemu-iotests/check
+++ b/tests/qemu-iotests/check
@@ -21,6 +21,7 @@ import sys
 import argparse
 import shutil
 from pathlib import Path
+import warnings
 
 from findtests import TestFinder
 from testenv import TestEnv
@@ -137,6 +138,9 @@ def make_argparser() -> argparse.ArgumentParser:
 
 
 if __name__ == '__main__':
+    warnings.simplefilter("default")
+    os.environ["PYTHONWARNINGS"] = "default"
+
     args = make_argparser().parse_args()
 
     env = TestEnv(source_dir=args.source_dir,
diff --git a/tests/qemu-iotests/testenv.py b/tests/qemu-iotests/testenv.py
index 6326e46b7b..29caaa8a34 100644
--- a/tests/qemu-iotests/testenv.py
+++ b/tests/qemu-iotests/testenv.py
@@ -22,15 +22,12 @@ import tempfile
 from pathlib import Path
 import shutil
 import collections
+import contextlib
 import random
 import subprocess
 import glob
 from typing import List, Dict, Any, Optional
 
-if sys.version_info >= (3, 9):
-    from contextlib import AbstractContextManager as ContextManager
-else:
-    from typing import ContextManager
 
 DEF_GDB_OPTIONS = 'localhost:12345'
 
@@ -58,7 +55,7 @@ def get_default_machine(qemu_prog: str) -> str:
     return default_machine
 
 
-class TestEnv(ContextManager['TestEnv']):
+class TestEnv(contextlib.AbstractContextManager['TestEnv']):
     """
     Manage system environment for running tests
 
diff --git a/tests/qemu-iotests/testrunner.py b/tests/qemu-iotests/testrunner.py
index 2e236c8fa3..14cc8492f9 100644
--- a/tests/qemu-iotests/testrunner.py
+++ b/tests/qemu-iotests/testrunner.py
@@ -30,11 +30,6 @@ from multiprocessing import Pool
 from typing import List, Optional, Any, Sequence, Dict
 from testenv import TestEnv
 
-if sys.version_info >= (3, 9):
-    from contextlib import AbstractContextManager as ContextManager
-else:
-    from typing import ContextManager
-
 
 def silent_unlink(path: Path) -> None:
     try:
@@ -57,7 +52,7 @@ def file_diff(file1: str, file2: str) -> List[str]:
         return res
 
 
-class LastElapsedTime(ContextManager['LastElapsedTime']):
+class LastElapsedTime(contextlib.AbstractContextManager['LastElapsedTime']):
     """ Cache for elapsed time for tests, to show it during new test run
 
     It is safe to use get() at any time.  To use update(), you must either
@@ -112,7 +107,7 @@ class TestResult:
         self.interrupted = interrupted
 
 
-class TestRunner(ContextManager['TestRunner']):
+class TestRunner(contextlib.AbstractContextManager['TestRunner']):
     shared_self = None
 
     @staticmethod