about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--miasm/jitter/loader/pe.py121
1 files changed, 121 insertions, 0 deletions
diff --git a/miasm/jitter/loader/pe.py b/miasm/jitter/loader/pe.py
index 28010b74..6d359a9a 100644
--- a/miasm/jitter/loader/pe.py
+++ b/miasm/jitter/loader/pe.py
@@ -695,3 +695,124 @@ def guess_arch(pe):
     """Return the architecture specified by the PE container @pe.
     If unknown, return None"""
     return PE_machine.get(pe.Coffhdr.machine, None)
+
+
+class ImpRecStateMachine(object):
+    """
+    Finite State Machine used for internal purpose only.
+    See `ImpRecStrategy` for more details.
+    """
+
+    # Looking for a function pointer
+    STATE_SEARCH = 0
+    # Candidate function list
+    STATE_FUNC_FOUND = 1
+    # Function list found, terminated by a NULL entry
+    STATE_END_FUNC_LIST = 2
+
+    def __init__(self, libs, ptrtype):
+        self.ptrtype = ptrtype
+        self.libs = libs
+        self.func_addrs = set(struct.pack(self.ptrtype, address) for address in self.libs.cname2addr.values())
+        self.off2name = {v:k for k,v in self.libs.name2off.items()}
+        self.state = self.STATE_SEARCH
+
+        # STATE_FUNC_FOUND
+        self.cur_list = []
+        self.cur_list_lib = None
+
+        # STATE_END_FUNC_LIST
+        self.seen = []
+
+    def format_func_info(self, func_info, func_addr):
+        return {
+            "lib_addr": func_info[0],
+            "lib_name": self.off2name[func_info[0]],
+            "entry_name": func_info[1],
+            "entry_module_addr": func_addr,
+            "entry_memory_addr": self.cur_address,
+        }
+        
+    def transition(self, data):
+        if self.state == self.STATE_SEARCH:
+            if data in self.func_addrs:
+                self.state = self.STATE_FUNC_FOUND
+                func_addr = struct.unpack(self.ptrtype, data)[0]
+                func_info = self.libs.fad2info[func_addr]
+                self.cur_list = [self.format_func_info(func_info, func_addr)]
+                self.cur_list_lib = func_info[0]
+        elif self.state == self.STATE_FUNC_FOUND:
+            if data == (b"\x00" * len(data)):
+                self.state = self.STATE_END_FUNC_LIST
+            elif data in self.func_addrs:
+                func_addr = struct.unpack(self.ptrtype, data)[0]
+                func_info = self.libs.fad2info[func_addr]
+                if func_info[0] != self.cur_list_lib:
+                    # The list must belong to the same library
+                    self.state = self.STATE_SEARCH
+                    return
+                self.cur_list.append(self.format_func_info(func_info, func_addr))
+            else:
+                self.state == self.STATE_SEARCH
+        elif self.state == self.STATE_END_FUNC_LIST:
+            self.seen.append(self.cur_list)
+            self.state = self.STATE_SEARCH
+            self.transition(data)
+        else:
+            raise ValueError()
+        
+    def run(self):
+        while True:
+            data, address = yield
+            self.cur_address = address
+            self.transition(data)
+
+
+class ImpRecStrategy(object):
+    """
+    Naive import reconstruction, similar to ImpRec
+
+    It looks for a continuation of module export addresses, ended by a NULL entry, ie:
+    [...]
+    &Kernel32::LoadLibraryA
+    &Kernel32::HeapCreate
+    00 00 00 00
+    [...]
+
+    Usage:
+    >>> sb = Sandbox[...]
+    >>> sb.run()
+    >>> imprec = ImpRecStrategy(sb.jitter, sb.libs, size=32)
+    >>> imprec.recover_import()
+    List<List<Recovered functions>>
+
+    -> sb.libs has also been updated, ready to be passed to `vm2pe`
+    """
+    def __init__(self, jitter, libs, size):
+        self._jitter = jitter
+        self._libs = libs
+        if size == 32:
+            self._ptrtype = "<I"
+        elif size == 64:
+            self._ptrtype = "<Q"
+        else:
+            ValueError("Unsupported size: %d" % size)
+
+    def recover_import(self, update_libs=True):
+        # Hypothesis: align on 4
+        # Search for several addresses from `func_addrs` ending with a `\x00`
+        fsm_obj = ImpRecStateMachine(self._libs, self._ptrtype)
+        fsm = fsm_obj.run()
+        fsm.send(None)
+        for addr_start, page_info in self._jitter.vm.get_all_memory().items():
+            data = page_info["data"]
+            for i in range(0, page_info["size"], 4):
+                fsm.send((data[i:i+4], addr_start + i))
+
+        # Apply to libs
+        if update_libs:
+            for entry_list in fsm_obj.seen:
+                for func_info in entry_list:
+                    self._libs.lib_imp2dstad[func_info["lib_addr"]][func_info["entry_name"]].add(func_info["entry_memory_addr"])
+
+        return fsm_obj.seen