summary refs log tree commit diff stats
path: root/python/qemu/qmp
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/qmp')
-rw-r--r--python/qemu/qmp/__init__.py59
-rw-r--r--python/qemu/qmp/aqmp_tui.py652
-rw-r--r--python/qemu/qmp/error.py50
-rw-r--r--python/qemu/qmp/events.py717
-rw-r--r--python/qemu/qmp/legacy.py317
-rw-r--r--python/qemu/qmp/message.py209
-rw-r--r--python/qemu/qmp/models.py146
-rw-r--r--python/qemu/qmp/protocol.py1048
-rw-r--r--python/qemu/qmp/py.typed0
-rw-r--r--python/qemu/qmp/qmp_client.py655
-rw-r--r--python/qemu/qmp/qmp_shell.py610
-rw-r--r--python/qemu/qmp/util.py217
12 files changed, 4680 insertions, 0 deletions
diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py
new file mode 100644
index 0000000000..69190d057a
--- /dev/null
+++ b/python/qemu/qmp/__init__.py
@@ -0,0 +1,59 @@
+"""
+QEMU Monitor Protocol (QMP) development library & tooling.
+
+This package provides a fairly low-level class for communicating
+asynchronously with QMP protocol servers, as implemented by QEMU, the
+QEMU Guest Agent, and the QEMU Storage Daemon.
+
+`QMPClient` provides the main functionality of this package. All errors
+raised by this library derive from `QMPError`, see `qmp.error` for
+additional detail. See `qmp.events` for an in-depth tutorial on
+managing QMP events.
+"""
+
+# Copyright (C) 2020-2022 John Snow for Red Hat, Inc.
+#
+# Authors:
+#  John Snow <jsnow@redhat.com>
+#
+# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>.
+#
+# This work is licensed under the terms of the GNU LGPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import logging
+
+from .error import QMPError
+from .events import EventListener
+from .message import Message
+from .protocol import (
+    ConnectError,
+    Runstate,
+    SocketAddrT,
+    StateError,
+)
+from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
+
+
+# Suppress logging unless an application engages it.
+logging.getLogger('qemu.qmp').addHandler(logging.NullHandler())
+
+
+# The order of these fields impact the Sphinx documentation order.
+__all__ = (
+    # Classes, most to least important
+    'QMPClient',
+    'Message',
+    'EventListener',
+    'Runstate',
+
+    # Exceptions, most generic to most explicit
+    'QMPError',
+    'StateError',
+    'ConnectError',
+    'ExecuteError',
+    'ExecInterruptedError',
+
+    # Type aliases
+    'SocketAddrT',
+)
diff --git a/python/qemu/qmp/aqmp_tui.py b/python/qemu/qmp/aqmp_tui.py
new file mode 100644
index 0000000000..59d3036be3
--- /dev/null
+++ b/python/qemu/qmp/aqmp_tui.py
@@ -0,0 +1,652 @@
+# Copyright (c) 2021
+#
+# Authors:
+#  Niteesh Babu G S <niteesh.gs@gmail.com>
+#
+# This work is licensed under the terms of the GNU LGPL, version 2 or
+# later.  See the COPYING file in the top-level directory.
+"""
+AQMP TUI
+
+AQMP TUI is an asynchronous interface built on top the of the AQMP library.
+It is the successor of QMP-shell and is bought-in as a replacement for it.
+
+Example Usage: aqmp-tui <SOCKET | TCP IP:PORT>
+Full Usage: aqmp-tui --help
+"""
+
+import argparse
+import asyncio
+import json
+import logging
+from logging import Handler, LogRecord
+import signal
+from typing import (
+    List,
+    Optional,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+
+from pygments import lexers
+from pygments import token as Token
+import urwid
+import urwid_readline
+
+from .error import ProtocolError
+from .legacy import QEMUMonitorProtocol, QMPBadPortError
+from .message import DeserializationError, Message, UnexpectedTypeError
+from .protocol import ConnectError, Runstate
+from .qmp_client import ExecInterruptedError, QMPClient
+from .util import create_task, pretty_traceback
+
+
+# The name of the signal that is used to update the history list
+UPDATE_MSG: str = 'UPDATE_MSG'
+
+
+palette = [
+    (Token.Punctuation, '', '', '', 'h15,bold', 'g7'),
+    (Token.Text, '', '', '', '', 'g7'),
+    (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'),
+    (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'),
+    (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'),
+    (Token.Keyword.Constant, '', '', '', '#6af', 'g7'),
+    ('DEBUG', '', '', '', '#ddf', 'g7'),
+    ('INFO', '', '', '', 'g100', 'g7'),
+    ('WARNING', '', '', '', '#ff6', 'g7'),
+    ('ERROR', '', '', '', '#a00', 'g7'),
+    ('CRITICAL', '', '', '', '#a00', 'g7'),
+    ('background', '', 'black', '', '', 'g7'),
+]
+
+
+def format_json(msg: str) -> str:
+    """
+    Formats valid/invalid multi-line JSON message into a single-line message.
+
+    Formatting is first tried using the standard json module. If that fails
+    due to an decoding error then a simple string manipulation is done to
+    achieve a single line JSON string.
+
+    Converting into single line is more asthetically pleasing when looking
+    along with error messages.
+
+    Eg:
+    Input:
+          [ 1,
+            true,
+            3 ]
+    The above input is not a valid QMP message and produces the following error
+    "QMP message is not a JSON object."
+    When displaying this in TUI in multiline mode we get
+
+        [ 1,
+          true,
+          3 ]: QMP message is not a JSON object.
+
+    whereas in singleline mode we get the following
+
+        [1, true, 3]: QMP message is not a JSON object.
+
+    The single line mode is more asthetically pleasing.
+
+    :param msg:
+        The message to formatted into single line.
+
+    :return: Formatted singleline message.
+    """
+    try:
+        msg = json.loads(msg)
+        return str(json.dumps(msg))
+    except json.decoder.JSONDecodeError:
+        msg = msg.replace('\n', '')
+        words = msg.split(' ')
+        words = list(filter(None, words))
+        return ' '.join(words)
+
+
+def has_handler_type(logger: logging.Logger,
+                     handler_type: Type[Handler]) -> bool:
+    """
+    The Logger class has no interface to check if a certain type of handler is
+    installed or not. So we provide an interface to do so.
+
+    :param logger:
+        Logger object
+    :param handler_type:
+        The type of the handler to be checked.
+
+    :return: returns True if handler of type `handler_type`.
+    """
+    for handler in logger.handlers:
+        if isinstance(handler, handler_type):
+            return True
+    return False
+
+
+class App(QMPClient):
+    """
+    Implements the AQMP TUI.
+
+    Initializes the widgets and starts the urwid event loop.
+
+    :param address:
+        Address of the server to connect to.
+    :param num_retries:
+        The number of times to retry before stopping to reconnect.
+    :param retry_delay:
+        The delay(sec) before each retry
+    """
+    def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
+                 retry_delay: Optional[int]) -> None:
+        urwid.register_signal(type(self), UPDATE_MSG)
+        self.window = Window(self)
+        self.address = address
+        self.aloop: Optional[asyncio.AbstractEventLoop] = None
+        self.num_retries = num_retries
+        self.retry_delay = retry_delay if retry_delay else 2
+        self.retry: bool = False
+        self.exiting: bool = False
+        super().__init__()
+
+    def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
+        """
+        Appends the msg to the history list.
+
+        :param msg:
+            The raw message to be appended in string type.
+        """
+        urwid.emit_signal(self, UPDATE_MSG, msg, level)
+
+    def _cb_outbound(self, msg: Message) -> Message:
+        """
+        Callback: outbound message hook.
+
+        Appends the outgoing messages to the history box.
+
+        :param msg: raw outbound message.
+        :return: final outbound message.
+        """
+        str_msg = str(msg)
+
+        if not has_handler_type(logging.getLogger(), TUILogHandler):
+            logging.debug('Request: %s', str_msg)
+        self.add_to_history('<-- ' + str_msg)
+        return msg
+
+    def _cb_inbound(self, msg: Message) -> Message:
+        """
+        Callback: outbound message hook.
+
+        Appends the incoming messages to the history box.
+
+        :param msg: raw inbound message.
+        :return: final inbound message.
+        """
+        str_msg = str(msg)
+
+        if not has_handler_type(logging.getLogger(), TUILogHandler):
+            logging.debug('Request: %s', str_msg)
+        self.add_to_history('--> ' + str_msg)
+        return msg
+
+    async def _send_to_server(self, msg: Message) -> None:
+        """
+        This coroutine sends the message to the server.
+        The message has to be pre-validated.
+
+        :param msg:
+            Pre-validated message to be to sent to the server.
+
+        :raise Exception: When an unhandled exception is caught.
+        """
+        try:
+            await self._raw(msg, assign_id='id' not in msg)
+        except ExecInterruptedError as err:
+            logging.info('Error server disconnected before reply %s', str(err))
+            self.add_to_history('Server disconnected before reply', 'ERROR')
+        except Exception as err:
+            logging.error('Exception from _send_to_server: %s', str(err))
+            raise err
+
+    def cb_send_to_server(self, raw_msg: str) -> None:
+        """
+        Validates and sends the message to the server.
+        The raw string message is first converted into a Message object
+        and is then sent to the server.
+
+        :param raw_msg:
+            The raw string message to be sent to the server.
+
+        :raise Exception: When an unhandled exception is caught.
+        """
+        try:
+            msg = Message(bytes(raw_msg, encoding='utf-8'))
+            create_task(self._send_to_server(msg))
+        except (DeserializationError, UnexpectedTypeError) as err:
+            raw_msg = format_json(raw_msg)
+            logging.info('Invalid message: %s', err.error_message)
+            self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')
+
+    def unhandled_input(self, key: str) -> None:
+        """
+        Handle's keys which haven't been handled by the child widgets.
+
+        :param key:
+            Unhandled key
+        """
+        if key == 'esc':
+            self.kill_app()
+
+    def kill_app(self) -> None:
+        """
+        Initiates killing of app. A bridge between asynchronous and synchronous
+        code.
+        """
+        create_task(self._kill_app())
+
+    async def _kill_app(self) -> None:
+        """
+        This coroutine initiates the actual disconnect process and calls
+        urwid.ExitMainLoop() to kill the TUI.
+
+        :raise Exception: When an unhandled exception is caught.
+        """
+        self.exiting = True
+        await self.disconnect()
+        logging.debug('Disconnect finished. Exiting app')
+        raise urwid.ExitMainLoop()
+
+    async def disconnect(self) -> None:
+        """
+        Overrides the disconnect method to handle the errors locally.
+        """
+        try:
+            await super().disconnect()
+        except (OSError, EOFError) as err:
+            logging.info('disconnect: %s', str(err))
+            self.retry = True
+        except ProtocolError as err:
+            logging.info('disconnect: %s', str(err))
+        except Exception as err:
+            logging.error('disconnect: Unhandled exception %s', str(err))
+            raise err
+
+    def _set_status(self, msg: str) -> None:
+        """
+        Sets the message as the status.
+
+        :param msg:
+            The message to be displayed in the status bar.
+        """
+        self.window.footer.set_text(msg)
+
+    def _get_formatted_address(self) -> str:
+        """
+        Returns a formatted version of the server's address.
+
+        :return: formatted address
+        """
+        if isinstance(self.address, tuple):
+            host, port = self.address
+            addr = f'{host}:{port}'
+        else:
+            addr = f'{self.address}'
+        return addr
+
+    async def _initiate_connection(self) -> Optional[ConnectError]:
+        """
+        Tries connecting to a server a number of times with a delay between
+        each try. If all retries failed then return the error faced during
+        the last retry.
+
+        :return: Error faced during last retry.
+        """
+        current_retries = 0
+        err = None
+
+        # initial try
+        await self.connect_server()
+        while self.retry and current_retries < self.num_retries:
+            logging.info('Connection Failed, retrying in %d', self.retry_delay)
+            status = f'[Retry #{current_retries} ({self.retry_delay}s)]'
+            self._set_status(status)
+
+            await asyncio.sleep(self.retry_delay)
+
+            err = await self.connect_server()
+            current_retries += 1
+        # If all retries failed report the last error
+        if err:
+            logging.info('All retries failed: %s', err)
+            return err
+        return None
+
+    async def manage_connection(self) -> None:
+        """
+        Manage the connection based on the current run state.
+
+        A reconnect is issued when the current state is IDLE and the number
+        of retries is not exhausted.
+        A disconnect is issued when the current state is DISCONNECTING.
+        """
+        while not self.exiting:
+            if self.runstate == Runstate.IDLE:
+                err = await self._initiate_connection()
+                # If retry is still true then, we have exhausted all our tries.
+                if err:
+                    self._set_status(f'[Error: {err.error_message}]')
+                else:
+                    addr = self._get_formatted_address()
+                    self._set_status(f'[Connected {addr}]')
+            elif self.runstate == Runstate.DISCONNECTING:
+                self._set_status('[Disconnected]')
+                await self.disconnect()
+                # check if a retry is needed
+                if self.runstate == Runstate.IDLE:
+                    continue
+            await self.runstate_changed()
+
+    async def connect_server(self) -> Optional[ConnectError]:
+        """
+        Initiates a connection to the server at address `self.address`
+        and in case of a failure, sets the status to the respective error.
+        """
+        try:
+            await self.connect(self.address)
+            self.retry = False
+        except ConnectError as err:
+            logging.info('connect_server: ConnectError %s', str(err))
+            self.retry = True
+            return err
+        return None
+
+    def run(self, debug: bool = False) -> None:
+        """
+        Starts the long running co-routines and the urwid event loop.
+
+        :param debug:
+            Enables/Disables asyncio event loop debugging
+        """
+        screen = urwid.raw_display.Screen()
+        screen.set_terminal_properties(256)
+
+        self.aloop = asyncio.get_event_loop()
+        self.aloop.set_debug(debug)
+
+        # Gracefully handle SIGTERM and SIGINT signals
+        cancel_signals = [signal.SIGTERM, signal.SIGINT]
+        for sig in cancel_signals:
+            self.aloop.add_signal_handler(sig, self.kill_app)
+
+        event_loop = urwid.AsyncioEventLoop(loop=self.aloop)
+        main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'),
+                                   unhandled_input=self.unhandled_input,
+                                   screen=screen,
+                                   palette=palette,
+                                   handle_mouse=True,
+                                   event_loop=event_loop)
+
+        create_task(self.manage_connection(), self.aloop)
+        try:
+            main_loop.run()
+        except Exception as err:
+            logging.error('%s\n%s\n', str(err), pretty_traceback())
+            raise err
+
+
+class StatusBar(urwid.Text):
+    """
+    A simple statusbar modelled using the Text widget. The status can be
+    set using the set_text function. All text set is aligned to right.
+
+    :param text: Initial text to be displayed. Default is empty str.
+    """
+    def __init__(self, text: str = ''):
+        super().__init__(text, align='right')
+
+
+class Editor(urwid_readline.ReadlineEdit):
+    """
+    A simple editor modelled using the urwid_readline.ReadlineEdit widget.
+    Mimcs GNU readline shortcuts and provides history support.
+
+    The readline shortcuts can be found below:
+    https://github.com/rr-/urwid_readline#features
+
+    Along with the readline features, this editor also has support for
+    history. Pressing the 'up'/'down' switches between the prev/next messages
+    available in the history.
+
+    Currently there is no support to save the history to a file. The history of
+    previous commands is lost on exit.
+
+    :param parent: Reference to the TUI object.
+    """
+    def __init__(self, parent: App) -> None:
+        super().__init__(caption='> ', multiline=True)
+        self.parent = parent
+        self.history: List[str] = []
+        self.last_index: int = -1
+        self.show_history: bool = False
+
+    def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]:
+        """
+        Handles the keypress on this widget.
+
+        :param size:
+            The current size of the widget.
+        :param key:
+            The key to be handled.
+
+        :return: Unhandled key if any.
+        """
+        msg = self.get_edit_text()
+        if key == 'up' and not msg:
+            # Show the history when 'up arrow' is pressed with no input text.
+            # NOTE: The show_history logic is necessary because in 'multiline'
+            # mode (which we use) 'up arrow' is used to move between lines.
+            if not self.history:
+                return None
+            self.show_history = True
+            last_msg = self.history[self.last_index]
+            self.set_edit_text(last_msg)
+            self.edit_pos = len(last_msg)
+        elif key == 'up' and self.show_history:
+            self.last_index = max(self.last_index - 1, -len(self.history))
+            self.set_edit_text(self.history[self.last_index])
+            self.edit_pos = len(self.history[self.last_index])
+        elif key == 'down' and self.show_history:
+            if self.last_index == -1:
+                self.set_edit_text('')
+                self.show_history = False
+            else:
+                self.last_index += 1
+                self.set_edit_text(self.history[self.last_index])
+                self.edit_pos = len(self.history[self.last_index])
+        elif key == 'meta enter':
+            # When using multiline, enter inserts a new line into the editor
+            # send the input to the server on alt + enter
+            self.parent.cb_send_to_server(msg)
+            self.history.append(msg)
+            self.set_edit_text('')
+            self.last_index = -1
+            self.show_history = False
+        else:
+            self.show_history = False
+            self.last_index = -1
+            return cast(Optional[str], super().keypress(size, key))
+        return None
+
+
+class EditorWidget(urwid.Filler):
+    """
+    Wrapper around the editor widget.
+
+    The Editor is a flow widget and has to wrapped inside a box widget.
+    This class wraps the Editor inside filler widget.
+
+    :param parent: Reference to the TUI object.
+    """
+    def __init__(self, parent: App) -> None:
+        super().__init__(Editor(parent), valign='top')
+
+
+class HistoryBox(urwid.ListBox):
+    """
+    This widget is modelled using the ListBox widget, contains the list of
+    all messages both QMP messages and log messsages to be shown in the TUI.
+
+    The messages are urwid.Text widgets. On every append of a message, the
+    focus is shifted to the last appended message.
+
+    :param parent: Reference to the TUI object.
+    """
+    def __init__(self, parent: App) -> None:
+        self.parent = parent
+        self.history = urwid.SimpleFocusListWalker([])
+        super().__init__(self.history)
+
+    def add_to_history(self,
+                       history: Union[str, List[Tuple[str, str]]]) -> None:
+        """
+        Appends a message to the list and set the focus to the last appended
+        message.
+
+        :param history:
+            The history item(message/event) to be appended to the list.
+        """
+        self.history.append(urwid.Text(history))
+        self.history.set_focus(len(self.history) - 1)
+
+    def mouse_event(self, size: Tuple[int, int], _event: str, button: float,
+                    _x: int, _y: int, focus: bool) -> None:
+        # Unfortunately there are no urwid constants that represent the mouse
+        # events.
+        if button == 4:  # Scroll up event
+            super().keypress(size, 'up')
+        elif button == 5:  # Scroll down event
+            super().keypress(size, 'down')
+
+
+class HistoryWindow(urwid.Frame):
+    """
+    This window composes the HistoryBox and EditorWidget in a horizontal split.
+    By default the first focus is given to the history box.
+
+    :param parent: Reference to the TUI object.
+    """
+    def __init__(self, parent: App) -> None:
+        self.parent = parent
+        self.editor_widget = EditorWidget(parent)
+        self.editor = urwid.LineBox(self.editor_widget)
+        self.history = HistoryBox(parent)
+        self.body = urwid.Pile([('weight', 80, self.history),
+                                ('weight', 20, self.editor)])
+        super().__init__(self.body)
+        urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history)
+
+    def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None:
+        """
+        Appends a message to the history box
+
+        :param msg:
+            The message to be appended to the history box.
+        :param level:
+            The log level of the message, if it is a log message.
+        """
+        formatted = []
+        if level:
+            msg = f'[{level}]: {msg}'
+            formatted.append((level, msg))
+        else:
+            lexer = lexers.JsonLexer()  # pylint: disable=no-member
+            for token in lexer.get_tokens(msg):
+                formatted.append(token)
+        self.history.add_to_history(formatted)
+
+
+class Window(urwid.Frame):
+    """
+    This window is the top most widget of the TUI and will contain other
+    windows. Each child of this widget is responsible for displaying a specific
+    functionality.
+
+    :param parent: Reference to the TUI object.
+    """
+    def __init__(self, parent: App) -> None:
+        self.parent = parent
+        footer = StatusBar()
+        body = HistoryWindow(parent)
+        super().__init__(body, footer=footer)
+
+
+class TUILogHandler(Handler):
+    """
+    This handler routes all the log messages to the TUI screen.
+    It is installed to the root logger to so that the log message from all
+    libraries begin used is routed to the screen.
+
+    :param tui: Reference to the TUI object.
+    """
+    def __init__(self, tui: App) -> None:
+        super().__init__()
+        self.tui = tui
+
+    def emit(self, record: LogRecord) -> None:
+        """
+        Emits a record to the TUI screen.
+
+        Appends the log message to the TUI screen
+        """
+        level = record.levelname
+        msg = record.getMessage()
+        self.tui.add_to_history(msg, level)
+
+
+def main() -> None:
+    """
+    Driver of the whole script, parses arguments, initialize the TUI and
+    the logger.
+    """
+    parser = argparse.ArgumentParser(description='AQMP TUI')
+    parser.add_argument('qmp_server', help='Address of the QMP server. '
+                        'Format <UNIX socket path | TCP addr:port>')
+    parser.add_argument('--num-retries', type=int, default=10,
+                        help='Number of times to reconnect before giving up.')
+    parser.add_argument('--retry-delay', type=int,
+                        help='Time(s) to wait before next retry. '
+                        'Default action is to wait 2s between each retry.')
+    parser.add_argument('--log-file', help='The Log file name')
+    parser.add_argument('--log-level', default='WARNING',
+                        help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
+    parser.add_argument('--asyncio-debug', action='store_true',
+                        help='Enable debug mode for asyncio loop. '
+                        'Generates lot of output, makes TUI unusable when '
+                        'logs are logged in the TUI. '
+                        'Use only when logging to a file.')
+    args = parser.parse_args()
+
+    try:
+        address = QEMUMonitorProtocol.parse_address(args.qmp_server)
+    except QMPBadPortError as err:
+        parser.error(str(err))
+
+    app = App(address, args.num_retries, args.retry_delay)
+
+    root_logger = logging.getLogger()
+    root_logger.setLevel(logging.getLevelName(args.log_level))
+
+    if args.log_file:
+        root_logger.addHandler(logging.FileHandler(args.log_file))
+    else:
+        root_logger.addHandler(TUILogHandler(app))
+
+    app.run(args.asyncio_debug)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/python/qemu/qmp/error.py b/python/qemu/qmp/error.py
new file mode 100644
index 0000000000..24ba4d5054
--- /dev/null
+++ b/python/qemu/qmp/error.py
@@ -0,0 +1,50 @@
+"""
+QMP Error Classes
+
+This package seeks to provide semantic error classes that are intended
+to be used directly by clients when they would like to handle particular
+semantic failures (e.g. "failed to connect") without needing to know the
+enumeration of possible reasons for that failure.
+
+QMPError serves as the ancestor for all exceptions raised by this
+package, and is suitable for use in handling semantic errors from this
+library. In most cases, individual public methods will attempt to catch
+and re-encapsulate various exceptions to provide a semantic
+error-handling interface.
+
+.. admonition:: QMP Exception Hierarchy Reference
+
+ |   `Exception`
+ |    +-- `QMPError`
+ |         +-- `ConnectError`
+ |         +-- `StateError`
+ |         +-- `ExecInterruptedError`
+ |         +-- `ExecuteError`
+ |         +-- `ListenerError`
+ |         +-- `ProtocolError`
+ |              +-- `DeserializationError`
+ |              +-- `UnexpectedTypeError`
+ |              +-- `ServerParseError`
+ |              +-- `BadReplyError`
+ |              +-- `GreetingError`
+ |              +-- `NegotiationError`
+"""
+
+
+class QMPError(Exception):
+    """Abstract error class for all errors originating from this package."""
+
+
+class ProtocolError(QMPError):
+    """
+    Abstract error class for protocol failures.
+
+    Semantically, these errors are generally the fault of either the
+    protocol server or as a result of a bug in this library.
+
+    :param error_message: Human-readable string describing the error.
+    """
+    def __init__(self, error_message: str):
+        super().__init__(error_message)
+        #: Human-readable error message, without any prefix.
+        self.error_message: str = error_message
diff --git a/python/qemu/qmp/events.py b/python/qemu/qmp/events.py
new file mode 100644
index 0000000000..6199776cc6
--- /dev/null
+++ b/python/qemu/qmp/events.py
@@ -0,0 +1,717 @@
+"""
+QMP Events and EventListeners
+
+Asynchronous QMP uses `EventListener` objects to listen for events. An
+`EventListener` is a FIFO event queue that can be pre-filtered to listen
+for only specific events. Each `EventListener` instance receives its own
+copy of events that it hears, so events may be consumed without fear or
+worry for depriving other listeners of events they need to hear.
+
+
+EventListener Tutorial
+----------------------
+
+In all of the following examples, we assume that we have a `QMPClient`
+instantiated named ``qmp`` that is already connected.
+
+
+`listener()` context blocks with one name
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The most basic usage is by using the `listener()` context manager to
+construct them:
+
+.. code:: python
+
+   with qmp.listener('STOP') as listener:
+       await qmp.execute('stop')
+       await listener.get()
+
+The listener is active only for the duration of the ‘with’ block. This
+instance listens only for ‘STOP’ events.
+
+
+`listener()` context blocks with two or more names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Multiple events can be selected for by providing any ``Iterable[str]``:
+
+.. code:: python
+
+   with qmp.listener(('STOP', 'RESUME')) as listener:
+       await qmp.execute('stop')
+       event = await listener.get()
+       assert event['event'] == 'STOP'
+
+       await qmp.execute('cont')
+       event = await listener.get()
+       assert event['event'] == 'RESUME'
+
+
+`listener()` context blocks with no names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+By omitting names entirely, you can listen to ALL events.
+
+.. code:: python
+
+   with qmp.listener() as listener:
+       await qmp.execute('stop')
+       event = await listener.get()
+       assert event['event'] == 'STOP'
+
+This isn’t a very good use case for this feature: In a non-trivial
+running system, we may not know what event will arrive next. Grabbing
+the top of a FIFO queue returning multiple kinds of events may be prone
+to error.
+
+
+Using async iterators to retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If you’d like to simply watch what events happen to arrive, you can use
+the listener as an async iterator:
+
+.. code:: python
+
+   with qmp.listener() as listener:
+       async for event in listener:
+           print(f"Event arrived: {event['event']}")
+
+This is analogous to the following code:
+
+.. code:: python
+
+   with qmp.listener() as listener:
+       while True:
+           event = listener.get()
+           print(f"Event arrived: {event['event']}")
+
+This event stream will never end, so these blocks will never terminate.
+
+
+Using asyncio.Task to concurrently retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Since a listener’s event stream will never terminate, it is not likely
+useful to use that form in a script. For longer-running clients, we can
+create event handlers by using `asyncio.Task` to create concurrent
+coroutines:
+
+.. code:: python
+
+   async def print_events(listener):
+       try:
+           async for event in listener:
+               print(f"Event arrived: {event['event']}")
+       except asyncio.CancelledError:
+           return
+
+   with qmp.listener() as listener:
+       task = asyncio.Task(print_events(listener))
+       await qmp.execute('stop')
+       await qmp.execute('cont')
+       task.cancel()
+       await task
+
+However, there is no guarantee that these events will be received by the
+time we leave this context block. Once the context block is exited, the
+listener will cease to hear any new events, and becomes inert.
+
+Be mindful of the timing: the above example will *probably*– but does
+not *guarantee*– that both STOP/RESUMED events will be printed. The
+example below outlines how to use listeners outside of a context block.
+
+
+Using `register_listener()` and `remove_listener()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To create a listener with a longer lifetime, beyond the scope of a
+single block, create a listener and then call `register_listener()`:
+
+.. code:: python
+
+   class MyClient:
+       def __init__(self, qmp):
+           self.qmp = qmp
+           self.listener = EventListener()
+
+       async def print_events(self):
+           try:
+               async for event in self.listener:
+                   print(f"Event arrived: {event['event']}")
+           except asyncio.CancelledError:
+               return
+
+       async def run(self):
+           self.task = asyncio.Task(self.print_events)
+           self.qmp.register_listener(self.listener)
+           await qmp.execute('stop')
+           await qmp.execute('cont')
+
+       async def stop(self):
+           self.task.cancel()
+           await self.task
+           self.qmp.remove_listener(self.listener)
+
+The listener can be deactivated by using `remove_listener()`. When it is
+removed, any possible pending events are cleared and it can be
+re-registered at a later time.
+
+
+Using the built-in all events listener
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The `QMPClient` object creates its own default listener named
+:py:obj:`~Events.events` that can be used for the same purpose without
+having to create your own:
+
+.. code:: python
+
+   async def print_events(listener):
+       try:
+           async for event in listener:
+               print(f"Event arrived: {event['event']}")
+       except asyncio.CancelledError:
+           return
+
+   task = asyncio.Task(print_events(qmp.events))
+
+   await qmp.execute('stop')
+   await qmp.execute('cont')
+
+   task.cancel()
+   await task
+
+
+Using both .get() and async iterators
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The async iterator and `get()` methods pull events from the same FIFO
+queue. If you mix the usage of both, be aware: Events are emitted
+precisely once per listener.
+
+If multiple contexts try to pull events from the same listener instance,
+events are still emitted only precisely once.
+
+This restriction can be lifted by creating additional listeners.
+
+
+Creating multiple listeners
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Additional `EventListener` objects can be created at-will. Each one
+receives its own copy of events, with separate FIFO event queues.
+
+.. code:: python
+
+   my_listener = EventListener()
+   qmp.register_listener(my_listener)
+
+   await qmp.execute('stop')
+   copy1 = await my_listener.get()
+   copy2 = await qmp.events.get()
+
+   assert copy1 == copy2
+
+In this example, we await an event from both a user-created
+`EventListener` and the built-in events listener. Both receive the same
+event.
+
+
+Clearing listeners
+~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be cleared, clearing all events seen thus far:
+
+.. code:: python
+
+   await qmp.execute('stop')
+   qmp.events.clear()
+   await qmp.execute('cont')
+   event = await qmp.events.get()
+   assert event['event'] == 'RESUME'
+
+`EventListener` objects are FIFO queues. If events are not consumed,
+they will remain in the queue until they are witnessed or discarded via
+`clear()`. FIFO queues will be drained automatically upon leaving a
+context block, or when calling `remove_listener()`.
+
+
+Accessing listener history
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects record their history. Even after being cleared,
+you can obtain a record of all events seen so far:
+
+.. code:: python
+
+   await qmp.execute('stop')
+   await qmp.execute('cont')
+   qmp.events.clear()
+
+   assert len(qmp.events.history) == 2
+   assert qmp.events.history[0]['event'] == 'STOP'
+   assert qmp.events.history[1]['event'] == 'RESUME'
+
+The history is updated immediately and does not require the event to be
+witnessed first.
+
+
+Using event filters
+~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be given complex filtering criteria if names
+are not sufficient:
+
+.. code:: python
+
+   def job1_filter(event) -> bool:
+       event_data = event.get('data', {})
+       event_job_id = event_data.get('id')
+       return event_job_id == "job1"
+
+   with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
+       async for event in listener:
+           if event['data']['status'] == 'concluded':
+               break
+
+These filters might be most useful when parameterized. `EventListener`
+objects expect a function that takes only a single argument (the raw
+event, as a `Message`) and returns a bool; True if the event should be
+accepted into the stream. You can create a function that adapts this
+signature to accept configuration parameters:
+
+.. code:: python
+
+   def job_filter(job_id: str) -> EventFilter:
+       def filter(event: Message) -> bool:
+           return event['data']['id'] == job_id
+       return filter
+
+   with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
+       async for event in listener:
+           if event['data']['status'] == 'concluded':
+               break
+
+
+Activating an existing listener with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Listeners with complex, long configurations can also be created manually
+and activated temporarily by using `listen()` instead of `listener()`:
+
+.. code:: python
+
+   listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+
+   with qmp.listen(listener):
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
+       async for event in listener:
+           print(event)
+           if event['event'] == 'BLOCK_JOB_COMPLETED':
+               break
+
+Any events that are not witnessed by the time the block is left will be
+cleared from the queue; entering the block is an implicit
+`register_listener()` and leaving the block is an implicit
+`remove_listener()`.
+
+
+Activating multiple existing listeners with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+While `listener()` is only capable of creating a single listener,
+`listen()` is capable of activating multiple listeners simultaneously:
+
+.. code:: python
+
+   def job_filter(job_id: str) -> EventFilter:
+       def filter(event: Message) -> bool:
+           return event['data']['id'] == job_id
+       return filter
+
+   jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
+   jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
+
+   with qmp.listen(jobA, jobB):
+       qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
+       qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
+
+       async for event in jobA.get():
+           if event['data']['status'] == 'concluded':
+               break
+       async for event in jobB.get():
+           if event['data']['status'] == 'concluded':
+               break
+
+
+Extending the `EventListener` class
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In the case that a more specialized `EventListener` is desired to
+provide either more functionality or more compact syntax for specialized
+cases, it can be extended.
+
+One of the key methods to extend or override is
+:py:meth:`~EventListener.accept()`. The default implementation checks an
+incoming message for:
+
+1. A qualifying name, if any :py:obj:`~EventListener.names` were
+   specified at initialization time
+2. That :py:obj:`~EventListener.event_filter()` returns True.
+
+This can be modified however you see fit to change the criteria for
+inclusion in the stream.
+
+For convenience, a ``JobListener`` class could be created that simply
+bakes in configuration so it does not need to be repeated:
+
+.. code:: python
+
+   class JobListener(EventListener):
+       def __init__(self, job_id: str):
+           super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+           self.job_id = job_id
+
+       def accept(self, event) -> bool:
+           if not super().accept(event):
+               return False
+           if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
+               return event['data']['id'] == job_id
+           return event['data']['device'] == job_id
+
+From here on out, you can conjure up a custom-purpose listener that
+listens only for job-related events for a specific job-id easily:
+
+.. code:: python
+
+   listener = JobListener('job4')
+   with qmp.listener(listener):
+       await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
+       async for event in listener:
+           print(event)
+           if event['event'] == 'BLOCK_JOB_COMPLETED':
+               break
+
+
+Experimental Interfaces & Design Issues
+---------------------------------------
+
+These interfaces are not ones I am sure I will keep or otherwise modify
+heavily.
+
+qmp.listener()’s type signature
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`listener()` does not return anything, because it was assumed the caller
+already had a handle to the listener. However, for
+``qmp.listener(EventListener())`` forms, the caller will not have saved
+a handle to the listener.
+
+Because this function can accept *many* listeners, I found it hard to
+accurately type in a way where it could be used in both “one” or “many”
+forms conveniently and in a statically type-safe manner.
+
+Ultimately, I removed the return altogether, but perhaps with more time
+I can work out a way to re-add it.
+
+
+API Reference
+-------------
+
+"""
+
+import asyncio
+from contextlib import contextmanager
+import logging
+from typing import (
+    AsyncIterator,
+    Callable,
+    Iterable,
+    Iterator,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+)
+
+from .error import QMPError
+from .message import Message
+
+
+EventNames = Union[str, Iterable[str], None]
+EventFilter = Callable[[Message], bool]
+
+
+class ListenerError(QMPError):
+    """
+    Generic error class for `EventListener`-related problems.
+    """
+
+
+class EventListener:
+    """
+    Selectively listens for events with runtime configurable filtering.
+
+    This class is designed to be directly usable for the most common cases,
+    but it can be extended to provide more rigorous control.
+
+    :param names:
+        One or more names of events to listen for.
+        When not provided, listen for ALL events.
+    :param event_filter:
+        An optional event filtering function.
+        When names are also provided, this acts as a secondary filter.
+
+    When ``names`` and ``event_filter`` are both provided, the names
+    will be filtered first, and then the filter function will be called
+    second. The event filter function can assume that the format of the
+    event is a known format.
+    """
+    def __init__(
+        self,
+        names: EventNames = None,
+        event_filter: Optional[EventFilter] = None,
+    ):
+        # Queue of 'heard' events yet to be witnessed by a caller.
+        self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
+
+        # Intended as a historical record, NOT a processing queue or backlog.
+        self._history: List[Message] = []
+
+        #: Primary event filter, based on one or more event names.
+        self.names: Set[str] = set()
+        if isinstance(names, str):
+            self.names.add(names)
+        elif names is not None:
+            self.names.update(names)
+
+        #: Optional, secondary event filter.
+        self.event_filter: Optional[EventFilter] = event_filter
+
+    @property
+    def history(self) -> Tuple[Message, ...]:
+        """
+        A read-only history of all events seen so far.
+
+        This represents *every* event, including those not yet witnessed
+        via `get()` or ``async for``. It persists between `clear()`
+        calls and is immutable.
+        """
+        return tuple(self._history)
+
+    def accept(self, event: Message) -> bool:
+        """
+        Determine if this listener accepts this event.
+
+        This method determines which events will appear in the stream.
+        The default implementation simply checks the event against the
+        list of names and the event_filter to decide if this
+        `EventListener` accepts a given event. It can be
+        overridden/extended to provide custom listener behavior.
+
+        User code is not expected to need to invoke this method.
+
+        :param event: The event under consideration.
+        :return: `True`, if this listener accepts this event.
+        """
+        name_ok = (not self.names) or (event['event'] in self.names)
+        return name_ok and (
+            (not self.event_filter) or self.event_filter(event)
+        )
+
+    async def put(self, event: Message) -> None:
+        """
+        Conditionally put a new event into the FIFO queue.
+
+        This method is not designed to be invoked from user code, and it
+        should not need to be overridden. It is a public interface so
+        that `QMPClient` has an interface by which it can inform
+        registered listeners of new events.
+
+        The event will be put into the queue if
+        :py:meth:`~EventListener.accept()` returns `True`.
+
+        :param event: The new event to put into the FIFO queue.
+        """
+        if not self.accept(event):
+            return
+
+        self._history.append(event)
+        await self._queue.put(event)
+
+    async def get(self) -> Message:
+        """
+        Wait for the very next event in this stream.
+
+        If one is already available, return that one.
+        """
+        return await self._queue.get()
+
+    def empty(self) -> bool:
+        """
+        Return `True` if there are no pending events.
+        """
+        return self._queue.empty()
+
+    def clear(self) -> List[Message]:
+        """
+        Clear this listener of all pending events.
+
+        Called when an `EventListener` is being unregistered, this clears the
+        pending FIFO queue synchronously. It can be also be used to
+        manually clear any pending events, if desired.
+
+        :return: The cleared events, if any.
+
+        .. warning::
+            Take care when discarding events. Cleared events will be
+            silently tossed on the floor. All events that were ever
+            accepted by this listener are visible in `history()`.
+        """
+        events = []
+        while True:
+            try:
+                events.append(self._queue.get_nowait())
+            except asyncio.QueueEmpty:
+                break
+
+        return events
+
+    def __aiter__(self) -> AsyncIterator[Message]:
+        return self
+
+    async def __anext__(self) -> Message:
+        """
+        Enables the `EventListener` to function as an async iterator.
+
+        It may be used like this:
+
+        .. code:: python
+
+            async for event in listener:
+                print(event)
+
+        These iterators will never terminate of their own accord; you
+        must provide break conditions or otherwise prepare to run them
+        in an `asyncio.Task` that can be cancelled.
+        """
+        return await self.get()
+
+
+class Events:
+    """
+    Events is a mix-in class that adds event functionality to the QMP class.
+
+    It's designed specifically as a mix-in for `QMPClient`, and it
+    relies upon the class it is being mixed into having a 'logger'
+    property.
+    """
+    def __init__(self) -> None:
+        self._listeners: List[EventListener] = []
+
+        #: Default, all-events `EventListener`.
+        self.events: EventListener = EventListener()
+        self.register_listener(self.events)
+
+        # Parent class needs to have a logger
+        self.logger: logging.Logger
+
+    async def _event_dispatch(self, msg: Message) -> None:
+        """
+        Given a new event, propagate it to all of the active listeners.
+
+        :param msg: The event to propagate.
+        """
+        for listener in self._listeners:
+            await listener.put(msg)
+
+    def register_listener(self, listener: EventListener) -> None:
+        """
+        Register and activate an `EventListener`.
+
+        :param listener: The listener to activate.
+        :raise ListenerError: If the given listener is already registered.
+        """
+        if listener in self._listeners:
+            raise ListenerError("Attempted to re-register existing listener")
+        self.logger.debug("Registering %s.", str(listener))
+        self._listeners.append(listener)
+
+    def remove_listener(self, listener: EventListener) -> None:
+        """
+        Unregister and deactivate an `EventListener`.
+
+        The removed listener will have its pending events cleared via
+        `clear()`. The listener can be re-registered later when
+        desired.
+
+        :param listener: The listener to deactivate.
+        :raise ListenerError: If the given listener is not registered.
+        """
+        if listener == self.events:
+            raise ListenerError("Cannot remove the default listener.")
+        self.logger.debug("Removing %s.", str(listener))
+        listener.clear()
+        self._listeners.remove(listener)
+
+    @contextmanager
+    def listen(self, *listeners: EventListener) -> Iterator[None]:
+        r"""
+        Context manager: Temporarily listen with an `EventListener`.
+
+        Accepts one or more `EventListener` objects and registers them,
+        activating them for the duration of the context block.
+
+        `EventListener` objects will have any pending events in their
+        FIFO queue cleared upon exiting the context block, when they are
+        deactivated.
+
+        :param \*listeners: One or more EventListeners to activate.
+        :raise ListenerError: If the given listener(s) are already active.
+        """
+        _added = []
+
+        try:
+            for listener in listeners:
+                self.register_listener(listener)
+                _added.append(listener)
+
+            yield
+
+        finally:
+            for listener in _added:
+                self.remove_listener(listener)
+
+    @contextmanager
+    def listener(
+        self,
+        names: EventNames = (),
+        event_filter: Optional[EventFilter] = None
+    ) -> Iterator[EventListener]:
+        """
+        Context manager: Temporarily listen with a new `EventListener`.
+
+        Creates an `EventListener` object and registers it, activating
+        it for the duration of the context block.
+
+        :param names:
+            One or more names of events to listen for.
+            When not provided, listen for ALL events.
+        :param event_filter:
+            An optional event filtering function.
+            When names are also provided, this acts as a secondary filter.
+
+        :return: The newly created and active `EventListener`.
+        """
+        listener = EventListener(names, event_filter)
+        with self.listen(listener):
+            yield listener
diff --git a/python/qemu/qmp/legacy.py b/python/qemu/qmp/legacy.py
new file mode 100644
index 0000000000..a8629b44df
--- /dev/null
+++ b/python/qemu/qmp/legacy.py
@@ -0,0 +1,317 @@
+"""
+(Legacy) Sync QMP Wrapper
+
+This module provides the `QEMUMonitorProtocol` class, which is a
+synchronous wrapper around `QMPClient`.
+
+Its design closely resembles that of the original QEMUMonitorProtocol
+class, originally written by Luiz Capitulino. It is provided here for
+compatibility with scripts inside the QEMU source tree that expect the
+old interface.
+"""
+
+#
+# Copyright (C) 2009-2022 Red Hat Inc.
+#
+# Authors:
+#  Luiz Capitulino <lcapitulino@redhat.com>
+#  John Snow <jsnow@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2.  See
+# the COPYING file in the top-level directory.
+#
+
+import asyncio
+from types import TracebackType
+from typing import (
+    Any,
+    Awaitable,
+    Dict,
+    List,
+    Optional,
+    Type,
+    TypeVar,
+    Union,
+)
+
+from .error import QMPError
+from .protocol import Runstate, SocketAddrT
+from .qmp_client import QMPClient
+
+
+#: QMPMessage is an entire QMP message of any kind.
+QMPMessage = Dict[str, Any]
+
+#: QMPReturnValue is the 'return' value of a command.
+QMPReturnValue = object
+
+#: QMPObject is any object in a QMP message.
+QMPObject = Dict[str, object]
+
+# QMPMessage can be outgoing commands or incoming events/returns.
+# QMPReturnValue is usually a dict/json object, but due to QAPI's
+# 'returns-whitelist', it can actually be anything.
+#
+# {'return': {}} is a QMPMessage,
+# {} is the QMPReturnValue.
+
+
+class QMPBadPortError(QMPError):
+    """
+    Unable to parse socket address: Port was non-numerical.
+    """
+
+
+class QEMUMonitorProtocol:
+    """
+    Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
+    and then allow to handle commands and events.
+
+    :param address:  QEMU address, can be either a unix socket path (string)
+                     or a tuple in the form ( address, port ) for a TCP
+                     connection
+    :param server:   Act as the socket server. (See 'accept')
+    :param nickname: Optional nickname used for logging.
+    """
+
+    def __init__(self, address: SocketAddrT,
+                 server: bool = False,
+                 nickname: Optional[str] = None):
+
+        self._qmp = QMPClient(nickname)
+        self._aloop = asyncio.get_event_loop()
+        self._address = address
+        self._timeout: Optional[float] = None
+
+        if server:
+            self._sync(self._qmp.start_server(self._address))
+
+    _T = TypeVar('_T')
+
+    def _sync(
+            self, future: Awaitable[_T], timeout: Optional[float] = None
+    ) -> _T:
+        return self._aloop.run_until_complete(
+            asyncio.wait_for(future, timeout=timeout)
+        )
+
+    def _get_greeting(self) -> Optional[QMPMessage]:
+        if self._qmp.greeting is not None:
+            # pylint: disable=protected-access
+            return self._qmp.greeting._asdict()
+        return None
+
+    def __enter__(self: _T) -> _T:
+        # Implement context manager enter function.
+        return self
+
+    def __exit__(self,
+                 # pylint: disable=duplicate-code
+                 # see https://github.com/PyCQA/pylint/issues/3619
+                 exc_type: Optional[Type[BaseException]],
+                 exc_val: Optional[BaseException],
+                 exc_tb: Optional[TracebackType]) -> None:
+        # Implement context manager exit function.
+        self.close()
+
+    @classmethod
+    def parse_address(cls, address: str) -> SocketAddrT:
+        """
+        Parse a string into a QMP address.
+
+        Figure out if the argument is in the port:host form.
+        If it's not, it's probably a file path.
+        """
+        components = address.split(':')
+        if len(components) == 2:
+            try:
+                port = int(components[1])
+            except ValueError:
+                msg = f"Bad port: '{components[1]}' in '{address}'."
+                raise QMPBadPortError(msg) from None
+            return (components[0], port)
+
+        # Treat as filepath.
+        return address
+
+    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
+        """
+        Connect to the QMP Monitor and perform capabilities negotiation.
+
+        :return: QMP greeting dict, or None if negotiate is false
+        :raise ConnectError: on connection errors
+        """
+        self._qmp.await_greeting = negotiate
+        self._qmp.negotiate = negotiate
+
+        self._sync(
+            self._qmp.connect(self._address)
+        )
+        return self._get_greeting()
+
+    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
+        """
+        Await connection from QMP Monitor and perform capabilities negotiation.
+
+        :param timeout:
+            timeout in seconds (nonnegative float number, or None).
+            If None, there is no timeout, and this may block forever.
+
+        :return: QMP greeting dict
+        :raise ConnectError: on connection errors
+        """
+        self._qmp.await_greeting = True
+        self._qmp.negotiate = True
+
+        self._sync(self._qmp.accept(), timeout)
+
+        ret = self._get_greeting()
+        assert ret is not None
+        return ret
+
+    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
+        """
+        Send a QMP command to the QMP Monitor.
+
+        :param qmp_cmd: QMP command to be sent as a Python dict
+        :return: QMP response as a Python dict
+        """
+        return dict(
+            self._sync(
+                # pylint: disable=protected-access
+
+                # _raw() isn't a public API, because turning off
+                # automatic ID assignment is discouraged. For
+                # compatibility with iotests *only*, do it anyway.
+                self._qmp._raw(qmp_cmd, assign_id=False),
+                self._timeout
+            )
+        )
+
+    def cmd(self, name: str,
+            args: Optional[Dict[str, object]] = None,
+            cmd_id: Optional[object] = None) -> QMPMessage:
+        """
+        Build a QMP command and send it to the QMP Monitor.
+
+        :param name: command name (string)
+        :param args: command arguments (dict)
+        :param cmd_id: command id (dict, list, string or int)
+        """
+        qmp_cmd: QMPMessage = {'execute': name}
+        if args:
+            qmp_cmd['arguments'] = args
+        if cmd_id:
+            qmp_cmd['id'] = cmd_id
+        return self.cmd_obj(qmp_cmd)
+
+    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
+        """
+        Build and send a QMP command to the monitor, report errors if any
+        """
+        return self._sync(
+            self._qmp.execute(cmd, kwds),
+            self._timeout
+        )
+
+    def pull_event(self,
+                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
+        """
+        Pulls a single event.
+
+        :param wait:
+            If False or 0, do not wait. Return None if no events ready.
+            If True, wait forever until the next event.
+            Otherwise, wait for the specified number of seconds.
+
+        :raise asyncio.TimeoutError:
+            When a timeout is requested and the timeout period elapses.
+
+        :return: The first available QMP event, or None.
+        """
+        if not wait:
+            # wait is False/0: "do not wait, do not except."
+            if self._qmp.events.empty():
+                return None
+
+        # If wait is 'True', wait forever. If wait is False/0, the events
+        # queue must not be empty; but it still needs some real amount
+        # of time to complete.
+        timeout = None
+        if wait and isinstance(wait, float):
+            timeout = wait
+
+        return dict(
+            self._sync(
+                self._qmp.events.get(),
+                timeout
+            )
+        )
+
+    def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
+        """
+        Get a list of QMP events and clear all pending events.
+
+        :param wait:
+            If False or 0, do not wait. Return None if no events ready.
+            If True, wait until we have at least one event.
+            Otherwise, wait for up to the specified number of seconds for at
+            least one event.
+
+        :raise asyncio.TimeoutError:
+            When a timeout is requested and the timeout period elapses.
+
+        :return: A list of QMP events.
+        """
+        events = [dict(x) for x in self._qmp.events.clear()]
+        if events:
+            return events
+
+        event = self.pull_event(wait)
+        return [event] if event is not None else []
+
+    def clear_events(self) -> None:
+        """Clear current list of pending events."""
+        self._qmp.events.clear()
+
+    def close(self) -> None:
+        """Close the connection."""
+        self._sync(
+            self._qmp.disconnect()
+        )
+
+    def settimeout(self, timeout: Optional[float]) -> None:
+        """
+        Set the timeout for QMP RPC execution.
+
+        This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
+        The `accept`, `pull_event` and `get_event` methods have their
+        own configurable timeouts.
+
+        :param timeout:
+            timeout in seconds, or None.
+            None will wait indefinitely.
+        """
+        self._timeout = timeout
+
+    def send_fd_scm(self, fd: int) -> None:
+        """
+        Send a file descriptor to the remote via SCM_RIGHTS.
+        """
+        self._qmp.send_fd_scm(fd)
+
+    def __del__(self) -> None:
+        if self._qmp.runstate == Runstate.IDLE:
+            return
+
+        if not self._aloop.is_running():
+            self.close()
+        else:
+            # Garbage collection ran while the event loop was running.
+            # Nothing we can do about it now, but if we don't raise our
+            # own error, the user will be treated to a lot of traceback
+            # they might not understand.
+            raise QMPError(
+                "QEMUMonitorProtocol.close()"
+                " was not called before object was garbage collected"
+            )
diff --git a/python/qemu/qmp/message.py b/python/qemu/qmp/message.py
new file mode 100644
index 0000000000..f76ccc9074
--- /dev/null
+++ b/python/qemu/qmp/message.py
@@ -0,0 +1,209 @@
+"""
+QMP Message Format
+
+This module provides the `Message` class, which represents a single QMP
+message sent to or from the server.
+"""
+
+import json
+from json import JSONDecodeError
+from typing import (
+    Dict,
+    Iterator,
+    Mapping,
+    MutableMapping,
+    Optional,
+    Union,
+)
+
+from .error import ProtocolError
+
+
+class Message(MutableMapping[str, object]):
+    """
+    Represents a single QMP protocol message.
+
+    QMP uses JSON objects as its basic communicative unit; so this
+    Python object is a :py:obj:`~collections.abc.MutableMapping`. It may
+    be instantiated from either another mapping (like a `dict`), or from
+    raw `bytes` that still need to be deserialized.
+
+    Once instantiated, it may be treated like any other MutableMapping::
+
+        >>> msg = Message(b'{"hello": "world"}')
+        >>> assert msg['hello'] == 'world'
+        >>> msg['id'] = 'foobar'
+        >>> print(msg)
+        {
+          "hello": "world",
+          "id": "foobar"
+        }
+
+    It can be converted to `bytes`::
+
+        >>> msg = Message({"hello": "world"})
+        >>> print(bytes(msg))
+        b'{"hello":"world","id":"foobar"}'
+
+    Or back into a garden-variety `dict`::
+
+       >>> dict(msg)
+       {'hello': 'world'}
+
+
+    :param value: Initial value, if any.
+    :param eager:
+        When `True`, attempt to serialize or deserialize the initial value
+        immediately, so that conversion exceptions are raised during
+        the call to ``__init__()``.
+    """
+    # pylint: disable=too-many-ancestors
+
+    def __init__(self,
+                 value: Union[bytes, Mapping[str, object]] = b'{}', *,
+                 eager: bool = True):
+        self._data: Optional[bytes] = None
+        self._obj: Optional[Dict[str, object]] = None
+
+        if isinstance(value, bytes):
+            self._data = value
+            if eager:
+                self._obj = self._deserialize(self._data)
+        else:
+            self._obj = dict(value)
+            if eager:
+                self._data = self._serialize(self._obj)
+
+    # Methods necessary to implement the MutableMapping interface, see:
+    # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping
+
+    # We get pop, popitem, clear, update, setdefault, __contains__,
+    # keys, items, values, get, __eq__ and __ne__ for free.
+
+    def __getitem__(self, key: str) -> object:
+        return self._object[key]
+
+    def __setitem__(self, key: str, value: object) -> None:
+        self._object[key] = value
+        self._data = None
+
+    def __delitem__(self, key: str) -> None:
+        del self._object[key]
+        self._data = None
+
+    def __iter__(self) -> Iterator[str]:
+        return iter(self._object)
+
+    def __len__(self) -> int:
+        return len(self._object)
+
+    # Dunder methods not related to MutableMapping:
+
+    def __repr__(self) -> str:
+        if self._obj is not None:
+            return f"Message({self._object!r})"
+        return f"Message({bytes(self)!r})"
+
+    def __str__(self) -> str:
+        """Pretty-printed representation of this QMP message."""
+        return json.dumps(self._object, indent=2)
+
+    def __bytes__(self) -> bytes:
+        """bytes representing this QMP message."""
+        if self._data is None:
+            self._data = self._serialize(self._obj or {})
+        return self._data
+
+    # Conversion Methods
+
+    @property
+    def _object(self) -> Dict[str, object]:
+        """
+        A `dict` representing this QMP message.
+
+        Generated on-demand, if required. This property is private
+        because it returns an object that could be used to invalidate
+        the internal state of the `Message` object.
+        """
+        if self._obj is None:
+            self._obj = self._deserialize(self._data or b'{}')
+        return self._obj
+
+    @classmethod
+    def _serialize(cls, value: object) -> bytes:
+        """
+        Serialize a JSON object as `bytes`.
+
+        :raise ValueError: When the object cannot be serialized.
+        :raise TypeError: When the object cannot be serialized.
+
+        :return: `bytes` ready to be sent over the wire.
+        """
+        return json.dumps(value, separators=(',', ':')).encode('utf-8')
+
+    @classmethod
+    def _deserialize(cls, data: bytes) -> Dict[str, object]:
+        """
+        Deserialize JSON `bytes` into a native Python `dict`.
+
+        :raise DeserializationError:
+            If JSON deserialization fails for any reason.
+        :raise UnexpectedTypeError:
+            If the data does not represent a JSON object.
+
+        :return: A `dict` representing this QMP message.
+        """
+        try:
+            obj = json.loads(data)
+        except JSONDecodeError as err:
+            emsg = "Failed to deserialize QMP message."
+            raise DeserializationError(emsg, data) from err
+        if not isinstance(obj, dict):
+            raise UnexpectedTypeError(
+                "QMP message is not a JSON object.",
+                obj
+            )
+        return obj
+
+
+class DeserializationError(ProtocolError):
+    """
+    A QMP message was not understood as JSON.
+
+    When this Exception is raised, ``__cause__`` will be set to the
+    `json.JSONDecodeError` Exception, which can be interrogated for
+    further details.
+
+    :param error_message: Human-readable string describing the error.
+    :param raw: The raw `bytes` that prompted the failure.
+    """
+    def __init__(self, error_message: str, raw: bytes):
+        super().__init__(error_message)
+        #: The raw `bytes` that were not understood as JSON.
+        self.raw: bytes = raw
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  raw bytes were: {str(self.raw)}",
+        ])
+
+
+class UnexpectedTypeError(ProtocolError):
+    """
+    A QMP message was JSON, but not a JSON object.
+
+    :param error_message: Human-readable string describing the error.
+    :param value: The deserialized JSON value that wasn't an object.
+    """
+    def __init__(self, error_message: str, value: object):
+        super().__init__(error_message)
+        #: The JSON value that was expected to be an object.
+        self.value: object = value
+
+    def __str__(self) -> str:
+        strval = json.dumps(self.value, indent=2)
+        return "\n".join([
+            super().__str__(),
+            f"  json value was: {strval}",
+        ])
diff --git a/python/qemu/qmp/models.py b/python/qemu/qmp/models.py
new file mode 100644
index 0000000000..de87f87804
--- /dev/null
+++ b/python/qemu/qmp/models.py
@@ -0,0 +1,146 @@
+"""
+QMP Data Models
+
+This module provides simplistic data classes that represent the few
+structures that the QMP spec mandates; they are used to verify incoming
+data to make sure it conforms to spec.
+"""
+# pylint: disable=too-few-public-methods
+
+from collections import abc
+import copy
+from typing import (
+    Any,
+    Dict,
+    Mapping,
+    Optional,
+    Sequence,
+)
+
+
+class Model:
+    """
+    Abstract data model, representing some QMP object of some kind.
+
+    :param raw: The raw object to be validated.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        self._raw = raw
+
+    def _check_key(self, key: str) -> None:
+        if key not in self._raw:
+            raise KeyError(f"'{self._name}' object requires '{key}' member")
+
+    def _check_value(self, key: str, type_: type, typestr: str) -> None:
+        assert key in self._raw
+        if not isinstance(self._raw[key], type_):
+            raise TypeError(
+                f"'{self._name}' member '{key}' must be a {typestr}"
+            )
+
+    def _check_member(self, key: str, type_: type, typestr: str) -> None:
+        self._check_key(key)
+        self._check_value(key, type_, typestr)
+
+    @property
+    def _name(self) -> str:
+        return type(self).__name__
+
+    def __repr__(self) -> str:
+        return f"{self._name}({self._raw!r})"
+
+
+class Greeting(Model):
+    """
+    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+    :param raw: The raw Greeting object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'QMP' member
+        self.QMP: QMPGreeting  # pylint: disable=invalid-name
+
+        self._check_member('QMP', abc.Mapping, "JSON object")
+        self.QMP = QMPGreeting(self._raw['QMP'])
+
+    def _asdict(self) -> Dict[str, object]:
+        """
+        For compatibility with the iotests sync QMP wrapper.
+
+        The legacy QMP interface needs Greetings as a garden-variety Dict.
+
+        This interface is private in the hopes that it will be able to
+        be dropped again in the near-future. Caller beware!
+        """
+        return dict(copy.deepcopy(self._raw))
+
+
+class QMPGreeting(Model):
+    """
+    Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+    :param raw: The raw QMPGreeting object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'version' member
+        self.version: Mapping[str, object]
+        #: 'capabilities' member
+        self.capabilities: Sequence[object]
+
+        self._check_member('version', abc.Mapping, "JSON object")
+        self.version = self._raw['version']
+
+        self._check_member('capabilities', abc.Sequence, "JSON array")
+        self.capabilities = self._raw['capabilities']
+
+
+class ErrorResponse(Model):
+    """
+    Defined in qmp-spec.txt, section 2.4.2, "error".
+
+    :param raw: The raw ErrorResponse object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'error' member
+        self.error: ErrorInfo
+        #: 'id' member
+        self.id: Optional[object] = None  # pylint: disable=invalid-name
+
+        self._check_member('error', abc.Mapping, "JSON object")
+        self.error = ErrorInfo(self._raw['error'])
+
+        if 'id' in raw:
+            self.id = raw['id']
+
+
+class ErrorInfo(Model):
+    """
+    Defined in qmp-spec.txt, section 2.4.2, "error".
+
+    :param raw: The raw ErrorInfo object.
+    :raise KeyError: If any required fields are absent.
+    :raise TypeError: If any required fields have the wrong type.
+    """
+    def __init__(self, raw: Mapping[str, Any]):
+        super().__init__(raw)
+        #: 'class' member, with an underscore to avoid conflicts in Python.
+        self.class_: str
+        #: 'desc' member
+        self.desc: str
+
+        self._check_member('class', str, "string")
+        self.class_ = self._raw['class']
+
+        self._check_member('desc', str, "string")
+        self.desc = self._raw['desc']
diff --git a/python/qemu/qmp/protocol.py b/python/qemu/qmp/protocol.py
new file mode 100644
index 0000000000..6ea86650ad
--- /dev/null
+++ b/python/qemu/qmp/protocol.py
@@ -0,0 +1,1048 @@
+"""
+Generic Asynchronous Message-based Protocol Support
+
+This module provides a generic framework for sending and receiving
+messages over an asyncio stream. `AsyncProtocol` is an abstract class
+that implements the core mechanisms of a simple send/receive protocol,
+and is designed to be extended.
+
+In this package, it is used as the implementation for the `QMPClient`
+class.
+"""
+
+# It's all the docstrings ... ! It's long for a good reason ^_^;
+# pylint: disable=too-many-lines
+
+import asyncio
+from asyncio import StreamReader, StreamWriter
+from enum import Enum
+from functools import wraps
+import logging
+from ssl import SSLContext
+from typing import (
+    Any,
+    Awaitable,
+    Callable,
+    Generic,
+    List,
+    Optional,
+    Tuple,
+    TypeVar,
+    Union,
+    cast,
+)
+
+from .error import QMPError
+from .util import (
+    bottom_half,
+    create_task,
+    exception_summary,
+    flush,
+    is_closing,
+    pretty_traceback,
+    upper_half,
+    wait_closed,
+)
+
+
+T = TypeVar('T')
+_U = TypeVar('_U')
+_TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
+
+InternetAddrT = Tuple[str, int]
+UnixAddrT = str
+SocketAddrT = Union[UnixAddrT, InternetAddrT]
+
+
+class Runstate(Enum):
+    """Protocol session runstate."""
+
+    #: Fully quiesced and disconnected.
+    IDLE = 0
+    #: In the process of connecting or establishing a session.
+    CONNECTING = 1
+    #: Fully connected and active session.
+    RUNNING = 2
+    #: In the process of disconnecting.
+    #: Runstate may be returned to `IDLE` by calling `disconnect()`.
+    DISCONNECTING = 3
+
+
+class ConnectError(QMPError):
+    """
+    Raised when the initial connection process has failed.
+
+    This Exception always wraps a "root cause" exception that can be
+    interrogated for additional information.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+    def __init__(self, error_message: str, exc: Exception):
+        super().__init__(error_message)
+        #: Human-readable error string
+        self.error_message: str = error_message
+        #: Wrapped root cause exception
+        self.exc: Exception = exc
+
+    def __str__(self) -> str:
+        cause = str(self.exc)
+        if not cause:
+            # If there's no error string, use the exception name.
+            cause = exception_summary(self.exc)
+        return f"{self.error_message}: {cause}"
+
+
+class StateError(QMPError):
+    """
+    An API command (connect, execute, etc) was issued at an inappropriate time.
+
+    This error is raised when a command like
+    :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
+    time.
+
+    :param error_message: Human-readable string describing the state violation.
+    :param state: The actual `Runstate` seen at the time of the violation.
+    :param required: The `Runstate` required to process this command.
+    """
+    def __init__(self, error_message: str,
+                 state: Runstate, required: Runstate):
+        super().__init__(error_message)
+        self.error_message = error_message
+        self.state = state
+        self.required = required
+
+
+F = TypeVar('F', bound=Callable[..., Any])  # pylint: disable=invalid-name
+
+
+# Don't Panic.
+def require(required_state: Runstate) -> Callable[[F], F]:
+    """
+    Decorator: protect a method so it can only be run in a certain `Runstate`.
+
+    :param required_state: The `Runstate` required to invoke this method.
+    :raise StateError: When the required `Runstate` is not met.
+    """
+    def _decorator(func: F) -> F:
+        # _decorator is the decorator that is built by calling the
+        # require() decorator factory; e.g.:
+        #
+        # @require(Runstate.IDLE) def foo(): ...
+        # will replace 'foo' with the result of '_decorator(foo)'.
+
+        @wraps(func)
+        def _wrapper(proto: 'AsyncProtocol[Any]',
+                     *args: Any, **kwargs: Any) -> Any:
+            # _wrapper is the function that gets executed prior to the
+            # decorated method.
+
+            name = type(proto).__name__
+
+            if proto.runstate != required_state:
+                if proto.runstate == Runstate.CONNECTING:
+                    emsg = f"{name} is currently connecting."
+                elif proto.runstate == Runstate.DISCONNECTING:
+                    emsg = (f"{name} is disconnecting."
+                            " Call disconnect() to return to IDLE state.")
+                elif proto.runstate == Runstate.RUNNING:
+                    emsg = f"{name} is already connected and running."
+                elif proto.runstate == Runstate.IDLE:
+                    emsg = f"{name} is disconnected and idle."
+                else:
+                    assert False
+                raise StateError(emsg, proto.runstate, required_state)
+            # No StateError, so call the wrapped method.
+            return func(proto, *args, **kwargs)
+
+        # Return the decorated method;
+        # Transforming Func to Decorated[Func].
+        return cast(F, _wrapper)
+
+    # Return the decorator instance from the decorator factory. Phew!
+    return _decorator
+
+
+class AsyncProtocol(Generic[T]):
+    """
+    AsyncProtocol implements a generic async message-based protocol.
+
+    This protocol assumes the basic unit of information transfer between
+    client and server is a "message", the details of which are left up
+    to the implementation. It assumes the sending and receiving of these
+    messages is full-duplex and not necessarily correlated; i.e. it
+    supports asynchronous inbound messages.
+
+    It is designed to be extended by a specific protocol which provides
+    the implementations for how to read and send messages. These must be
+    defined in `_do_recv()` and `_do_send()`, respectively.
+
+    Other callbacks have a default implementation, but are intended to be
+    either extended or overridden:
+
+     - `_establish_session`:
+         The base implementation starts the reader/writer tasks.
+         A protocol implementation can override this call, inserting
+         actions to be taken prior to starting the reader/writer tasks
+         before the super() call; actions needing to occur afterwards
+         can be written after the super() call.
+     - `_on_message`:
+         Actions to be performed when a message is received.
+     - `_cb_outbound`:
+         Logging/Filtering hook for all outbound messages.
+     - `_cb_inbound`:
+         Logging/Filtering hook for all inbound messages.
+         This hook runs *before* `_on_message()`.
+
+    :param name:
+        Name used for logging messages, if any. By default, messages
+        will log to 'qemu.qmp.protocol', but each individual connection
+        can be given its own logger by giving it a name; messages will
+        then log to 'qemu.qmp.protocol.${name}'.
+    """
+    # pylint: disable=too-many-instance-attributes
+
+    #: Logger object for debugging messages from this connection.
+    logger = logging.getLogger(__name__)
+
+    # Maximum allowable size of read buffer
+    _limit = (64 * 1024)
+
+    # -------------------------
+    # Section: Public interface
+    # -------------------------
+
+    def __init__(self, name: Optional[str] = None) -> None:
+        #: The nickname for this connection, if any.
+        self.name: Optional[str] = name
+        if self.name is not None:
+            self.logger = self.logger.getChild(self.name)
+
+        # stream I/O
+        self._reader: Optional[StreamReader] = None
+        self._writer: Optional[StreamWriter] = None
+
+        # Outbound Message queue
+        self._outgoing: asyncio.Queue[T]
+
+        # Special, long-running tasks:
+        self._reader_task: Optional[asyncio.Future[None]] = None
+        self._writer_task: Optional[asyncio.Future[None]] = None
+
+        # Aggregate of the above two tasks, used for Exception management.
+        self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None
+
+        #: Disconnect task. The disconnect implementation runs in a task
+        #: so that asynchronous disconnects (initiated by the
+        #: reader/writer) are allowed to wait for the reader/writers to
+        #: exit.
+        self._dc_task: Optional[asyncio.Future[None]] = None
+
+        self._runstate = Runstate.IDLE
+        self._runstate_changed: Optional[asyncio.Event] = None
+
+        # Server state for start_server() and _incoming()
+        self._server: Optional[asyncio.AbstractServer] = None
+        self._accepted: Optional[asyncio.Event] = None
+
+    def __repr__(self) -> str:
+        cls_name = type(self).__name__
+        tokens = []
+        if self.name is not None:
+            tokens.append(f"name={self.name!r}")
+        tokens.append(f"runstate={self.runstate.name}")
+        return f"<{cls_name} {' '.join(tokens)}>"
+
+    @property  # @upper_half
+    def runstate(self) -> Runstate:
+        """The current `Runstate` of the connection."""
+        return self._runstate
+
+    @upper_half
+    async def runstate_changed(self) -> Runstate:
+        """
+        Wait for the `runstate` to change, then return that runstate.
+        """
+        await self._runstate_event.wait()
+        return self.runstate
+
+    @upper_half
+    @require(Runstate.IDLE)
+    async def start_server_and_accept(
+            self, address: SocketAddrT,
+            ssl: Optional[SSLContext] = None
+    ) -> None:
+        """
+        Accept a connection and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+        This method is precisely equivalent to calling `start_server()`
+        followed by `accept()`.
+
+        :param address:
+            Address to listen on; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError:
+            When a connection or session cannot be established.
+
+            This exception will wrap a more concrete one. In most cases,
+            the wrapped exception will be `OSError` or `EOFError`. If a
+            protocol-level failure occurs while establishing a new
+            session, the wrapped error may also be an `QMPError`.
+        """
+        await self.start_server(address, ssl)
+        await self.accept()
+        assert self.runstate == Runstate.RUNNING
+
+    @upper_half
+    @require(Runstate.IDLE)
+    async def start_server(self, address: SocketAddrT,
+                           ssl: Optional[SSLContext] = None) -> None:
+        """
+        Start listening for an incoming connection, but do not wait for a peer.
+
+        This method starts listening for an incoming connection, but
+        does not block waiting for a peer. This call will return
+        immediately after binding and listening on a socket. A later
+        call to `accept()` must be made in order to finalize the
+        incoming connection.
+
+        :param address:
+            Address to listen on; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError:
+            When the server could not start listening on this address.
+
+            This exception will wrap a more concrete one. In most cases,
+            the wrapped exception will be `OSError`.
+        """
+        await self._session_guard(
+            self._do_start_server(address, ssl),
+            'Failed to establish connection')
+        assert self.runstate == Runstate.CONNECTING
+
+    @upper_half
+    @require(Runstate.CONNECTING)
+    async def accept(self) -> None:
+        """
+        Accept an incoming connection and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+        :raise StateError: When the `Runstate` is not `CONNECTING`.
+        :raise QMPError: When `start_server()` was not called yet.
+        :raise ConnectError:
+            When a connection or session cannot be established.
+
+            This exception will wrap a more concrete one. In most cases,
+            the wrapped exception will be `OSError` or `EOFError`. If a
+            protocol-level failure occurs while establishing a new
+            session, the wrapped error may also be an `QMPError`.
+        """
+        if self._accepted is None:
+            raise QMPError("Cannot call accept() before start_server().")
+        await self._session_guard(
+            self._do_accept(),
+            'Failed to establish connection')
+        await self._session_guard(
+            self._establish_session(),
+            'Failed to establish session')
+        assert self.runstate == Runstate.RUNNING
+
+    @upper_half
+    @require(Runstate.IDLE)
+    async def connect(self, address: SocketAddrT,
+                      ssl: Optional[SSLContext] = None) -> None:
+        """
+        Connect to the server and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+        :param address:
+            Address to connect to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError:
+            When a connection or session cannot be established.
+
+            This exception will wrap a more concrete one. In most cases,
+            the wrapped exception will be `OSError` or `EOFError`. If a
+            protocol-level failure occurs while establishing a new
+            session, the wrapped error may also be an `QMPError`.
+        """
+        await self._session_guard(
+            self._do_connect(address, ssl),
+            'Failed to establish connection')
+        await self._session_guard(
+            self._establish_session(),
+            'Failed to establish session')
+        assert self.runstate == Runstate.RUNNING
+
+    @upper_half
+    async def disconnect(self) -> None:
+        """
+        Disconnect and wait for all tasks to fully stop.
+
+        If there was an exception that caused the reader/writers to
+        terminate prematurely, it will be raised here.
+
+        :raise Exception: When the reader or writer terminate unexpectedly.
+        """
+        self.logger.debug("disconnect() called.")
+        self._schedule_disconnect()
+        await self._wait_disconnect()
+
+    # --------------------------
+    # Section: Session machinery
+    # --------------------------
+
+    async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None:
+        """
+        Async guard function used to roll back to `IDLE` on any error.
+
+        On any Exception, the state machine will be reset back to
+        `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but
+        `BaseException` events will be left alone (This includes
+        asyncio.CancelledError, even prior to Python 3.8).
+
+        :param error_message:
+            Human-readable string describing what connection phase failed.
+
+        :raise BaseException:
+            When `BaseException` occurs in the guarded block.
+        :raise ConnectError:
+            When any other error is encountered in the guarded block.
+        """
+        # Note: After Python 3.6 support is removed, this should be an
+        # @asynccontextmanager instead of accepting a callback.
+        try:
+            await coro
+        except BaseException as err:
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            try:
+                # Reset the runstate back to IDLE.
+                await self.disconnect()
+            except:
+                # We don't expect any Exceptions from the disconnect function
+                # here, because we failed to connect in the first place.
+                # The disconnect() function is intended to perform
+                # only cannot-fail cleanup here, but you never know.
+                emsg = (
+                    "Unexpected bottom half exception. "
+                    "This is a bug in the QMP library. "
+                    "Please report it to <qemu-devel@nongnu.org> and "
+                    "CC: John Snow <jsnow@redhat.com>."
+                )
+                self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
+                raise
+
+            # CancelledError is an Exception with special semantic meaning;
+            # We do NOT want to wrap it up under ConnectError.
+            # NB: CancelledError is not a BaseException before Python 3.8
+            if isinstance(err, asyncio.CancelledError):
+                raise
+
+            # Any other kind of error can be treated as some kind of connection
+            # failure broadly. Inspect the 'exc' field to explore the root
+            # cause in greater detail.
+            if isinstance(err, Exception):
+                raise ConnectError(emsg, err) from err
+
+            # Raise BaseExceptions un-wrapped, they're more important.
+            raise
+
+    @property
+    def _runstate_event(self) -> asyncio.Event:
+        # asyncio.Event() objects should not be created prior to entrance into
+        # an event loop, so we can ensure we create it in the correct context.
+        # Create it on-demand *only* at the behest of an 'async def' method.
+        if not self._runstate_changed:
+            self._runstate_changed = asyncio.Event()
+        return self._runstate_changed
+
+    @upper_half
+    @bottom_half
+    def _set_state(self, state: Runstate) -> None:
+        """
+        Change the `Runstate` of the protocol connection.
+
+        Signals the `runstate_changed` event.
+        """
+        if state == self._runstate:
+            return
+
+        self.logger.debug("Transitioning from '%s' to '%s'.",
+                          str(self._runstate), str(state))
+        self._runstate = state
+        self._runstate_event.set()
+        self._runstate_event.clear()
+
+    @bottom_half
+    async def _stop_server(self) -> None:
+        """
+        Stop listening for / accepting new incoming connections.
+        """
+        if self._server is None:
+            return
+
+        try:
+            self.logger.debug("Stopping server.")
+            self._server.close()
+            await self._server.wait_closed()
+            self.logger.debug("Server stopped.")
+        finally:
+            self._server = None
+
+    @bottom_half  # However, it does not run from the R/W tasks.
+    async def _incoming(self,
+                        reader: asyncio.StreamReader,
+                        writer: asyncio.StreamWriter) -> None:
+        """
+        Accept an incoming connection and signal the upper_half.
+
+        This method does the minimum necessary to accept a single
+        incoming connection. It signals back to the upper_half ASAP so
+        that any errors during session initialization can occur
+        naturally in the caller's stack.
+
+        :param reader: Incoming `asyncio.StreamReader`
+        :param writer: Incoming `asyncio.StreamWriter`
+        """
+        peer = writer.get_extra_info('peername', 'Unknown peer')
+        self.logger.debug("Incoming connection from %s", peer)
+
+        if self._reader or self._writer:
+            # Sadly, we can have more than one pending connection
+            # because of https://bugs.python.org/issue46715
+            # Close any extra connections we don't actually want.
+            self.logger.warning("Extraneous connection inadvertently accepted")
+            writer.close()
+            return
+
+        # A connection has been accepted; stop listening for new ones.
+        assert self._accepted is not None
+        await self._stop_server()
+        self._reader, self._writer = (reader, writer)
+        self._accepted.set()
+
+    @upper_half
+    async def _do_start_server(self, address: SocketAddrT,
+                               ssl: Optional[SSLContext] = None) -> None:
+        """
+        Start listening for an incoming connection, but do not wait for a peer.
+
+        This method starts listening for an incoming connection, but does not
+        block waiting for a peer. This call will return immediately after
+        binding and listening to a socket. A later call to accept() must be
+        made in order to finalize the incoming connection.
+
+        :param address:
+            Address to listen on; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise OSError: For stream-related errors.
+        """
+        assert self.runstate == Runstate.IDLE
+        self._set_state(Runstate.CONNECTING)
+
+        self.logger.debug("Awaiting connection on %s ...", address)
+        self._accepted = asyncio.Event()
+
+        if isinstance(address, tuple):
+            coro = asyncio.start_server(
+                self._incoming,
+                host=address[0],
+                port=address[1],
+                ssl=ssl,
+                backlog=1,
+                limit=self._limit,
+            )
+        else:
+            coro = asyncio.start_unix_server(
+                self._incoming,
+                path=address,
+                ssl=ssl,
+                backlog=1,
+                limit=self._limit,
+            )
+
+        # Allow runstate watchers to witness 'CONNECTING' state; some
+        # failures in the streaming layer are synchronous and will not
+        # otherwise yield.
+        await asyncio.sleep(0)
+
+        # This will start the server (bind(2), listen(2)). It will also
+        # call accept(2) if we yield, but we don't block on that here.
+        self._server = await coro
+        self.logger.debug("Server listening on %s", address)
+
+    @upper_half
+    async def _do_accept(self) -> None:
+        """
+        Wait for and accept an incoming connection.
+
+        Requires that we have not yet accepted an incoming connection
+        from the upper_half, but it's OK if the server is no longer
+        running because the bottom_half has already accepted the
+        connection.
+        """
+        assert self._accepted is not None
+        await self._accepted.wait()
+        assert self._server is None
+        self._accepted = None
+
+        self.logger.debug("Connection accepted.")
+
+    @upper_half
+    async def _do_connect(self, address: SocketAddrT,
+                          ssl: Optional[SSLContext] = None) -> None:
+        """
+        Acting as the transport client, initiate a connection to a server.
+
+        :param address:
+            Address to connect to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise OSError: For stream-related errors.
+        """
+        assert self.runstate == Runstate.IDLE
+        self._set_state(Runstate.CONNECTING)
+
+        # Allow runstate watchers to witness 'CONNECTING' state; some
+        # failures in the streaming layer are synchronous and will not
+        # otherwise yield.
+        await asyncio.sleep(0)
+
+        self.logger.debug("Connecting to %s ...", address)
+
+        if isinstance(address, tuple):
+            connect = asyncio.open_connection(
+                address[0],
+                address[1],
+                ssl=ssl,
+                limit=self._limit,
+            )
+        else:
+            connect = asyncio.open_unix_connection(
+                path=address,
+                ssl=ssl,
+                limit=self._limit,
+            )
+        self._reader, self._writer = await connect
+
+        self.logger.debug("Connected.")
+
+    @upper_half
+    async def _establish_session(self) -> None:
+        """
+        Establish a new session.
+
+        Starts the readers/writer tasks; subclasses may perform their
+        own negotiations here. The Runstate will be RUNNING upon
+        successful conclusion.
+        """
+        assert self.runstate == Runstate.CONNECTING
+
+        self._outgoing = asyncio.Queue()
+
+        reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+        writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
+
+        self._reader_task = create_task(reader_coro)
+        self._writer_task = create_task(writer_coro)
+
+        self._bh_tasks = asyncio.gather(
+            self._reader_task,
+            self._writer_task,
+        )
+
+        self._set_state(Runstate.RUNNING)
+        await asyncio.sleep(0)  # Allow runstate_event to process
+
+    @upper_half
+    @bottom_half
+    def _schedule_disconnect(self) -> None:
+        """
+        Initiate a disconnect; idempotent.
+
+        This method is used both in the upper-half as a direct
+        consequence of `disconnect()`, and in the bottom-half in the
+        case of unhandled exceptions in the reader/writer tasks.
+
+        It can be invoked no matter what the `runstate` is.
+        """
+        if not self._dc_task:
+            self._set_state(Runstate.DISCONNECTING)
+            self.logger.debug("Scheduling disconnect.")
+            self._dc_task = create_task(self._bh_disconnect())
+
+    @upper_half
+    async def _wait_disconnect(self) -> None:
+        """
+        Waits for a previously scheduled disconnect to finish.
+
+        This method will gather any bottom half exceptions and re-raise
+        the one that occurred first; presuming it to be the root cause
+        of any subsequent Exceptions. It is intended to be used in the
+        upper half of the call chain.
+
+        :raise Exception:
+            Arbitrary exception re-raised on behalf of the reader/writer.
+        """
+        assert self.runstate == Runstate.DISCONNECTING
+        assert self._dc_task
+
+        aws: List[Awaitable[object]] = [self._dc_task]
+        if self._bh_tasks:
+            aws.insert(0, self._bh_tasks)
+        all_defined_tasks = asyncio.gather(*aws)
+
+        # Ensure disconnect is done; Exception (if any) is not raised here:
+        await asyncio.wait((self._dc_task,))
+
+        try:
+            await all_defined_tasks  # Raise Exceptions from the bottom half.
+        finally:
+            self._cleanup()
+            self._set_state(Runstate.IDLE)
+
+    @upper_half
+    def _cleanup(self) -> None:
+        """
+        Fully reset this object to a clean state and return to `IDLE`.
+        """
+        def _paranoid_task_erase(task: Optional['asyncio.Future[_U]']
+                                 ) -> Optional['asyncio.Future[_U]']:
+            # Help to erase a task, ENSURING it is fully quiesced first.
+            assert (task is None) or task.done()
+            return None if (task and task.done()) else task
+
+        assert self.runstate == Runstate.DISCONNECTING
+        self._dc_task = _paranoid_task_erase(self._dc_task)
+        self._reader_task = _paranoid_task_erase(self._reader_task)
+        self._writer_task = _paranoid_task_erase(self._writer_task)
+        self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
+
+        self._reader = None
+        self._writer = None
+        self._accepted = None
+
+        # NB: _runstate_changed cannot be cleared because we still need it to
+        # send the final runstate changed event ...!
+
+    # ----------------------------
+    # Section: Bottom Half methods
+    # ----------------------------
+
+    @bottom_half
+    async def _bh_disconnect(self) -> None:
+        """
+        Disconnect and cancel all outstanding tasks.
+
+        It is designed to be called from its task context,
+        :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
+        it is free to wait on any pending actions that may still need to
+        occur in either the reader or writer tasks.
+        """
+        assert self.runstate == Runstate.DISCONNECTING
+
+        def _done(task: Optional['asyncio.Future[Any]']) -> bool:
+            return task is not None and task.done()
+
+        # If the server is running, stop it.
+        await self._stop_server()
+
+        # Are we already in an error pathway? If either of the tasks are
+        # already done, or if we have no tasks but a reader/writer; we
+        # must be.
+        #
+        # NB: We can't use _bh_tasks to check for premature task
+        # completion, because it may not yet have had a chance to run
+        # and gather itself.
+        tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
+        error_pathway = _done(self._reader_task) or _done(self._writer_task)
+        if not tasks:
+            error_pathway |= bool(self._reader) or bool(self._writer)
+
+        try:
+            # Try to flush the writer, if possible.
+            # This *may* cause an error and force us over into the error path.
+            if not error_pathway:
+                await self._bh_flush_writer()
+        except BaseException as err:
+            error_pathway = True
+            emsg = "Failed to flush the writer"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise
+        finally:
+            # Cancel any still-running tasks (Won't raise):
+            if self._writer_task is not None and not self._writer_task.done():
+                self.logger.debug("Cancelling writer task.")
+                self._writer_task.cancel()
+            if self._reader_task is not None and not self._reader_task.done():
+                self.logger.debug("Cancelling reader task.")
+                self._reader_task.cancel()
+
+            # Close out the tasks entirely (Won't raise):
+            if tasks:
+                self.logger.debug("Waiting for tasks to complete ...")
+                await asyncio.wait(tasks)
+
+            # Lastly, close the stream itself. (*May raise*!):
+            await self._bh_close_stream(error_pathway)
+            self.logger.debug("Disconnected.")
+
+    @bottom_half
+    async def _bh_flush_writer(self) -> None:
+        if not self._writer_task:
+            return
+
+        self.logger.debug("Draining the outbound queue ...")
+        await self._outgoing.join()
+        if self._writer is not None:
+            self.logger.debug("Flushing the StreamWriter ...")
+            await flush(self._writer)
+
+    @bottom_half
+    async def _bh_close_stream(self, error_pathway: bool = False) -> None:
+        # NB: Closing the writer also implcitly closes the reader.
+        if not self._writer:
+            return
+
+        if not is_closing(self._writer):
+            self.logger.debug("Closing StreamWriter.")
+            self._writer.close()
+
+        self.logger.debug("Waiting for StreamWriter to close ...")
+        try:
+            await wait_closed(self._writer)
+        except Exception:  # pylint: disable=broad-except
+            # It's hard to tell if the Stream is already closed or
+            # not. Even if one of the tasks has failed, it may have
+            # failed for a higher-layered protocol reason. The
+            # stream could still be open and perfectly fine.
+            # I don't know how to discern its health here.
+
+            if error_pathway:
+                # We already know that *something* went wrong. Let's
+                # just trust that the Exception we already have is the
+                # better one to present to the user, even if we don't
+                # genuinely *know* the relationship between the two.
+                self.logger.debug(
+                    "Discarding Exception from wait_closed:\n%s\n",
+                    pretty_traceback(),
+                )
+            else:
+                # Oops, this is a brand-new error!
+                raise
+        finally:
+            self.logger.debug("StreamWriter closed.")
+
+    @bottom_half
+    async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
+        """
+        Run one of the bottom-half methods in a loop forever.
+
+        If the bottom half ever raises any exception, schedule a
+        disconnect that will terminate the entire loop.
+
+        :param async_fn: The bottom-half method to run in a loop.
+        :param name: The name of this task, used for logging.
+        """
+        try:
+            while True:
+                await async_fn()
+        except asyncio.CancelledError:
+            # We have been cancelled by _bh_disconnect, exit gracefully.
+            self.logger.debug("Task.%s: cancelled.", name)
+            return
+        except BaseException as err:
+            self.logger.log(
+                logging.INFO if isinstance(err, EOFError) else logging.ERROR,
+                "Task.%s: %s",
+                name, exception_summary(err)
+            )
+            self.logger.debug("Task.%s: failure:\n%s\n",
+                              name, pretty_traceback())
+            self._schedule_disconnect()
+            raise
+        finally:
+            self.logger.debug("Task.%s: exiting.", name)
+
+    @bottom_half
+    async def _bh_send_message(self) -> None:
+        """
+        Wait for an outgoing message, then send it.
+
+        Designed to be run in `_bh_loop_forever()`.
+        """
+        msg = await self._outgoing.get()
+        try:
+            await self._send(msg)
+        finally:
+            self._outgoing.task_done()
+
+    @bottom_half
+    async def _bh_recv_message(self) -> None:
+        """
+        Wait for an incoming message and call `_on_message` to route it.
+
+        Designed to be run in `_bh_loop_forever()`.
+        """
+        msg = await self._recv()
+        await self._on_message(msg)
+
+    # --------------------
+    # Section: Message I/O
+    # --------------------
+
+    @upper_half
+    @bottom_half
+    def _cb_outbound(self, msg: T) -> T:
+        """
+        Callback: outbound message hook.
+
+        This is intended for subclasses to be able to add arbitrary
+        hooks to filter or manipulate outgoing messages. The base
+        implementation does nothing but log the message without any
+        manipulation of the message.
+
+        :param msg: raw outbound message
+        :return: final outbound message
+        """
+        self.logger.debug("--> %s", str(msg))
+        return msg
+
+    @upper_half
+    @bottom_half
+    def _cb_inbound(self, msg: T) -> T:
+        """
+        Callback: inbound message hook.
+
+        This is intended for subclasses to be able to add arbitrary
+        hooks to filter or manipulate incoming messages. The base
+        implementation does nothing but log the message without any
+        manipulation of the message.
+
+        This method does not "handle" incoming messages; it is a filter.
+        The actual "endpoint" for incoming messages is `_on_message()`.
+
+        :param msg: raw inbound message
+        :return: processed inbound message
+        """
+        self.logger.debug("<-- %s", str(msg))
+        return msg
+
+    @upper_half
+    @bottom_half
+    async def _readline(self) -> bytes:
+        """
+        Wait for a newline from the incoming reader.
+
+        This method is provided as a convenience for upper-layer
+        protocols, as many are line-based.
+
+        This method *may* return a sequence of bytes without a trailing
+        newline if EOF occurs, but *some* bytes were received. In this
+        case, the next call will raise `EOFError`. It is assumed that
+        the layer 5 protocol will decide if there is anything meaningful
+        to be done with a partial message.
+
+        :raise OSError: For stream-related errors.
+        :raise EOFError:
+            If the reader stream is at EOF and there are no bytes to return.
+        :return: bytes, including the newline.
+        """
+        assert self._reader is not None
+        msg_bytes = await self._reader.readline()
+
+        if not msg_bytes:
+            if self._reader.at_eof():
+                raise EOFError
+
+        return msg_bytes
+
+    @upper_half
+    @bottom_half
+    async def _do_recv(self) -> T:
+        """
+        Abstract: Read from the stream and return a message.
+
+        Very low-level; intended to only be called by `_recv()`.
+        """
+        raise NotImplementedError
+
+    @upper_half
+    @bottom_half
+    async def _recv(self) -> T:
+        """
+        Read an arbitrary protocol message.
+
+        .. warning::
+            This method is intended primarily for `_bh_recv_message()`
+            to use in an asynchronous task loop. Using it outside of
+            this loop will "steal" messages from the normal routing
+            mechanism. It is safe to use prior to `_establish_session()`,
+            but should not be used otherwise.
+
+        This method uses `_do_recv()` to retrieve the raw message, and
+        then transforms it using `_cb_inbound()`.
+
+        :return: A single (filtered, processed) protocol message.
+        """
+        message = await self._do_recv()
+        return self._cb_inbound(message)
+
+    @upper_half
+    @bottom_half
+    def _do_send(self, msg: T) -> None:
+        """
+        Abstract: Write a message to the stream.
+
+        Very low-level; intended to only be called by `_send()`.
+        """
+        raise NotImplementedError
+
+    @upper_half
+    @bottom_half
+    async def _send(self, msg: T) -> None:
+        """
+        Send an arbitrary protocol message.
+
+        This method will transform any outgoing messages according to
+        `_cb_outbound()`.
+
+        .. warning::
+            Like `_recv()`, this method is intended to be called by
+            the writer task loop that processes outgoing
+            messages. Calling it directly may circumvent logic
+            implemented by the caller meant to correlate outgoing and
+            incoming messages.
+
+        :raise OSError: For problems with the underlying stream.
+        """
+        msg = self._cb_outbound(msg)
+        self._do_send(msg)
+
+    @bottom_half
+    async def _on_message(self, msg: T) -> None:
+        """
+        Called to handle the receipt of a new message.
+
+        .. caution::
+            This is executed from within the reader loop, so be advised
+            that waiting on either the reader or writer task will lead
+            to deadlock. Additionally, any unhandled exceptions will
+            directly cause the loop to halt, so logic may be best-kept
+            to a minimum if at all possible.
+
+        :param msg: The incoming message, already logged/filtered.
+        """
+        # Nothing to do in the abstract case.
diff --git a/python/qemu/qmp/py.typed b/python/qemu/qmp/py.typed
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/python/qemu/qmp/py.typed
diff --git a/python/qemu/qmp/qmp_client.py b/python/qemu/qmp/qmp_client.py
new file mode 100644
index 0000000000..5dcda04a75
--- /dev/null
+++ b/python/qemu/qmp/qmp_client.py
@@ -0,0 +1,655 @@
+"""
+QMP Protocol Implementation
+
+This module provides the `QMPClient` class, which can be used to connect
+and send commands to a QMP server such as QEMU. The QMP class can be
+used to either connect to a listening server, or used to listen and
+accept an incoming connection from that server.
+"""
+
+import asyncio
+import logging
+import socket
+import struct
+from typing import (
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Union,
+    cast,
+)
+
+from .error import ProtocolError, QMPError
+from .events import Events
+from .message import Message
+from .models import ErrorResponse, Greeting
+from .protocol import AsyncProtocol, Runstate, require
+from .util import (
+    bottom_half,
+    exception_summary,
+    pretty_traceback,
+    upper_half,
+)
+
+
+class _WrappedProtocolError(ProtocolError):
+    """
+    Abstract exception class for Protocol errors that wrap an Exception.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+    def __init__(self, error_message: str, exc: Exception):
+        super().__init__(error_message)
+        self.exc = exc
+
+    def __str__(self) -> str:
+        return f"{self.error_message}: {self.exc!s}"
+
+
+class GreetingError(_WrappedProtocolError):
+    """
+    An exception occurred during the Greeting phase.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+
+
+class NegotiationError(_WrappedProtocolError):
+    """
+    An exception occurred during the Negotiation phase.
+
+    :param error_message: Human-readable string describing the error.
+    :param exc: The root-cause exception.
+    """
+
+
+class ExecuteError(QMPError):
+    """
+    Exception raised by `QMPClient.execute()` on RPC failure.
+
+    :param error_response: The RPC error response object.
+    :param sent: The sent RPC message that caused the failure.
+    :param received: The raw RPC error reply received.
+    """
+    def __init__(self, error_response: ErrorResponse,
+                 sent: Message, received: Message):
+        super().__init__(error_response.error.desc)
+        #: The sent `Message` that caused the failure
+        self.sent: Message = sent
+        #: The received `Message` that indicated failure
+        self.received: Message = received
+        #: The parsed error response
+        self.error: ErrorResponse = error_response
+        #: The QMP error class
+        self.error_class: str = error_response.error.class_
+
+
+class ExecInterruptedError(QMPError):
+    """
+    Exception raised by `execute()` (et al) when an RPC is interrupted.
+
+    This error is raised when an `execute()` statement could not be
+    completed.  This can occur because the connection itself was
+    terminated before a reply was received.
+
+    The true cause of the interruption will be available via `disconnect()`.
+    """
+
+
+class _MsgProtocolError(ProtocolError):
+    """
+    Abstract error class for protocol errors that have a `Message` object.
+
+    This Exception class is used for protocol errors where the `Message`
+    was mechanically understood, but was found to be inappropriate or
+    malformed.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+    def __init__(self, error_message: str, msg: Message):
+        super().__init__(error_message)
+        #: The received `Message` that caused the error.
+        self.msg: Message = msg
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  Message was: {str(self.msg)}\n",
+        ])
+
+
+class ServerParseError(_MsgProtocolError):
+    """
+    The Server sent a `Message` indicating parsing failure.
+
+    i.e. A reply has arrived from the server, but it is missing the "ID"
+    field, indicating a parsing error.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+
+
+class BadReplyError(_MsgProtocolError):
+    """
+    An execution reply was successfully routed, but not understood.
+
+    If a QMP message is received with an 'id' field to allow it to be
+    routed, but is otherwise malformed, this exception will be raised.
+
+    A reply message is malformed if it is missing either the 'return' or
+    'error' keys, or if the 'error' value has missing keys or members of
+    the wrong type.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The malformed reply that was received.
+    :param sent: The message that was sent that prompted the error.
+    """
+    def __init__(self, error_message: str, msg: Message, sent: Message):
+        super().__init__(error_message, msg)
+        #: The sent `Message` that caused the failure
+        self.sent = sent
+
+
+class QMPClient(AsyncProtocol[Message], Events):
+    """
+    Implements a QMP client connection.
+
+    QMP can be used to establish a connection as either the transport
+    client or server, though this class always acts as the QMP client.
+
+    :param name: Optional nickname for the connection, used for logging.
+
+    Basic script-style usage looks like this::
+
+      qmp = QMPClient('my_virtual_machine_name')
+      await qmp.connect(('127.0.0.1', 1234))
+      ...
+      res = await qmp.execute('block-query')
+      ...
+      await qmp.disconnect()
+
+    Basic async client-style usage looks like this::
+
+      class Client:
+          def __init__(self, name: str):
+              self.qmp = QMPClient(name)
+
+          async def watch_events(self):
+              try:
+                  async for event in self.qmp.events:
+                      print(f"Event: {event['event']}")
+              except asyncio.CancelledError:
+                  return
+
+          async def run(self, address='/tmp/qemu.socket'):
+              await self.qmp.connect(address)
+              asyncio.create_task(self.watch_events())
+              await self.qmp.runstate_changed.wait()
+              await self.disconnect()
+
+    See `qmp.events` for more detail on event handling patterns.
+    """
+    #: Logger object used for debugging messages.
+    logger = logging.getLogger(__name__)
+
+    # Read buffer limit; large enough to accept query-qmp-schema
+    _limit = (256 * 1024)
+
+    # Type alias for pending execute() result items
+    _PendingT = Union[Message, ExecInterruptedError]
+
+    def __init__(self, name: Optional[str] = None) -> None:
+        super().__init__(name)
+        Events.__init__(self)
+
+        #: Whether or not to await a greeting after establishing a connection.
+        self.await_greeting: bool = True
+
+        #: Whether or not to perform capabilities negotiation upon connection.
+        #: Implies `await_greeting`.
+        self.negotiate: bool = True
+
+        # Cached Greeting, if one was awaited.
+        self._greeting: Optional[Greeting] = None
+
+        # Command ID counter
+        self._execute_id = 0
+
+        # Incoming RPC reply messages.
+        self._pending: Dict[
+            Union[str, None],
+            'asyncio.Queue[QMPClient._PendingT]'
+        ] = {}
+
+    @property
+    def greeting(self) -> Optional[Greeting]:
+        """The `Greeting` from the QMP server, if any."""
+        return self._greeting
+
+    @upper_half
+    async def _establish_session(self) -> None:
+        """
+        Initiate the QMP session.
+
+        Wait for the QMP greeting and perform capabilities negotiation.
+
+        :raise GreetingError: When the greeting is not understood.
+        :raise NegotiationError: If the negotiation fails.
+        :raise EOFError: When the server unexpectedly hangs up.
+        :raise OSError: For underlying stream errors.
+        """
+        self._greeting = None
+        self._pending = {}
+
+        if self.await_greeting or self.negotiate:
+            self._greeting = await self._get_greeting()
+
+        if self.negotiate:
+            await self._negotiate()
+
+        # This will start the reader/writers:
+        await super()._establish_session()
+
+    @upper_half
+    async def _get_greeting(self) -> Greeting:
+        """
+        :raise GreetingError: When the greeting is not understood.
+        :raise EOFError: When the server unexpectedly hangs up.
+        :raise OSError: For underlying stream errors.
+
+        :return: the Greeting object given by the server.
+        """
+        self.logger.debug("Awaiting greeting ...")
+
+        try:
+            msg = await self._recv()
+            return Greeting(msg)
+        except (ProtocolError, KeyError, TypeError) as err:
+            emsg = "Did not understand Greeting"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise GreetingError(emsg, err) from err
+        except BaseException as err:
+            # EOFError, OSError, or something unexpected.
+            emsg = "Failed to receive Greeting"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise
+
+    @upper_half
+    async def _negotiate(self) -> None:
+        """
+        Perform QMP capabilities negotiation.
+
+        :raise NegotiationError: When negotiation fails.
+        :raise EOFError: When the server unexpectedly hangs up.
+        :raise OSError: For underlying stream errors.
+        """
+        self.logger.debug("Negotiating capabilities ...")
+
+        arguments: Dict[str, List[str]] = {}
+        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+            arguments.setdefault('enable', []).append('oob')
+        msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
+
+        # It's not safe to use execute() here, because the reader/writers
+        # aren't running. AsyncProtocol *requires* that a new session
+        # does not fail after the reader/writers are running!
+        try:
+            await self._send(msg)
+            reply = await self._recv()
+            assert 'return' in reply
+            assert 'error' not in reply
+        except (ProtocolError, AssertionError) as err:
+            emsg = "Negotiation failed"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise NegotiationError(emsg, err) from err
+        except BaseException as err:
+            # EOFError, OSError, or something unexpected.
+            emsg = "Negotiation failed"
+            self.logger.error("%s: %s", emsg, exception_summary(err))
+            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+            raise
+
+    @bottom_half
+    async def _bh_disconnect(self) -> None:
+        try:
+            await super()._bh_disconnect()
+        finally:
+            if self._pending:
+                self.logger.debug("Cancelling pending executions")
+            keys = self._pending.keys()
+            for key in keys:
+                self.logger.debug("Cancelling execution '%s'", key)
+                self._pending[key].put_nowait(
+                    ExecInterruptedError("Disconnected")
+                )
+
+            self.logger.debug("QMP Disconnected.")
+
+    @upper_half
+    def _cleanup(self) -> None:
+        super()._cleanup()
+        assert not self._pending
+
+    @bottom_half
+    async def _on_message(self, msg: Message) -> None:
+        """
+        Add an incoming message to the appropriate queue/handler.
+
+        :raise ServerParseError: When Message indicates server parse failure.
+        """
+        # Incoming messages are not fully parsed/validated here;
+        # do only light peeking to know how to route the messages.
+
+        if 'event' in msg:
+            await self._event_dispatch(msg)
+            return
+
+        # Below, we assume everything left is an execute/exec-oob response.
+
+        exec_id = cast(Optional[str], msg.get('id'))
+
+        if exec_id in self._pending:
+            await self._pending[exec_id].put(msg)
+            return
+
+        # We have a message we can't route back to a caller.
+
+        is_error = 'error' in msg
+        has_id = 'id' in msg
+
+        if is_error and not has_id:
+            # This is very likely a server parsing error.
+            # It doesn't inherently belong to any pending execution.
+            # Instead of performing clever recovery, just terminate.
+            # See "NOTE" in qmp-spec.txt, section 2.4.2
+            raise ServerParseError(
+                ("Server sent an error response without an ID, "
+                 "but there are no ID-less executions pending. "
+                 "Assuming this is a server parser failure."),
+                msg
+            )
+
+        # qmp-spec.txt, section 2.4:
+        # 'Clients should drop all the responses
+        # that have an unknown "id" field.'
+        self.logger.log(
+            logging.ERROR if is_error else logging.WARNING,
+            "Unknown ID '%s', message dropped.",
+            exec_id,
+        )
+        self.logger.debug("Unroutable message: %s", str(msg))
+
+    @upper_half
+    @bottom_half
+    async def _do_recv(self) -> Message:
+        """
+        :raise OSError: When a stream error is encountered.
+        :raise EOFError: When the stream is at EOF.
+        :raise ProtocolError:
+            When the Message is not understood.
+            See also `Message._deserialize`.
+
+        :return: A single QMP `Message`.
+        """
+        msg_bytes = await self._readline()
+        msg = Message(msg_bytes, eager=True)
+        return msg
+
+    @upper_half
+    @bottom_half
+    def _do_send(self, msg: Message) -> None:
+        """
+        :raise ValueError: JSON serialization failure
+        :raise TypeError: JSON serialization failure
+        :raise OSError: When a stream error is encountered.
+        """
+        assert self._writer is not None
+        self._writer.write(bytes(msg))
+
+    @upper_half
+    def _get_exec_id(self) -> str:
+        exec_id = f"__qmp#{self._execute_id:05d}"
+        self._execute_id += 1
+        return exec_id
+
+    @upper_half
+    async def _issue(self, msg: Message) -> Union[None, str]:
+        """
+        Issue a QMP `Message` and do not wait for a reply.
+
+        :param msg: The QMP `Message` to send to the server.
+
+        :return: The ID of the `Message` sent.
+        """
+        msg_id: Optional[str] = None
+        if 'id' in msg:
+            assert isinstance(msg['id'], str)
+            msg_id = msg['id']
+
+        self._pending[msg_id] = asyncio.Queue(maxsize=1)
+        try:
+            await self._outgoing.put(msg)
+        except:
+            del self._pending[msg_id]
+            raise
+
+        return msg_id
+
+    @upper_half
+    async def _reply(self, msg_id: Union[str, None]) -> Message:
+        """
+        Await a reply to a previously issued QMP message.
+
+        :param msg_id: The ID of the previously issued message.
+
+        :return: The reply from the server.
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        """
+        queue = self._pending[msg_id]
+
+        try:
+            result = await queue.get()
+            if isinstance(result, ExecInterruptedError):
+                raise result
+            return result
+        finally:
+            del self._pending[msg_id]
+
+    @upper_half
+    async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
+        """
+        Send a QMP `Message` to the server and await a reply.
+
+        This method *assumes* you are sending some kind of an execute
+        statement that *will* receive a reply.
+
+        An execution ID will be assigned if assign_id is `True`. It can be
+        disabled, but this requires that an ID is manually assigned
+        instead. For manually assigned IDs, you must not use the string
+        '__qmp#' anywhere in the ID.
+
+        :param msg: The QMP `Message` to execute.
+        :param assign_id: If True, assign a new execution ID.
+
+        :return: Execution reply from the server.
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        """
+        if assign_id:
+            msg['id'] = self._get_exec_id()
+        elif 'id' in msg:
+            assert isinstance(msg['id'], str)
+            assert '__qmp#' not in msg['id']
+
+        exec_id = await self._issue(msg)
+        return await self._reply(exec_id)
+
+    @upper_half
+    @require(Runstate.RUNNING)
+    async def _raw(
+            self,
+            msg: Union[Message, Mapping[str, object], bytes],
+            assign_id: bool = True,
+    ) -> Message:
+        """
+        Issue a raw `Message` to the QMP server and await a reply.
+
+        :param msg:
+            A Message to send to the server. It may be a `Message`, any
+            Mapping (including Dict), or raw bytes.
+        :param assign_id:
+            Assign an arbitrary execution ID to this message. If
+            `False`, the existing id must either be absent (and no other
+            such pending execution may omit an ID) or a string. If it is
+            a string, it must not start with '__qmp#' and no other such
+            pending execution may currently be using that ID.
+
+        :return: Execution reply from the server.
+
+        :raise ExecInterruptedError:
+            When the reply could not be retrieved because the connection
+            was lost, or some other problem.
+        :raise TypeError:
+            When assign_id is `False`, an ID is given, and it is not a string.
+        :raise ValueError:
+            When assign_id is `False`, but the ID is not usable;
+            Either because it starts with '__qmp#' or it is already in-use.
+        """
+        # 1. convert generic Mapping or bytes to a QMP Message
+        # 2. copy Message objects so that we assign an ID only to the copy.
+        msg = Message(msg)
+
+        exec_id = msg.get('id')
+        if not assign_id and 'id' in msg:
+            if not isinstance(exec_id, str):
+                raise TypeError(f"ID ('{exec_id}') must be a string.")
+            if exec_id.startswith('__qmp#'):
+                raise ValueError(
+                    f"ID ('{exec_id}') must not start with '__qmp#'."
+                )
+
+        if not assign_id and exec_id in self._pending:
+            raise ValueError(
+                f"ID '{exec_id}' is in-use and cannot be used."
+            )
+
+        return await self._execute(msg, assign_id=assign_id)
+
+    @upper_half
+    @require(Runstate.RUNNING)
+    async def execute_msg(self, msg: Message) -> object:
+        """
+        Execute a QMP command and return its value.
+
+        :param msg: The QMP `Message` to execute.
+
+        :return:
+            The command execution return value from the server. The type of
+            object returned depends on the command that was issued,
+            though most in QEMU return a `dict`.
+        :raise ValueError:
+            If the QMP `Message` does not have either the 'execute' or
+            'exec-oob' fields set.
+        :raise ExecuteError: When the server returns an error response.
+        :raise ExecInterruptedError: if the connection was terminated early.
+        """
+        if not ('execute' in msg or 'exec-oob' in msg):
+            raise ValueError("Requires 'execute' or 'exec-oob' message")
+
+        # Copy the Message so that the ID assigned by _execute() is
+        # local to this method; allowing the ID to be seen in raised
+        # Exceptions but without modifying the caller's held copy.
+        msg = Message(msg)
+        reply = await self._execute(msg)
+
+        if 'error' in reply:
+            try:
+                error_response = ErrorResponse(reply)
+            except (KeyError, TypeError) as err:
+                # Error response was malformed.
+                raise BadReplyError(
+                    "QMP error reply is malformed", reply, msg,
+                ) from err
+
+            raise ExecuteError(error_response, msg, reply)
+
+        if 'return' not in reply:
+            raise BadReplyError(
+                "QMP reply is missing a 'error' or 'return' member",
+                reply, msg,
+            )
+
+        return reply['return']
+
+    @classmethod
+    def make_execute_msg(cls, cmd: str,
+                         arguments: Optional[Mapping[str, object]] = None,
+                         oob: bool = False) -> Message:
+        """
+        Create an executable message to be sent by `execute_msg` later.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If `True`, execute "out of band".
+
+        :return: An executable QMP `Message`.
+        """
+        msg = Message({'exec-oob' if oob else 'execute': cmd})
+        if arguments is not None:
+            msg['arguments'] = arguments
+        return msg
+
+    @upper_half
+    async def execute(self, cmd: str,
+                      arguments: Optional[Mapping[str, object]] = None,
+                      oob: bool = False) -> object:
+        """
+        Execute a QMP command and return its value.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If `True`, execute "out of band".
+
+        :return:
+            The command execution return value from the server. The type of
+            object returned depends on the command that was issued,
+            though most in QEMU return a `dict`.
+        :raise ExecuteError: When the server returns an error response.
+        :raise ExecInterruptedError: if the connection was terminated early.
+        """
+        msg = self.make_execute_msg(cmd, arguments, oob=oob)
+        return await self.execute_msg(msg)
+
+    @upper_half
+    @require(Runstate.RUNNING)
+    def send_fd_scm(self, fd: int) -> None:
+        """
+        Send a file descriptor to the remote via SCM_RIGHTS.
+        """
+        assert self._writer is not None
+        sock = self._writer.transport.get_extra_info('socket')
+
+        if sock.family != socket.AF_UNIX:
+            raise QMPError("Sending file descriptors requires a UNIX socket.")
+
+        if not hasattr(sock, 'sendmsg'):
+            # We need to void the warranty sticker.
+            # Access to sendmsg is scheduled for removal in Python 3.11.
+            # Find the real backing socket to use it anyway.
+            sock = sock._sock  # pylint: disable=protected-access
+
+        sock.sendmsg(
+            [b' '],
+            [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
+        )
diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py
new file mode 100644
index 0000000000..619ab42ced
--- /dev/null
+++ b/python/qemu/qmp/qmp_shell.py
@@ -0,0 +1,610 @@
+#
+# Copyright (C) 2009-2022 Red Hat Inc.
+#
+# Authors:
+#  Luiz Capitulino <lcapitulino@redhat.com>
+#  John Snow <jsnow@redhat.com>
+#
+# This work is licensed under the terms of the GNU LGPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+#
+
+"""
+Low-level QEMU shell on top of QMP.
+
+usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
+
+positional arguments:
+  qmp_server            < UNIX socket path | TCP address:port >
+
+optional arguments:
+  -h, --help            show this help message and exit
+  -H, --hmp             Use HMP interface
+  -N, --skip-negotiation
+                        Skip negotiate (for qemu-ga)
+  -v, --verbose         Verbose (echo commands sent and received)
+  -p, --pretty          Pretty-print JSON
+
+
+Start QEMU with:
+
+# qemu [...] -qmp unix:./qmp-sock,server
+
+Run the shell:
+
+$ qmp-shell ./qmp-sock
+
+Commands have the following format:
+
+   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+
+For example:
+
+(QEMU) device_add driver=e1000 id=net1
+{'return': {}}
+(QEMU)
+
+key=value pairs also support Python or JSON object literal subset notations,
+without spaces. Dictionaries/objects {} are supported as are arrays [].
+
+   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
+
+Both JSON and Python formatting should work, including both styles of
+string literal quotes. Both paradigms of literal values should work,
+including null/true/false for JSON and None/True/False for Python.
+
+
+Transactions have the following multi-line format:
+
+   transaction(
+   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+   ...
+   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+   )
+
+One line transactions are also supported:
+
+   transaction( action-name1 ... )
+
+For example:
+
+    (QEMU) transaction(
+    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
+    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
+    TRANS> )
+    {"return": {}}
+    (QEMU)
+
+Use the -v and -p options to activate the verbose and pretty-print options,
+which will echo back the properly formatted JSON-compliant QMP that is being
+sent to QEMU, which is useful for debugging and documentation generation.
+"""
+
+import argparse
+import ast
+import json
+import logging
+import os
+import re
+import readline
+from subprocess import Popen
+import sys
+from typing import (
+    IO,
+    Iterator,
+    List,
+    NoReturn,
+    Optional,
+    Sequence,
+)
+
+from qemu.qmp import ConnectError, QMPError, SocketAddrT
+from qemu.qmp.legacy import (
+    QEMUMonitorProtocol,
+    QMPBadPortError,
+    QMPMessage,
+    QMPObject,
+)
+
+
+LOG = logging.getLogger(__name__)
+
+
+class QMPCompleter:
+    """
+    QMPCompleter provides a readline library tab-complete behavior.
+    """
+    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
+    # but pylint as of today does not know that List[str] is simply 'list'.
+    def __init__(self) -> None:
+        self._matches: List[str] = []
+
+    def append(self, value: str) -> None:
+        """Append a new valid completion to the list of possibilities."""
+        return self._matches.append(value)
+
+    def complete(self, text: str, state: int) -> Optional[str]:
+        """readline.set_completer() callback implementation."""
+        for cmd in self._matches:
+            if cmd.startswith(text):
+                if state == 0:
+                    return cmd
+                state -= 1
+        return None
+
+
+class QMPShellError(QMPError):
+    """
+    QMP Shell Base error class.
+    """
+
+
+class FuzzyJSON(ast.NodeTransformer):
+    """
+    This extension of ast.NodeTransformer filters literal "true/false/null"
+    values in a Python AST and replaces them by proper "True/False/None" values
+    that Python can properly evaluate.
+    """
+
+    @classmethod
+    def visit_Name(cls,  # pylint: disable=invalid-name
+                   node: ast.Name) -> ast.AST:
+        """
+        Transform Name nodes with certain values into Constant (keyword) nodes.
+        """
+        if node.id == 'true':
+            return ast.Constant(value=True)
+        if node.id == 'false':
+            return ast.Constant(value=False)
+        if node.id == 'null':
+            return ast.Constant(value=None)
+        return node
+
+
+class QMPShell(QEMUMonitorProtocol):
+    """
+    QMPShell provides a basic readline-based QMP shell.
+
+    :param address: Address of the QMP server.
+    :param pretty: Pretty-print QMP messages.
+    :param verbose: Echo outgoing QMP messages to console.
+    """
+    def __init__(self, address: SocketAddrT,
+                 pretty: bool = False,
+                 verbose: bool = False,
+                 server: bool = False,
+                 logfile: Optional[str] = None):
+        super().__init__(address, server=server)
+        self._greeting: Optional[QMPMessage] = None
+        self._completer = QMPCompleter()
+        self._transmode = False
+        self._actions: List[QMPMessage] = []
+        self._histfile = os.path.join(os.path.expanduser('~'),
+                                      '.qmp-shell_history')
+        self.pretty = pretty
+        self.verbose = verbose
+        self.logfile = None
+
+        if logfile is not None:
+            self.logfile = open(logfile, "w", encoding='utf-8')
+
+    def close(self) -> None:
+        # Hook into context manager of parent to save shell history.
+        self._save_history()
+        super().close()
+
+    def _fill_completion(self) -> None:
+        cmds = self.cmd('query-commands')
+        if 'error' in cmds:
+            return
+        for cmd in cmds['return']:
+            self._completer.append(cmd['name'])
+
+    def _completer_setup(self) -> None:
+        self._completer = QMPCompleter()
+        self._fill_completion()
+        readline.set_history_length(1024)
+        readline.set_completer(self._completer.complete)
+        readline.parse_and_bind("tab: complete")
+        # NB: default delimiters conflict with some command names
+        # (eg. query-), clearing everything as it doesn't seem to matter
+        readline.set_completer_delims('')
+        try:
+            readline.read_history_file(self._histfile)
+        except FileNotFoundError:
+            pass
+        except IOError as err:
+            msg = f"Failed to read history '{self._histfile}': {err!s}"
+            LOG.warning(msg)
+
+    def _save_history(self) -> None:
+        try:
+            readline.write_history_file(self._histfile)
+        except IOError as err:
+            msg = f"Failed to save history file '{self._histfile}': {err!s}"
+            LOG.warning(msg)
+
+    @classmethod
+    def _parse_value(cls, val: str) -> object:
+        try:
+            return int(val)
+        except ValueError:
+            pass
+
+        if val.lower() == 'true':
+            return True
+        if val.lower() == 'false':
+            return False
+        if val.startswith(('{', '[')):
+            # Try first as pure JSON:
+            try:
+                return json.loads(val)
+            except ValueError:
+                pass
+            # Try once again as FuzzyJSON:
+            try:
+                tree = ast.parse(val, mode='eval')
+                transformed = FuzzyJSON().visit(tree)
+                return ast.literal_eval(transformed)
+            except (SyntaxError, ValueError):
+                pass
+        return val
+
+    def _cli_expr(self,
+                  tokens: Sequence[str],
+                  parent: QMPObject) -> None:
+        for arg in tokens:
+            (key, sep, val) = arg.partition('=')
+            if sep != '=':
+                raise QMPShellError(
+                    f"Expected a key=value pair, got '{arg!s}'"
+                )
+
+            value = self._parse_value(val)
+            optpath = key.split('.')
+            curpath = []
+            for path in optpath[:-1]:
+                curpath.append(path)
+                obj = parent.get(path, {})
+                if not isinstance(obj, dict):
+                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+                    raise QMPShellError(msg.format('.'.join(curpath)))
+                parent[path] = obj
+                parent = obj
+            if optpath[-1] in parent:
+                if isinstance(parent[optpath[-1]], dict):
+                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+                    raise QMPShellError(msg.format('.'.join(curpath)))
+                raise QMPShellError(f'Cannot set "{key}" multiple times')
+            parent[optpath[-1]] = value
+
+    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
+        """
+        Build a QMP input object from a user provided command-line in the
+        following format:
+
+            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+        """
+        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
+        cmdargs = re.findall(argument_regex, cmdline)
+        qmpcmd: QMPMessage
+
+        # Transactional CLI entry:
+        if cmdargs and cmdargs[0] == 'transaction(':
+            self._transmode = True
+            self._actions = []
+            cmdargs.pop(0)
+
+        # Transactional CLI exit:
+        if cmdargs and cmdargs[0] == ')' and self._transmode:
+            self._transmode = False
+            if len(cmdargs) > 1:
+                msg = 'Unexpected input after close of Transaction sub-shell'
+                raise QMPShellError(msg)
+            qmpcmd = {
+                'execute': 'transaction',
+                'arguments': {'actions': self._actions}
+            }
+            return qmpcmd
+
+        # No args, or no args remaining
+        if not cmdargs:
+            return None
+
+        if self._transmode:
+            # Parse and cache this Transactional Action
+            finalize = False
+            action = {'type': cmdargs[0], 'data': {}}
+            if cmdargs[-1] == ')':
+                cmdargs.pop(-1)
+                finalize = True
+            self._cli_expr(cmdargs[1:], action['data'])
+            self._actions.append(action)
+            return self._build_cmd(')') if finalize else None
+
+        # Standard command: parse and return it to be executed.
+        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
+        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
+        return qmpcmd
+
+    def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None:
+        jsobj = json.dumps(qmp_message,
+                           indent=4 if self.pretty else None,
+                           sort_keys=self.pretty)
+        print(str(jsobj), file=fh)
+
+    def _execute_cmd(self, cmdline: str) -> bool:
+        try:
+            qmpcmd = self._build_cmd(cmdline)
+        except QMPShellError as err:
+            print(
+                f"Error while parsing command line: {err!s}\n"
+                "command format: <command-name> "
+                "[arg-name1=arg1] ... [arg-nameN=argN",
+                file=sys.stderr
+            )
+            return True
+        # For transaction mode, we may have just cached the action:
+        if qmpcmd is None:
+            return True
+        if self.verbose:
+            self._print(qmpcmd)
+        resp = self.cmd_obj(qmpcmd)
+        if resp is None:
+            print('Disconnected')
+            return False
+        self._print(resp)
+        if self.logfile is not None:
+            cmd = {**qmpcmd, **resp}
+            self._print(cmd, fh=self.logfile)
+        return True
+
+    def connect(self, negotiate: bool = True) -> None:
+        self._greeting = super().connect(negotiate)
+        self._completer_setup()
+
+    def show_banner(self,
+                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
+        """
+        Print to stdio a greeting, and the QEMU version if available.
+        """
+        print(msg)
+        if not self._greeting:
+            print('Connected')
+            return
+        version = self._greeting['QMP']['version']['qemu']
+        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
+
+    @property
+    def prompt(self) -> str:
+        """
+        Return the current shell prompt, including a trailing space.
+        """
+        if self._transmode:
+            return 'TRANS> '
+        return '(QEMU) '
+
+    def read_exec_command(self) -> bool:
+        """
+        Read and execute a command.
+
+        @return True if execution was ok, return False if disconnected.
+        """
+        try:
+            cmdline = input(self.prompt)
+        except EOFError:
+            print()
+            return False
+
+        if cmdline == '':
+            for event in self.get_events():
+                print(event)
+            return True
+
+        return self._execute_cmd(cmdline)
+
+    def repl(self) -> Iterator[None]:
+        """
+        Return an iterator that implements the REPL.
+        """
+        self.show_banner()
+        while self.read_exec_command():
+            yield
+        self.close()
+
+
+class HMPShell(QMPShell):
+    """
+    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
+
+    :param address: Address of the QMP server.
+    :param pretty: Pretty-print QMP messages.
+    :param verbose: Echo outgoing QMP messages to console.
+    """
+    def __init__(self, address: SocketAddrT,
+                 pretty: bool = False,
+                 verbose: bool = False,
+                 server: bool = False,
+                 logfile: Optional[str] = None):
+        super().__init__(address, pretty, verbose, server, logfile)
+        self._cpu_index = 0
+
+    def _cmd_completion(self) -> None:
+        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
+            if cmd and cmd[0] != '[' and cmd[0] != '\t':
+                name = cmd.split()[0]  # drop help text
+                if name == 'info':
+                    continue
+                if name.find('|') != -1:
+                    # Command in the form 'foobar|f' or 'f|foobar', take the
+                    # full name
+                    opt = name.split('|')
+                    if len(opt[0]) == 1:
+                        name = opt[1]
+                    else:
+                        name = opt[0]
+                self._completer.append(name)
+                self._completer.append('help ' + name)  # help completion
+
+    def _info_completion(self) -> None:
+        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
+            if cmd:
+                self._completer.append('info ' + cmd.split()[1])
+
+    def _other_completion(self) -> None:
+        # special cases
+        self._completer.append('help info')
+
+    def _fill_completion(self) -> None:
+        self._cmd_completion()
+        self._info_completion()
+        self._other_completion()
+
+    def _cmd_passthrough(self, cmdline: str,
+                         cpu_index: int = 0) -> QMPMessage:
+        return self.cmd_obj({
+            'execute': 'human-monitor-command',
+            'arguments': {
+                'command-line': cmdline,
+                'cpu-index': cpu_index
+            }
+        })
+
+    def _execute_cmd(self, cmdline: str) -> bool:
+        if cmdline.split()[0] == "cpu":
+            # trap the cpu command, it requires special setting
+            try:
+                idx = int(cmdline.split()[1])
+                if 'return' not in self._cmd_passthrough('info version', idx):
+                    print('bad CPU index')
+                    return True
+                self._cpu_index = idx
+            except ValueError:
+                print('cpu command takes an integer argument')
+                return True
+        resp = self._cmd_passthrough(cmdline, self._cpu_index)
+        if resp is None:
+            print('Disconnected')
+            return False
+        assert 'return' in resp or 'error' in resp
+        if 'return' in resp:
+            # Success
+            if len(resp['return']) > 0:
+                print(resp['return'], end=' ')
+        else:
+            # Error
+            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
+        return True
+
+    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
+        QMPShell.show_banner(self, msg)
+
+
+def die(msg: str) -> NoReturn:
+    """Write an error to stderr, then exit with a return code of 1."""
+    sys.stderr.write('ERROR: %s\n' % msg)
+    sys.exit(1)
+
+
+def main() -> None:
+    """
+    qmp-shell entry point: parse command line arguments and start the REPL.
+    """
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-H', '--hmp', action='store_true',
+                        help='Use HMP interface')
+    parser.add_argument('-N', '--skip-negotiation', action='store_true',
+                        help='Skip negotiate (for qemu-ga)')
+    parser.add_argument('-v', '--verbose', action='store_true',
+                        help='Verbose (echo commands sent and received)')
+    parser.add_argument('-p', '--pretty', action='store_true',
+                        help='Pretty-print JSON')
+    parser.add_argument('-l', '--logfile',
+                        help='Save log of all QMP messages to PATH')
+
+    default_server = os.environ.get('QMP_SOCKET')
+    parser.add_argument('qmp_server', action='store',
+                        default=default_server,
+                        help='< UNIX socket path | TCP address:port >')
+
+    args = parser.parse_args()
+    if args.qmp_server is None:
+        parser.error("QMP socket or TCP address must be specified")
+
+    shell_class = HMPShell if args.hmp else QMPShell
+
+    try:
+        address = shell_class.parse_address(args.qmp_server)
+    except QMPBadPortError:
+        parser.error(f"Bad port number: {args.qmp_server}")
+        return  # pycharm doesn't know error() is noreturn
+
+    with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu:
+        try:
+            qemu.connect(negotiate=not args.skip_negotiation)
+        except ConnectError as err:
+            if isinstance(err.exc, OSError):
+                die(f"Couldn't connect to {args.qmp_server}: {err!s}")
+            die(str(err))
+
+        for _ in qemu.repl():
+            pass
+
+
+def main_wrap() -> None:
+    """
+    qmp-shell-wrap entry point: parse command line arguments and
+    start the REPL.
+    """
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-H', '--hmp', action='store_true',
+                        help='Use HMP interface')
+    parser.add_argument('-v', '--verbose', action='store_true',
+                        help='Verbose (echo commands sent and received)')
+    parser.add_argument('-p', '--pretty', action='store_true',
+                        help='Pretty-print JSON')
+    parser.add_argument('-l', '--logfile',
+                        help='Save log of all QMP messages to PATH')
+
+    parser.add_argument('command', nargs=argparse.REMAINDER,
+                        help='QEMU command line to invoke')
+
+    args = parser.parse_args()
+
+    cmd = args.command
+    if len(cmd) != 0 and cmd[0] == '--':
+        cmd = cmd[1:]
+    if len(cmd) == 0:
+        cmd = ["qemu-system-x86_64"]
+
+    sockpath = "qmp-shell-wrap-%d" % os.getpid()
+    cmd += ["-qmp", "unix:%s" % sockpath]
+
+    shell_class = HMPShell if args.hmp else QMPShell
+
+    try:
+        address = shell_class.parse_address(sockpath)
+    except QMPBadPortError:
+        parser.error(f"Bad port number: {sockpath}")
+        return  # pycharm doesn't know error() is noreturn
+
+    try:
+        with shell_class(address, args.pretty, args.verbose,
+                         True, args.logfile) as qemu:
+            with Popen(cmd):
+
+                try:
+                    qemu.accept()
+                except ConnectError as err:
+                    if isinstance(err.exc, OSError):
+                        die(f"Couldn't connect to {args.qmp_server}: {err!s}")
+                    die(str(err))
+
+                for _ in qemu.repl():
+                    pass
+    finally:
+        os.unlink(sockpath)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/python/qemu/qmp/util.py b/python/qemu/qmp/util.py
new file mode 100644
index 0000000000..eaa5fc7d5f
--- /dev/null
+++ b/python/qemu/qmp/util.py
@@ -0,0 +1,217 @@
+"""
+Miscellaneous Utilities
+
+This module provides asyncio utilities and compatibility wrappers for
+Python 3.6 to provide some features that otherwise become available in
+Python 3.7+.
+
+Various logging and debugging utilities are also provided, such as
+`exception_summary()` and `pretty_traceback()`, used primarily for
+adding information into the logging stream.
+"""
+
+import asyncio
+import sys
+import traceback
+from typing import (
+    Any,
+    Coroutine,
+    Optional,
+    TypeVar,
+    cast,
+)
+
+
+T = TypeVar('T')
+
+
+# --------------------------
+# Section: Utility Functions
+# --------------------------
+
+
+async def flush(writer: asyncio.StreamWriter) -> None:
+    """
+    Utility function to ensure a StreamWriter is *fully* drained.
+
+    `asyncio.StreamWriter.drain` only promises we will return to below
+    the "high-water mark". This function ensures we flush the entire
+    buffer -- by setting the high water mark to 0 and then calling
+    drain. The flow control limits are restored after the call is
+    completed.
+    """
+    transport = cast(asyncio.WriteTransport, writer.transport)
+
+    # https://github.com/python/typeshed/issues/5779
+    low, high = transport.get_write_buffer_limits()  # type: ignore
+    transport.set_write_buffer_limits(0, 0)
+    try:
+        await writer.drain()
+    finally:
+        transport.set_write_buffer_limits(high, low)
+
+
+def upper_half(func: T) -> T:
+    """
+    Do-nothing decorator that annotates a method as an "upper-half" method.
+
+    These methods must not call bottom-half functions directly, but can
+    schedule them to run.
+    """
+    return func
+
+
+def bottom_half(func: T) -> T:
+    """
+    Do-nothing decorator that annotates a method as a "bottom-half" method.
+
+    These methods must take great care to handle their own exceptions whenever
+    possible. If they go unhandled, they will cause termination of the loop.
+
+    These methods do not, in general, have the ability to directly
+    report information to a caller’s context and will usually be
+    collected as a Task result instead.
+
+    They must not call upper-half functions directly.
+    """
+    return func
+
+
+# -------------------------------
+# Section: Compatibility Wrappers
+# -------------------------------
+
+
+def create_task(coro: Coroutine[Any, Any, T],
+                loop: Optional[asyncio.AbstractEventLoop] = None
+                ) -> 'asyncio.Future[T]':
+    """
+    Python 3.6-compatible `asyncio.create_task` wrapper.
+
+    :param coro: The coroutine to execute in a task.
+    :param loop: Optionally, the loop to create the task in.
+
+    :return: An `asyncio.Future` object.
+    """
+    if sys.version_info >= (3, 7):
+        if loop is not None:
+            return loop.create_task(coro)
+        return asyncio.create_task(coro)  # pylint: disable=no-member
+
+    # Python 3.6:
+    return asyncio.ensure_future(coro, loop=loop)
+
+
+def is_closing(writer: asyncio.StreamWriter) -> bool:
+    """
+    Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
+
+    :param writer: The `asyncio.StreamWriter` object.
+    :return: `True` if the writer is closing, or closed.
+    """
+    if sys.version_info >= (3, 7):
+        return writer.is_closing()
+
+    # Python 3.6:
+    transport = writer.transport
+    assert isinstance(transport, asyncio.WriteTransport)
+    return transport.is_closing()
+
+
+async def wait_closed(writer: asyncio.StreamWriter) -> None:
+    """
+    Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
+
+    :param writer: The `asyncio.StreamWriter` to wait on.
+    """
+    if sys.version_info >= (3, 7):
+        await writer.wait_closed()
+        return
+
+    # Python 3.6
+    transport = writer.transport
+    assert isinstance(transport, asyncio.WriteTransport)
+
+    while not transport.is_closing():
+        await asyncio.sleep(0)
+
+    # This is an ugly workaround, but it's the best I can come up with.
+    sock = transport.get_extra_info('socket')
+
+    if sock is None:
+        # Our transport doesn't have a socket? ...
+        # Nothing we can reasonably do.
+        return
+
+    while sock.fileno() != -1:
+        await asyncio.sleep(0)
+
+
+def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
+    """
+    Python 3.6-compatible `asyncio.run` wrapper.
+
+    :param coro: A coroutine to execute now.
+    :return: The return value from the coroutine.
+    """
+    if sys.version_info >= (3, 7):
+        return asyncio.run(coro, debug=debug)
+
+    # Python 3.6
+    loop = asyncio.get_event_loop()
+    loop.set_debug(debug)
+    ret = loop.run_until_complete(coro)
+    loop.close()
+
+    return ret
+
+
+# ----------------------------
+# Section: Logging & Debugging
+# ----------------------------
+
+
+def exception_summary(exc: BaseException) -> str:
+    """
+    Return a summary string of an arbitrary exception.
+
+    It will be of the form "ExceptionType: Error Message", if the error
+    string is non-empty, and just "ExceptionType" otherwise.
+    """
+    name = type(exc).__qualname__
+    smod = type(exc).__module__
+    if smod not in ("__main__", "builtins"):
+        name = smod + '.' + name
+
+    error = str(exc)
+    if error:
+        return f"{name}: {error}"
+    return name
+
+
+def pretty_traceback(prefix: str = "  | ") -> str:
+    """
+    Formats the current traceback, indented to provide visual distinction.
+
+    This is useful for printing a traceback within a traceback for
+    debugging purposes when encapsulating errors to deliver them up the
+    stack; when those errors are printed, this helps provide a nice
+    visual grouping to quickly identify the parts of the error that
+    belong to the inner exception.
+
+    :param prefix: The prefix to append to each line of the traceback.
+    :return: A string, formatted something like the following::
+
+      | Traceback (most recent call last):
+      |   File "foobar.py", line 42, in arbitrary_example
+      |     foo.baz()
+      | ArbitraryError: [Errno 42] Something bad happened!
+    """
+    output = "".join(traceback.format_exception(*sys.exc_info()))
+
+    exc_lines = []
+    for line in output.split('\n'):
+        exc_lines.append(prefix + line)
+
+    # The last line is always empty, omit it
+    return "\n".join(exc_lines[:-1])