summary refs log tree commit diff stats
path: root/python/qemu/aqmp/qmp_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/aqmp/qmp_client.py')
-rw-r--r--python/qemu/aqmp/qmp_client.py655
1 files changed, 0 insertions, 655 deletions
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
deleted file mode 100644
index 90a8737f03..0000000000
--- a/python/qemu/aqmp/qmp_client.py
+++ /dev/null
@@ -1,655 +0,0 @@
-"""
-QMP Protocol Implementation
-
-This module provides the `QMPClient` class, which can be used to connect
-and send commands to a QMP server such as QEMU. The QMP class can be
-used to either connect to a listening server, or used to listen and
-accept an incoming connection from that server.
-"""
-
-import asyncio
-import logging
-import socket
-import struct
-from typing import (
-    Dict,
-    List,
-    Mapping,
-    Optional,
-    Union,
-    cast,
-)
-
-from .error import ProtocolError, QMPError
-from .events import Events
-from .message import Message
-from .models import ErrorResponse, Greeting
-from .protocol import AsyncProtocol, Runstate, require
-from .util import (
-    bottom_half,
-    exception_summary,
-    pretty_traceback,
-    upper_half,
-)
-
-
-class _WrappedProtocolError(ProtocolError):
-    """
-    Abstract exception class for Protocol errors that wrap an Exception.
-
-    :param error_message: Human-readable string describing the error.
-    :param exc: The root-cause exception.
-    """
-    def __init__(self, error_message: str, exc: Exception):
-        super().__init__(error_message)
-        self.exc = exc
-
-    def __str__(self) -> str:
-        return f"{self.error_message}: {self.exc!s}"
-
-
-class GreetingError(_WrappedProtocolError):
-    """
-    An exception occurred during the Greeting phase.
-
-    :param error_message: Human-readable string describing the error.
-    :param exc: The root-cause exception.
-    """
-
-
-class NegotiationError(_WrappedProtocolError):
-    """
-    An exception occurred during the Negotiation phase.
-
-    :param error_message: Human-readable string describing the error.
-    :param exc: The root-cause exception.
-    """
-
-
-class ExecuteError(QMPError):
-    """
-    Exception raised by `QMPClient.execute()` on RPC failure.
-
-    :param error_response: The RPC error response object.
-    :param sent: The sent RPC message that caused the failure.
-    :param received: The raw RPC error reply received.
-    """
-    def __init__(self, error_response: ErrorResponse,
-                 sent: Message, received: Message):
-        super().__init__(error_response.error.desc)
-        #: The sent `Message` that caused the failure
-        self.sent: Message = sent
-        #: The received `Message` that indicated failure
-        self.received: Message = received
-        #: The parsed error response
-        self.error: ErrorResponse = error_response
-        #: The QMP error class
-        self.error_class: str = error_response.error.class_
-
-
-class ExecInterruptedError(QMPError):
-    """
-    Exception raised by `execute()` (et al) when an RPC is interrupted.
-
-    This error is raised when an `execute()` statement could not be
-    completed.  This can occur because the connection itself was
-    terminated before a reply was received.
-
-    The true cause of the interruption will be available via `disconnect()`.
-    """
-
-
-class _MsgProtocolError(ProtocolError):
-    """
-    Abstract error class for protocol errors that have a `Message` object.
-
-    This Exception class is used for protocol errors where the `Message`
-    was mechanically understood, but was found to be inappropriate or
-    malformed.
-
-    :param error_message: Human-readable string describing the error.
-    :param msg: The QMP `Message` that caused the error.
-    """
-    def __init__(self, error_message: str, msg: Message):
-        super().__init__(error_message)
-        #: The received `Message` that caused the error.
-        self.msg: Message = msg
-
-    def __str__(self) -> str:
-        return "\n".join([
-            super().__str__(),
-            f"  Message was: {str(self.msg)}\n",
-        ])
-
-
-class ServerParseError(_MsgProtocolError):
-    """
-    The Server sent a `Message` indicating parsing failure.
-
-    i.e. A reply has arrived from the server, but it is missing the "ID"
-    field, indicating a parsing error.
-
-    :param error_message: Human-readable string describing the error.
-    :param msg: The QMP `Message` that caused the error.
-    """
-
-
-class BadReplyError(_MsgProtocolError):
-    """
-    An execution reply was successfully routed, but not understood.
-
-    If a QMP message is received with an 'id' field to allow it to be
-    routed, but is otherwise malformed, this exception will be raised.
-
-    A reply message is malformed if it is missing either the 'return' or
-    'error' keys, or if the 'error' value has missing keys or members of
-    the wrong type.
-
-    :param error_message: Human-readable string describing the error.
-    :param msg: The malformed reply that was received.
-    :param sent: The message that was sent that prompted the error.
-    """
-    def __init__(self, error_message: str, msg: Message, sent: Message):
-        super().__init__(error_message, msg)
-        #: The sent `Message` that caused the failure
-        self.sent = sent
-
-
-class QMPClient(AsyncProtocol[Message], Events):
-    """
-    Implements a QMP client connection.
-
-    QMP can be used to establish a connection as either the transport
-    client or server, though this class always acts as the QMP client.
-
-    :param name: Optional nickname for the connection, used for logging.
-
-    Basic script-style usage looks like this::
-
-      qmp = QMPClient('my_virtual_machine_name')
-      await qmp.connect(('127.0.0.1', 1234))
-      ...
-      res = await qmp.execute('block-query')
-      ...
-      await qmp.disconnect()
-
-    Basic async client-style usage looks like this::
-
-      class Client:
-          def __init__(self, name: str):
-              self.qmp = QMPClient(name)
-
-          async def watch_events(self):
-              try:
-                  async for event in self.qmp.events:
-                      print(f"Event: {event['event']}")
-              except asyncio.CancelledError:
-                  return
-
-          async def run(self, address='/tmp/qemu.socket'):
-              await self.qmp.connect(address)
-              asyncio.create_task(self.watch_events())
-              await self.qmp.runstate_changed.wait()
-              await self.disconnect()
-
-    See `aqmp.events` for more detail on event handling patterns.
-    """
-    #: Logger object used for debugging messages.
-    logger = logging.getLogger(__name__)
-
-    # Read buffer limit; large enough to accept query-qmp-schema
-    _limit = (256 * 1024)
-
-    # Type alias for pending execute() result items
-    _PendingT = Union[Message, ExecInterruptedError]
-
-    def __init__(self, name: Optional[str] = None) -> None:
-        super().__init__(name)
-        Events.__init__(self)
-
-        #: Whether or not to await a greeting after establishing a connection.
-        self.await_greeting: bool = True
-
-        #: Whether or not to perform capabilities negotiation upon connection.
-        #: Implies `await_greeting`.
-        self.negotiate: bool = True
-
-        # Cached Greeting, if one was awaited.
-        self._greeting: Optional[Greeting] = None
-
-        # Command ID counter
-        self._execute_id = 0
-
-        # Incoming RPC reply messages.
-        self._pending: Dict[
-            Union[str, None],
-            'asyncio.Queue[QMPClient._PendingT]'
-        ] = {}
-
-    @property
-    def greeting(self) -> Optional[Greeting]:
-        """The `Greeting` from the QMP server, if any."""
-        return self._greeting
-
-    @upper_half
-    async def _establish_session(self) -> None:
-        """
-        Initiate the QMP session.
-
-        Wait for the QMP greeting and perform capabilities negotiation.
-
-        :raise GreetingError: When the greeting is not understood.
-        :raise NegotiationError: If the negotiation fails.
-        :raise EOFError: When the server unexpectedly hangs up.
-        :raise OSError: For underlying stream errors.
-        """
-        self._greeting = None
-        self._pending = {}
-
-        if self.await_greeting or self.negotiate:
-            self._greeting = await self._get_greeting()
-
-        if self.negotiate:
-            await self._negotiate()
-
-        # This will start the reader/writers:
-        await super()._establish_session()
-
-    @upper_half
-    async def _get_greeting(self) -> Greeting:
-        """
-        :raise GreetingError: When the greeting is not understood.
-        :raise EOFError: When the server unexpectedly hangs up.
-        :raise OSError: For underlying stream errors.
-
-        :return: the Greeting object given by the server.
-        """
-        self.logger.debug("Awaiting greeting ...")
-
-        try:
-            msg = await self._recv()
-            return Greeting(msg)
-        except (ProtocolError, KeyError, TypeError) as err:
-            emsg = "Did not understand Greeting"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise GreetingError(emsg, err) from err
-        except BaseException as err:
-            # EOFError, OSError, or something unexpected.
-            emsg = "Failed to receive Greeting"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise
-
-    @upper_half
-    async def _negotiate(self) -> None:
-        """
-        Perform QMP capabilities negotiation.
-
-        :raise NegotiationError: When negotiation fails.
-        :raise EOFError: When the server unexpectedly hangs up.
-        :raise OSError: For underlying stream errors.
-        """
-        self.logger.debug("Negotiating capabilities ...")
-
-        arguments: Dict[str, List[str]] = {}
-        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
-            arguments.setdefault('enable', []).append('oob')
-        msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
-
-        # It's not safe to use execute() here, because the reader/writers
-        # aren't running. AsyncProtocol *requires* that a new session
-        # does not fail after the reader/writers are running!
-        try:
-            await self._send(msg)
-            reply = await self._recv()
-            assert 'return' in reply
-            assert 'error' not in reply
-        except (ProtocolError, AssertionError) as err:
-            emsg = "Negotiation failed"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise NegotiationError(emsg, err) from err
-        except BaseException as err:
-            # EOFError, OSError, or something unexpected.
-            emsg = "Negotiation failed"
-            self.logger.error("%s: %s", emsg, exception_summary(err))
-            self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
-            raise
-
-    @bottom_half
-    async def _bh_disconnect(self) -> None:
-        try:
-            await super()._bh_disconnect()
-        finally:
-            if self._pending:
-                self.logger.debug("Cancelling pending executions")
-            keys = self._pending.keys()
-            for key in keys:
-                self.logger.debug("Cancelling execution '%s'", key)
-                self._pending[key].put_nowait(
-                    ExecInterruptedError("Disconnected")
-                )
-
-            self.logger.debug("QMP Disconnected.")
-
-    @upper_half
-    def _cleanup(self) -> None:
-        super()._cleanup()
-        assert not self._pending
-
-    @bottom_half
-    async def _on_message(self, msg: Message) -> None:
-        """
-        Add an incoming message to the appropriate queue/handler.
-
-        :raise ServerParseError: When Message indicates server parse failure.
-        """
-        # Incoming messages are not fully parsed/validated here;
-        # do only light peeking to know how to route the messages.
-
-        if 'event' in msg:
-            await self._event_dispatch(msg)
-            return
-
-        # Below, we assume everything left is an execute/exec-oob response.
-
-        exec_id = cast(Optional[str], msg.get('id'))
-
-        if exec_id in self._pending:
-            await self._pending[exec_id].put(msg)
-            return
-
-        # We have a message we can't route back to a caller.
-
-        is_error = 'error' in msg
-        has_id = 'id' in msg
-
-        if is_error and not has_id:
-            # This is very likely a server parsing error.
-            # It doesn't inherently belong to any pending execution.
-            # Instead of performing clever recovery, just terminate.
-            # See "NOTE" in qmp-spec.txt, section 2.4.2
-            raise ServerParseError(
-                ("Server sent an error response without an ID, "
-                 "but there are no ID-less executions pending. "
-                 "Assuming this is a server parser failure."),
-                msg
-            )
-
-        # qmp-spec.txt, section 2.4:
-        # 'Clients should drop all the responses
-        # that have an unknown "id" field.'
-        self.logger.log(
-            logging.ERROR if is_error else logging.WARNING,
-            "Unknown ID '%s', message dropped.",
-            exec_id,
-        )
-        self.logger.debug("Unroutable message: %s", str(msg))
-
-    @upper_half
-    @bottom_half
-    async def _do_recv(self) -> Message:
-        """
-        :raise OSError: When a stream error is encountered.
-        :raise EOFError: When the stream is at EOF.
-        :raise ProtocolError:
-            When the Message is not understood.
-            See also `Message._deserialize`.
-
-        :return: A single QMP `Message`.
-        """
-        msg_bytes = await self._readline()
-        msg = Message(msg_bytes, eager=True)
-        return msg
-
-    @upper_half
-    @bottom_half
-    def _do_send(self, msg: Message) -> None:
-        """
-        :raise ValueError: JSON serialization failure
-        :raise TypeError: JSON serialization failure
-        :raise OSError: When a stream error is encountered.
-        """
-        assert self._writer is not None
-        self._writer.write(bytes(msg))
-
-    @upper_half
-    def _get_exec_id(self) -> str:
-        exec_id = f"__aqmp#{self._execute_id:05d}"
-        self._execute_id += 1
-        return exec_id
-
-    @upper_half
-    async def _issue(self, msg: Message) -> Union[None, str]:
-        """
-        Issue a QMP `Message` and do not wait for a reply.
-
-        :param msg: The QMP `Message` to send to the server.
-
-        :return: The ID of the `Message` sent.
-        """
-        msg_id: Optional[str] = None
-        if 'id' in msg:
-            assert isinstance(msg['id'], str)
-            msg_id = msg['id']
-
-        self._pending[msg_id] = asyncio.Queue(maxsize=1)
-        try:
-            await self._outgoing.put(msg)
-        except:
-            del self._pending[msg_id]
-            raise
-
-        return msg_id
-
-    @upper_half
-    async def _reply(self, msg_id: Union[str, None]) -> Message:
-        """
-        Await a reply to a previously issued QMP message.
-
-        :param msg_id: The ID of the previously issued message.
-
-        :return: The reply from the server.
-        :raise ExecInterruptedError:
-            When the reply could not be retrieved because the connection
-            was lost, or some other problem.
-        """
-        queue = self._pending[msg_id]
-
-        try:
-            result = await queue.get()
-            if isinstance(result, ExecInterruptedError):
-                raise result
-            return result
-        finally:
-            del self._pending[msg_id]
-
-    @upper_half
-    async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
-        """
-        Send a QMP `Message` to the server and await a reply.
-
-        This method *assumes* you are sending some kind of an execute
-        statement that *will* receive a reply.
-
-        An execution ID will be assigned if assign_id is `True`. It can be
-        disabled, but this requires that an ID is manually assigned
-        instead. For manually assigned IDs, you must not use the string
-        '__aqmp#' anywhere in the ID.
-
-        :param msg: The QMP `Message` to execute.
-        :param assign_id: If True, assign a new execution ID.
-
-        :return: Execution reply from the server.
-        :raise ExecInterruptedError:
-            When the reply could not be retrieved because the connection
-            was lost, or some other problem.
-        """
-        if assign_id:
-            msg['id'] = self._get_exec_id()
-        elif 'id' in msg:
-            assert isinstance(msg['id'], str)
-            assert '__aqmp#' not in msg['id']
-
-        exec_id = await self._issue(msg)
-        return await self._reply(exec_id)
-
-    @upper_half
-    @require(Runstate.RUNNING)
-    async def _raw(
-            self,
-            msg: Union[Message, Mapping[str, object], bytes],
-            assign_id: bool = True,
-    ) -> Message:
-        """
-        Issue a raw `Message` to the QMP server and await a reply.
-
-        :param msg:
-            A Message to send to the server. It may be a `Message`, any
-            Mapping (including Dict), or raw bytes.
-        :param assign_id:
-            Assign an arbitrary execution ID to this message. If
-            `False`, the existing id must either be absent (and no other
-            such pending execution may omit an ID) or a string. If it is
-            a string, it must not start with '__aqmp#' and no other such
-            pending execution may currently be using that ID.
-
-        :return: Execution reply from the server.
-
-        :raise ExecInterruptedError:
-            When the reply could not be retrieved because the connection
-            was lost, or some other problem.
-        :raise TypeError:
-            When assign_id is `False`, an ID is given, and it is not a string.
-        :raise ValueError:
-            When assign_id is `False`, but the ID is not usable;
-            Either because it starts with '__aqmp#' or it is already in-use.
-        """
-        # 1. convert generic Mapping or bytes to a QMP Message
-        # 2. copy Message objects so that we assign an ID only to the copy.
-        msg = Message(msg)
-
-        exec_id = msg.get('id')
-        if not assign_id and 'id' in msg:
-            if not isinstance(exec_id, str):
-                raise TypeError(f"ID ('{exec_id}') must be a string.")
-            if exec_id.startswith('__aqmp#'):
-                raise ValueError(
-                    f"ID ('{exec_id}') must not start with '__aqmp#'."
-                )
-
-        if not assign_id and exec_id in self._pending:
-            raise ValueError(
-                f"ID '{exec_id}' is in-use and cannot be used."
-            )
-
-        return await self._execute(msg, assign_id=assign_id)
-
-    @upper_half
-    @require(Runstate.RUNNING)
-    async def execute_msg(self, msg: Message) -> object:
-        """
-        Execute a QMP command and return its value.
-
-        :param msg: The QMP `Message` to execute.
-
-        :return:
-            The command execution return value from the server. The type of
-            object returned depends on the command that was issued,
-            though most in QEMU return a `dict`.
-        :raise ValueError:
-            If the QMP `Message` does not have either the 'execute' or
-            'exec-oob' fields set.
-        :raise ExecuteError: When the server returns an error response.
-        :raise ExecInterruptedError: if the connection was terminated early.
-        """
-        if not ('execute' in msg or 'exec-oob' in msg):
-            raise ValueError("Requires 'execute' or 'exec-oob' message")
-
-        # Copy the Message so that the ID assigned by _execute() is
-        # local to this method; allowing the ID to be seen in raised
-        # Exceptions but without modifying the caller's held copy.
-        msg = Message(msg)
-        reply = await self._execute(msg)
-
-        if 'error' in reply:
-            try:
-                error_response = ErrorResponse(reply)
-            except (KeyError, TypeError) as err:
-                # Error response was malformed.
-                raise BadReplyError(
-                    "QMP error reply is malformed", reply, msg,
-                ) from err
-
-            raise ExecuteError(error_response, msg, reply)
-
-        if 'return' not in reply:
-            raise BadReplyError(
-                "QMP reply is missing a 'error' or 'return' member",
-                reply, msg,
-            )
-
-        return reply['return']
-
-    @classmethod
-    def make_execute_msg(cls, cmd: str,
-                         arguments: Optional[Mapping[str, object]] = None,
-                         oob: bool = False) -> Message:
-        """
-        Create an executable message to be sent by `execute_msg` later.
-
-        :param cmd: QMP command name.
-        :param arguments: Arguments (if any). Must be JSON-serializable.
-        :param oob: If `True`, execute "out of band".
-
-        :return: An executable QMP `Message`.
-        """
-        msg = Message({'exec-oob' if oob else 'execute': cmd})
-        if arguments is not None:
-            msg['arguments'] = arguments
-        return msg
-
-    @upper_half
-    async def execute(self, cmd: str,
-                      arguments: Optional[Mapping[str, object]] = None,
-                      oob: bool = False) -> object:
-        """
-        Execute a QMP command and return its value.
-
-        :param cmd: QMP command name.
-        :param arguments: Arguments (if any). Must be JSON-serializable.
-        :param oob: If `True`, execute "out of band".
-
-        :return:
-            The command execution return value from the server. The type of
-            object returned depends on the command that was issued,
-            though most in QEMU return a `dict`.
-        :raise ExecuteError: When the server returns an error response.
-        :raise ExecInterruptedError: if the connection was terminated early.
-        """
-        msg = self.make_execute_msg(cmd, arguments, oob=oob)
-        return await self.execute_msg(msg)
-
-    @upper_half
-    @require(Runstate.RUNNING)
-    def send_fd_scm(self, fd: int) -> None:
-        """
-        Send a file descriptor to the remote via SCM_RIGHTS.
-        """
-        assert self._writer is not None
-        sock = self._writer.transport.get_extra_info('socket')
-
-        if sock.family != socket.AF_UNIX:
-            raise QMPError("Sending file descriptors requires a UNIX socket.")
-
-        if not hasattr(sock, 'sendmsg'):
-            # We need to void the warranty sticker.
-            # Access to sendmsg is scheduled for removal in Python 3.11.
-            # Find the real backing socket to use it anyway.
-            sock = sock._sock  # pylint: disable=protected-access
-
-        sock.sendmsg(
-            [b' '],
-            [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
-        )