diff options
Diffstat (limited to 'miasm/analysis/data_analysis.py')
| -rw-r--r-- | miasm/analysis/data_analysis.py | 204 |
1 files changed, 0 insertions, 204 deletions
diff --git a/miasm/analysis/data_analysis.py b/miasm/analysis/data_analysis.py deleted file mode 100644 index c7924cf2..00000000 --- a/miasm/analysis/data_analysis.py +++ /dev/null @@ -1,204 +0,0 @@ -from __future__ import print_function - -from future.utils import viewitems - -from builtins import object -from functools import cmp_to_key -from miasm.expression.expression \ - import get_expr_mem, get_list_rw, ExprId, ExprInt, \ - compare_exprs -from miasm.ir.symbexec import SymbolicExecutionEngine - - -def get_node_name(label, i, n): - n_name = (label, i, n) - return n_name - - -def intra_block_flow_raw(lifter, ircfg, flow_graph, irb, in_nodes, out_nodes): - """ - Create data flow for an irbloc using raw IR expressions - """ - current_nodes = {} - for i, assignblk in enumerate(irb): - dict_rw = assignblk.get_rw(cst_read=True) - current_nodes.update(out_nodes) - - # gen mem arg to mem node links - all_mems = set() - for node_w, nodes_r in viewitems(dict_rw): - for n in nodes_r.union([node_w]): - all_mems.update(get_expr_mem(n)) - if not all_mems: - continue - - for n in all_mems: - node_n_w = get_node_name(irb.loc_key, i, n) - if not n in nodes_r: - continue - o_r = n.ptr.get_r(mem_read=False, cst_read=True) - for n_r in o_r: - if n_r in current_nodes: - node_n_r = current_nodes[n_r] - else: - node_n_r = get_node_name(irb.loc_key, i, n_r) - current_nodes[n_r] = node_n_r - in_nodes[n_r] = node_n_r - flow_graph.add_uniq_edge(node_n_r, node_n_w) - - # gen data flow links - for node_w, nodes_r in viewitems(dict_rw): - for n_r in nodes_r: - if n_r in current_nodes: - node_n_r = current_nodes[n_r] - else: - node_n_r = get_node_name(irb.loc_key, i, n_r) - current_nodes[n_r] = node_n_r - in_nodes[n_r] = node_n_r - - flow_graph.add_node(node_n_r) - - node_n_w = get_node_name(irb.loc_key, i + 1, node_w) - out_nodes[node_w] = node_n_w - - flow_graph.add_node(node_n_w) - flow_graph.add_uniq_edge(node_n_r, node_n_w) - - - -def inter_block_flow_link(lifter, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, todo, link_exec_to_data): - lbl, current_nodes, exec_nodes = todo - current_nodes = dict(current_nodes) - - # link current nodes to block in_nodes - if not lbl in ircfg.blocks: - print("cannot find block!!", lbl) - return set() - irb = ircfg.blocks[lbl] - to_del = set() - for n_r, node_n_r in viewitems(irb_in_nodes[irb.loc_key]): - if not n_r in current_nodes: - continue - flow_graph.add_uniq_edge(current_nodes[n_r], node_n_r) - to_del.add(n_r) - - # if link exec to data, all nodes depends on exec nodes - if link_exec_to_data: - for n_x_r in exec_nodes: - for n_r, node_n_r in viewitems(irb_in_nodes[irb.loc_key]): - if not n_x_r in current_nodes: - continue - if isinstance(n_r, ExprInt): - continue - flow_graph.add_uniq_edge(current_nodes[n_x_r], node_n_r) - - # update current nodes using block out_nodes - for n_w, node_n_w in viewitems(irb_out_nodes[irb.loc_key]): - current_nodes[n_w] = node_n_w - - # get nodes involved in exec flow - x_nodes = tuple(sorted(irb.dst.get_r(), key=cmp_to_key(compare_exprs))) - - todo = set() - for lbl_dst in ircfg.successors(irb.loc_key): - todo.add((lbl_dst, tuple(viewitems(current_nodes)), x_nodes)) - - return todo - - -def create_implicit_flow(lifter, flow_graph, irb_in_nodes, irb_out_nodes): - - # first fix IN/OUT - # If a son read a node which in not in OUT, add it - todo = set(lifter.blocks.keys()) - while todo: - lbl = todo.pop() - irb = lifter.blocks[lbl] - for lbl_son in lifter.graph.successors(irb.loc_key): - if not lbl_son in lifter.blocks: - print("cannot find block!!", lbl) - continue - irb_son = lifter.blocks[lbl_son] - for n_r in irb_in_nodes[irb_son.loc_key]: - if n_r in irb_out_nodes[irb.loc_key]: - continue - if not isinstance(n_r, ExprId): - continue - - node_n_w = irb.loc_key, len(irb), n_r - irb_out_nodes[irb.loc_key][n_r] = node_n_w - if not n_r in irb_in_nodes[irb.loc_key]: - irb_in_nodes[irb.loc_key][n_r] = irb.loc_key, 0, n_r - node_n_r = irb_in_nodes[irb.loc_key][n_r] - for lbl_p in lifter.graph.predecessors(irb.loc_key): - todo.add(lbl_p) - - flow_graph.add_uniq_edge(node_n_r, node_n_w) - - -def inter_block_flow(lifter, ircfg, flow_graph, irb_0, irb_in_nodes, irb_out_nodes, link_exec_to_data=True): - - todo = set() - done = set() - todo.add((irb_0, (), ())) - - while todo: - state = todo.pop() - if state in done: - continue - done.add(state) - out = inter_block_flow_link(lifter, ircfg, flow_graph, irb_in_nodes, irb_out_nodes, state, link_exec_to_data) - todo.update(out) - - -class symb_exec_func(object): - - """ - This algorithm will do symbolic execution on a function, trying to propagate - states between basic blocks in order to extract inter-blocks dataflow. The - algorithm tries to merge states from blocks with multiple parents. - - There is no real magic here, loops and complex merging will certainly fail. - """ - - def __init__(self, lifter): - self.todo = set() - self.stateby_ad = {} - self.cpt = {} - self.states_var_done = set() - self.states_done = set() - self.total_done = 0 - self.lifter = lifter - - def add_state(self, parent, ad, state): - variables = dict(state.symbols) - - # get block dead, and remove from state - b = self.lifter.get_block(ad) - if b is None: - raise ValueError("unknown block! %s" % ad) - s = parent, ad, tuple(sorted(viewitems(variables))) - self.todo.add(s) - - def get_next_state(self): - state = self.todo.pop() - return state - - def do_step(self): - if len(self.todo) == 0: - return None - if self.total_done > 600: - print("symbexec watchdog!") - return None - self.total_done += 1 - print('CPT', self.total_done) - while self.todo: - state = self.get_next_state() - parent, ad, s = state - self.states_done.add(state) - self.states_var_done.add(state) - - sb = SymbolicExecutionEngine(self.lifter, dict(s)) - - return parent, ad, sb - return None |