diff options
Diffstat (limited to 'focaccia/lldb_target.py')
| -rw-r--r-- | focaccia/lldb_target.py | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/focaccia/lldb_target.py b/focaccia/lldb_target.py new file mode 100644 index 0000000..146891f --- /dev/null +++ b/focaccia/lldb_target.py @@ -0,0 +1,210 @@ +import lldb + +from .arch import supported_architectures, x86 +from .snapshot import ProgramState + +class MemoryMap: + """Description of a range of mapped memory. + + Inspired by https://github.com/angr/angr-targets/blob/master/angr_targets/memory_map.py, + meaning we initially used angr and I wanted to keep the interface when we + switched to a different tool. + """ + def __init__(self, start_address, end_address, name, perms): + self.start_address = start_address + self.end_address = end_address + self.name = name + self.perms = perms + + def __str__(self): + return f'MemoryMap[0x{self.start_address:x}, 0x{self.end_address:x}]' \ + f': {self.name}' + +class ConcreteRegisterError(Exception): + pass + +class ConcreteMemoryError(Exception): + pass + +class LLDBConcreteTarget: + def __init__(self, executable: str, argv: list[str] = []): + """Construct an LLDB concrete target. Stop at entry. + + :raises RuntimeError: If the process is unable to launch. + """ + self.debugger = lldb.SBDebugger.Create() + self.debugger.SetAsync(False) + self.target = self.debugger.CreateTargetWithFileAndArch(executable, + lldb.LLDB_ARCH_DEFAULT) + self.module = self.target.FindModule(self.target.GetExecutable()) + self.interpreter = self.debugger.GetCommandInterpreter() + + # Set up objects for process execution + self.error = lldb.SBError() + self.listener = self.debugger.GetListener() + self.process = self.target.Launch(self.listener, + argv, None, # argv, envp + None, None, None, # stdin, stdout, stderr + None, # working directory + 0, + True, self.error) + if not self.process.IsValid(): + raise RuntimeError(f'[In LLDBConcreteTarget.__init__]: Failed to' + f' launch process.') + + self.archname = self.target.GetPlatform().GetTriple().split('-')[0] + + def is_exited(self): + """Signals whether the concrete process has exited. + + :return: True if the process has exited. False otherwise. + """ + return self.process.GetState() == lldb.eStateExited + + def run(self): + """Continue execution of the concrete process.""" + state = self.process.GetState() + if state == lldb.eStateExited: + raise RuntimeError(f'Tried to resume process execution, but the' + f' process has already exited.') + assert(state == lldb.eStateStopped) + self.process.Continue() + + def step(self): + """Step forward by a single instruction.""" + thread: lldb.SBThread = self.process.GetThreadAtIndex(0) + thread.StepInstruction(False) + + def record_snapshot(self) -> ProgramState: + """Record the concrete target's state in a ProgramState object.""" + # Determine current arch + if self.archname not in supported_architectures: + print(f'[ERROR] LLDBConcreteTarget: Recording snapshots is not' + f' supported for architecture {self.archname}!') + raise NotImplementedError() + arch = supported_architectures[self.archname] + + state = ProgramState(arch) + + # Query and store register state + for regname in arch.regnames: + try: + conc_val = self.read_register(regname) + state.set(regname, conc_val) + except KeyError: + pass + except ConcreteRegisterError: + # Special rule for flags on X86 + if arch.archname == x86.archname: + rflags = x86.decompose_rflags(self.read_register('rflags')) + if regname in rflags: + state.set(regname, rflags[regname]) + + # Query and store memory state + for mapping in self.get_mappings(): + assert(mapping.end_address > mapping.start_address) + size = mapping.end_address - mapping.start_address + try: + data = self.read_memory(mapping.start_address, size) + state.write_memory(mapping.start_address, data) + except ConcreteMemoryError: + pass + + return state + + def _get_register(self, regname: str) -> lldb.SBValue: + """Find a register by name. + + :raise ConcreteRegisterError: If no register with the specified name + can be found. + """ + frame = self.process.GetThreadAtIndex(0).GetFrameAtIndex(0) + reg = frame.FindRegister(regname) + if reg is None: + raise ConcreteRegisterError( + f'[In LLDBConcreteTarget._get_register]: Register {regname}' + f' not found.') + return reg + + def read_register(self, regname: str) -> int: + """Read the value of a register. + + :raise ConcreteRegisterError: If `regname` is not a valid register name + or the target is otherwise unable to read + the register's value. + """ + reg = self._get_register(regname) + val = reg.GetValue() + if val is None: + raise ConcreteRegisterError( + f'[In LLDBConcreteTarget.read_register]: Register has an' + f' invalid value of {val}.') + + return int(val, 16) + + def write_register(self, regname: str, value: int): + """Read the value of a register. + + :raise ConcreteRegisterError: If `regname` is not a valid register name + or the target is otherwise unable to set + the register's value. + """ + reg = self._get_register(regname) + error = lldb.SBError() + reg.SetValueFromCString(hex(value), error) + if not error.success: + raise ConcreteRegisterError( + f'[In LLDBConcreteTarget.write_register]: Unable to set' + f' {regname} to value {hex(value)}!') + + def read_memory(self, addr, size): + """Read bytes from memory. + + :raise ConcreteMemoryError: If unable to read `size` bytes from `addr`. + """ + err = lldb.SBError() + content = self.process.ReadMemory(addr, size, err) + if not err.success: + raise ConcreteMemoryError(f'Error when reading {size} bytes at' + f' address {hex(addr)}: {err}') + return content + + def write_memory(self, addr, value: bytes): + """Write bytes to memory. + + :raise ConcreteMemoryError: If unable to write at `addr`. + """ + err = lldb.SBError() + res = self.process.WriteMemory(addr, value, err) + if not err.success or res != len(value): + raise ConcreteMemoryError(f'Error when writing to address' + f' {hex(addr)}: {err}') + + def get_mappings(self) -> list[MemoryMap]: + mmap = [] + + region_list = self.process.GetMemoryRegions() + for i in range(region_list.GetSize()): + region = lldb.SBMemoryRegionInfo() + region_list.GetMemoryRegionAtIndex(i, region) + + perms = f'{"r" if region.IsReadable() else "-"}' \ + f'{"w" if region.IsWritable() else "-"}' \ + f'{"x" if region.IsExecutable() else "-"}' + name = region.GetName() + + mmap.append(MemoryMap(region.GetRegionBase(), + region.GetRegionEnd(), + name if name is not None else '<none>', + perms)) + return mmap + + def set_breakpoint(self, addr): + command = f'b -a {addr} -s {self.module.GetFileSpec().GetFilename()}' + result = lldb.SBCommandReturnObject() + self.interpreter.HandleCommand(command, result) + + def remove_breakpoint(self, addr): + command = f'breakpoint delete {addr}' + result = lldb.SBCommandReturnObject() + self.interpreter.HandleCommand(command, result) |