diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/focaccia/native/__init__.py | 0 | ||||
| -rw-r--r-- | src/focaccia/native/lldb_target.py (renamed from src/focaccia/lldb_target.py) | 4 | ||||
| -rw-r--r-- | src/focaccia/native/tracer.py | 342 | ||||
| -rw-r--r-- | src/focaccia/symbolic.py | 359 | ||||
| -rwxr-xr-x | src/focaccia/tools/capture_transforms.py | 2 |
5 files changed, 352 insertions, 355 deletions
diff --git a/src/focaccia/native/__init__.py b/src/focaccia/native/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/focaccia/native/__init__.py diff --git a/src/focaccia/lldb_target.py b/src/focaccia/native/lldb_target.py index 940b3d9..ff43643 100644 --- a/src/focaccia/lldb_target.py +++ b/src/focaccia/native/lldb_target.py @@ -3,8 +3,8 @@ import logging import lldb -from .arch import supported_architectures -from .snapshot import ProgramState +from focaccia.snapshot import ProgramState +from focaccia.arch import supported_architectures logger = logging.getLogger('focaccia-lldb-target') debug = logger.debug diff --git a/src/focaccia/native/tracer.py b/src/focaccia/native/tracer.py new file mode 100644 index 0000000..9dbc32a --- /dev/null +++ b/src/focaccia/native/tracer.py @@ -0,0 +1,342 @@ +"""Concolic Tracer for native programs.""" + +from __future__ import annotations + +import sys +import logging + +from pathlib import Path + +from focaccia.utils import timebound, TimeoutError +from focaccia.trace import Trace, TraceEnvironment +from focaccia.miasm_util import MiasmSymbolResolver +from focaccia.snapshot import ReadableProgramState, RegisterAccessError +from focaccia.symbolic import SymbolicTransform, DisassemblyContext, run_instruction + +from .lldb_target import LLDBConcreteTarget, LLDBLocalTarget, LLDBRemoteTarget + +logger = logging.getLogger('focaccia-symbolic') +debug = logger.debug +info = logger.info +warn = logger.warn + +# Disable Miasm's disassembly logger +logging.getLogger('asmblock').setLevel(logging.CRITICAL) + +class ValidationError(Exception): + pass + +class SpeculativeTracer(ReadableProgramState): + def __init__(self, target: LLDBConcreteTarget): + super().__init__(target.arch) + self.target = target + self.pc = target.read_register('pc') + self.speculative_pc: int | None = None + self.speculative_count: int = 0 + + self.read_cache = {} + + def speculate(self, new_pc): + self.read_cache.clear() + if new_pc is None: + self.progress_execution() + self.target.step() + self.pc = self.target.read_register('pc') + self.speculative_pc = None + self.speculative_count = 0 + return + + new_pc = int(new_pc) + self.speculative_pc = new_pc + self.speculative_count += 1 + + def progress_execution(self) -> None: + if self.speculative_pc is not None and self.speculative_count != 0: + debug(f'Updating PC to {hex(self.speculative_pc)}') + if self.speculative_count == 1: + self.target.step() + else: + self.target.run_until(self.speculative_pc) + + self.pc = self.speculative_pc + self.speculative_pc = None + self.speculative_count = 0 + + self.read_cache.clear() + + def run_until(self, addr: int): + if self.speculative_pc: + raise Exception('Attempting manual execution with speculative execution enabled') + self.target.run_until(addr) + self.pc = addr + + def step(self): + self.progress_execution() + if self.target.is_exited(): + return + self.target.step() + self.pc = self.target.read_register('pc') + + def _cache(self, name: str, value): + self.read_cache[name] = value + return value + + def read_pc(self) -> int: + if self.speculative_pc is not None: + return self.speculative_pc + return self.pc + + def read_flags(self) -> dict[str, int | bool]: + if 'flags' in self.read_cache: + return self.read_cache['flags'] + self.progress_execution() + return self._cache('flags', self.target.read_flags()) + + def read_register(self, reg: str) -> int: + regname = self.arch.to_regname(reg) + if regname is None: + raise RegisterAccessError(reg, f'Not a register name: {reg}') + + if reg in self.read_cache: + return self.read_cache[reg] + + self.progress_execution() + return self._cache(reg, self.target.read_register(regname)) + + def write_register(self, regname: str, value: int): + self.progress_execution() + self.read_cache.pop(regname, None) + self.target.write_register(regname, value) + + def read_instructions(self, addr: int, size: int) -> bytes: + return self.target.read_memory(addr, size) + + def read_memory(self, addr: int, size: int) -> bytes: + self.progress_execution() + cache_name = f'{addr}_{size}' + if cache_name in self.read_cache: + return self.read_cache[cache_name] + return self._cache(cache_name, self.target.read_memory(addr, size)) + + def write_memory(self, addr: int, value: bytes): + self.progress_execution() + self.read_cache.pop(addr, None) + self.target.write_memory(addr, value) + + def __getattr__(self, name: str): + return getattr(self.target, name) + +class SymbolicTracer: + """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a + program with concrete state and collect its symbolic transforms + """ + def __init__(self, + env: TraceEnvironment, + remote: str | None=None, + force: bool=False, + cross_validate: bool=False): + self.env = env + self.force = force + self.remote = remote + self.cross_validate = cross_validate + self.target = SpeculativeTracer(self.create_debug_target()) + + self.nondet_events = self.env.detlog.events() + self.next_event: int | None = None + + def create_debug_target(self) -> LLDBConcreteTarget: + binary = self.env.binary_name + if self.remote is False: + debug(f'Launching local debug target {binary} {self.env.argv}') + debug(f'Environment: {self.env}') + return LLDBLocalTarget(binary, self.env.argv, self.env.envp) + + debug(f'Connecting to remote debug target {self.remote}') + target = LLDBRemoteTarget(self.remote, binary) + + module_name = target.determine_name() + binary = str(Path(self.env.binary_name).resolve()) + if binary != module_name: + warn(f'Discovered binary name {module_name} differs from specified name {binary}') + + return target + + def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): + debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') + predicted_regs = transform.eval_register_transforms(self.target) + predicted_mems = transform.eval_memory_transforms(self.target) + return predicted_regs, predicted_mems + + def validate(self, + instruction: Instruction, + transform: SymbolicTransform, + predicted_regs: dict[str, int], + predicted_mems: dict[int, bytes]): + # Verify last generated transform by comparing concrete state against + # predicted values. + if self.target.is_exited(): + return + + debug('Cross-validating symbolic transforms by comparing actual to predicted values') + for reg, val in predicted_regs.items(): + conc_val = self.target.read_register(reg) + if conc_val != val: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}:' + f' Predicted {reg} = {hex(val)}, but the' + f' concrete state has value {reg} = {hex(conc_val)}.' + f'\nFaulty transformation: {transform}') + for addr, data in predicted_mems.items(): + conc_data = self.target.read_memory(addr, len(data)) + if conc_data != data: + raise ValidationError(f'Symbolic execution backend generated false equation for' + f' [{hex(instruction.addr)}]: {instruction}: Predicted' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' + f' but the concrete state has value' + f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' + f'\nFaulty transformation: {transform}') + + def progress_event(self) -> None: + if (self.next_event + 1) < len(self.nondet_events): + self.next_event += 1 + debug(f'Next event to handle at index {self.next_event}') + else: + self.next_event = None + + def post_event(self) -> None: + if self.next_event: + if self.nondet_events[self.next_event].pc == 0: + # Exit sequence + debug('Completed exit event') + self.target.run() + + debug(f'Completed handling event at index {self.next_event}') + self.progress_event() + + def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: + if self.nondet_events: + pc = pc + instruction.length # detlog reports next pc for each event + if self.next_event and self.nondet_events[self.next_event].match(pc, self.target): + debug('Current instruction matches next event; stepping through it') + self.progress_event() + return True + else: + if self.target.arch.is_instr_syscall(str(instruction)): + return True + return False + + def progress(self, new_pc, step: bool = False) -> int | None: + self.target.speculate(new_pc) + if step: + self.target.progress_execution() + if self.target.is_exited(): + return None + return self.target.read_pc() + + def trace(self, time_limit: int | None = None) -> Trace[SymbolicTransform]: + """Execute a program and compute state transformations between executed + instructions. + + :param start_addr: Address from which to start tracing. + :param stop_addr: Address until which to trace. + """ + # Set up concrete reference state + if self.env.start_address is not None: + self.target.run_until(self.env.start_address) + + for i in range(len(self.nondet_events)): + if self.nondet_events[i].pc == self.target.read_pc(): + self.next_event = i+1 + if self.next_event >= len(self.nondet_events): + break + + debug(f'Starting from event {self.nondet_events[i]} onwards') + break + + ctx = DisassemblyContext(self.target) + arch = ctx.arch + + if logger.isEnabledFor(logging.DEBUG): + debug('Tracing program with the following non-deterministic events') + for event in self.nondet_events: + debug(event) + + # Trace concolically + strace: list[SymbolicTransform] = [] + while not self.target.is_exited(): + pc = self.target.read_pc() + + if self.env.stop_address is not None and pc == self.env.stop_address: + break + + assert(pc != 0) + + # Disassemble instruction at the current PC + tid = self.target.get_current_tid() + try: + instruction = ctx.disassemble(pc) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + err = sys.exc_info()[1] + + # Try to recovery by using the LLDB disassembly instead + try: + alt_disas = self.target.get_disassembly(pc) + instruction = Instruction.from_string(alt_disas, ctx.arch, pc, + self.target.get_instruction_size(pc)) + info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') + except: + if self.force: + if alt_disas: + warn(f'[{tid}] Unable to handle instruction {alt_disas} at {hex(pc)} in Miasm.' + f' Skipping.') + else: + warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' + f' Skipping.') + self.target.step() + continue + raise # forward exception + + is_event = self.is_stepping_instr(pc, instruction) + + # Run instruction + conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) + + try: + new_pc, modified = timebound(time_limit, run_instruction, + instruction.instr, conc_state, ctx.lifter) + except TimeoutError: + warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') + new_pc, modified = None, {} + + if self.cross_validate and new_pc: + # Predict next concrete state. + # We verify the symbolic execution backend on the fly for some + # additional protection from bugs in the backend. + new_pc = int(new_pc) + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + pred_regs, pred_mems = self.predict_next_state(instruction, transform) + self.progress(new_pc, step=is_event) + + try: + self.validate(instruction, transform, pred_regs, pred_mems) + except ValidationError as e: + if self.force: + warn(f'Cross-validation failed: {e}') + continue + raise + else: + new_pc = self.progress(new_pc, step=is_event) + if new_pc is None: + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) + strace.append(transform) + continue # we're done + transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) + + strace.append(transform) + + if is_event: + self.post_event() + + return Trace(strace, self.env) + diff --git a/src/focaccia/symbolic.py b/src/focaccia/symbolic.py index 2a66a26..b83b289 100644 --- a/src/focaccia/symbolic.py +++ b/src/focaccia/symbolic.py @@ -1,42 +1,17 @@ -"""Tools and utilities for execution with Miasm.""" +"""Tools and utilities for execution with Miasm.""" from __future__ import annotations -import sys -import logging - -from pathlib import Path - +from miasm.ir.ir import Lifter from miasm.analysis.machine import Machine -from miasm.core.cpu import instruction as miasm_instr from miasm.core.locationdb import LocationDB -from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt -from miasm.ir.ir import Lifter +from miasm.core.cpu import instruction as miasm_instr from miasm.ir.symbexec import SymbolicExecutionEngine +from miasm.expression.expression import Expr, ExprId, ExprMem, ExprInt +from .snapshot import ReadableProgramState from .arch import Arch, supported_architectures -from .lldb_target import ( - LLDBConcreteTarget, - LLDBLocalTarget, - LLDBRemoteTarget, - ConcreteRegisterError, - ConcreteMemoryError, -) from .miasm_util import MiasmSymbolResolver, eval_expr, make_machine -from .snapshot import ReadableProgramState, RegisterAccessError, MemoryAccessError -from .trace import Trace, TraceEnvironment -from .utils import timebound, TimeoutError - -logger = logging.getLogger('focaccia-symbolic') -debug = logger.debug -info = logger.info -warn = logger.warn - -# Disable Miasm's disassembly logger -logging.getLogger('asmblock').setLevel(logging.CRITICAL) - -class ValidationError(Exception): - pass def eval_symbol(symbol: Expr, conc_state: ReadableProgramState) -> int: """Evaluate a symbol based on a concrete reference state. @@ -542,8 +517,7 @@ def run_instruction(instr: miasm_instr, res[dst] = expr_simp(ExprCond(cond, dst, v)) return res - def _execute_location(loc, base_state: dict | None) \ - -> tuple[Expr, dict]: + def _execute_location(loc, base_state: dict | None) -> tuple[Expr, dict]: """Execute a single IR block via symbolic engine. No fancy stuff.""" # Query the location's IR block irblock = ircfg.get_block(loc) @@ -601,12 +575,7 @@ def run_instruction(instr: miasm_instr, loc = lifter.add_instr_to_ircfg(instr, ircfg, None, False) assert(isinstance(loc, Expr) or isinstance(loc, LocKey)) except NotImplementedError as err: - msg = f'Unable to lift instruction {instr}: {err}' - if force: - warn(f'{msg}. Skipping') - return None, {} - else: - raise Exception(msg) + raise Exception(f'Unable to lift instruction {instr}: {err}') # Execute instruction symbolically new_pc, modified = execute_location(loc) @@ -614,317 +583,3 @@ def run_instruction(instr: miasm_instr, return new_pc, modified -class SpeculativeTracer(ReadableProgramState): - def __init__(self, target: LLDBConcreteTarget): - super().__init__(target.arch) - self.target = target - self.pc = target.read_register('pc') - self.speculative_pc: int | None = None - self.speculative_count: int = 0 - - self.read_cache = {} - - def speculate(self, new_pc): - self.read_cache.clear() - if new_pc is None: - self.progress_execution() - self.target.step() - self.pc = self.target.read_register('pc') - self.speculative_pc = None - self.speculative_count = 0 - return - - new_pc = int(new_pc) - self.speculative_pc = new_pc - self.speculative_count += 1 - - def progress_execution(self) -> None: - if self.speculative_pc is not None and self.speculative_count != 0: - debug(f'Updating PC to {hex(self.speculative_pc)}') - if self.speculative_count == 1: - self.target.step() - else: - self.target.run_until(self.speculative_pc) - - self.pc = self.speculative_pc - self.speculative_pc = None - self.speculative_count = 0 - - self.read_cache.clear() - - def run_until(self, addr: int): - if self.speculative_pc: - raise Exception('Attempting manual execution with speculative execution enabled') - self.target.run_until(addr) - self.pc = addr - - def step(self): - self.progress_execution() - if self.target.is_exited(): - return - self.target.step() - self.pc = self.target.read_register('pc') - - def _cache(self, name: str, value): - self.read_cache[name] = value - return value - - def read_pc(self) -> int: - if self.speculative_pc is not None: - return self.speculative_pc - return self.pc - - def read_flags(self) -> dict[str, int | bool]: - if 'flags' in self.read_cache: - return self.read_cache['flags'] - self.progress_execution() - return self._cache('flags', self.target.read_flags()) - - def read_register(self, reg: str) -> int: - regname = self.arch.to_regname(reg) - if regname is None: - raise RegisterAccessError(reg, f'Not a register name: {reg}') - - if reg in self.read_cache: - return self.read_cache[reg] - - self.progress_execution() - return self._cache(reg, self.target.read_register(regname)) - - def write_register(self, regname: str, value: int): - self.progress_execution() - self.read_cache.pop(regname, None) - self.target.write_register(regname, value) - - def read_instructions(self, addr: int, size: int) -> bytes: - return self.target.read_memory(addr, size) - - def read_memory(self, addr: int, size: int) -> bytes: - self.progress_execution() - cache_name = f'{addr}_{size}' - if cache_name in self.read_cache: - return self.read_cache[cache_name] - return self._cache(cache_name, self.target.read_memory(addr, size)) - - def write_memory(self, addr: int, value: bytes): - self.progress_execution() - self.read_cache.pop(addr, None) - self.target.write_memory(addr, value) - - def __getattr__(self, name: str): - return getattr(self.target, name) - -class SymbolicTracer: - """A symbolic tracer that uses `LLDBConcreteTarget` with Miasm to simultaneously execute a - program with concrete state and collect its symbolic transforms - """ - def __init__(self, - env: TraceEnvironment, - remote: str | None=None, - force: bool=False, - cross_validate: bool=False): - self.env = env - self.force = force - self.remote = remote - self.cross_validate = cross_validate - self.target = SpeculativeTracer(self.create_debug_target()) - - self.nondet_events = self.env.detlog.events() - self.next_event: int | None = None - - def create_debug_target(self) -> LLDBConcreteTarget: - binary = self.env.binary_name - if self.remote is False: - debug(f'Launching local debug target {binary} {self.env.argv}') - debug(f'Environment: {self.env}') - return LLDBLocalTarget(binary, self.env.argv, self.env.envp) - - debug(f'Connecting to remote debug target {self.remote}') - target = LLDBRemoteTarget(self.remote, binary) - - module_name = target.determine_name() - binary = str(Path(self.env.binary_name).resolve()) - if binary != module_name: - warn(f'Discovered binary name {module_name} differs from specified name {binary}') - - return target - - def predict_next_state(self, instruction: Instruction, transform: SymbolicTransform): - debug(f'Evaluating register and memory transforms for {instruction} to cross-validate') - predicted_regs = transform.eval_register_transforms(self.target) - predicted_mems = transform.eval_memory_transforms(self.target) - return predicted_regs, predicted_mems - - def validate(self, - instruction: Instruction, - transform: SymbolicTransform, - predicted_regs: dict[str, int], - predicted_mems: dict[int, bytes]): - # Verify last generated transform by comparing concrete state against - # predicted values. - if self.target.is_exited(): - return - - debug('Cross-validating symbolic transforms by comparing actual to predicted values') - for reg, val in predicted_regs.items(): - conc_val = self.target.read_register(reg) - if conc_val != val: - raise ValidationError(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}:' - f' Predicted {reg} = {hex(val)}, but the' - f' concrete state has value {reg} = {hex(conc_val)}.' - f'\nFaulty transformation: {transform}') - for addr, data in predicted_mems.items(): - conc_data = self.target.read_memory(addr, len(data)) - if conc_data != data: - raise ValidationError(f'Symbolic execution backend generated false equation for' - f' [{hex(instruction.addr)}]: {instruction}: Predicted' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {data},' - f' but the concrete state has value' - f' mem[{hex(addr)}:{hex(addr+len(data))}] = {conc_data}.' - f'\nFaulty transformation: {transform}') - - def progress_event(self) -> None: - if (self.next_event + 1) < len(self.nondet_events): - self.next_event += 1 - debug(f'Next event to handle at index {self.next_event}') - else: - self.next_event = None - - def post_event(self) -> None: - if self.next_event: - if self.nondet_events[self.next_event].pc == 0: - # Exit sequence - debug('Completed exit event') - self.target.run() - - debug(f'Completed handling event at index {self.next_event}') - self.progress_event() - - def is_stepping_instr(self, pc: int, instruction: Instruction) -> bool: - if self.nondet_events: - pc = pc + instruction.length # detlog reports next pc for each event - if self.next_event and self.nondet_events[self.next_event].match(pc, self.target): - debug('Current instruction matches next event; stepping through it') - self.progress_event() - return True - else: - if self.target.arch.is_instr_syscall(str(instruction)): - return True - return False - - def progress(self, new_pc, step: bool = False) -> int | None: - self.target.speculate(new_pc) - if step: - self.target.progress_execution() - if self.target.is_exited(): - return None - return self.target.read_pc() - - def trace(self, time_limit: int | None = None) -> Trace[SymbolicTransform]: - """Execute a program and compute state transformations between executed - instructions. - - :param start_addr: Address from which to start tracing. - :param stop_addr: Address until which to trace. - """ - # Set up concrete reference state - if self.env.start_address is not None: - self.target.run_until(self.env.start_address) - - for i in range(len(self.nondet_events)): - if self.nondet_events[i].pc == self.target.read_pc(): - self.next_event = i+1 - if self.next_event >= len(self.nondet_events): - break - - debug(f'Starting from event {self.nondet_events[i]} onwards') - break - - ctx = DisassemblyContext(self.target) - arch = ctx.arch - - if logger.isEnabledFor(logging.DEBUG): - debug('Tracing program with the following non-deterministic events') - for event in self.nondet_events: - debug(event) - - # Trace concolically - strace: list[SymbolicTransform] = [] - while not self.target.is_exited(): - pc = self.target.read_pc() - - if self.env.stop_address is not None and pc == self.env.stop_address: - break - - assert(pc != 0) - - # Disassemble instruction at the current PC - tid = self.target.get_current_tid() - try: - instruction = ctx.disassemble(pc) - info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') - except: - err = sys.exc_info()[1] - - # Try to recovery by using the LLDB disassembly instead - try: - alt_disas = self.target.get_disassembly(pc) - instruction = Instruction.from_string(alt_disas, ctx.arch, pc, - self.target.get_instruction_size(pc)) - info(f'[{tid}] Disassembled instruction {instruction} at {hex(pc)}') - except: - if self.force: - if alt_disas: - warn(f'[{tid}] Unable to handle instruction {alt_disas} at {hex(pc)} in Miasm.' - f' Skipping.') - else: - warn(f'[{tid}] Unable to disassemble instruction {hex(pc)}: {err}.' - f' Skipping.') - self.target.step() - continue - raise # forward exception - - is_event = self.is_stepping_instr(pc, instruction) - - # Run instruction - conc_state = MiasmSymbolResolver(self.target, ctx.loc_db) - - try: - new_pc, modified = timebound(time_limit, run_instruction, - instruction.instr, conc_state, ctx.lifter) - except TimeoutError: - warn(f'Running instruction {instruction} took longer than {time_limit} second. Skipping') - new_pc, modified = None, {} - - if self.cross_validate and new_pc: - # Predict next concrete state. - # We verify the symbolic execution backend on the fly for some - # additional protection from bugs in the backend. - new_pc = int(new_pc) - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) - pred_regs, pred_mems = self.predict_next_state(instruction, transform) - self.progress(new_pc, step=is_event) - - try: - self.validate(instruction, transform, pred_regs, pred_mems) - except ValidationError as e: - if self.force: - warn(f'Cross-validation failed: {e}') - continue - raise - else: - new_pc = self.progress(new_pc, step=is_event) - if new_pc is None: - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, 0) - strace.append(transform) - continue # we're done - transform = SymbolicTransform(tid, modified, [instruction], arch, pc, new_pc) - - strace.append(transform) - - if is_event: - self.post_event() - - return Trace(strace, self.env) - diff --git a/src/focaccia/tools/capture_transforms.py b/src/focaccia/tools/capture_transforms.py index 1208156..a178ba0 100755 --- a/src/focaccia/tools/capture_transforms.py +++ b/src/focaccia/tools/capture_transforms.py @@ -5,8 +5,8 @@ import argparse import logging from focaccia import parser, utils -from focaccia.symbolic import SymbolicTracer from focaccia.trace import TraceEnvironment +from focaccia.native.tracer import SymbolicTracer def main(): prog = argparse.ArgumentParser() |