about summary refs log tree commit diff stats
path: root/src
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-11-10 14:38:13 +0000
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-11-10 14:42:38 +0000
commit0b9f774d356c9cfef186d896e3df0b53bf3b87ec (patch)
tree921ba8ef16985721259a0c358273bb3a73e5c705 /src
parent5c8ec10fbb4653c8c1f297db46b348cd54678389 (diff)
downloadfocaccia-0b9f774d356c9cfef186d896e3df0b53bf3b87ec.tar.gz
focaccia-0b9f774d356c9cfef186d896e3df0b53bf3b87ec.zip
Implement streaming reader for data chunks
Diffstat (limited to 'src')
-rw-r--r--src/focaccia/deterministic.py125
1 files changed, 85 insertions, 40 deletions
diff --git a/src/focaccia/deterministic.py b/src/focaccia/deterministic.py
index a7a7eef..77bdcb0 100644
--- a/src/focaccia/deterministic.py
+++ b/src/focaccia/deterministic.py
@@ -1,7 +1,9 @@
 """Parsing of JSON files containing snapshot data."""
 
 import os
-from typing import Union
+import io
+import struct
+from typing import Union, Optional
 
 import brotli
 
@@ -21,6 +23,80 @@ TaskEvent = rr_trace.TaskEvent
 MMap = rr_trace.MMap
 SerializedObject = Union[Frame, TaskEvent, MMap]
 
+class DeterministicLogReader(io.RawIOBase):
+    """
+    File-like reader for rr trace files.
+
+    Each block in the file:
+      uint32_t uncompressed_size
+      uint32_t compressed_size
+      [compressed_data...]
+    Presents the concatenated uncompressed data as a sequential byte stream.
+    """
+
+    _HDR = struct.Struct("<II")
+
+    def __init__(self, filename: str):
+        super().__init__()
+        self._f = open(filename, "rb", buffering=0)
+        self._data_buffer = memoryview(b"")
+        self._pos = 0
+        self._eof = False
+
+    def _load_chunk(self) -> None:
+        """Load and decompress the next Brotli block."""
+        header = self._f.read(self._HDR.size)
+        if not header:
+            self._eof = True
+            self._data_buffer = memoryview(b"")
+            return
+        if len(header) != self._HDR.size:
+            raise EOFError("Incomplete RR data block header")
+
+        compressed_length, uncompressed_length = self._HDR.unpack(header)
+        chunk = self._f.read(compressed_length)
+        if len(chunk) != compressed_length:
+            raise EOFError("Incomplete RR data block")
+
+        chunk = brotli.decompress(chunk)
+        if len(chunk) != uncompressed_length:
+            raise Exception(f'Malformed deterministic log: uncompressed chunk is not equal'
+                            f'to reported length {hex(uncompressed_length)}')
+
+        self._data_buffer = memoryview(chunk)
+        self._pos = 0
+
+    def read(self, n: Optional[int] = -1) -> bytes:
+        """Read up to n bytes from the uncompressed stream."""
+        if n == 0:
+            return b""
+
+        chunks = bytearray()
+        remaining = n if n is not None and n >= 0 else None
+
+        while not self._eof and (remaining is None or remaining > 0):
+            if self._pos >= len(self._data_buffer):
+                self._load_chunk()
+                if self._eof:
+                    break
+
+            available = len(self._data_buffer) - self._pos
+            take = available if remaining is None else min(available, remaining)
+            chunks += self._data_buffer[self._pos:self._pos + take]
+            self._pos += take
+            if remaining is not None:
+                remaining -= take
+
+        return bytes(chunks)
+
+    def readable(self) -> bool:
+        return True
+
+    def close(self) -> None:
+        if not self.closed:
+            self._f.close()
+        super().close()
+
 def parse_x64_registers(enc_regs: bytes, signed: bool=False) -> dict[str, int]:
     idx = 0
     def parse_reg():
@@ -387,29 +463,8 @@ class DeterministicLog:
     def data_file(self) -> str:
         return os.path.join(self.base_directory, 'data')
 
-    def _read(self, file) -> bytes:
-        data = bytearray()
-        with open(file, 'rb') as f:
-            while True:
-                try:
-                    compressed_len = int.from_bytes(f.read(4), byteorder='little')
-                    uncompressed_len = int.from_bytes(f.read(4), byteorder='little')
-                except Exception as e:
-                    raise Exception(f'Malformed deterministic log: {e}') from None
-
-                chunk = f.read(compressed_len)
-                if not chunk:
-                    break
-
-                chunk = brotli.decompress(chunk)
-                if len(chunk) != uncompressed_len:
-                    raise Exception(f'Malformed deterministic log: uncompressed chunk is not equal'
-                                    f'to reported length {hex(uncompressed_len)}')
-                data.extend(chunk)
-        return bytes(data)
-
     def _read_structure(self, file, obj: SerializedObject) -> list[SerializedObject]:
-        data = self._read(file)
+        data = DeterministicLogReader(file).read()
 
         objects = []
         for deser in obj.read_multiple_bytes_packed(data):
@@ -436,14 +491,7 @@ class DeterministicLog:
                 return regs['pc'], regs
             raise NotImplementedError(f'Unable to parse registers for architecture {arch}')
 
-        def fill_memory_writes(self, mem_writes: list[MemoryWrite]) -> list[MemoryWrite]:
-            with open(self.data_file, 'rb') as f:
-                for mem_write in mem_writes:
-                    mem_write.data = f.read(mem_write.size)
-            return mem_writes
-
-    
-        def parse_memory_writes(event: Frame, data_src: bytes, pos: int):
+        def parse_memory_writes(event: Frame, reader: io.RawIOBase):
             writes = []
             for raw_write in event.memWrites:
                 # Skip memory writes with 0 bytes
@@ -456,15 +504,13 @@ class DeterministicLog:
 
                 data = bytearray()
                 for hole in holes:
-                    until_hole = hole.offset - pos
-                    data.extend(data_src[pos:pos+until_hole])
+                    until_hole = hole.offset - reader.tell()
+                    data.extend(reader.read(until_hole))
                     data.extend(b'\x00' * hole.size)
-                    pos += until_hole
 
                 # No holes
                 if len(data) == 0:
-                    data = data_src[pos:pos+raw_write.size]
-                    pos += raw_write.size
+                    data = reader.read(raw_write.size)
 
                 mem_write = MemoryWrite(raw_write.tid,
                                         raw_write.addr,
@@ -473,16 +519,15 @@ class DeterministicLog:
                                         raw_write.sizeIsConservative,
                                         bytes(data))
                 writes.append(mem_write)
-            return writes, pos
+            return writes
 
-        pos = 0
-        data = self._read(self.data_file())
+        data_reader = DeterministicLogReader(self.data_file())
 
         events = []
         raw_events = self.raw_events()
         for raw_event in raw_events:
             pc, registers = parse_registers(raw_event)
-            mem_writes, pos = parse_memory_writes(raw_event, data, pos)
+            mem_writes = parse_memory_writes(raw_event, data_reader)
 
             event = None