diff options
Diffstat (limited to 'tests/qemu-iotests/qcow2_format.py')
| -rw-r--r-- | tests/qemu-iotests/qcow2_format.py | 215 |
1 files changed, 198 insertions, 17 deletions
diff --git a/tests/qemu-iotests/qcow2_format.py b/tests/qemu-iotests/qcow2_format.py index cc432e7ae0..8adc9959e1 100644 --- a/tests/qemu-iotests/qcow2_format.py +++ b/tests/qemu-iotests/qcow2_format.py @@ -19,6 +19,15 @@ import struct import string +import json + + +class ComplexEncoder(json.JSONEncoder): + def default(self, obj): + if hasattr(obj, 'to_json'): + return obj.to_json() + else: + return json.JSONEncoder.default(self, obj) class Qcow2Field: @@ -40,6 +49,22 @@ class Flags64(Qcow2Field): return str(bits) +class BitmapFlags(Qcow2Field): + + flags = { + 0x1: 'in-use', + 0x2: 'auto' + } + + def __str__(self): + bits = [] + for bit in range(64): + flag = self.value & (1 << bit) + if flag: + bits.append(self.flags.get(flag, f'bit-{bit}')) + return f'{self.value:#x} ({bits})' + + class Enum(Qcow2Field): def __str__(self): @@ -93,7 +118,11 @@ class Qcow2Struct(metaclass=Qcow2StructMeta): self.__dict__ = dict((field[2], values[i]) for i, field in enumerate(self.fields)) - def dump(self): + def dump(self, is_json=False): + if is_json: + print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder)) + return + for f in self.fields: value = self.__dict__[f[2]] if isinstance(f[1], str): @@ -103,6 +132,9 @@ class Qcow2Struct(metaclass=Qcow2StructMeta): print('{:<25} {}'.format(f[2], value_str)) + def to_json(self): + return dict((f[2], self.__dict__[f[2]]) for f in self.fields) + class Qcow2BitmapExt(Qcow2Struct): @@ -113,6 +145,131 @@ class Qcow2BitmapExt(Qcow2Struct): ('u64', '{:#x}', 'bitmap_directory_offset') ) + def __init__(self, fd, cluster_size): + super().__init__(fd=fd) + tail = struct.calcsize(self.fmt) % 8 + if tail: + fd.seek(8 - tail, 1) + position = fd.tell() + self.cluster_size = cluster_size + self.read_bitmap_directory(fd) + fd.seek(position) + + def read_bitmap_directory(self, fd): + fd.seek(self.bitmap_directory_offset) + self.bitmap_directory = \ + [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size) + for _ in range(self.nb_bitmaps)] + + def dump(self): + super().dump() + for entry in self.bitmap_directory: + print() + entry.dump() + + def to_json(self): + fields_dict = super().to_json() + fields_dict['bitmap_directory'] = self.bitmap_directory + return fields_dict + + +class Qcow2BitmapDirEntry(Qcow2Struct): + + fields = ( + ('u64', '{:#x}', 'bitmap_table_offset'), + ('u32', '{}', 'bitmap_table_size'), + ('u32', BitmapFlags, 'flags'), + ('u8', '{}', 'type'), + ('u8', '{}', 'granularity_bits'), + ('u16', '{}', 'name_size'), + ('u32', '{}', 'extra_data_size') + ) + + def __init__(self, fd, cluster_size): + super().__init__(fd=fd) + self.cluster_size = cluster_size + # Seek relative to the current position in the file + fd.seek(self.extra_data_size, 1) + bitmap_name = fd.read(self.name_size) + self.name = bitmap_name.decode('ascii') + # Move position to the end of the entry in the directory + entry_raw_size = self.bitmap_dir_entry_raw_size() + padding = ((entry_raw_size + 7) & ~7) - entry_raw_size + fd.seek(padding, 1) + self.bitmap_table = Qcow2BitmapTable(fd=fd, + offset=self.bitmap_table_offset, + nb_entries=self.bitmap_table_size, + cluster_size=self.cluster_size) + + def bitmap_dir_entry_raw_size(self): + return struct.calcsize(self.fmt) + self.name_size + \ + self.extra_data_size + + def dump(self): + print(f'{"Bitmap name":<25} {self.name}') + super(Qcow2BitmapDirEntry, self).dump() + self.bitmap_table.dump() + + def to_json(self): + # Put the name ahead of the dict + return { + 'name': self.name, + **super().to_json(), + 'bitmap_table': self.bitmap_table + } + + +class Qcow2BitmapTableEntry(Qcow2Struct): + + fields = ( + ('u64', '{}', 'entry'), + ) + + BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe + BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00 + BME_TABLE_ENTRY_FLAG_ALL_ONES = 1 + + def __init__(self, fd): + super().__init__(fd=fd) + self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK + self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK + if self.offset: + if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES: + self.type = 'invalid' + else: + self.type = 'serialized' + elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES: + self.type = 'all-ones' + else: + self.type = 'all-zeroes' + + def to_json(self): + return {'type': self.type, 'offset': self.offset, + 'reserved': self.reserved} + + +class Qcow2BitmapTable: + + def __init__(self, fd, offset, nb_entries, cluster_size): + self.cluster_size = cluster_size + position = fd.tell() + fd.seek(offset) + self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)] + fd.seek(position) + + def dump(self): + bitmap_table = enumerate(self.entries) + print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}') + for i, entry in bitmap_table: + if entry.type == 'serialized': + size = self.cluster_size + else: + size = 0 + print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}') + + def to_json(self): + return self.entries + QCOW2_EXT_MAGIC_BITMAPS = 0x23852875 @@ -128,6 +285,9 @@ class QcowHeaderExtension(Qcow2Struct): 0x44415441: 'Data file' } + def to_json(self): + return self.mapping.get(self.value, "<unknown>") + fields = ( ('u32', Magic, 'magic'), ('u32', '{}', 'length') @@ -135,11 +295,13 @@ class QcowHeaderExtension(Qcow2Struct): # then padding to next multiply of 8 ) - def __init__(self, magic=None, length=None, data=None, fd=None): + def __init__(self, magic=None, length=None, data=None, fd=None, + cluster_size=None): """ Support both loading from fd and creation from user data. For fd-based creation current position in a file will be used to read the data. + The cluster_size value may be obtained by dependent structures. This should be somehow refactored and functionality should be moved to superclass (to allow creation of any qcow2 struct), but then, fields @@ -161,28 +323,43 @@ class QcowHeaderExtension(Qcow2Struct): else: assert all(v is None for v in (magic, length, data)) super().__init__(fd=fd) - padded = (self.length + 7) & ~7 - self.data = fd.read(padded) - assert self.data is not None + if self.magic == QCOW2_EXT_MAGIC_BITMAPS: + self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size) + self.data = None + else: + padded = (self.length + 7) & ~7 + self.data = fd.read(padded) + assert self.data is not None + self.obj = None + + if self.data is not None: + data_str = self.data[:self.length] + if all(c in string.printable.encode( + 'ascii') for c in data_str): + data_str = f"'{ data_str.decode('ascii') }'" + else: + data_str = '<binary>' + self.data_str = data_str - if self.magic == QCOW2_EXT_MAGIC_BITMAPS: - self.obj = Qcow2BitmapExt(data=self.data) - else: - self.obj = None def dump(self): super().dump() if self.obj is None: - data = self.data[:self.length] - if all(c in string.printable.encode('ascii') for c in data): - data = f"'{ data.decode('ascii') }'" - else: - data = '<binary>' - print(f'{"data":<25} {data}') + print(f'{"data":<25} {self.data_str}') else: self.obj.dump() + def to_json(self): + # Put the name ahead of the dict + res = {'name': self.Magic(self.magic), **super().to_json()} + if self.obj is not None: + res['data'] = self.obj + else: + res['data_str'] = self.data_str + + return res + @classmethod def create(cls, magic, data): return QcowHeaderExtension(magic, len(data), data) @@ -246,7 +423,7 @@ class QcowHeader(Qcow2Struct): end = self.cluster_size while fd.tell() < end: - ext = QcowHeaderExtension(fd=fd) + ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size) if ext.magic == 0: break else: @@ -280,7 +457,11 @@ class QcowHeader(Qcow2Struct): buf = buf[0:header_bytes-1] fd.write(buf) - def dump_extensions(self): + def dump_extensions(self, is_json=False): + if is_json: + print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder)) + return + for ex in self.extensions: print('Header extension:') ex.dump() |