about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorserpilliere <serpilliere@users.noreply.github.com>2023-04-23 21:24:51 +0200
committerGitHub <noreply@github.com>2023-04-23 21:24:51 +0200
commit49ec4bc9a90391bc796ac6940a6d9b77deb17ac9 (patch)
tree1a890a007c103e643ffe4e1b13caada32c901ef9
parent230d528c50d8a2870a89011fc1e660fcab4910ff (diff)
parentb66becdead10f0bb2aa009dda61c422e79c567fe (diff)
downloadfocaccia-miasm-49ec4bc9a90391bc796ac6940a6d9b77deb17ac9.tar.gz
focaccia-miasm-49ec4bc9a90391bc796ac6940a6d9b77deb17ac9.zip
Merge pull request #1448 from cea-sec/generic-unpack
Generic import recovery (cheap ImpRec style)
-rw-r--r--example/jitter/unpack_generic.py53
-rw-r--r--miasm/jitter/loader/pe.py121
-rwxr-xr-xtest/test_all.py6
3 files changed, 180 insertions, 0 deletions
diff --git a/example/jitter/unpack_generic.py b/example/jitter/unpack_generic.py
new file mode 100644
index 00000000..3329d2a9
--- /dev/null
+++ b/example/jitter/unpack_generic.py
@@ -0,0 +1,53 @@
+from __future__ import print_function
+import os
+import logging
+from miasm.analysis.sandbox import Sandbox_Win_x86_32
+from miasm.jitter.loader.pe import vm2pe, ImpRecStrategy
+from miasm.core.locationdb import LocationDB
+from miasm.jitter.jitload import JitterException
+
+parser = Sandbox_Win_x86_32.parser(description="Generic & dummy unpacker")
+parser.add_argument("filename", help="PE Filename")
+parser.add_argument("--oep", help="Stop and dump if this address is reached")
+parser.add_argument('-v', "--verbose",
+                    help="verbose mode", action="store_true")
+options = parser.parse_args()
+
+loc_db = LocationDB()
+sb = Sandbox_Win_x86_32(
+    loc_db, options.filename, options, globals(),
+    parse_reloc=False
+)
+
+if options.verbose is True:
+    logging.basicConfig(level=logging.INFO)
+else:
+    logging.basicConfig(level=logging.WARNING)
+
+if options.verbose is True:
+    print(sb.jitter.vm)
+
+def stop(jitter):
+    logging.info('User provided OEP reached')
+    # Stop execution
+    return False
+
+if options.oep:
+    # Set callbacks
+    sb.jitter.add_breakpoint(int(options.oep, 0), stop)
+    
+# Run until an error is encountered - IT IS UNLIKELY THE ORIGINAL ENTRY POINT
+try:
+    sb.run()
+except (JitterException, ValueError) as e:
+    logging.exception(e)
+
+out_fname = "%s.dump" % (options.filename)
+
+# Try a generic approach to rebuild the Import Table
+imprec = ImpRecStrategy(sb.jitter, sb.libs, 32)
+imprec.recover_import()
+
+# Rebuild the PE and dump it
+print("Dump to %s" % out_fname)
+vm2pe(sb.jitter, out_fname, libs=sb.libs, e_orig=sb.pe)
diff --git a/miasm/jitter/loader/pe.py b/miasm/jitter/loader/pe.py
index 28010b74..6d359a9a 100644
--- a/miasm/jitter/loader/pe.py
+++ b/miasm/jitter/loader/pe.py
@@ -695,3 +695,124 @@ def guess_arch(pe):
     """Return the architecture specified by the PE container @pe.
     If unknown, return None"""
     return PE_machine.get(pe.Coffhdr.machine, None)
+
+
+class ImpRecStateMachine(object):
+    """
+    Finite State Machine used for internal purpose only.
+    See `ImpRecStrategy` for more details.
+    """
+
+    # Looking for a function pointer
+    STATE_SEARCH = 0
+    # Candidate function list
+    STATE_FUNC_FOUND = 1
+    # Function list found, terminated by a NULL entry
+    STATE_END_FUNC_LIST = 2
+
+    def __init__(self, libs, ptrtype):
+        self.ptrtype = ptrtype
+        self.libs = libs
+        self.func_addrs = set(struct.pack(self.ptrtype, address) for address in self.libs.cname2addr.values())
+        self.off2name = {v:k for k,v in self.libs.name2off.items()}
+        self.state = self.STATE_SEARCH
+
+        # STATE_FUNC_FOUND
+        self.cur_list = []
+        self.cur_list_lib = None
+
+        # STATE_END_FUNC_LIST
+        self.seen = []
+
+    def format_func_info(self, func_info, func_addr):
+        return {
+            "lib_addr": func_info[0],
+            "lib_name": self.off2name[func_info[0]],
+            "entry_name": func_info[1],
+            "entry_module_addr": func_addr,
+            "entry_memory_addr": self.cur_address,
+        }
+        
+    def transition(self, data):
+        if self.state == self.STATE_SEARCH:
+            if data in self.func_addrs:
+                self.state = self.STATE_FUNC_FOUND
+                func_addr = struct.unpack(self.ptrtype, data)[0]
+                func_info = self.libs.fad2info[func_addr]
+                self.cur_list = [self.format_func_info(func_info, func_addr)]
+                self.cur_list_lib = func_info[0]
+        elif self.state == self.STATE_FUNC_FOUND:
+            if data == (b"\x00" * len(data)):
+                self.state = self.STATE_END_FUNC_LIST
+            elif data in self.func_addrs:
+                func_addr = struct.unpack(self.ptrtype, data)[0]
+                func_info = self.libs.fad2info[func_addr]
+                if func_info[0] != self.cur_list_lib:
+                    # The list must belong to the same library
+                    self.state = self.STATE_SEARCH
+                    return
+                self.cur_list.append(self.format_func_info(func_info, func_addr))
+            else:
+                self.state == self.STATE_SEARCH
+        elif self.state == self.STATE_END_FUNC_LIST:
+            self.seen.append(self.cur_list)
+            self.state = self.STATE_SEARCH
+            self.transition(data)
+        else:
+            raise ValueError()
+        
+    def run(self):
+        while True:
+            data, address = yield
+            self.cur_address = address
+            self.transition(data)
+
+
+class ImpRecStrategy(object):
+    """
+    Naive import reconstruction, similar to ImpRec
+
+    It looks for a continuation of module export addresses, ended by a NULL entry, ie:
+    [...]
+    &Kernel32::LoadLibraryA
+    &Kernel32::HeapCreate
+    00 00 00 00
+    [...]
+
+    Usage:
+    >>> sb = Sandbox[...]
+    >>> sb.run()
+    >>> imprec = ImpRecStrategy(sb.jitter, sb.libs, size=32)
+    >>> imprec.recover_import()
+    List<List<Recovered functions>>
+
+    -> sb.libs has also been updated, ready to be passed to `vm2pe`
+    """
+    def __init__(self, jitter, libs, size):
+        self._jitter = jitter
+        self._libs = libs
+        if size == 32:
+            self._ptrtype = "<I"
+        elif size == 64:
+            self._ptrtype = "<Q"
+        else:
+            ValueError("Unsupported size: %d" % size)
+
+    def recover_import(self, update_libs=True):
+        # Hypothesis: align on 4
+        # Search for several addresses from `func_addrs` ending with a `\x00`
+        fsm_obj = ImpRecStateMachine(self._libs, self._ptrtype)
+        fsm = fsm_obj.run()
+        fsm.send(None)
+        for addr_start, page_info in self._jitter.vm.get_all_memory().items():
+            data = page_info["data"]
+            for i in range(0, page_info["size"], 4):
+                fsm.send((data[i:i+4], addr_start + i))
+
+        # Apply to libs
+        if update_libs:
+            for entry_list in fsm_obj.seen:
+                for func_info in entry_list:
+                    self._libs.lib_imp2dstad[func_info["lib_addr"]][func_info["entry_name"]].add(func_info["entry_memory_addr"])
+
+        return fsm_obj.seen
diff --git a/test/test_all.py b/test/test_all.py
index 591f3d8e..85ebb31f 100755
--- a/test/test_all.py
+++ b/test/test_all.py
@@ -800,6 +800,12 @@ for jitter in ExampleJitter.jitter_engines:
                              products=[Example.get_sample("box_upx_exe_unupx.bin")],
                              tags=tags.get(jitter, []))
 
+    testset += ExampleJitter(["unpack_generic.py",
+                              Example.get_sample("box_upx.exe")] +
+                             ["--jitter", jitter, "-o"],
+                             products=[Example.get_sample("box_upx.exe.dump")],
+                             tags=tags.get(jitter, []))
+
     testset += ExampleJitter(["memory_breakpoint.py",
                               Example.get_sample("box_upx.exe")] +
                              ["--jitter", jitter] +