import logging import warnings from miasm.core.bin_stream import bin_stream_str, bin_stream_elf, bin_stream_pe from miasm.jitter.csts import PAGE_READ log = logging.getLogger("binary") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s")) log.addHandler(console_handler) log.setLevel(logging.ERROR) # Container ## Exceptions class ContainerSignatureException(Exception): "The container does not match the current container signature" class ContainerParsingException(Exception): "Error during container parsing" ## Parent class class Container(object): """Container abstraction layer This class aims to offer a common interface for abstracting container such as PE or ELF. """ available_container = [] # Available container formats fallback_container = None # Fallback container format @classmethod def from_string(cls, data, loc_db, *args, **kwargs): """Instantiate a container and parse the binary @data: str containing the binary @loc_db: LocationDB instance """ log.info('Load binary') # Try each available format for container_type in cls.available_container: try: return container_type(data, loc_db, *args, **kwargs) except ContainerSignatureException: continue except ContainerParsingException as error: log.error(error) # Fallback mode log.warning('Fallback to string input') return cls.fallback_container(data, loc_db, *args, **kwargs) @classmethod def register_container(cls, container): "Add a Container format" cls.available_container.append(container) @classmethod def register_fallback(cls, container): "Set the Container fallback format" cls.fallback_container = container @classmethod def from_stream(cls, stream, loc_db, *args, **kwargs): """Instantiate a container and parse the binary @stream: stream to use as binary @vm: (optional) VmMngr instance to link with the executable @addr: (optional) Base address of the parsed binary. If set, force the unknown format """ return Container.from_string(stream.read(), loc_db, *args, **kwargs) def parse(self, data, *args, **kwargs): """Launch parsing of @data @data: str containing the binary """ raise NotImplementedError("Abstract method") def __init__(self, data, loc_db, **kwargs): "Alias for 'parse'" # Init attributes self._executable = None self._bin_stream = None self._entry_point = None self._arch = None self._loc_db = loc_db # Launch parsing self.parse(data, **kwargs) @property def bin_stream(self): "Return the BinStream instance corresponding to container content" return self._bin_stream @property def executable(self): "Return the abstract instance standing for parsed executable" return self._executable @property def entry_point(self): "Return the detected entry_point" return self._entry_point @property def arch(self): "Return the guessed architecture" return self._arch @property def loc_db(self): "LocationDB instance preloaded with container symbols (if any)" return self._loc_db @property def symbol_pool(self): "[DEPRECATED API]" warnings.warn("Deprecated API: use 'loc_db'") return self.loc_db ## Format dependent classes class ContainerPE(Container): "Container abstraction for PE" def parse(self, data, vm=None, **kwargs): from miasm.jitter.loader.pe import vm_load_pe, guess_arch from miasm.loader import pe_init # Parse signature if not data.startswith(b'MZ'): raise ContainerSignatureException() # Build executable instance try: if vm is not None: self._executable = vm_load_pe(vm, data) else: self._executable = pe_init.PE(data) except Exception as error: raise ContainerParsingException('Cannot read PE: %s' % error) # Check instance validity if not self._executable.isPE() or \ self._executable.NTsig.signature_value != 0x4550: raise ContainerSignatureException() # Guess the architecture self._arch = guess_arch(self._executable) # Build the bin_stream instance and set the entry point try: self._bin_stream = bin_stream_pe(self._executable) ep_detected = self._executable.Opthdr.AddressOfEntryPoint self._entry_point = self._executable.rva2virt(ep_detected) except Exception as error: raise ContainerParsingException('Cannot read PE: %s' % error) class ContainerELF(Container): "Container abstraction for ELF" def parse(self, data, vm=None, addr=0, apply_reloc=False, **kwargs): """Load an ELF from @data @data: bytes containing the ELF bytes @vm (optional): VmMngr instance. If set, load the ELF in virtual memory @addr (optional): base address the ELF in virtual memory @apply_reloc (optional): if set, apply relocation during ELF loading @addr and @apply_reloc are only meaningful in the context of a non-empty @vm """ from miasm.jitter.loader.elf import vm_load_elf, guess_arch, \ fill_loc_db_with_symbols from miasm.loader import elf_init # Parse signature if not data.startswith(b'\x7fELF'): raise ContainerSignatureException() # Build executable instance try: if vm is not None: self._executable = vm_load_elf( vm, data, loc_db=self.loc_db, base_addr=addr, apply_reloc=apply_reloc ) else: self._executable = elf_init.ELF(data) except Exception as error: raise ContainerParsingException('Cannot read ELF: %s' % error) # Guess the architecture self._arch = guess_arch(self._executable) # Build the bin_stream instance and set the entry point try: self._bin_stream = bin_stream_elf(self._executable) self._entry_point = self._executable.Ehdr.entry + addr except Exception as error: raise ContainerParsingException('Cannot read ELF: %s' % error) if vm is None: # Add known symbols (vm_load_elf already does it) fill_loc_db_with_symbols(self._executable, self.loc_db, addr) class ContainerUnknown(Container): "Container abstraction for unknown format" def parse(self, data, vm=None, addr=0, **kwargs): self._bin_stream = bin_stream_str(data, base_address=addr) if vm is not None: vm.add_memory_page( addr, PAGE_READ, data ) self._executable = None self._entry_point = 0 ## Register containers Container.register_container(ContainerPE) Container.register_container(ContainerELF) Container.register_fallback(ContainerUnknown)