summary refs log tree commit diff stats
path: root/python/qemu/aqmp/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/aqmp/legacy.py')
-rw-r--r--python/qemu/aqmp/legacy.py177
1 files changed, 0 insertions, 177 deletions
diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py
deleted file mode 100644
index 46026e9fdc..0000000000
--- a/python/qemu/aqmp/legacy.py
+++ /dev/null
@@ -1,177 +0,0 @@
-"""
-Sync QMP Wrapper
-
-This class pretends to be qemu.qmp.QEMUMonitorProtocol.
-"""
-
-import asyncio
-from typing import (
-    Any,
-    Awaitable,
-    Dict,
-    List,
-    Optional,
-    TypeVar,
-    Union,
-)
-
-import qemu.qmp
-
-from .error import QMPError
-from .protocol import Runstate, SocketAddrT
-from .qmp_client import QMPClient
-
-
-# (Temporarily) Re-export QMPBadPortError
-QMPBadPortError = qemu.qmp.QMPBadPortError
-
-#: QMPMessage is an entire QMP message of any kind.
-QMPMessage = Dict[str, Any]
-
-#: QMPReturnValue is the 'return' value of a command.
-QMPReturnValue = object
-
-#: QMPObject is any object in a QMP message.
-QMPObject = Dict[str, object]
-
-# QMPMessage can be outgoing commands or incoming events/returns.
-# QMPReturnValue is usually a dict/json object, but due to QAPI's
-# 'returns-whitelist', it can actually be anything.
-#
-# {'return': {}} is a QMPMessage,
-# {} is the QMPReturnValue.
-
-
-# pylint: disable=missing-docstring
-
-
-class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
-    def __init__(self, address: SocketAddrT,
-                 server: bool = False,
-                 nickname: Optional[str] = None):
-
-        # pylint: disable=super-init-not-called
-        self._aqmp = QMPClient(nickname)
-        self._aloop = asyncio.get_event_loop()
-        self._address = address
-        self._timeout: Optional[float] = None
-
-        if server:
-            self._sync(self._aqmp.start_server(self._address))
-
-    _T = TypeVar('_T')
-
-    def _sync(
-            self, future: Awaitable[_T], timeout: Optional[float] = None
-    ) -> _T:
-        return self._aloop.run_until_complete(
-            asyncio.wait_for(future, timeout=timeout)
-        )
-
-    def _get_greeting(self) -> Optional[QMPMessage]:
-        if self._aqmp.greeting is not None:
-            # pylint: disable=protected-access
-            return self._aqmp.greeting._asdict()
-        return None
-
-    # __enter__ and __exit__ need no changes
-    # parse_address needs no changes
-
-    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
-        self._aqmp.await_greeting = negotiate
-        self._aqmp.negotiate = negotiate
-
-        self._sync(
-            self._aqmp.connect(self._address)
-        )
-        return self._get_greeting()
-
-    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
-        self._aqmp.await_greeting = True
-        self._aqmp.negotiate = True
-
-        self._sync(self._aqmp.accept(), timeout)
-
-        ret = self._get_greeting()
-        assert ret is not None
-        return ret
-
-    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
-        return dict(
-            self._sync(
-                # pylint: disable=protected-access
-
-                # _raw() isn't a public API, because turning off
-                # automatic ID assignment is discouraged. For
-                # compatibility with iotests *only*, do it anyway.
-                self._aqmp._raw(qmp_cmd, assign_id=False),
-                self._timeout
-            )
-        )
-
-    # Default impl of cmd() delegates to cmd_obj
-
-    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
-        return self._sync(
-            self._aqmp.execute(cmd, kwds),
-            self._timeout
-        )
-
-    def pull_event(self,
-                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
-        if not wait:
-            # wait is False/0: "do not wait, do not except."
-            if self._aqmp.events.empty():
-                return None
-
-        # If wait is 'True', wait forever. If wait is False/0, the events
-        # queue must not be empty; but it still needs some real amount
-        # of time to complete.
-        timeout = None
-        if wait and isinstance(wait, float):
-            timeout = wait
-
-        return dict(
-            self._sync(
-                self._aqmp.events.get(),
-                timeout
-            )
-        )
-
-    def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
-        events = [dict(x) for x in self._aqmp.events.clear()]
-        if events:
-            return events
-
-        event = self.pull_event(wait)
-        return [event] if event is not None else []
-
-    def clear_events(self) -> None:
-        self._aqmp.events.clear()
-
-    def close(self) -> None:
-        self._sync(
-            self._aqmp.disconnect()
-        )
-
-    def settimeout(self, timeout: Optional[float]) -> None:
-        self._timeout = timeout
-
-    def send_fd_scm(self, fd: int) -> None:
-        self._aqmp.send_fd_scm(fd)
-
-    def __del__(self) -> None:
-        if self._aqmp.runstate == Runstate.IDLE:
-            return
-
-        if not self._aloop.is_running():
-            self.close()
-        else:
-            # Garbage collection ran while the event loop was running.
-            # Nothing we can do about it now, but if we don't raise our
-            # own error, the user will be treated to a lot of traceback
-            # they might not understand.
-            raise QMPError(
-                "QEMUMonitorProtocol.close()"
-                " was not called before object was garbage collected"
-            )