summary refs log tree commit diff stats
path: root/python/qemu
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu')
-rw-r--r--python/qemu/qmp/__init__.py59
-rw-r--r--python/qemu/qmp/qemu_ga_client.py323
-rw-r--r--python/qemu/qmp/qmp_shell.py535
-rw-r--r--python/qemu/qmp/qom.py272
-rw-r--r--python/qemu/qmp/qom_common.py178
-rw-r--r--python/qemu/qmp/qom_fuse.py206
6 files changed, 1562 insertions, 11 deletions
diff --git a/python/qemu/qmp/__init__.py b/python/qemu/qmp/__init__.py
index 9606248a3d..376954cb6d 100644
--- a/python/qemu/qmp/__init__.py
+++ b/python/qemu/qmp/__init__.py
@@ -30,21 +30,30 @@ from typing import (
     TextIO,
     Tuple,
     Type,
+    TypeVar,
     Union,
     cast,
 )
 
 
-# QMPMessage is a QMP Message of any kind.
-# e.g. {'yee': 'haw'}
+#: QMPMessage is an entire QMP message of any kind.
+QMPMessage = Dict[str, Any]
+
+#: QMPReturnValue is the 'return' value of a command.
+QMPReturnValue = object
+
+#: QMPObject is any object in a QMP message.
+QMPObject = Dict[str, object]
+
+# QMPMessage can be outgoing commands or incoming events/returns.
+# QMPReturnValue is usually a dict/json object, but due to QAPI's
+# 'returns-whitelist', it can actually be anything.
 #
-# QMPReturnValue is the inner value of return values only.
-# {'return': {}} is the QMPMessage,
+# {'return': {}} is a QMPMessage,
 # {} is the QMPReturnValue.
-QMPMessage = Dict[str, Any]
-QMPReturnValue = Dict[str, Any]
 
-InternetAddrT = Tuple[str, str]
+
+InternetAddrT = Tuple[str, int]
 UnixAddrT = str
 SocketAddrT = Union[InternetAddrT, UnixAddrT]
 
@@ -92,6 +101,12 @@ class QMPResponseError(QMPError):
         self.reply = reply
 
 
+class QMPBadPortError(QMPError):
+    """
+    Unable to parse socket address: Port was non-numerical.
+    """
+
+
 class QEMUMonitorProtocol:
     """
     Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
@@ -206,7 +221,9 @@ class QEMUMonitorProtocol:
             if ret is None:
                 raise QMPConnectError("Error while reading from socket")
 
-    def __enter__(self) -> 'QEMUMonitorProtocol':
+    T = TypeVar('T')
+
+    def __enter__(self: T) -> T:
         # Implement context manager enter function.
         return self
 
@@ -219,6 +236,26 @@ class QEMUMonitorProtocol:
         # Implement context manager exit function.
         self.close()
 
+    @classmethod
+    def parse_address(cls, address: str) -> SocketAddrT:
+        """
+        Parse a string into a QMP address.
+
+        Figure out if the argument is in the port:host form.
+        If it's not, it's probably a file path.
+        """
+        components = address.split(':')
+        if len(components) == 2:
+            try:
+                port = int(components[1])
+            except ValueError:
+                msg = f"Bad port: '{components[1]}' in '{address}'."
+                raise QMPBadPortError(msg) from None
+            return (components[0], port)
+
+        # Treat as filepath.
+        return address
+
     def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
         """
         Connect to the QMP Monitor and perform capabilities negotiation.
@@ -271,8 +308,8 @@ class QEMUMonitorProtocol:
         return resp
 
     def cmd(self, name: str,
-            args: Optional[Dict[str, Any]] = None,
-            cmd_id: Optional[Any] = None) -> QMPMessage:
+            args: Optional[Dict[str, object]] = None,
+            cmd_id: Optional[object] = None) -> QMPMessage:
         """
         Build a QMP command and send it to the QMP Monitor.
 
@@ -287,7 +324,7 @@ class QEMUMonitorProtocol:
             qmp_cmd['id'] = cmd_id
         return self.cmd_obj(qmp_cmd)
 
-    def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
+    def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
         """
         Build and send a QMP command to the monitor, report errors if any
         """
diff --git a/python/qemu/qmp/qemu_ga_client.py b/python/qemu/qmp/qemu_ga_client.py
new file mode 100644
index 0000000000..67ac0b4211
--- /dev/null
+++ b/python/qemu/qmp/qemu_ga_client.py
@@ -0,0 +1,323 @@
+"""
+QEMU Guest Agent Client
+
+Usage:
+
+Start QEMU with:
+
+# qemu [...] -chardev socket,path=/tmp/qga.sock,server,wait=off,id=qga0 \
+  -device virtio-serial \
+  -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0
+
+Run the script:
+
+$ qemu-ga-client --address=/tmp/qga.sock <command> [args...]
+
+or
+
+$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock
+$ qemu-ga-client <command> [args...]
+
+For example:
+
+$ qemu-ga-client cat /etc/resolv.conf
+# Generated by NetworkManager
+nameserver 10.0.2.3
+$ qemu-ga-client fsfreeze status
+thawed
+$ qemu-ga-client fsfreeze freeze
+2 filesystems frozen
+
+See also: https://wiki.qemu.org/Features/QAPI/GuestAgent
+"""
+
+# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2.  See
+# the COPYING file in the top-level directory.
+
+import argparse
+import base64
+import errno
+import os
+import random
+import sys
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Optional,
+    Sequence,
+)
+
+from qemu import qmp
+from qemu.qmp import SocketAddrT
+
+
+# This script has not seen many patches or careful attention in quite
+# some time. If you would like to improve it, please review the design
+# carefully and add docstrings at that point in time. Until then:
+
+# pylint: disable=missing-docstring
+
+
+class QemuGuestAgent(qmp.QEMUMonitorProtocol):
+    def __getattr__(self, name: str) -> Callable[..., Any]:
+        def wrapper(**kwds: object) -> object:
+            return self.command('guest-' + name.replace('_', '-'), **kwds)
+        return wrapper
+
+
+class QemuGuestAgentClient:
+    def __init__(self, address: SocketAddrT):
+        self.qga = QemuGuestAgent(address)
+        self.qga.connect(negotiate=False)
+
+    def sync(self, timeout: Optional[float] = 3) -> None:
+        # Avoid being blocked forever
+        if not self.ping(timeout):
+            raise EnvironmentError('Agent seems not alive')
+        uid = random.randint(0, (1 << 32) - 1)
+        while True:
+            ret = self.qga.sync(id=uid)
+            if isinstance(ret, int) and int(ret) == uid:
+                break
+
+    def __file_read_all(self, handle: int) -> bytes:
+        eof = False
+        data = b''
+        while not eof:
+            ret = self.qga.file_read(handle=handle, count=1024)
+            _data = base64.b64decode(ret['buf-b64'])
+            data += _data
+            eof = ret['eof']
+        return data
+
+    def read(self, path: str) -> bytes:
+        handle = self.qga.file_open(path=path)
+        try:
+            data = self.__file_read_all(handle)
+        finally:
+            self.qga.file_close(handle=handle)
+        return data
+
+    def info(self) -> str:
+        info = self.qga.info()
+
+        msgs = []
+        msgs.append('version: ' + info['version'])
+        msgs.append('supported_commands:')
+        enabled = [c['name'] for c in info['supported_commands']
+                   if c['enabled']]
+        msgs.append('\tenabled: ' + ', '.join(enabled))
+        disabled = [c['name'] for c in info['supported_commands']
+                    if not c['enabled']]
+        msgs.append('\tdisabled: ' + ', '.join(disabled))
+
+        return '\n'.join(msgs)
+
+    @classmethod
+    def __gen_ipv4_netmask(cls, prefixlen: int) -> str:
+        mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2)
+        return '.'.join([str(mask >> 24),
+                         str((mask >> 16) & 0xff),
+                         str((mask >> 8) & 0xff),
+                         str(mask & 0xff)])
+
+    def ifconfig(self) -> str:
+        nifs = self.qga.network_get_interfaces()
+
+        msgs = []
+        for nif in nifs:
+            msgs.append(nif['name'] + ':')
+            if 'ip-addresses' in nif:
+                for ipaddr in nif['ip-addresses']:
+                    if ipaddr['ip-address-type'] == 'ipv4':
+                        addr = ipaddr['ip-address']
+                        mask = self.__gen_ipv4_netmask(int(ipaddr['prefix']))
+                        msgs.append(f"\tinet {addr}  netmask {mask}")
+                    elif ipaddr['ip-address-type'] == 'ipv6':
+                        addr = ipaddr['ip-address']
+                        prefix = ipaddr['prefix']
+                        msgs.append(f"\tinet6 {addr}  prefixlen {prefix}")
+            if nif['hardware-address'] != '00:00:00:00:00:00':
+                msgs.append("\tether " + nif['hardware-address'])
+
+        return '\n'.join(msgs)
+
+    def ping(self, timeout: Optional[float]) -> bool:
+        self.qga.settimeout(timeout)
+        try:
+            self.qga.ping()
+        except TimeoutError:
+            return False
+        return True
+
+    def fsfreeze(self, cmd: str) -> object:
+        if cmd not in ['status', 'freeze', 'thaw']:
+            raise Exception('Invalid command: ' + cmd)
+        # Can be int (freeze, thaw) or GuestFsfreezeStatus (status)
+        return getattr(self.qga, 'fsfreeze' + '_' + cmd)()
+
+    def fstrim(self, minimum: int) -> Dict[str, object]:
+        # returns GuestFilesystemTrimResponse
+        ret = getattr(self.qga, 'fstrim')(minimum=minimum)
+        assert isinstance(ret, dict)
+        return ret
+
+    def suspend(self, mode: str) -> None:
+        if mode not in ['disk', 'ram', 'hybrid']:
+            raise Exception('Invalid mode: ' + mode)
+
+        try:
+            getattr(self.qga, 'suspend' + '_' + mode)()
+            # On error exception will raise
+        except TimeoutError:
+            # On success command will timed out
+            return
+
+    def shutdown(self, mode: str = 'powerdown') -> None:
+        if mode not in ['powerdown', 'halt', 'reboot']:
+            raise Exception('Invalid mode: ' + mode)
+
+        try:
+            self.qga.shutdown(mode=mode)
+        except TimeoutError:
+            pass
+
+
+def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    if len(args) != 1:
+        print('Invalid argument')
+        print('Usage: cat <file>')
+        sys.exit(1)
+    print(client.read(args[0]))
+
+
+def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    usage = 'Usage: fsfreeze status|freeze|thaw'
+    if len(args) != 1:
+        print('Invalid argument')
+        print(usage)
+        sys.exit(1)
+    if args[0] not in ['status', 'freeze', 'thaw']:
+        print('Invalid command: ' + args[0])
+        print(usage)
+        sys.exit(1)
+    cmd = args[0]
+    ret = client.fsfreeze(cmd)
+    if cmd == 'status':
+        print(ret)
+        return
+
+    assert isinstance(ret, int)
+    verb = 'frozen' if cmd == 'freeze' else 'thawed'
+    print(f"{ret:d} filesystems {verb}")
+
+
+def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    if len(args) == 0:
+        minimum = 0
+    else:
+        minimum = int(args[0])
+    print(client.fstrim(minimum))
+
+
+def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    assert not args
+    print(client.ifconfig())
+
+
+def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    assert not args
+    print(client.info())
+
+
+def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    timeout = 3.0 if len(args) == 0 else float(args[0])
+    alive = client.ping(timeout)
+    if not alive:
+        print("Not responded in %s sec" % args[0])
+        sys.exit(1)
+
+
+def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    usage = 'Usage: suspend disk|ram|hybrid'
+    if len(args) != 1:
+        print('Less argument')
+        print(usage)
+        sys.exit(1)
+    if args[0] not in ['disk', 'ram', 'hybrid']:
+        print('Invalid command: ' + args[0])
+        print(usage)
+        sys.exit(1)
+    client.suspend(args[0])
+
+
+def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    assert not args
+    client.shutdown()
+
+
+_cmd_powerdown = _cmd_shutdown
+
+
+def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    assert not args
+    client.shutdown('halt')
+
+
+def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
+    assert not args
+    client.shutdown('reboot')
+
+
+commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m]
+
+
+def send_command(address: str, cmd: str, args: Sequence[str]) -> None:
+    if not os.path.exists(address):
+        print('%s not found' % address)
+        sys.exit(1)
+
+    if cmd not in commands:
+        print('Invalid command: ' + cmd)
+        print('Available commands: ' + ', '.join(commands))
+        sys.exit(1)
+
+    try:
+        client = QemuGuestAgentClient(address)
+    except OSError as err:
+        print(err)
+        if err.errno == errno.ECONNREFUSED:
+            print('Hint: qemu is not running?')
+        sys.exit(1)
+
+    if cmd == 'fsfreeze' and args[0] == 'freeze':
+        client.sync(60)
+    elif cmd != 'ping':
+        client.sync()
+
+    globals()['_cmd_' + cmd](client, args)
+
+
+def main() -> None:
+    address = os.environ.get('QGA_CLIENT_ADDRESS')
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--address', action='store',
+                        default=address,
+                        help='Specify a ip:port pair or a unix socket path')
+    parser.add_argument('command', choices=commands)
+    parser.add_argument('args', nargs='*')
+
+    args = parser.parse_args()
+    if args.address is None:
+        parser.error('address is not specified')
+        sys.exit(1)
+
+    send_command(args.address, args.command, args.args)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py
new file mode 100644
index 0000000000..337acfce2d
--- /dev/null
+++ b/python/qemu/qmp/qmp_shell.py
@@ -0,0 +1,535 @@
+#
+# Copyright (C) 2009, 2010 Red Hat Inc.
+#
+# Authors:
+#  Luiz Capitulino <lcapitulino@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2.  See
+# the COPYING file in the top-level directory.
+#
+
+"""
+Low-level QEMU shell on top of QMP.
+
+usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
+
+positional arguments:
+  qmp_server            < UNIX socket path | TCP address:port >
+
+optional arguments:
+  -h, --help            show this help message and exit
+  -H, --hmp             Use HMP interface
+  -N, --skip-negotiation
+                        Skip negotiate (for qemu-ga)
+  -v, --verbose         Verbose (echo commands sent and received)
+  -p, --pretty          Pretty-print JSON
+
+
+Start QEMU with:
+
+# qemu [...] -qmp unix:./qmp-sock,server
+
+Run the shell:
+
+$ qmp-shell ./qmp-sock
+
+Commands have the following format:
+
+   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+
+For example:
+
+(QEMU) device_add driver=e1000 id=net1
+{'return': {}}
+(QEMU)
+
+key=value pairs also support Python or JSON object literal subset notations,
+without spaces. Dictionaries/objects {} are supported as are arrays [].
+
+   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
+
+Both JSON and Python formatting should work, including both styles of
+string literal quotes. Both paradigms of literal values should work,
+including null/true/false for JSON and None/True/False for Python.
+
+
+Transactions have the following multi-line format:
+
+   transaction(
+   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+   ...
+   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+   )
+
+One line transactions are also supported:
+
+   transaction( action-name1 ... )
+
+For example:
+
+    (QEMU) transaction(
+    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
+    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
+    TRANS> )
+    {"return": {}}
+    (QEMU)
+
+Use the -v and -p options to activate the verbose and pretty-print options,
+which will echo back the properly formatted JSON-compliant QMP that is being
+sent to QEMU, which is useful for debugging and documentation generation.
+"""
+
+import argparse
+import ast
+import json
+import logging
+import os
+import re
+import readline
+import sys
+from typing import (
+    Iterator,
+    List,
+    NoReturn,
+    Optional,
+    Sequence,
+)
+
+from qemu import qmp
+from qemu.qmp import QMPMessage
+
+
+LOG = logging.getLogger(__name__)
+
+
+class QMPCompleter:
+    """
+    QMPCompleter provides a readline library tab-complete behavior.
+    """
+    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
+    # but pylint as of today does not know that List[str] is simply 'list'.
+    def __init__(self) -> None:
+        self._matches: List[str] = []
+
+    def append(self, value: str) -> None:
+        """Append a new valid completion to the list of possibilities."""
+        return self._matches.append(value)
+
+    def complete(self, text: str, state: int) -> Optional[str]:
+        """readline.set_completer() callback implementation."""
+        for cmd in self._matches:
+            if cmd.startswith(text):
+                if state == 0:
+                    return cmd
+                state -= 1
+        return None
+
+
+class QMPShellError(qmp.QMPError):
+    """
+    QMP Shell Base error class.
+    """
+
+
+class FuzzyJSON(ast.NodeTransformer):
+    """
+    This extension of ast.NodeTransformer filters literal "true/false/null"
+    values in a Python AST and replaces them by proper "True/False/None" values
+    that Python can properly evaluate.
+    """
+
+    @classmethod
+    def visit_Name(cls,  # pylint: disable=invalid-name
+                   node: ast.Name) -> ast.AST:
+        """
+        Transform Name nodes with certain values into Constant (keyword) nodes.
+        """
+        if node.id == 'true':
+            return ast.Constant(value=True)
+        if node.id == 'false':
+            return ast.Constant(value=False)
+        if node.id == 'null':
+            return ast.Constant(value=None)
+        return node
+
+
+class QMPShell(qmp.QEMUMonitorProtocol):
+    """
+    QMPShell provides a basic readline-based QMP shell.
+
+    :param address: Address of the QMP server.
+    :param pretty: Pretty-print QMP messages.
+    :param verbose: Echo outgoing QMP messages to console.
+    """
+    def __init__(self, address: qmp.SocketAddrT,
+                 pretty: bool = False, verbose: bool = False):
+        super().__init__(address)
+        self._greeting: Optional[QMPMessage] = None
+        self._completer = QMPCompleter()
+        self._transmode = False
+        self._actions: List[QMPMessage] = []
+        self._histfile = os.path.join(os.path.expanduser('~'),
+                                      '.qmp-shell_history')
+        self.pretty = pretty
+        self.verbose = verbose
+
+    def close(self) -> None:
+        # Hook into context manager of parent to save shell history.
+        self._save_history()
+        super().close()
+
+    def _fill_completion(self) -> None:
+        cmds = self.cmd('query-commands')
+        if 'error' in cmds:
+            return
+        for cmd in cmds['return']:
+            self._completer.append(cmd['name'])
+
+    def _completer_setup(self) -> None:
+        self._completer = QMPCompleter()
+        self._fill_completion()
+        readline.set_history_length(1024)
+        readline.set_completer(self._completer.complete)
+        readline.parse_and_bind("tab: complete")
+        # NB: default delimiters conflict with some command names
+        # (eg. query-), clearing everything as it doesn't seem to matter
+        readline.set_completer_delims('')
+        try:
+            readline.read_history_file(self._histfile)
+        except FileNotFoundError:
+            pass
+        except IOError as err:
+            msg = f"Failed to read history '{self._histfile}': {err!s}"
+            LOG.warning(msg)
+
+    def _save_history(self) -> None:
+        try:
+            readline.write_history_file(self._histfile)
+        except IOError as err:
+            msg = f"Failed to save history file '{self._histfile}': {err!s}"
+            LOG.warning(msg)
+
+    @classmethod
+    def _parse_value(cls, val: str) -> object:
+        try:
+            return int(val)
+        except ValueError:
+            pass
+
+        if val.lower() == 'true':
+            return True
+        if val.lower() == 'false':
+            return False
+        if val.startswith(('{', '[')):
+            # Try first as pure JSON:
+            try:
+                return json.loads(val)
+            except ValueError:
+                pass
+            # Try once again as FuzzyJSON:
+            try:
+                tree = ast.parse(val, mode='eval')
+                transformed = FuzzyJSON().visit(tree)
+                return ast.literal_eval(transformed)
+            except (SyntaxError, ValueError):
+                pass
+        return val
+
+    def _cli_expr(self,
+                  tokens: Sequence[str],
+                  parent: qmp.QMPObject) -> None:
+        for arg in tokens:
+            (key, sep, val) = arg.partition('=')
+            if sep != '=':
+                raise QMPShellError(
+                    f"Expected a key=value pair, got '{arg!s}'"
+                )
+
+            value = self._parse_value(val)
+            optpath = key.split('.')
+            curpath = []
+            for path in optpath[:-1]:
+                curpath.append(path)
+                obj = parent.get(path, {})
+                if not isinstance(obj, dict):
+                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+                    raise QMPShellError(msg.format('.'.join(curpath)))
+                parent[path] = obj
+                parent = obj
+            if optpath[-1] in parent:
+                if isinstance(parent[optpath[-1]], dict):
+                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+                    raise QMPShellError(msg.format('.'.join(curpath)))
+                raise QMPShellError(f'Cannot set "{key}" multiple times')
+            parent[optpath[-1]] = value
+
+    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
+        """
+        Build a QMP input object from a user provided command-line in the
+        following format:
+
+            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+        """
+        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
+        cmdargs = re.findall(argument_regex, cmdline)
+        qmpcmd: QMPMessage
+
+        # Transactional CLI entry:
+        if cmdargs and cmdargs[0] == 'transaction(':
+            self._transmode = True
+            self._actions = []
+            cmdargs.pop(0)
+
+        # Transactional CLI exit:
+        if cmdargs and cmdargs[0] == ')' and self._transmode:
+            self._transmode = False
+            if len(cmdargs) > 1:
+                msg = 'Unexpected input after close of Transaction sub-shell'
+                raise QMPShellError(msg)
+            qmpcmd = {
+                'execute': 'transaction',
+                'arguments': {'actions': self._actions}
+            }
+            return qmpcmd
+
+        # No args, or no args remaining
+        if not cmdargs:
+            return None
+
+        if self._transmode:
+            # Parse and cache this Transactional Action
+            finalize = False
+            action = {'type': cmdargs[0], 'data': {}}
+            if cmdargs[-1] == ')':
+                cmdargs.pop(-1)
+                finalize = True
+            self._cli_expr(cmdargs[1:], action['data'])
+            self._actions.append(action)
+            return self._build_cmd(')') if finalize else None
+
+        # Standard command: parse and return it to be executed.
+        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
+        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
+        return qmpcmd
+
+    def _print(self, qmp_message: object) -> None:
+        jsobj = json.dumps(qmp_message,
+                           indent=4 if self.pretty else None,
+                           sort_keys=self.pretty)
+        print(str(jsobj))
+
+    def _execute_cmd(self, cmdline: str) -> bool:
+        try:
+            qmpcmd = self._build_cmd(cmdline)
+        except QMPShellError as err:
+            print(
+                f"Error while parsing command line: {err!s}\n"
+                "command format: <command-name> "
+                "[arg-name1=arg1] ... [arg-nameN=argN",
+                file=sys.stderr
+            )
+            return True
+        # For transaction mode, we may have just cached the action:
+        if qmpcmd is None:
+            return True
+        if self.verbose:
+            self._print(qmpcmd)
+        resp = self.cmd_obj(qmpcmd)
+        if resp is None:
+            print('Disconnected')
+            return False
+        self._print(resp)
+        return True
+
+    def connect(self, negotiate: bool = True) -> None:
+        self._greeting = super().connect(negotiate)
+        self._completer_setup()
+
+    def show_banner(self,
+                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
+        """
+        Print to stdio a greeting, and the QEMU version if available.
+        """
+        print(msg)
+        if not self._greeting:
+            print('Connected')
+            return
+        version = self._greeting['QMP']['version']['qemu']
+        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
+
+    @property
+    def prompt(self) -> str:
+        """
+        Return the current shell prompt, including a trailing space.
+        """
+        if self._transmode:
+            return 'TRANS> '
+        return '(QEMU) '
+
+    def read_exec_command(self) -> bool:
+        """
+        Read and execute a command.
+
+        @return True if execution was ok, return False if disconnected.
+        """
+        try:
+            cmdline = input(self.prompt)
+        except EOFError:
+            print()
+            return False
+
+        if cmdline == '':
+            for event in self.get_events():
+                print(event)
+            self.clear_events()
+            return True
+
+        return self._execute_cmd(cmdline)
+
+    def repl(self) -> Iterator[None]:
+        """
+        Return an iterator that implements the REPL.
+        """
+        self.show_banner()
+        while self.read_exec_command():
+            yield
+        self.close()
+
+
+class HMPShell(QMPShell):
+    """
+    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
+
+    :param address: Address of the QMP server.
+    :param pretty: Pretty-print QMP messages.
+    :param verbose: Echo outgoing QMP messages to console.
+    """
+    def __init__(self, address: qmp.SocketAddrT,
+                 pretty: bool = False, verbose: bool = False):
+        super().__init__(address, pretty, verbose)
+        self._cpu_index = 0
+
+    def _cmd_completion(self) -> None:
+        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
+            if cmd and cmd[0] != '[' and cmd[0] != '\t':
+                name = cmd.split()[0]  # drop help text
+                if name == 'info':
+                    continue
+                if name.find('|') != -1:
+                    # Command in the form 'foobar|f' or 'f|foobar', take the
+                    # full name
+                    opt = name.split('|')
+                    if len(opt[0]) == 1:
+                        name = opt[1]
+                    else:
+                        name = opt[0]
+                self._completer.append(name)
+                self._completer.append('help ' + name)  # help completion
+
+    def _info_completion(self) -> None:
+        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
+            if cmd:
+                self._completer.append('info ' + cmd.split()[1])
+
+    def _other_completion(self) -> None:
+        # special cases
+        self._completer.append('help info')
+
+    def _fill_completion(self) -> None:
+        self._cmd_completion()
+        self._info_completion()
+        self._other_completion()
+
+    def _cmd_passthrough(self, cmdline: str,
+                         cpu_index: int = 0) -> QMPMessage:
+        return self.cmd_obj({
+            'execute': 'human-monitor-command',
+            'arguments': {
+                'command-line': cmdline,
+                'cpu-index': cpu_index
+            }
+        })
+
+    def _execute_cmd(self, cmdline: str) -> bool:
+        if cmdline.split()[0] == "cpu":
+            # trap the cpu command, it requires special setting
+            try:
+                idx = int(cmdline.split()[1])
+                if 'return' not in self._cmd_passthrough('info version', idx):
+                    print('bad CPU index')
+                    return True
+                self._cpu_index = idx
+            except ValueError:
+                print('cpu command takes an integer argument')
+                return True
+        resp = self._cmd_passthrough(cmdline, self._cpu_index)
+        if resp is None:
+            print('Disconnected')
+            return False
+        assert 'return' in resp or 'error' in resp
+        if 'return' in resp:
+            # Success
+            if len(resp['return']) > 0:
+                print(resp['return'], end=' ')
+        else:
+            # Error
+            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
+        return True
+
+    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
+        QMPShell.show_banner(self, msg)
+
+
+def die(msg: str) -> NoReturn:
+    """Write an error to stderr, then exit with a return code of 1."""
+    sys.stderr.write('ERROR: %s\n' % msg)
+    sys.exit(1)
+
+
+def main() -> None:
+    """
+    qmp-shell entry point: parse command line arguments and start the REPL.
+    """
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-H', '--hmp', action='store_true',
+                        help='Use HMP interface')
+    parser.add_argument('-N', '--skip-negotiation', action='store_true',
+                        help='Skip negotiate (for qemu-ga)')
+    parser.add_argument('-v', '--verbose', action='store_true',
+                        help='Verbose (echo commands sent and received)')
+    parser.add_argument('-p', '--pretty', action='store_true',
+                        help='Pretty-print JSON')
+
+    default_server = os.environ.get('QMP_SOCKET')
+    parser.add_argument('qmp_server', action='store',
+                        default=default_server,
+                        help='< UNIX socket path | TCP address:port >')
+
+    args = parser.parse_args()
+    if args.qmp_server is None:
+        parser.error("QMP socket or TCP address must be specified")
+
+    shell_class = HMPShell if args.hmp else QMPShell
+
+    try:
+        address = shell_class.parse_address(args.qmp_server)
+    except qmp.QMPBadPortError:
+        parser.error(f"Bad port number: {args.qmp_server}")
+        return  # pycharm doesn't know error() is noreturn
+
+    with shell_class(address, args.pretty, args.verbose) as qemu:
+        try:
+            qemu.connect(negotiate=not args.skip_negotiation)
+        except qmp.QMPConnectError:
+            die("Didn't get QMP greeting message")
+        except qmp.QMPCapabilitiesError:
+            die("Couldn't negotiate capabilities")
+        except OSError as err:
+            die(f"Couldn't connect to {args.qmp_server}: {err!s}")
+
+        for _ in qemu.repl():
+            pass
+
+
+if __name__ == '__main__':
+    main()
diff --git a/python/qemu/qmp/qom.py b/python/qemu/qmp/qom.py
new file mode 100644
index 0000000000..7ec7843d57
--- /dev/null
+++ b/python/qemu/qmp/qom.py
@@ -0,0 +1,272 @@
+"""
+QEMU Object Model testing tools.
+
+usage: qom [-h] {set,get,list,tree,fuse} ...
+
+Query and manipulate QOM data
+
+optional arguments:
+  -h, --help           show this help message and exit
+
+QOM commands:
+  {set,get,list,tree,fuse}
+    set                Set a QOM property value
+    get                Get a QOM property value
+    list               List QOM properties at a given path
+    tree               Show QOM tree from a given path
+    fuse               Mount a QOM tree as a FUSE filesystem
+"""
+##
+# Copyright John Snow 2020, for Red Hat, Inc.
+# Copyright IBM, Corp. 2011
+#
+# Authors:
+#  John Snow <jsnow@redhat.com>
+#  Anthony Liguori <aliguori@amazon.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+#
+# Based on ./scripts/qmp/qom-[set|get|tree|list]
+##
+
+import argparse
+
+from . import QMPResponseError
+from .qom_common import QOMCommand
+
+
+try:
+    from .qom_fuse import QOMFuse
+except ModuleNotFoundError as err:
+    if err.name != 'fuse':
+        raise
+else:
+    assert issubclass(QOMFuse, QOMCommand)
+
+
+class QOMSet(QOMCommand):
+    """
+    QOM Command - Set a property to a given value.
+
+    usage: qom-set [-h] [--socket SOCKET] <path>.<property> <value>
+
+    Set a QOM property value
+
+    positional arguments:
+      <path>.<property>     QOM path and property, separated by a period '.'
+      <value>               new QOM property value
+
+    optional arguments:
+      -h, --help            show this help message and exit
+      --socket SOCKET, -s SOCKET
+                            QMP socket path or address (addr:port). May also be
+                            set via QMP_SOCKET environment variable.
+    """
+    name = 'set'
+    help = 'Set a QOM property value'
+
+    @classmethod
+    def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+        super().configure_parser(parser)
+        cls.add_path_prop_arg(parser)
+        parser.add_argument(
+            'value',
+            metavar='<value>',
+            action='store',
+            help='new QOM property value'
+        )
+
+    def __init__(self, args: argparse.Namespace):
+        super().__init__(args)
+        self.path, self.prop = args.path_prop.rsplit('.', 1)
+        self.value = args.value
+
+    def run(self) -> int:
+        rsp = self.qmp.command(
+            'qom-set',
+            path=self.path,
+            property=self.prop,
+            value=self.value
+        )
+        print(rsp)
+        return 0
+
+
+class QOMGet(QOMCommand):
+    """
+    QOM Command - Get a property's current value.
+
+    usage: qom-get [-h] [--socket SOCKET] <path>.<property>
+
+    Get a QOM property value
+
+    positional arguments:
+      <path>.<property>     QOM path and property, separated by a period '.'
+
+    optional arguments:
+      -h, --help            show this help message and exit
+      --socket SOCKET, -s SOCKET
+                            QMP socket path or address (addr:port). May also be
+                            set via QMP_SOCKET environment variable.
+    """
+    name = 'get'
+    help = 'Get a QOM property value'
+
+    @classmethod
+    def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+        super().configure_parser(parser)
+        cls.add_path_prop_arg(parser)
+
+    def __init__(self, args: argparse.Namespace):
+        super().__init__(args)
+        try:
+            tmp = args.path_prop.rsplit('.', 1)
+        except ValueError as err:
+            raise ValueError('Invalid format for <path>.<property>') from err
+        self.path = tmp[0]
+        self.prop = tmp[1]
+
+    def run(self) -> int:
+        rsp = self.qmp.command(
+            'qom-get',
+            path=self.path,
+            property=self.prop
+        )
+        if isinstance(rsp, dict):
+            for key, value in rsp.items():
+                print(f"{key}: {value}")
+        else:
+            print(rsp)
+        return 0
+
+
+class QOMList(QOMCommand):
+    """
+    QOM Command - List the properties at a given path.
+
+    usage: qom-list [-h] [--socket SOCKET] <path>
+
+    List QOM properties at a given path
+
+    positional arguments:
+      <path>                QOM path
+
+    optional arguments:
+      -h, --help            show this help message and exit
+      --socket SOCKET, -s SOCKET
+                            QMP socket path or address (addr:port). May also be
+                            set via QMP_SOCKET environment variable.
+    """
+    name = 'list'
+    help = 'List QOM properties at a given path'
+
+    @classmethod
+    def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+        super().configure_parser(parser)
+        parser.add_argument(
+            'path',
+            metavar='<path>',
+            action='store',
+            help='QOM path',
+        )
+
+    def __init__(self, args: argparse.Namespace):
+        super().__init__(args)
+        self.path = args.path
+
+    def run(self) -> int:
+        rsp = self.qom_list(self.path)
+        for item in rsp:
+            if item.child:
+                print(f"{item.name}/")
+            elif item.link:
+                print(f"@{item.name}/")
+            else:
+                print(item.name)
+        return 0
+
+
+class QOMTree(QOMCommand):
+    """
+    QOM Command - Show the full tree below a given path.
+
+    usage: qom-tree [-h] [--socket SOCKET] [<path>]
+
+    Show QOM tree from a given path
+
+    positional arguments:
+      <path>                QOM path
+
+    optional arguments:
+      -h, --help            show this help message and exit
+      --socket SOCKET, -s SOCKET
+                            QMP socket path or address (addr:port). May also be
+                            set via QMP_SOCKET environment variable.
+    """
+    name = 'tree'
+    help = 'Show QOM tree from a given path'
+
+    @classmethod
+    def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+        super().configure_parser(parser)
+        parser.add_argument(
+            'path',
+            metavar='<path>',
+            action='store',
+            help='QOM path',
+            nargs='?',
+            default='/'
+        )
+
+    def __init__(self, args: argparse.Namespace):
+        super().__init__(args)
+        self.path = args.path
+
+    def _list_node(self, path: str) -> None:
+        print(path)
+        items = self.qom_list(path)
+        for item in items:
+            if item.child:
+                continue
+            try:
+                rsp = self.qmp.command('qom-get', path=path,
+                                       property=item.name)
+                print(f"  {item.name}: {rsp} ({item.type})")
+            except QMPResponseError as err:
+                print(f"  {item.name}: <EXCEPTION: {err!s}> ({item.type})")
+        print('')
+        for item in items:
+            if not item.child:
+                continue
+            if path == '/':
+                path = ''
+            self._list_node(f"{path}/{item.name}")
+
+    def run(self) -> int:
+        self._list_node(self.path)
+        return 0
+
+
+def main() -> int:
+    """QOM script main entry point."""
+    parser = argparse.ArgumentParser(
+        description='Query and manipulate QOM data'
+    )
+    subparsers = parser.add_subparsers(
+        title='QOM commands',
+        dest='command'
+    )
+
+    for command in QOMCommand.__subclasses__():
+        command.register(subparsers)
+
+    args = parser.parse_args()
+
+    if args.command is None:
+        parser.error('Command not specified.')
+        return 1
+
+    cmd_class = args.cmd_class
+    assert isinstance(cmd_class, type(QOMCommand))
+    return cmd_class.command_runner(args)
diff --git a/python/qemu/qmp/qom_common.py b/python/qemu/qmp/qom_common.py
new file mode 100644
index 0000000000..f82b16772d
--- /dev/null
+++ b/python/qemu/qmp/qom_common.py
@@ -0,0 +1,178 @@
+"""
+QOM Command abstractions.
+"""
+##
+# Copyright John Snow 2020, for Red Hat, Inc.
+# Copyright IBM, Corp. 2011
+#
+# Authors:
+#  John Snow <jsnow@redhat.com>
+#  Anthony Liguori <aliguori@amazon.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+#
+# Based on ./scripts/qmp/qom-[set|get|tree|list]
+##
+
+import argparse
+import os
+import sys
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Type,
+    TypeVar,
+)
+
+from . import QEMUMonitorProtocol, QMPError
+
+
+# The following is needed only for a type alias.
+Subparsers = argparse._SubParsersAction  # pylint: disable=protected-access
+
+
+class ObjectPropertyInfo:
+    """
+    Represents the return type from e.g. qom-list.
+    """
+    def __init__(self, name: str, type_: str,
+                 description: Optional[str] = None,
+                 default_value: Optional[object] = None):
+        self.name = name
+        self.type = type_
+        self.description = description
+        self.default_value = default_value
+
+    @classmethod
+    def make(cls, value: Dict[str, Any]) -> 'ObjectPropertyInfo':
+        """
+        Build an ObjectPropertyInfo from a Dict with an unknown shape.
+        """
+        assert value.keys() >= {'name', 'type'}
+        assert value.keys() <= {'name', 'type', 'description', 'default-value'}
+        return cls(value['name'], value['type'],
+                   value.get('description'),
+                   value.get('default-value'))
+
+    @property
+    def child(self) -> bool:
+        """Is this property a child property?"""
+        return self.type.startswith('child<')
+
+    @property
+    def link(self) -> bool:
+        """Is this property a link property?"""
+        return self.type.startswith('link<')
+
+
+CommandT = TypeVar('CommandT', bound='QOMCommand')
+
+
+class QOMCommand:
+    """
+    Represents a QOM sub-command.
+
+    :param args: Parsed arguments, as returned from parser.parse_args.
+    """
+    name: str
+    help: str
+
+    def __init__(self, args: argparse.Namespace):
+        if args.socket is None:
+            raise QMPError("No QMP socket path or address given")
+        self.qmp = QEMUMonitorProtocol(
+            QEMUMonitorProtocol.parse_address(args.socket)
+        )
+        self.qmp.connect()
+
+    @classmethod
+    def register(cls, subparsers: Subparsers) -> None:
+        """
+        Register this command with the argument parser.
+
+        :param subparsers: argparse subparsers object, from "add_subparsers".
+        """
+        subparser = subparsers.add_parser(cls.name, help=cls.help,
+                                          description=cls.help)
+        cls.configure_parser(subparser)
+
+    @classmethod
+    def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+        """
+        Configure a parser with this command's arguments.
+
+        :param parser: argparse parser or subparser object.
+        """
+        default_path = os.environ.get('QMP_SOCKET')
+        parser.add_argument(
+            '--socket', '-s',
+            dest='socket',
+            action='store',
+            help='QMP socket path or address (addr:port).'
+            ' May also be set via QMP_SOCKET environment variable.',
+            default=default_path
+        )
+        parser.set_defaults(cmd_class=cls)
+
+    @classmethod
+    def add_path_prop_arg(cls, parser: argparse.ArgumentParser) -> None:
+        """
+        Add the <path>.<proptery> positional argument to this command.
+
+        :param parser: The parser to add the argument to.
+        """
+        parser.add_argument(
+            'path_prop',
+            metavar='<path>.<property>',
+            action='store',
+            help="QOM path and property, separated by a period '.'"
+        )
+
+    def run(self) -> int:
+        """
+        Run this command.
+
+        :return: 0 on success, 1 otherwise.
+        """
+        raise NotImplementedError
+
+    def qom_list(self, path: str) -> List[ObjectPropertyInfo]:
+        """
+        :return: a strongly typed list from the 'qom-list' command.
+        """
+        rsp = self.qmp.command('qom-list', path=path)
+        # qom-list returns List[ObjectPropertyInfo]
+        assert isinstance(rsp, list)
+        return [ObjectPropertyInfo.make(x) for x in rsp]
+
+    @classmethod
+    def command_runner(
+            cls: Type[CommandT],
+            args: argparse.Namespace
+    ) -> int:
+        """
+        Run a fully-parsed subcommand, with error-handling for the CLI.
+
+        :return: The return code from `.run()`.
+        """
+        try:
+            cmd = cls(args)
+            return cmd.run()
+        except QMPError as err:
+            print(f"{type(err).__name__}: {err!s}", file=sys.stderr)
+            return -1
+
+    @classmethod
+    def entry_point(cls) -> int:
+        """
+        Build this command's parser, parse arguments, and run the command.
+
+        :return: `run`'s return code.
+        """
+        parser = argparse.ArgumentParser(description=cls.help)
+        cls.configure_parser(parser)
+        args = parser.parse_args()
+        return cls.command_runner(args)
diff --git a/python/qemu/qmp/qom_fuse.py b/python/qemu/qmp/qom_fuse.py
new file mode 100644
index 0000000000..43f4671fdb
--- /dev/null
+++ b/python/qemu/qmp/qom_fuse.py
@@ -0,0 +1,206 @@
+"""
+QEMU Object Model FUSE filesystem tool
+
+This script offers a simple FUSE filesystem within which the QOM tree
+may be browsed, queried and edited using traditional shell tooling.
+
+This script requires the 'fusepy' python package.
+
+
+usage: qom-fuse [-h] [--socket SOCKET] <mount>
+
+Mount a QOM tree as a FUSE filesystem
+
+positional arguments:
+  <mount>               Mount point
+
+optional arguments:
+  -h, --help            show this help message and exit
+  --socket SOCKET, -s SOCKET
+                        QMP socket path or address (addr:port). May also be
+                        set via QMP_SOCKET environment variable.
+"""
+##
+# Copyright IBM, Corp. 2012
+# Copyright (C) 2020 Red Hat, Inc.
+#
+# Authors:
+#  Anthony Liguori   <aliguori@us.ibm.com>
+#  Markus Armbruster <armbru@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or later.
+# See the COPYING file in the top-level directory.
+##
+
+import argparse
+from errno import ENOENT, EPERM
+import stat
+import sys
+from typing import (
+    IO,
+    Dict,
+    Iterator,
+    Mapping,
+    Optional,
+    Union,
+)
+
+import fuse
+from fuse import FUSE, FuseOSError, Operations
+
+from . import QMPResponseError
+from .qom_common import QOMCommand
+
+
+fuse.fuse_python_api = (0, 2)
+
+
+class QOMFuse(QOMCommand, Operations):
+    """
+    QOMFuse implements both fuse.Operations and QOMCommand.
+
+    Operations implements the FS, and QOMCommand implements the CLI command.
+    """
+    name = 'fuse'
+    help = 'Mount a QOM tree as a FUSE filesystem'
+    fuse: FUSE
+
+    @classmethod
+    def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
+        super().configure_parser(parser)
+        parser.add_argument(
+            'mount',
+            metavar='<mount>',
+            action='store',
+            help="Mount point",
+        )
+
+    def __init__(self, args: argparse.Namespace):
+        super().__init__(args)
+        self.mount = args.mount
+        self.ino_map: Dict[str, int] = {}
+        self.ino_count = 1
+
+    def run(self) -> int:
+        print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr)
+        self.fuse = FUSE(self, self.mount, foreground=True)
+        return 0
+
+    def get_ino(self, path: str) -> int:
+        """Get an inode number for a given QOM path."""
+        if path in self.ino_map:
+            return self.ino_map[path]
+        self.ino_map[path] = self.ino_count
+        self.ino_count += 1
+        return self.ino_map[path]
+
+    def is_object(self, path: str) -> bool:
+        """Is the given QOM path an object?"""
+        try:
+            self.qom_list(path)
+            return True
+        except QMPResponseError:
+            return False
+
+    def is_property(self, path: str) -> bool:
+        """Is the given QOM path a property?"""
+        path, prop = path.rsplit('/', 1)
+        if path == '':
+            path = '/'
+        try:
+            for item in self.qom_list(path):
+                if item.name == prop:
+                    return True
+            return False
+        except QMPResponseError:
+            return False
+
+    def is_link(self, path: str) -> bool:
+        """Is the given QOM path a link?"""
+        path, prop = path.rsplit('/', 1)
+        if path == '':
+            path = '/'
+        try:
+            for item in self.qom_list(path):
+                if item.name == prop and item.link:
+                    return True
+            return False
+        except QMPResponseError:
+            return False
+
+    def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes:
+        if not self.is_property(path):
+            raise FuseOSError(ENOENT)
+
+        path, prop = path.rsplit('/', 1)
+        if path == '':
+            path = '/'
+        try:
+            data = str(self.qmp.command('qom-get', path=path, property=prop))
+            data += '\n'  # make values shell friendly
+        except QMPResponseError as err:
+            raise FuseOSError(EPERM) from err
+
+        if offset > len(data):
+            return b''
+
+        return bytes(data[offset:][:size], encoding='utf-8')
+
+    def readlink(self, path: str) -> Union[bool, str]:
+        if not self.is_link(path):
+            return False
+        path, prop = path.rsplit('/', 1)
+        prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
+        return prefix + str(self.qmp.command('qom-get', path=path,
+                                             property=prop))
+
+    def getattr(self, path: str,
+                fh: Optional[IO[bytes]] = None) -> Mapping[str, object]:
+        if self.is_link(path):
+            value = {
+                'st_mode': 0o755 | stat.S_IFLNK,
+                'st_ino': self.get_ino(path),
+                'st_dev': 0,
+                'st_nlink': 2,
+                'st_uid': 1000,
+                'st_gid': 1000,
+                'st_size': 4096,
+                'st_atime': 0,
+                'st_mtime': 0,
+                'st_ctime': 0
+            }
+        elif self.is_object(path):
+            value = {
+                'st_mode': 0o755 | stat.S_IFDIR,
+                'st_ino': self.get_ino(path),
+                'st_dev': 0,
+                'st_nlink': 2,
+                'st_uid': 1000,
+                'st_gid': 1000,
+                'st_size': 4096,
+                'st_atime': 0,
+                'st_mtime': 0,
+                'st_ctime': 0
+            }
+        elif self.is_property(path):
+            value = {
+                'st_mode': 0o644 | stat.S_IFREG,
+                'st_ino': self.get_ino(path),
+                'st_dev': 0,
+                'st_nlink': 1,
+                'st_uid': 1000,
+                'st_gid': 1000,
+                'st_size': 4096,
+                'st_atime': 0,
+                'st_mtime': 0,
+                'st_ctime': 0
+            }
+        else:
+            raise FuseOSError(ENOENT)
+        return value
+
+    def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]:
+        yield '.'
+        yield '..'
+        for item in self.qom_list(path):
+            yield item.name