diff options
| -rw-r--r-- | src/focaccia/symbolic.py | 10 | ||||
| -rwxr-xr-x | src/focaccia/tools/capture_transforms.py | 8 | ||||
| -rw-r--r-- | src/focaccia/utils.py | 29 |
3 files changed, 42 insertions, 5 deletions
diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index 9a32962..81c78e6 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -22,6 +22,7 @@ from .lldb_target import ( from .miasm_util import MiasmSymbolResolver, eval_expr, make_machine from .snapshot import ReadableProgramState, RegisterAccessError, MemoryAccessError from .trace import Trace, TraceEnvironment +from .utils import timebound, TimeoutError logger = logging.getLogger('focaccia-symbolic') debug = logger.debug @@ -788,7 +789,8 @@ class SymbolicTracer: def trace(self, start_addr: int | None = None, - stop_addr: int | None = None) -> Trace[SymbolicTransform]: + stop_addr: int | None = None, + time_limit: int | None = None) -> Trace[SymbolicTransform]: """Execute a program and compute state transformations between executed instructions. @@ -846,7 +848,11 @@ class SymbolicTracer: # Run instruction conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) - new_pc, modified = run_instruction(instruction.instr, conc_state, ctx.lifter) + try: + new_pc, modified = timebound(time_limit, run_instruction, instruction.instr, conc_state, ctx.lifter) + except TimeoutError: + warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') + new_pc, modified = None, {} if self.cross_validate and new_pc: # Predict next concrete state. diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index 23d150d..10dfdb2 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -45,6 +45,11 @@ def main(): default=None, type=utils.to_int, help='Set a final address up until which to collect the symoblic trace') + prog.add_argument('--insn-time-limit', + default=None, + type=utils.to_num, + help='Set a time limit for executing an instruction symbolically, skip' + 'instruction when limit is exceeded') args = prog.parse_args() if args.debug: @@ -77,7 +82,8 @@ def main(): force=args.force) trace = tracer.trace(start_addr=args.start_address, - stop_addr=args.stop_address) + stop_addr=args.stop_address, + time_limit=args.insn_time_limit) with open(args.output, 'w') as file: parser.serialize_transformations(trace, file) diff --git a/src/focaccia/utils.py b/src/focaccia/utils.py index efbb6b2..c648d41 100644 --- a/src/focaccia/utils.py +++ b/src/focaccia/utils.py @@ -1,9 +1,10 @@ from __future__ import annotations -import ctypes import os -import shutil import sys +import shutil +import ctypes +import signal from functools import total_ordering from hashlib import sha256 @@ -118,3 +119,27 @@ def print_result(result, min_severity: ErrorSeverity): def to_int(value: str) -> int: return int(value, 0) +def to_num(value: str) -> int | float: + try: + return int(value, 0) + except: + return float(value) + +class TimeoutError(Exception): + pass + +def timebound(timeout: int | float | None, func, *args, **kwargs): + if timeout is None: + return func(*args, **kwargs) + + def _handle_timeout(signum, frame): + raise TimeoutError(f'Function exceeded {timeout} limit') + + old_handler = signal.signal(signal.SIGALRM, _handle_timeout) + signal.setitimer(signal.ITIMER_REAL, timeout) + try: + return func(*args, **kwargs) + finally: + signal.setitimer(signal.ITIMER_REAL, 0) + signal.signal(signal.SIGALRM, old_handler) + |