diff options
| author | serpilliere <serpilliere@users.noreply.github.com> | 2023-04-23 21:24:51 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-04-23 21:24:51 +0200 |
| commit | 49ec4bc9a90391bc796ac6940a6d9b77deb17ac9 (patch) | |
| tree | 1a890a007c103e643ffe4e1b13caada32c901ef9 /miasm/jitter/loader/pe.py | |
| parent | 230d528c50d8a2870a89011fc1e660fcab4910ff (diff) | |
| parent | b66becdead10f0bb2aa009dda61c422e79c567fe (diff) | |
| download | focaccia-miasm-49ec4bc9a90391bc796ac6940a6d9b77deb17ac9.tar.gz focaccia-miasm-49ec4bc9a90391bc796ac6940a6d9b77deb17ac9.zip | |
Merge pull request #1448 from cea-sec/generic-unpack
Generic import recovery (cheap ImpRec style)
Diffstat (limited to 'miasm/jitter/loader/pe.py')
| -rw-r--r-- | miasm/jitter/loader/pe.py | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/miasm/jitter/loader/pe.py b/miasm/jitter/loader/pe.py index 28010b74..6d359a9a 100644 --- a/miasm/jitter/loader/pe.py +++ b/miasm/jitter/loader/pe.py @@ -695,3 +695,124 @@ def guess_arch(pe): """Return the architecture specified by the PE container @pe. If unknown, return None""" return PE_machine.get(pe.Coffhdr.machine, None) + + +class ImpRecStateMachine(object): + """ + Finite State Machine used for internal purpose only. + See `ImpRecStrategy` for more details. + """ + + # Looking for a function pointer + STATE_SEARCH = 0 + # Candidate function list + STATE_FUNC_FOUND = 1 + # Function list found, terminated by a NULL entry + STATE_END_FUNC_LIST = 2 + + def __init__(self, libs, ptrtype): + self.ptrtype = ptrtype + self.libs = libs + self.func_addrs = set(struct.pack(self.ptrtype, address) for address in self.libs.cname2addr.values()) + self.off2name = {v:k for k,v in self.libs.name2off.items()} + self.state = self.STATE_SEARCH + + # STATE_FUNC_FOUND + self.cur_list = [] + self.cur_list_lib = None + + # STATE_END_FUNC_LIST + self.seen = [] + + def format_func_info(self, func_info, func_addr): + return { + "lib_addr": func_info[0], + "lib_name": self.off2name[func_info[0]], + "entry_name": func_info[1], + "entry_module_addr": func_addr, + "entry_memory_addr": self.cur_address, + } + + def transition(self, data): + if self.state == self.STATE_SEARCH: + if data in self.func_addrs: + self.state = self.STATE_FUNC_FOUND + func_addr = struct.unpack(self.ptrtype, data)[0] + func_info = self.libs.fad2info[func_addr] + self.cur_list = [self.format_func_info(func_info, func_addr)] + self.cur_list_lib = func_info[0] + elif self.state == self.STATE_FUNC_FOUND: + if data == (b"\x00" * len(data)): + self.state = self.STATE_END_FUNC_LIST + elif data in self.func_addrs: + func_addr = struct.unpack(self.ptrtype, data)[0] + func_info = self.libs.fad2info[func_addr] + if func_info[0] != self.cur_list_lib: + # The list must belong to the same library + self.state = self.STATE_SEARCH + return + self.cur_list.append(self.format_func_info(func_info, func_addr)) + else: + self.state == self.STATE_SEARCH + elif self.state == self.STATE_END_FUNC_LIST: + self.seen.append(self.cur_list) + self.state = self.STATE_SEARCH + self.transition(data) + else: + raise ValueError() + + def run(self): + while True: + data, address = yield + self.cur_address = address + self.transition(data) + + +class ImpRecStrategy(object): + """ + Naive import reconstruction, similar to ImpRec + + It looks for a continuation of module export addresses, ended by a NULL entry, ie: + [...] + &Kernel32::LoadLibraryA + &Kernel32::HeapCreate + 00 00 00 00 + [...] + + Usage: + >>> sb = Sandbox[...] + >>> sb.run() + >>> imprec = ImpRecStrategy(sb.jitter, sb.libs, size=32) + >>> imprec.recover_import() + List<List<Recovered functions>> + + -> sb.libs has also been updated, ready to be passed to `vm2pe` + """ + def __init__(self, jitter, libs, size): + self._jitter = jitter + self._libs = libs + if size == 32: + self._ptrtype = "<I" + elif size == 64: + self._ptrtype = "<Q" + else: + ValueError("Unsupported size: %d" % size) + + def recover_import(self, update_libs=True): + # Hypothesis: align on 4 + # Search for several addresses from `func_addrs` ending with a `\x00` + fsm_obj = ImpRecStateMachine(self._libs, self._ptrtype) + fsm = fsm_obj.run() + fsm.send(None) + for addr_start, page_info in self._jitter.vm.get_all_memory().items(): + data = page_info["data"] + for i in range(0, page_info["size"], 4): + fsm.send((data[i:i+4], addr_start + i)) + + # Apply to libs + if update_libs: + for entry_list in fsm_obj.seen: + for func_info in entry_list: + self._libs.lib_imp2dstad[func_info["lib_addr"]][func_info["entry_name"]].add(func_info["entry_memory_addr"]) + + return fsm_obj.seen |