diff options
| author | Florent <florent.monjalet@gmail.com> | 2015-10-30 08:56:49 +0100 |
|---|---|---|
| committer | Florent <florent.monjalet@gmail.com> | 2015-10-30 08:56:49 +0100 |
| commit | 9d23ef12b2c41c1c826e1108dd11e651d12a473e (patch) | |
| tree | ed0783670ca655d9b50ac2c739d9ef19e3599794 | |
| parent | fefb609f7ed8267815e3ab4b7467a8fada7040b8 (diff) | |
| parent | 7bd9f72a3c3c48c246a6d4917bcfaa0075fd0860 (diff) | |
| download | miasm-9d23ef12b2c41c1c826e1108dd11e651d12a473e.tar.gz miasm-9d23ef12b2c41c1c826e1108dd11e651d12a473e.zip | |
Merge pull request #245 from serpilliere/fix_parse_asm
Fix parse asm
| -rw-r--r-- | example/asm/shellcode.py | 6 | ||||
| -rw-r--r-- | example/asm/simple.py | 4 | ||||
| -rw-r--r-- | example/expression/asm_to_ir.py | 1 | ||||
| -rw-r--r-- | example/expression/solve_condition_stp.py | 2 | ||||
| -rw-r--r-- | miasm2/core/parse_asm.py | 319 | ||||
| -rw-r--r-- | test/arch/aarch64/unit/asm_test.py | 2 | ||||
| -rw-r--r-- | test/arch/mips32/unit/asm_test.py | 2 | ||||
| -rw-r--r-- | test/arch/x86/sem.py | 4 | ||||
| -rw-r--r-- | test/arch/x86/unit/asm_test.py | 2 | ||||
| -rw-r--r-- | test/core/parse_asm.py | 69 |
10 files changed, 260 insertions, 151 deletions
diff --git a/example/asm/shellcode.py b/example/asm/shellcode.py index 3f3aa877..945cac8c 100644 --- a/example/asm/shellcode.py +++ b/example/asm/shellcode.py @@ -76,14 +76,14 @@ if args.PE: pe.DirImport.get_funcvirt('MessageBoxA')) # Print and graph firsts blocs before patching it -for bloc in blocs[0]: +for bloc in blocs: print bloc -graph = asmbloc.bloc2graph(blocs[0]) +graph = asmbloc.bloc2graph(blocs) open("graph.txt", "w").write(graph) # Apply patches patches = asmbloc.asm_resolve_final(machine.mn, - blocs[0], + blocs, symbol_pool, dst_interval) if args.encrypt: diff --git a/example/asm/simple.py b/example/asm/simple.py index 1929961f..45954f91 100644 --- a/example/asm/simple.py +++ b/example/asm/simple.py @@ -27,10 +27,10 @@ loop: symbol_pool.set_offset(symbol_pool.getby_name("main"), 0x0) # Spread information and resolve instructions offset -patches = asmbloc.asm_resolve_final(mn_x86, blocs[0], symbol_pool) +patches = asmbloc.asm_resolve_final(mn_x86, blocs, symbol_pool) # Show resolved blocs -for bloc in blocs[0]: +for bloc in blocs: print bloc # Print offset -> bytes diff --git a/example/expression/asm_to_ir.py b/example/expression/asm_to_ir.py index 942e5e19..19ffd659 100644 --- a/example/expression/asm_to_ir.py +++ b/example/expression/asm_to_ir.py @@ -23,7 +23,6 @@ loop: RET ''') -blocs = blocs[0] symbol_pool.set_offset(symbol_pool.getby_name("main"), 0x0) for b in blocs: diff --git a/example/expression/solve_condition_stp.py b/example/expression/solve_condition_stp.py index a25a7072..385c5d78 100644 --- a/example/expression/solve_condition_stp.py +++ b/example/expression/solve_condition_stp.py @@ -170,7 +170,7 @@ if __name__ == '__main__': ''') - b = blocs[0][0] + b = blocs[0] print b # add fake address and len to parsed instructions for i, l in enumerate(b.lines): diff --git a/miasm2/core/parse_asm.py b/miasm2/core/parse_asm.py index 1df8e85b..646ad445 100644 --- a/miasm2/core/parse_asm.py +++ b/miasm2/core/parse_asm.py @@ -5,6 +5,7 @@ import re import miasm2.expression.expression as m2_expr import miasm2.core.asmbloc as asmbloc from miasm2.core.cpu import gen_base_expr, parse_ast +from miasm2.core.cpu import instruction declarator = {'byte': 8, 'word': 16, @@ -19,7 +20,23 @@ size2pck = {8: 'B', 64: 'Q', } -class DirectiveAlign(object): +EMPTY_RE = re.compile(r'\s*$') +COMMENT_RE = re.compile(r'\s*;\S*') +LOCAL_LABEL_RE = re.compile(r'\s*(\.L\S+)\s*:') +DIRECTIVE_START_RE = re.compile(r'\s*\.') +DIRECTIVE_RE = re.compile(r'\s*\.(\S+)') +LABEL_RE = re.compile(r'\s*(\S+)\s*:') +FORGET_LABEL_RE = re.compile(r'\s*\.LF[BE]\d\s*:') + + +class Directive(object): + + """Stand for Directive""" + + pass + +class DirectiveAlign(Directive): + """Stand for alignment representation""" def __init__(self, alignment=1): @@ -28,16 +45,34 @@ class DirectiveAlign(object): def __str__(self): return "Alignment %s" % self.alignment -def guess_next_new_label(symbol_pool, gen_label_index=0): + +class DirectiveSplit(Directive): + + """Stand for alignment representation""" + + pass + + +class DirectiveDontSplit(Directive): + + """Stand for alignment representation""" + + pass + + +def guess_next_new_label(symbol_pool): + """Generate a new label + @symbol_pool: the asm_symbol_pool instance""" i = 0 gen_name = "loc_%.8X" while True: name = gen_name % i - l = symbol_pool.getby_name(name) - if l is None: + label = symbol_pool.getby_name(name) + if label is None: return symbol_pool.add_label(name) i += 1 + def replace_expr_labels(expr, symbol_pool, replace_id): """Create asm_label of the expression @expr in the @symbol_pool Update @replace_id""" @@ -51,66 +86,69 @@ def replace_expr_labels(expr, symbol_pool, replace_id): replace_id[expr] = m2_expr.ExprId(new_lbl, expr.size) return replace_id[expr] + def replace_orphan_labels(instr, symbol_pool): """Link orphan labels used by @instr to the @symbol_pool""" for i, arg in enumerate(instr.args): replace_id = {} - arg.visit(lambda e:replace_expr_labels(e, - symbol_pool, - replace_id)) + arg.visit(lambda e: replace_expr_labels(e, + symbol_pool, + replace_id)) instr.args[i] = instr.args[i].replace_expr(replace_id) +STATE_NO_BLOC = 0 +STATE_IN_BLOC = 1 + + +def parse_txt(mnemo, attrib, txt, symbol_pool=None): + """Parse an assembly listing. Returns a couple (blocks, symbol_pool), where + blocks is a list of asm_bloc and symbol_pool the associated asm_symbol_pool + + @mnemo: architecture used + @attrib: architecture attribute + @txt: assembly listing + @symbol_pool: (optional) the asm_symbol_pool instance used to handle labels + of the listing + + """ -def parse_txt(mnemo, attrib, txt, symbol_pool=None, gen_label_index=0): if symbol_pool is None: symbol_pool = asmbloc.asm_symbol_pool() - lines_text = [] - lines_data = [] - lines_bss = [] - C_NEXT = asmbloc.asm_constraint.c_next C_TO = asmbloc.asm_constraint.c_to - lines = lines_text + lines = [] # parse each line for line in txt.split('\n'): # empty - if re.match(r'\s*$', line): + if EMPTY_RE.match(line): continue # comment - if re.match(r'\s*;\S*', line): + if COMMENT_RE.match(line): continue # labels to forget - r = re.match(r'\s*\.LF[BE]\d\s*:', line) - if r: + if FORGET_LABEL_RE.match(line): continue # label beginning with .L - r = re.match(r'\s*(\.L\S+)\s*:', line) - if r: - l = r.groups()[0] - l = symbol_pool.getby_name_create(l) - lines.append(l) + match_re = LABEL_RE.match(line) + if match_re: + label_name = match_re.group(1) + label = symbol_pool.getby_name_create(label_name) + lines.append(label) continue # directive - if re.match(r'\s*\.', line): - r = re.match(r'\s*\.(\S+)', line) - directive = r.groups()[0] - if directive == 'text': - lines = lines_text - continue - if directive == 'data': - lines = lines_data - continue - if directive == 'bss': - lines = lines_bss + if DIRECTIVE_START_RE.match(line): + match_re = DIRECTIVE_RE.match(line) + directive = match_re.group(1) + if directive in ['text', 'data', 'bss']: continue if directive in ['string', 'ascii']: # XXX HACK line = line.replace(r'\n', '\n').replace(r'\r', '\r') - raw = line[line.find(r'"') + 1:line.rfind(r"'")] + raw = line[line.find(r'"') + 1:line.rfind(r'"')] raw = raw.decode('string_escape') if directive == 'string': raw += "\x00" @@ -119,16 +157,16 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None, gen_label_index=0): if directive == 'ustring': # XXX HACK line = line.replace(r'\n', '\n').replace(r'\r', '\r') - raw = line[line.find(r'"') + 1:line.rfind(r"'")] + "\x00" + raw = line[line.find(r'"') + 1:line.rfind(r'"')] + "\x00" raw = raw.decode('string_escape') raw = "".join([string + '\x00' for string in raw]) lines.append(asmbloc.asm_raw(raw)) continue if directive in declarator: - data_raw = line[r.end():].split(' ', 1)[1] + data_raw = line[match_re.end():].split(' ', 1)[1] data_raw = data_raw.split(',') size = declarator[directive] - data_int = [] + expr_list = [] # parser base_expr = gen_base_expr()[2] @@ -137,29 +175,26 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None, gen_label_index=0): m2_expr.ExprInt(x, size)) base_expr.setParseAction(my_var_parser) - for b in data_raw: - b = b.strip() - x = base_expr.parseString(b)[0] - data_int.append(x.canonize()) + for element in data_raw: + element = element.strip() + element_expr = base_expr.parseString(element)[0] + expr_list.append(element_expr.canonize()) - raw = data_int - x = asmbloc.asm_raw(raw) - x.element_size = size - lines.append(x) + raw_data = asmbloc.asm_raw(expr_list) + raw_data.element_size = size + lines.append(raw_data) continue if directive == 'comm': # TODO continue if directive == 'split': # custom command - x = asmbloc.asm_raw() - x.split = True - lines.append(x) + lines.append(DirectiveSplit()) continue if directive == 'dontsplit': # custom command - lines.append(asmbloc.asm_raw()) + lines.append(DirectiveDontSplit()) continue if directive == "align": - align_value = int(line[r.end():]) + align_value = int(line[match_re.end():], 0) lines.append(DirectiveAlign(align_value)) continue if directive in ['file', 'intel_syntax', 'globl', 'local', @@ -171,11 +206,11 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None, gen_label_index=0): raise ValueError("unknown directive %s" % str(directive)) # label - r = re.match(r'\s*(\S+)\s*:', line) - if r: - l = r.groups()[0] - l = symbol_pool.getby_name_create(l) - lines.append(l) + match_re = LABEL_RE.match(line) + if match_re: + label_name = match_re.group(1) + label = symbol_pool.getby_name_create(label_name) + lines.append(label) continue # code @@ -192,91 +227,97 @@ def parse_txt(mnemo, attrib, txt, symbol_pool=None, gen_label_index=0): lines.append(instr) asmbloc.log_asmbloc.info("___pre asm oki___") - # make blocs - - blocs_sections = [] - bloc_num = 0 - b = None - for lines in [lines_text, lines_data, lines_bss]: - state = 0 - i = 0 - blocs = [] - blocs_sections.append(blocs) - bloc_to_nlink = None - block_may_link = False - while i < len(lines): - # no current bloc - if state == 0: - if not isinstance(lines[i], asmbloc.asm_label): - l = guess_next_new_label(symbol_pool) - lines[i:i] = [l] - else: - l = lines[i] - b = asmbloc.asm_bloc(l, alignment=mnemo.alignment) - b.bloc_num = bloc_num - bloc_num += 1 - blocs.append(b) - state = 1 - i += 1 - if bloc_to_nlink: - bloc_to_nlink.addto(asmbloc.asm_constraint(b.label, - C_NEXT)) - bloc_to_nlink = None - - # in bloc - elif state == 1: - if isinstance(lines[i], asmbloc.asm_raw): - if hasattr(lines[i], 'split'): - state = 0 - block_may_link = False - i += 1 - else: - state = 1 - block_may_link = True - b.addline(lines[i]) - i += 1 - elif isinstance(lines[i], DirectiveAlign): - b.alignment = lines[i].alignment - i += 1 - # asmbloc.asm_label - elif isinstance(lines[i], asmbloc.asm_label): - if block_may_link: - b.addto( - asmbloc.asm_constraint(lines[i], C_NEXT)) - block_may_link = False - state = 0 - # instruction - else: - b.addline(lines[i]) - if lines[i].dstflow(): - for x in lines[i].getdstflow(symbol_pool): - if not isinstance(x, m2_expr.ExprId): - continue - if x in mnemo.regs.all_regs_ids: - continue - b.addto(asmbloc.asm_constraint(x, C_TO)) - - # TODO XXX redo this really - - if not lines[i].breakflow() and i + 1 < len(lines): - if isinstance(lines[i + 1], asmbloc.asm_label): - l = lines[i + 1] - else: - l = guess_next_new_label(symbol_pool) - lines[i + 1:i + 1] = [l] - else: - state = 0 - - if lines[i].splitflow(): - bloc_to_nlink = b - if not lines[i].breakflow() or lines[i].splitflow(): - block_may_link = True - else: - block_may_link = False + # make blocks + cur_block = None + state = STATE_NO_BLOC + i = 0 + blocks = [] + block_to_nlink = None + block_may_link = False + delayslot = 0 + while i < len(lines): + if delayslot: + if delayslot == 0: + state = STATE_NO_BLOC + else: + delayslot -= 1 + line = lines[i] + # no current block + if state == STATE_NO_BLOC: + if isinstance(line, DirectiveDontSplit): + block_to_nlink = cur_block + i += 1 + continue + elif isinstance(line, DirectiveSplit): + block_to_nlink = None + i += 1 + continue + elif not isinstance(line, asmbloc.asm_label): + # First line must be a label. If it's not the case, generate + # it. + label = guess_next_new_label(symbol_pool) + cur_block = asmbloc.asm_bloc(label, alignment=mnemo.alignment) + else: + cur_block = asmbloc.asm_bloc(line, alignment=mnemo.alignment) + i += 1 + # Generate the current bloc + blocks.append(cur_block) + state = STATE_IN_BLOC + if block_to_nlink: + block_to_nlink.addto( + asmbloc.asm_constraint(cur_block.label, + C_NEXT)) + block_to_nlink = None + continue + + # in block + elif state == STATE_IN_BLOC: + if isinstance(line, DirectiveSplit): + state = STATE_NO_BLOC + block_to_nlink = None + elif isinstance(line, DirectiveDontSplit): + state = STATE_NO_BLOC + block_to_nlink = cur_block + elif isinstance(line, DirectiveAlign): + cur_block.alignment = line.alignment + elif isinstance(line, asmbloc.asm_raw): + cur_block.addline(line) + block_to_nlink = cur_block + elif isinstance(line, asmbloc.asm_label): + if block_to_nlink: + cur_block.addto( + asmbloc.asm_constraint(line, C_NEXT)) + block_to_nlink = None + state = STATE_NO_BLOC + continue + # instruction + elif isinstance(line, instruction): + cur_block.addline(line) + block_to_nlink = cur_block + if not line.breakflow(): i += 1 + continue + if delayslot: + raise RuntimeError("Cannot have breakflow in delayslot") + if line.dstflow(): + for dst in line.getdstflow(symbol_pool): + if not isinstance(dst, m2_expr.ExprId): + continue + if dst in mnemo.regs.all_regs_ids: + continue + cur_block.addto(asmbloc.asm_constraint(dst, C_TO)) + + if not line.splitflow(): + block_to_nlink = None + + delayslot = line.delayslot + if delayslot == 0: + state = STATE_NO_BLOC + else: + raise RuntimeError("unknown class %s" % line.__class__) + i += 1 - for block in blocs_sections[0]: + for block in blocks: asmbloc.log_asmbloc.info(block) - - return blocs_sections, symbol_pool + return blocks, symbol_pool diff --git a/test/arch/aarch64/unit/asm_test.py b/test/arch/aarch64/unit/asm_test.py index 60ed418e..9e0d5ea8 100644 --- a/test/arch/aarch64/unit/asm_test.py +++ b/test/arch/aarch64/unit/asm_test.py @@ -41,7 +41,7 @@ class Asm_Test(object): # fix shellcode addr symbol_pool.set_offset(symbol_pool.getby_name("main"), 0x0) s = StrPatchwork() - patches = asmbloc.asm_resolve_final(mn_aarch64, blocs[0], symbol_pool) + patches = asmbloc.asm_resolve_final(mn_aarch64, blocs, symbol_pool) for offset, raw in patches.items(): s[offset] = raw diff --git a/test/arch/mips32/unit/asm_test.py b/test/arch/mips32/unit/asm_test.py index b6cb7b2d..a00d0842 100644 --- a/test/arch/mips32/unit/asm_test.py +++ b/test/arch/mips32/unit/asm_test.py @@ -41,7 +41,7 @@ class Asm_Test(object): # fix shellcode addr symbol_pool.set_offset(symbol_pool.getby_name("main"), 0x0) s = StrPatchwork() - patches = asmbloc.asm_resolve_final(mn_mips32, blocs[0], symbol_pool) + patches = asmbloc.asm_resolve_final(mn_mips32, blocs, symbol_pool) for offset, raw in patches.items(): s[offset] = raw diff --git a/test/arch/x86/sem.py b/test/arch/x86/sem.py index b80ab33d..617b929b 100644 --- a/test/arch/x86/sem.py +++ b/test/arch/x86/sem.py @@ -47,9 +47,9 @@ def compute(ir, mode, asm, inputstate={}, debug=False): def compute_txt(ir, mode, txt, inputstate={}, debug=False): blocs, symbol_pool = parse_asm.parse_txt(mn, mode, txt) symbol_pool.set_offset(symbol_pool.getby_name("main"), 0x0) - patches = asmbloc.asm_resolve_final(mn, blocs[0], symbol_pool) + patches = asmbloc.asm_resolve_final(mn, blocs, symbol_pool) interm = ir(symbol_pool) - for bbl in blocs[0]: + for bbl in blocs: interm.add_bloc(bbl) return symb_exec(interm, inputstate, debug) diff --git a/test/arch/x86/unit/asm_test.py b/test/arch/x86/unit/asm_test.py index c6381d9e..bf609aa5 100644 --- a/test/arch/x86/unit/asm_test.py +++ b/test/arch/x86/unit/asm_test.py @@ -41,7 +41,7 @@ class Asm_Test(object): # fix shellcode addr symbol_pool.set_offset(symbol_pool.getby_name("main"), 0x0) s = StrPatchwork() - patches = asmbloc.asm_resolve_final(mn_x86, blocs[0], symbol_pool) + patches = asmbloc.asm_resolve_final(mn_x86, blocs, symbol_pool) for offset, raw in patches.items(): s[offset] = raw diff --git a/test/core/parse_asm.py b/test/core/parse_asm.py index c2a6dc72..a488d075 100644 --- a/test/core/parse_asm.py +++ b/test/core/parse_asm.py @@ -35,6 +35,75 @@ class TestParseAsm(unittest.TestCase): self.assertTrue(parse_txt(mn_x86, 32, ASM0)) self.assertRaises(ValueError, parse_txt, mn_x86, 32, ASM1) + def test_DirectiveDontSplit(self): + from miasm2.arch.x86.arch import mn_x86 + from miasm2.core.parse_asm import parse_txt + from miasm2.core.asmbloc import asm_resolve_final + + ASM0 = ''' + lbl0: + INC EAX + JNZ lbl0 + INC EAX + JZ lbl2 + lbl1: + NOP + JMP lbl0 + .dontsplit + lbl2: + MOV EAX, ECX + RET + .dontsplit + lbl3: + ADD EAX, EBX + .dontsplit + lbl4: + .align 0x10 + .string "test" + lbl5: + .string "toto" + ''' + + blocks, symbol_pool = parse_txt(mn_x86, 32, ASM0) + patches = asm_resolve_final(mn_x86, + blocks, + symbol_pool) + lbls = [] + for i in xrange(6): + lbls.append(symbol_pool.getby_name('lbl%d' % i)) + # align test + assert(lbls[5].offset % 0x10 == 0) + lbl2block = {} + for block in blocks: + lbl2block[block.label] = block + # dontsplit test + assert(lbls[2] == lbl2block[lbls[1]].get_next()) + assert(lbls[3] == lbl2block[lbls[2]].get_next()) + assert(lbls[4] == lbl2block[lbls[3]].get_next()) + assert(lbls[5] == lbl2block[lbls[4]].get_next()) + + def test_DirectiveSplit(self): + from miasm2.arch.x86.arch import mn_x86 + from miasm2.core.parse_asm import parse_txt + + ASM0 = ''' + lbl0: + JNZ lbl0 + .split + lbl1: + RET + ''' + + blocks, symbol_pool = parse_txt(mn_x86, 32, ASM0) + lbls = [] + for i in xrange(2): + lbls.append(symbol_pool.getby_name('lbl%d' % i)) + lbl2block = {} + for block in blocks: + lbl2block[block.label] = block + # split test + assert(lbl2block[lbls[1]].get_next() is None) + if __name__ == '__main__': testsuite = unittest.TestLoader().loadTestsFromTestCase(TestParseAsm) report = unittest.TextTestRunner(verbosity=2).run(testsuite) |