about summary refs log tree commit diff stats
path: root/src/miasm/jitter/jitcore_python.py
diff options
context:
space:
mode:
authorTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-10-14 09:09:29 +0000
committerTheofilos Augoustis <theofilos.augoustis@gmail.com>2025-10-14 09:09:29 +0000
commit579cf1d03fb932083e6317967d1613d5c2587fb6 (patch)
tree629f039935382a2a7391bce9253f6c9968159049 /src/miasm/jitter/jitcore_python.py
parent51c15d3ea2e16d4fc5f0f01a3b9befc66b1f982e (diff)
downloadfocaccia-miasm-ta/nix.tar.gz
focaccia-miasm-ta/nix.zip
Convert to src-layout ta/nix
Diffstat (limited to 'src/miasm/jitter/jitcore_python.py')
-rw-r--r--src/miasm/jitter/jitcore_python.py219
1 files changed, 219 insertions, 0 deletions
diff --git a/src/miasm/jitter/jitcore_python.py b/src/miasm/jitter/jitcore_python.py
new file mode 100644
index 00000000..269df973
--- /dev/null
+++ b/src/miasm/jitter/jitcore_python.py
@@ -0,0 +1,219 @@
+from __future__ import print_function
+from builtins import zip
+import miasm.jitter.jitcore as jitcore
+from miasm.expression.expression import ExprInt, ExprLoc
+import miasm.jitter.csts as csts
+from miasm.expression.simplifications import expr_simp_explicit
+from miasm.jitter.emulatedsymbexec import EmulatedSymbExec
+
+################################################################################
+#                              Python jitter Core                              #
+################################################################################
+
+
+class JitCore_Python(jitcore.JitCore):
+    "JiT management, using Miasm2 Symbol Execution engine as backend"
+
+    SymbExecClass = EmulatedSymbExec
+
+    def __init__(self, lifter, bin_stream):
+        super(JitCore_Python, self).__init__(lifter, bin_stream)
+        self.lifter = lifter
+        self.ircfg = self.lifter.new_ircfg()
+
+        # CPU & VM (None for now) will be set later
+
+        self.symbexec = self.SymbExecClass(
+            None, None,
+            self.lifter, {},
+            sb_expr_simp=expr_simp_explicit
+        )
+        self.symbexec.enable_emulated_simplifications()
+
+    def set_cpu_vm(self, cpu, vm):
+        self.symbexec.cpu = cpu
+        self.symbexec.vm = vm
+
+    def load(self):
+        "Preload symbols according to current architecture"
+        self.symbexec.reset_regs()
+
+    def arch_specific(self):
+        """Return arch specific information for the current architecture"""
+        arch = self.lifter.arch
+        has_delayslot = False
+        if arch.name == "mips32":
+            from miasm.arch.mips32.jit import mipsCGen
+            cgen_class = mipsCGen
+            has_delayslot = True
+        elif arch.name == "arm":
+            from miasm.arch.arm.jit import arm_CGen
+            cgen_class = arm_CGen
+        else:
+            from miasm.jitter.codegen import CGen
+            cgen_class = CGen
+        return cgen_class(self.lifter), has_delayslot
+
+    def add_block(self, asmblock):
+        """Create a python function corresponding to an AsmBlock
+        @asmblock: AsmBlock
+        """
+
+        # TODO: merge duplicate code with CGen, llvmconvert
+        codegen, has_delayslot = self.arch_specific()
+        irblocks_list = codegen.block2assignblks(asmblock)
+        instr_offsets = [line.offset for line in asmblock.lines]
+
+        loc_db = self.lifter.loc_db
+        local_loc_keys = []
+        for irblocks in irblocks_list:
+            for irblock in irblocks:
+                local_loc_keys.append(irblock.loc_key)
+
+        def myfunc(cpu):
+            """Execute the function according to cpu and vmmngr states
+            @cpu: JitCpu instance
+            """
+            # Get virtual memory handler
+            vmmngr = cpu.vmmngr
+
+            # Get execution engine (EmulatedSymbExec instance)
+            exec_engine = self.symbexec
+
+            # Refresh CPU values according to @cpu instance
+            exec_engine.update_engine_from_cpu()
+
+            # Get initial loc_key
+            cur_loc_key = asmblock.loc_key
+
+            # Update PC helper
+            update_pc = lambda value: setattr(cpu, self.lifter.pc.name, value)
+
+            while True:
+                # Retrieve the expected irblock
+                for instr, irblocks in zip(asmblock.lines, irblocks_list):
+                    for index, irblock in enumerate(irblocks):
+                        if irblock.loc_key == cur_loc_key:
+                            break
+                    else:
+                        continue
+                    break
+                else:
+                    raise RuntimeError("Unable to find the block for %r" % cur_loc_key)
+
+                instr_attrib, irblocks_attributes = codegen.get_attributes(
+                    instr, irblocks, self.log_mn, self.log_regs
+                )
+                irblock_attributes = irblocks_attributes[index]
+
+                # Do IRBlock
+                new_irblock = self.lifter.irbloc_fix_regs_for_mode(
+                    irblock, self.lifter.attrib
+                )
+                if index == 0:
+                    # Pre code
+                    if instr_attrib.log_mn:
+                        print("%.8X %s" % (
+                            instr_attrib.instr.offset,
+                            instr_attrib.instr.to_string(loc_db)
+                        ))
+
+                # Exec IRBlock
+                instr = instr_attrib.instr
+
+                for index, assignblk in enumerate(irblock):
+                    attributes = irblock_attributes[index]
+
+                    # Eval current instruction (in IR)
+                    exec_engine.eval_updt_assignblk(assignblk)
+
+                    # Check memory access / write exception
+                    # TODO: insert a check between memory reads and writes
+                    if attributes.mem_read or attributes.mem_write:
+                        # Restricted exception
+                        flag = ~csts.EXCEPT_CODE_AUTOMOD & csts.EXCEPT_DO_NOT_UPDATE_PC
+                        if (vmmngr.get_exception() & flag != 0):
+                            # Do not update registers
+                            update_pc(instr.offset)
+                            return instr.offset
+
+                    # Update registers values
+                    exec_engine.update_cpu_from_engine()
+
+                    # Check post assignblk exception flags
+                    if attributes.set_exception:
+                        # Restricted exception
+                        if cpu.get_exception() > csts.EXCEPT_NUM_UPDT_EIP:
+                            # Update PC
+                            update_pc(instr.offset)
+                            return instr.offset
+
+                dst = exec_engine.eval_expr(self.lifter.IRDst)
+                if dst.is_int():
+                    loc_key = loc_db.get_or_create_offset_location(int(dst))
+                    dst = ExprLoc(loc_key, dst.size)
+
+                assert dst.is_loc()
+                loc_key = dst.loc_key
+                offset = loc_db.get_location_offset(loc_key)
+                if offset is None:
+                    # Avoid checks on generated label
+                    cur_loc_key = loc_key
+                    continue
+
+                if instr_attrib.log_regs:
+                    update_pc(offset)
+                    cpu.dump_gpregs_with_attrib(self.lifter.attrib)
+
+                # Post-instr checks
+                if instr_attrib.mem_read | instr_attrib.mem_write:
+                    vmmngr.check_memory_breakpoint()
+                    vmmngr.check_invalid_code_blocs()
+                    if vmmngr.get_exception():
+                        update_pc(offset)
+                        return offset
+
+                if instr_attrib.set_exception:
+                    if cpu.get_exception():
+                        update_pc(offset)
+                        return offset
+
+                if instr_attrib.mem_read | instr_attrib.mem_write:
+                    vmmngr.reset_memory_access()
+
+                # Manage resulting address
+                if (loc_key in local_loc_keys and
+                    offset > instr.offset):
+                    # Forward local jump
+                    # Note: a backward local jump has to be promoted to extern,
+                    # for max_exec_per_call support
+                    cur_loc_key = loc_key
+                    continue
+
+                # Delay slot
+                if has_delayslot:
+                    delay_slot_set = exec_engine.eval_expr(codegen.delay_slot_set)
+                    if delay_slot_set.is_int() and int(delay_slot_set) != 0:
+                        return int(exec_engine.eval_expr(codegen.delay_slot_dst))
+
+                # Extern of asmblock, must have an offset
+                assert offset is not None
+                return offset
+
+        # Associate myfunc with current loc_key
+        offset = loc_db.get_location_offset(asmblock.loc_key)
+        assert offset is not None
+        self.offset_to_jitted_func[offset] = myfunc
+
+    def exec_wrapper(self, loc_key, cpu, _offset_to_jitted_func, _stop_offsets,
+                     _max_exec_per_call):
+        """Call the function @loc_key with @cpu
+        @loc_key: function's loc_key
+        @cpu: JitCpu instance
+        """
+
+        # Get Python function corresponding to @loc_key
+        fc_ptr = self.offset_to_jitted_func[loc_key]
+
+        # Execute the function
+        return fc_ptr(cpu)