#-*- coding:utf-8 -*- import socket import struct import time import logging from StringIO import StringIO import miasm2.analysis.debugging as debugging from miasm2.jitter.jitload import ExceptionHandle class GdbServer(object): "Debugguer binding for GDBServer protocol" general_registers_order = [] general_registers_size = {} # RegName : Size in octet status = "S05" def __init__(self, dbg, port=4455): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('localhost', port)) server.listen(1) self.server = server self.dbg = dbg # Communication methods def compute_checksum(self, data): return chr(sum(map(ord, data)) % 256).encode("hex") def get_messages(self): all_data = "" data = self.sock.recv(4096) all_data += data while (len(data) == 4096 or data == ""): if data == "": # Avoid consuming CPU time.sleep(0.001) continue data = self.sock.recv(4096) all_data += data logging.debug("<- %r", all_data) self.recv_queue += self.parse_messages(all_data) def parse_messages(self, data): buf = StringIO(data) msgs = [] while (buf.tell() < buf.len): token = buf.read(1) if token == "+": continue if token == "-": raise NotImplementedError("Resend packet") if token == "$": packet_data = "" c = buf.read(1) while c != "#": packet_data += c c = buf.read(1) checksum = buf.read(2) if checksum != self.compute_checksum(packet_data): raise ValueError("Incorrect checksum") msgs.append(packet_data) return msgs def send_string(self, s): self.send_queue.append("O" + s.encode("hex")) def process_messages(self): while self.recv_queue: msg = self.recv_queue.pop(0) buf = StringIO(msg) msg_type = buf.read(1) self.send_queue.append("+") if msg_type == "q": if msg.startswith("qSupported"): self.send_queue.append("PacketSize=3fff") elif msg.startswith("qC"): # Current thread self.send_queue.append("") elif msg.startswith("qAttached"): # Not supported self.send_queue.append("") elif msg.startswith("qTStatus"): # Not supported self.send_queue.append("") elif msg.startswith("qfThreadInfo"): # Not supported self.send_queue.append("") else: raise NotImplementedError() elif msg_type == "H": # Set current thread self.send_queue.append("OK") elif msg_type == "?": # Report why the target halted self.send_queue.append(self.status) # TRAP signal elif msg_type == "g": # Report all general register values self.send_queue.append(self.report_general_register_values()) elif msg_type == "p": # Read a specific register reg_num = int(buf.read(), 16) self.send_queue.append(self.read_register(reg_num)) elif msg_type == "P": # Set a specific register reg_num, value = buf.read().split("=") reg_num = int(reg_num, 16) value = int(value.decode("hex")[::-1].encode("hex"), 16) self.set_register(reg_num, value) self.send_queue.append("OK") elif msg_type == "m": # Read memory addr, size = map(lambda x: int(x, 16), buf.read().split(",")) self.send_queue.append(self.read_memory(addr, size)) elif msg_type == "k": # Kill self.sock.close() self.send_queue = [] self.sock = None elif msg_type == "!": # Extending debugging will be used self.send_queue.append("OK") elif msg_type == "v": if msg == "vCont?": # Is vCont supported ? self.send_queue.append("") elif msg_type == "s": # Step self.dbg.step() self.send_queue.append("S05") # TRAP signal elif msg_type == "Z": # Add breakpoint or watchpoint bp_type = buf.read(1) if bp_type == "0": # Exec breakpoint assert(buf.read(1) == ",") addr, size = map( lambda x: int(x, 16), buf.read().split(",")) if size != 1: raise NotImplementedError("Bigger size") self.dbg.add_breakpoint(addr) self.send_queue.append("OK") elif bp_type == "1": # Hardware BP assert(buf.read(1) == ",") addr, size = map( lambda x: int(x, 16), buf.read().split(",")) self.dbg.add_memory_breakpoint(addr, size, read=True, write=True) self.send_queue.append("OK") elif bp_type in ["2", "3", "4"]: # Memory breakpoint assert(buf.read(1) == ",") read = bp_type in ["3", "4"] write = bp_type in ["2", "4"] addr, size = map( lambda x: int(x, 16), buf.read().split(",")) self.dbg.add_memory_breakpoint(addr, size, read=read, write=write) self.send_queue.append("OK") else: raise ValueError("Impossible value") elif msg_type == "z": # Remove breakpoint or watchpoint bp_type = buf.read(1) if bp_type == "0": # Exec breakpoint assert(buf.read(1) == ",") addr, size = map( lambda x: int(x, 16), buf.read().split(",")) if size != 1: raise NotImplementedError("Bigger size") dbgsoft = self.dbg.get_breakpoint_by_addr(addr) assert(len(dbgsoft) == 1) self.dbg.remove_breakpoint(dbgsoft[0]) self.send_queue.append("OK") elif bp_type == "1": # Hardware BP assert(buf.read(1) == ",") addr, size = map( lambda x: int(x, 16), buf.read().split(",")) self.dbg.remove_memory_breakpoint_by_addr_access( addr, read=True, write=True) self.send_queue.append("OK") elif bp_type in ["2", "3", "4"]: # Memory breakpoint assert(buf.read(1) == ",") read = bp_type in ["3", "4"] write = bp_type in ["2", "4"] addr, size = map( lambda x: int(x, 16), buf.read().split(",")) self.dbg.remove_memory_breakpoint_by_addr_access( addr, read=read, write=write) self.send_queue.append("OK") else: raise ValueError("Impossible value") elif msg_type == "c": # Continue self.status = "" self.send_messages() ret = self.dbg.run() if isinstance(ret, debugging.DebugBreakpointSoft): self.status = "S05" self.send_queue.append("S05") # TRAP signal elif isinstance(ret, ExceptionHandle): if ret == ExceptionHandle.memoryBreakpoint(): self.status = "S05" self.send_queue.append("S05") else: raise NotImplementedError("Unknown Except") elif isinstance(ret, debugging.DebugBreakpointTerminate): # Connexion should close, but keep it running as a TRAP # The connexion will be close on instance destruction print ret self.status = "S05" self.send_queue.append("S05") else: raise NotImplementedError() else: raise NotImplementedError( "Not implemented: message type '%s'" % msg_type) def send_messages(self): for msg in self.send_queue: if msg == "+": data = "+" else: data = "$%s#%s" % (msg, self.compute_checksum(msg)) logging.debug("-> %r", data) self.sock.send(data) self.send_queue = [] def main_loop(self): self.recv_queue = [] self.send_queue = [] self.send_string("Test\n") while (self.sock): self.get_messages() self.process_messages() self.send_messages() def run(self): self.sock, self.address = self.server.accept() self.main_loop() # Debugguer processing methods def report_general_register_values(self): s = "" for i in xrange(len(self.general_registers_order)): s += self.read_register(i) return s def read_register(self, reg_num): reg_name = self.general_registers_order[reg_num] reg_value = self.read_register_by_name(reg_name) size = self.general_registers_size[reg_name] pack_token = "" if size == 1: pack_token = "