about summary refs log tree commit diff stats
path: root/miasm/analysis/binary.py
blob: c278594b128a189e6039d9d4a3044d59e8ddcebb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import logging
import warnings

from miasm.core.bin_stream import bin_stream_str, bin_stream_elf, bin_stream_pe
from miasm.jitter.csts import PAGE_READ


log = logging.getLogger("binary")
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("[%(levelname)-8s]: %(message)s"))
log.addHandler(console_handler)
log.setLevel(logging.ERROR)


# Container
## Exceptions
class ContainerSignatureException(Exception):
    "The container does not match the current container signature"


class ContainerParsingException(Exception):
    "Error during container parsing"


## Parent class
class Container(object):
    """Container abstraction layer

    This class aims to offer a common interface for abstracting container
    such as PE or ELF.
    """

    available_container = []  # Available container formats
    fallback_container = None # Fallback container format

    @classmethod
    def from_string(cls, data, loc_db, *args, **kwargs):
        """Instantiate a container and parse the binary
        @data: str containing the binary
        @loc_db: LocationDB instance
        """
        log.info('Load binary')
        # Try each available format
        for container_type in cls.available_container:
            try:
                return container_type(data, loc_db, *args, **kwargs)
            except ContainerSignatureException:
                continue
            except ContainerParsingException as error:
                log.error(error)

        # Fallback mode
        log.warning('Fallback to string input')
        return cls.fallback_container(data, loc_db, *args, **kwargs)

    @classmethod
    def register_container(cls, container):
        "Add a Container format"
        cls.available_container.append(container)

    @classmethod
    def register_fallback(cls, container):
        "Set the Container fallback format"
        cls.fallback_container = container

    @classmethod
    def from_stream(cls, stream, loc_db, *args, **kwargs):
        """Instantiate a container and parse the binary
        @stream: stream to use as binary
        @vm: (optional) VmMngr instance to link with the executable
        @addr: (optional) Base address of the parsed binary. If set,
               force the unknown format
        """
        return Container.from_string(stream.read(), loc_db, *args, **kwargs)

    def parse(self, data, *args, **kwargs):
        """Launch parsing of @data
        @data: str containing the binary
        """
        raise NotImplementedError("Abstract method")

    def __init__(self, data, loc_db, **kwargs):
        "Alias for 'parse'"
        # Init attributes
        self._executable = None
        self._bin_stream = None
        self._entry_point = None
        self._arch = None
        self._loc_db = loc_db

        # Launch parsing
        self.parse(data, **kwargs)

    @property
    def bin_stream(self):
        "Return the BinStream instance corresponding to container content"
        return self._bin_stream

    @property
    def executable(self):
        "Return the abstract instance standing for parsed executable"
        return self._executable

    @property
    def entry_point(self):
        "Return the detected entry_point"
        return self._entry_point

    @property
    def arch(self):
        "Return the guessed architecture"
        return self._arch

    @property
    def loc_db(self):
        "LocationDB instance preloaded with container symbols (if any)"
        return self._loc_db

    @property
    def symbol_pool(self):
        "[DEPRECATED API]"
        warnings.warn("Deprecated API: use 'loc_db'")
        return self.loc_db

## Format dependent classes
class ContainerPE(Container):
    "Container abstraction for PE"

    def parse(self, data, vm=None, **kwargs):
        from miasm.jitter.loader.pe import vm_load_pe, guess_arch
        from miasm.loader import pe_init

        # Parse signature
        if not data.startswith(b'MZ'):
            raise ContainerSignatureException()

        # Build executable instance
        try:
            if vm is not None:
                self._executable = vm_load_pe(vm, data)
            else:
                self._executable = pe_init.PE(data)
        except Exception as error:
            raise ContainerParsingException('Cannot read PE: %s' % error)

        # Check instance validity
        if not self._executable.isPE() or \
                self._executable.NTsig.signature_value != 0x4550:
            raise ContainerSignatureException()

        # Guess the architecture
        self._arch = guess_arch(self._executable)

        # Build the bin_stream instance and set the entry point
        try:
            self._bin_stream = bin_stream_pe(self._executable)
            ep_detected = self._executable.Opthdr.AddressOfEntryPoint
            self._entry_point = self._executable.rva2virt(ep_detected)
        except Exception as error:
            raise ContainerParsingException('Cannot read PE: %s' % error)


class ContainerELF(Container):
    "Container abstraction for ELF"

    def parse(self, data, vm=None, addr=0, apply_reloc=False, **kwargs):
        """Load an ELF from @data
        @data: bytes containing the ELF bytes
        @vm (optional): VmMngr instance. If set, load the ELF in virtual memory
        @addr (optional): base address the ELF in virtual memory
        @apply_reloc (optional): if set, apply relocation during ELF loading

        @addr and @apply_reloc are only meaningful in the context of a
        non-empty @vm
        """
        from miasm.jitter.loader.elf import vm_load_elf, guess_arch, \
            fill_loc_db_with_symbols
        from miasm.loader import elf_init

        # Parse signature
        if not data.startswith(b'\x7fELF'):
            raise ContainerSignatureException()

        # Build executable instance
        try:
            if vm is not None:
                self._executable = vm_load_elf(
                    vm,
                    data,
                    loc_db=self.loc_db,
                    base_addr=addr,
                    apply_reloc=apply_reloc
                )
            else:
                self._executable = elf_init.ELF(data)
        except Exception as error:
            raise ContainerParsingException('Cannot read ELF: %s' % error)

        # Guess the architecture
        self._arch = guess_arch(self._executable)

        # Build the bin_stream instance and set the entry point
        try:
            self._bin_stream = bin_stream_elf(self._executable)
            self._entry_point = self._executable.Ehdr.entry + addr
        except Exception as error:
            raise ContainerParsingException('Cannot read ELF: %s' % error)

        if vm is None:
            # Add known symbols (vm_load_elf already does it)
            fill_loc_db_with_symbols(self._executable, self.loc_db, addr)



class ContainerUnknown(Container):
    "Container abstraction for unknown format"

    def parse(self, data, vm=None, addr=0, **kwargs):
        self._bin_stream = bin_stream_str(data, base_address=addr)
        if vm is not None:
            vm.add_memory_page(
                addr,
                PAGE_READ,
                data
            )
        self._executable = None
        self._entry_point = 0


## Register containers
Container.register_container(ContainerPE)
Container.register_container(ContainerELF)
Container.register_fallback(ContainerUnknown)