1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
|
import logging
from miasm2.core.bin_stream import bin_stream_str, bin_stream_elf, bin_stream_pe
from miasm2.jitter.csts import PAGE_READ
from miasm2.core.asmblock import AsmSymbolPool
log = logging.getLogger("binary")
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s"))
log.addHandler(console_handler)
log.setLevel(logging.ERROR)
# Container
## Exceptions
class ContainerSignatureException(Exception):
"The container does not match the current container signature"
class ContainerParsingException(Exception):
"Error during container parsing"
## Parent class
class Container(object):
"""Container abstraction layer
This class aims to offer a common interface for abstracting container
such as PE or ELF.
"""
available_container = [] # Available container formats
fallback_container = None # Fallback container format
@classmethod
def from_string(cls, data, vm=None, addr=None):
"""Instanciate a container and parse the binary
@data: str containing the binary
@vm: (optional) VmMngr instance to link with the executable
@addr: (optional) Base address for the binary. If set,
force the unknown format
"""
log.info('Load binary')
if not addr:
addr = 0
else:
# Force fallback mode
log.warning('Fallback to string input (offset=%s)', hex(addr))
return cls.fallback_container(data, vm, addr)
# Try each available format
for container_type in cls.available_container:
try:
return container_type(data, vm)
except ContainerSignatureException:
continue
except ContainerParsingException, error:
log.error(error)
# Fallback mode
log.warning('Fallback to string input (offset=%s)', hex(addr))
return cls.fallback_container(data, vm, addr)
@classmethod
def register_container(cls, container):
"Add a Container format"
cls.available_container.append(container)
@classmethod
def register_fallback(cls, container):
"Set the Container fallback format"
cls.fallback_container = container
@classmethod
def from_stream(cls, stream, *args, **kwargs):
"""Instanciate a container and parse the binary
@stream: stream to use as binary
@vm: (optional) VmMngr instance to link with the executable
@addr: (optional) Shift to apply before parsing the binary. If set,
force the unknown format
"""
return Container.from_string(stream.read(), *args, **kwargs)
def parse(self, data, *args, **kwargs):
"Launch parsing of @data"
raise NotImplementedError("Abstract method")
def __init__(self, *args, **kwargs):
"Alias for 'parse'"
# Init attributes
self._executable = None
self._bin_stream = None
self._entry_point = None
self._arch = None
self._symbol_pool = AsmSymbolPool()
# Launch parsing
self.parse(*args, **kwargs)
@property
def bin_stream(self):
"Return the BinStream instance corresponding to container content"
return self._bin_stream
@property
def executable(self):
"Return the abstract instance standing for parsed executable"
return self._executable
@property
def entry_point(self):
"Return the detected entry_point"
return self._entry_point
@property
def arch(self):
"Return the guessed architecture"
return self._arch
@property
def symbol_pool(self):
"AsmSymbolPool instance preloaded with container symbols (if any)"
return self._symbol_pool
## Format dependent classes
class ContainerPE(Container):
"Container abstraction for PE"
def parse(self, data, vm=None):
from miasm2.jitter.loader.pe import vm_load_pe, guess_arch
from elfesteem import pe_init
# Parse signature
if not data.startswith('MZ'):
raise ContainerSignatureException()
# Build executable instance
try:
if vm is not None:
self._executable = vm_load_pe(vm, data)
else:
self._executable = pe_init.PE(data)
except Exception, error:
raise ContainerParsingException('Cannot read PE: %s' % error)
# Check instance validity
if not self._executable.isPE() or \
self._executable.NTsig.signature_value != 0x4550:
raise ContainerSignatureException()
# Guess the architecture
self._arch = guess_arch(self._executable)
# Build the bin_stream instance and set the entry point
try:
self._bin_stream = bin_stream_pe(self._executable.virt)
ep_detected = self._executable.Opthdr.AddressOfEntryPoint
self._entry_point = self._executable.rva2virt(ep_detected)
except Exception, error:
raise ContainerParsingException('Cannot read PE: %s' % error)
class ContainerELF(Container):
"Container abstraction for ELF"
def parse(self, data, vm=None):
from miasm2.jitter.loader.elf import vm_load_elf, guess_arch
from elfesteem import elf_init
# Parse signature
if not data.startswith('\x7fELF'):
raise ContainerSignatureException()
# Build executable instance
try:
if vm is not None:
self._executable = vm_load_elf(vm, data)
else:
self._executable = elf_init.ELF(data)
except Exception, error:
raise ContainerParsingException('Cannot read ELF: %s' % error)
# Guess the architecture
self._arch = guess_arch(self._executable)
# Build the bin_stream instance and set the entry point
try:
self._bin_stream = bin_stream_elf(self._executable.virt)
self._entry_point = self._executable.Ehdr.entry
except Exception, error:
raise ContainerParsingException('Cannot read ELF: %s' % error)
# Add known symbols
for symb_source_name in [".symtab", ".dynsym"]:
symb_source = self._executable.getsectionbyname(symb_source_name)
if symb_source is None:
continue
for name, symb in symb_source.symbols.iteritems():
offset = symb.value
if offset == 0:
continue
try:
self._symbol_pool.add_location(name, offset)
except ValueError:
# Two symbols points on the same offset
log.warning("Same offset (%s) for %s and %s",
(hex(offset),
name,
self._symbol_pool.getby_offset(offset)))
continue
class ContainerUnknown(Container):
"Container abstraction for unknown format"
def parse(self, data, vm, addr):
self._bin_stream = bin_stream_str(data, shift=addr)
if vm is not None:
vm.add_memory_page(addr,
PAGE_READ,
data)
self._executable = None
self._entry_point = 0
## Register containers
Container.register_container(ContainerPE)
Container.register_container(ContainerELF)
Container.register_fallback(ContainerUnknown)
|