#-*- coding:utf-8 -*- from __future__ import print_function from future.builtins import map, range from miasm.core.utils import decode_hex, encode_hex, int_to_byte import socket import struct import time import logging from io import BytesIO import miasm.analysis.debugging as debugging from miasm.jitter.jitload import ExceptionHandle class GdbServer(object): "Debugguer binding for GDBServer protocol" general_registers_order = [] general_registers_size = {} # RegName : Size in octet status = b"S05" def __init__(self, dbg, port=4455): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('localhost', port)) server.listen(1) self.server = server self.dbg = dbg # Communication methods def compute_checksum(self, data): return encode_hex(int_to_byte(sum(map(ord, data)) % 256)) def get_messages(self): all_data = b"" while True: data = self.sock.recv(4096) if not data: break all_data += data logging.debug("<- %r", all_data) self.recv_queue += self.parse_messages(all_data) def parse_messages(self, data): buf = BytesIO(data) msgs = [] while (buf.tell() < buf.len): token = buf.read(1) if token == b"+": continue if token == b"-": raise NotImplementedError("Resend packet") if token == b"$": packet_data = b"" c = buf.read(1) while c != b"#": packet_data += c c = buf.read(1) checksum = buf.read(2) if checksum != self.compute_checksum(packet_data): raise ValueError("Incorrect checksum") msgs.append(packet_data) return msgs def send_string(self, s): self.send_queue.append(b"O" + encode_hex(s)) def process_messages(self): while self.recv_queue: msg = self.recv_queue.pop(0) buf = BytesIO(msg) msg_type = buf.read(1) self.send_queue.append(b"+") if msg_type == b"q": if msg.startswith(b"qSupported"): self.send_queue.append(b"PacketSize=3fff") elif msg.startswith(b"qC"): # Current thread self.send_queue.append(b"") elif msg.startswith(b"qAttached"): # Not supported self.send_queue.append(b"") elif msg.startswith(b"qTStatus"): # Not supported self.send_queue.append(b"") elif msg.startswith(b"qfThreadInfo"): # Not supported self.send_queue.append(b"") else: raise NotImplementedError() elif msg_type == b"H": # Set current thread self.send_queue.append(b"OK") elif msg_type == b"?": # Report why the target halted self.send_queue.append(self.status) # TRAP signal elif msg_type == b"g": # Report all general register values self.send_queue.append(self.report_general_register_values()) elif msg_type == b"p": # Read a specific register reg_num = int(buf.read(), 16) self.send_queue.append(self.read_register(reg_num)) elif msg_type == b"P": # Set a specific register reg_num, value = buf.read().split(b"=") reg_num = int(reg_num, 16) value = int(encode_hex(decode_hex(value)[::-1]), 16) self.set_register(reg_num, value) self.send_queue.append(b"OK") elif msg_type == b"m": # Read memory addr, size = (int(x, 16) for x in buf.read().split(b",", 1)) self.send_queue.append(self.read_memory(addr, size)) elif msg_type == b"k": # Kill self.sock.close() self.send_queue = [] self.sock = None elif msg_type == b"!": # Extending debugging will be used self.send_queue.append(b"OK") elif msg_type == b"v": if msg == b"vCont?": # Is vCont supported ? self.send_queue.append(b"") elif msg_type == b"s": # Step self.dbg.step() self.send_queue.append(b"S05") # TRAP signal elif msg_type == b"Z": # Add breakpoint or watchpoint bp_type = buf.read(1) if bp_type == b"0": # Exec breakpoint assert(buf.read(1) == b",") addr, size = (int(x, 16) for x in buf.read().split(b",", 1)) if size != 1: raise NotImplementedError("Bigger size") self.dbg.add_breakpoint(addr) self.send_queue.append(b"OK") elif bp_type == b"1": # Hardware BP assert(buf.read(1) == b",") addr, size = (int(x, 16) for x in buf.read().split(b",", 1)) self.dbg.add_memory_breakpoint( addr, size, read=True, write=True ) self.send_queue.append(b"OK") elif bp_type in [b"2", b"3", b"4"]: # Memory breakpoint assert(buf.read(1) == b",") read = bp_type in [b"3", b"4"] write = bp_type in [b"2", b"4"] addr, size = (int(x, 16) for x in buf.read().split(b",", 1)) self.dbg.add_memory_breakpoint( addr, size, read=read, write=write ) self.send_queue.append(b"OK") else: raise ValueError("Impossible value") elif msg_type == b"z": # Remove breakpoint or watchpoint bp_type = buf.read(1) if bp_type == b"0": # Exec breakpoint assert(buf.read(1) == b",") addr, size = (int(x, 16) for x in buf.read().split(b",", 1)) if size != 1: raise NotImplementedError("Bigger size") dbgsoft = self.dbg.get_breakpoint_by_addr(addr) assert(len(dbgsoft) == 1) self.dbg.remove_breakpoint(dbgsoft[0]) self.send_queue.append(b"OK") elif bp_type == b"1": # Hardware BP assert(buf.read(1) == b",") addr, size = (int(x, 16) for x in buf.read().split(b",", 1)) self.dbg.remove_memory_breakpoint_by_addr_access( addr, read=True, write=True ) self.send_queue.append(b"OK") elif bp_type in [b"2", b"3", b"4"]: # Memory breakpoint assert(buf.read(1) == b",") read = bp_type in [b"3", b"4"] write = bp_type in [b"2", b"4"] addr, size = (int(x, 16) for x in buf.read().split(b",", 1)) self.dbg.remove_memory_breakpoint_by_addr_access( addr, read=read, write=write ) self.send_queue.append(b"OK") else: raise ValueError("Impossible value") elif msg_type == b"c": # Continue self.status = b"" self.send_messages() ret = self.dbg.run() if isinstance(ret, debugging.DebugBreakpointSoft): self.status = b"S05" self.send_queue.append(b"S05") # TRAP signal elif isinstance(ret, ExceptionHandle): if ret == ExceptionHandle.memoryBreakpoint(): self.status = b"S05" self.send_queue.append(b"S05") else: raise NotImplementedError("Unknown Except") elif isinstance(ret, debugging.DebugBreakpointTerminate): # Connection should close, but keep it running as a TRAP # The connection will be close on instance destruction print(ret) self.status = b"S05" self.send_queue.append(b"S05") else: raise NotImplementedError() else: raise NotImplementedError( "Not implemented: message type %r" % msg_type ) def send_messages(self): for msg in self.send_queue: if msg == b"+": data = b"+" else: data = b"$%s#%s" % (msg, self.compute_checksum(msg)) logging.debug("-> %r", data) self.sock.send(data) self.send_queue = [] def main_loop(self): self.recv_queue = [] self.send_queue = [] self.send_string(b"Test\n") while (self.sock): self.get_messages() self.process_messages() self.send_messages() def run(self): self.sock, self.address = self.server.accept() self.main_loop() # Debugguer processing methods def report_general_register_values(self): s = b"" for i in range(len(self.general_registers_order)): s += self.read_register(i) return s def read_register(self, reg_num): reg_name = self.general_registers_order[reg_num] reg_value = self.read_register_by_name(reg_name) size = self.general_registers_size[reg_name] pack_token = "" if size == 1: pack_token = "