summary refs log tree commit diff stats
path: root/python/qemu/aqmp/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/aqmp/legacy.py')
-rw-r--r--python/qemu/aqmp/legacy.py138
1 files changed, 138 insertions, 0 deletions
diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py
new file mode 100644
index 0000000000..9e7b9fb80b
--- /dev/null
+++ b/python/qemu/aqmp/legacy.py
@@ -0,0 +1,138 @@
+"""
+Sync QMP Wrapper
+
+This class pretends to be qemu.qmp.QEMUMonitorProtocol.
+"""
+
+import asyncio
+from typing import (
+    Awaitable,
+    List,
+    Optional,
+    TypeVar,
+    Union,
+)
+
+import qemu.qmp
+from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
+
+from .qmp_client import QMPClient
+
+
+# pylint: disable=missing-docstring
+
+
+class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
+    def __init__(self, address: SocketAddrT,
+                 server: bool = False,
+                 nickname: Optional[str] = None):
+
+        # pylint: disable=super-init-not-called
+        self._aqmp = QMPClient(nickname)
+        self._aloop = asyncio.get_event_loop()
+        self._address = address
+        self._timeout: Optional[float] = None
+
+    _T = TypeVar('_T')
+
+    def _sync(
+            self, future: Awaitable[_T], timeout: Optional[float] = None
+    ) -> _T:
+        return self._aloop.run_until_complete(
+            asyncio.wait_for(future, timeout=timeout)
+        )
+
+    def _get_greeting(self) -> Optional[QMPMessage]:
+        if self._aqmp.greeting is not None:
+            # pylint: disable=protected-access
+            return self._aqmp.greeting._asdict()
+        return None
+
+    # __enter__ and __exit__ need no changes
+    # parse_address needs no changes
+
+    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
+        self._aqmp.await_greeting = negotiate
+        self._aqmp.negotiate = negotiate
+
+        self._sync(
+            self._aqmp.connect(self._address)
+        )
+        return self._get_greeting()
+
+    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
+        self._aqmp.await_greeting = True
+        self._aqmp.negotiate = True
+
+        self._sync(
+            self._aqmp.accept(self._address),
+            timeout
+        )
+
+        ret = self._get_greeting()
+        assert ret is not None
+        return ret
+
+    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
+        return dict(
+            self._sync(
+                # pylint: disable=protected-access
+
+                # _raw() isn't a public API, because turning off
+                # automatic ID assignment is discouraged. For
+                # compatibility with iotests *only*, do it anyway.
+                self._aqmp._raw(qmp_cmd, assign_id=False),
+                self._timeout
+            )
+        )
+
+    # Default impl of cmd() delegates to cmd_obj
+
+    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
+        return self._sync(
+            self._aqmp.execute(cmd, kwds),
+            self._timeout
+        )
+
+    def pull_event(self,
+                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
+        if not wait:
+            # wait is False/0: "do not wait, do not except."
+            if self._aqmp.events.empty():
+                return None
+
+        # If wait is 'True', wait forever. If wait is False/0, the events
+        # queue must not be empty; but it still needs some real amount
+        # of time to complete.
+        timeout = None
+        if wait and isinstance(wait, float):
+            timeout = wait
+
+        return dict(
+            self._sync(
+                self._aqmp.events.get(),
+                timeout
+            )
+        )
+
+    def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
+        events = [dict(x) for x in self._aqmp.events.clear()]
+        if events:
+            return events
+
+        event = self.pull_event(wait)
+        return [event] if event is not None else []
+
+    def clear_events(self) -> None:
+        self._aqmp.events.clear()
+
+    def close(self) -> None:
+        self._sync(
+            self._aqmp.disconnect()
+        )
+
+    def settimeout(self, timeout: Optional[float]) -> None:
+        self._timeout = timeout
+
+    def send_fd_scm(self, fd: int) -> None:
+        self._aqmp.send_fd_scm(fd)