gdb_rsp.py 6.45 KB
Newer Older
1
# Copyright 2020 the V8 project authors. All rights reserved.
2 3 4
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

5
import re
6
import socket
7
import struct
8 9
import subprocess
import time
10
import xml.etree.ElementTree
Ng Zhi An's avatar
Ng Zhi An committed
11 12 13 14 15 16 17 18 19 20 21 22 23 24
import sys

# Python2 has both int and long, Python3 only has int, which is the same as
# Python2 long.
try:
  long
except NameError:
  long = int

# Python2 has xrange and range. Python3 range is Python2 xrange.
try:
  xrange
except NameError:
  xrange = range
25 26 27

SOCKET_ADDR = ('localhost', 8765)

28 29 30 31 32 33 34 35 36
SIGTRAP = 5
SIGSEGV = 11
RETURNCODE_KILL = -9

ARCH = 'wasm32'
REG_DEFS = {
    ARCH: [('pc', 'Q'), ],
}

37 38

def EnsurePortIsAvailable(addr=SOCKET_ADDR):
39
  # As a validity check, check that the TCP port is available by binding to it
40 41 42 43 44 45 46 47 48 49
  # ourselves (and then unbinding).  Otherwise, we could end up talking to an
  # old instance of the GDB stub that is still hanging around, or to some
  # unrelated service that uses the same port number. Of course, there is still
  # a race condition because an unrelated process could bind the port after we
  # unbind.
  sock = socket.socket()
  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
  sock.bind(addr)
  sock.close()

50 51 52 53 54
def RspChecksum(data):
  checksum = 0
  for char in data:
    checksum = (checksum + ord(char)) & 0xff
  return checksum
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81

class GdbRspConnection(object):

  def __init__(self, addr=SOCKET_ADDR):
    self._socket = self._Connect(addr)

  def _Connect(self, addr):
    # We have to poll because we do not know when the GDB stub has successfully
    # done bind() on the TCP port. This is inherently unreliable.
    timeout_in_seconds = 10
    poll_time_in_seconds = 0.1
    for i in xrange(int(timeout_in_seconds / poll_time_in_seconds)):
      # On Mac OS X, we have to create a new socket FD for each retry.
      sock = socket.socket()
      # Do not delay sending small packets. This significantly speeds up debug
      # stub tests.
      sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
      try:
        sock.connect(addr)
      except socket.error:
        # Retry after a delay.
        time.sleep(poll_time_in_seconds)
      else:
        return sock
    raise Exception('Could not connect to the debug stub in %i seconds'
                    % timeout_in_seconds)

82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
  def _GetReply(self):
    reply = ''
    message_finished = re.compile('#[0-9a-fA-F]{2}')
    while True:
      data = self._socket.recv(1024)
      if len(data) == 0:
        raise AssertionError('EOF on socket reached with '
                             'incomplete reply message: %r' % reply)
      reply += data
      if message_finished.match(reply[-3:]):
        break
    match = re.match('\+?\$([^#]*)#([0-9a-fA-F]{2})$', reply)
    if match is None:
      raise AssertionError('Unexpected reply message: %r' % reply)
    reply_body = match.group(1)
    checksum = match.group(2)
    expected_checksum = '%02x' % RspChecksum(reply_body)
    if checksum != expected_checksum:
      raise AssertionError('Bad RSP checksum: %r != %r' %
                           (checksum, expected_checksum))
    # Send acknowledgement.
    self._socket.send('+')
    return reply_body

  # Send an rsp message, but don't wait for or expect a reply.
  def RspSendOnly(self, data):
    msg = '$%s#%02x' % (data, RspChecksum(data))
    return self._socket.send(msg)

  def RspRequest(self, data):
    self.RspSendOnly(data)
    reply = self._GetReply()
    return reply

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  def Close(self):
    self._socket.close()


def PopenDebugStub(command):
  EnsurePortIsAvailable()
  return subprocess.Popen(command)


def KillProcess(process):
  if process.returncode is not None:
    # kill() won't work if we've already wait()'ed on the process.
    return
  try:
    process.kill()
  except OSError:
    if sys.platform == 'win32':
      # If process is already terminated, kill() throws
      # "WindowsError: [Error 5] Access is denied" on Windows.
      pass
    else:
      raise
  process.wait()
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180


class LaunchDebugStub(object):
  def __init__(self, command):
    self._proc = PopenDebugStub(command)

  def __enter__(self):
    try:
      return GdbRspConnection()
    except:
      KillProcess(self._proc)
      raise

  def __exit__(self, exc_type, exc_value, traceback):
    KillProcess(self._proc)


def AssertEquals(x, y):
  if x != y:
    raise AssertionError('%r != %r' % (x, y))

def DecodeHex(data):
  assert len(data) % 2 == 0, data
  return bytes(bytearray([int(data[index * 2 : (index + 1) * 2], 16) for index in xrange(len(data) // 2)]))

def EncodeHex(data):
  return ''.join('%02x' % ord(byte) for byte in data)

def DecodeUInt64Array(data):
  assert len(data) % 16 == 0, data
  result = []
  for index in xrange(len(data) // 16):
    value = 0
    for digit in xrange(7, -1, -1):
      value = value * 256 + int(data[index * 16 + digit * 2 : index * 16 + (digit + 1) * 2], 16)
    result.append(value)
  return result

def AssertReplySignal(reply, signal):
  AssertEquals(ParseThreadStopReply(reply)['signal'], signal)

def ParseThreadStopReply(reply):
181
  match = re.match('T([0-9a-f]{2})thread-pcs:([0-9a-f]+);thread:([0-9a-f]+);(library:([0-9a-f]*);)?$', reply)
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  if not match:
    raise AssertionError('Bad thread stop reply: %r' % reply)
  return {'signal': int(match.group(1), 16),
          'thread_pc': int(match.group(2), 16),
          'thread_id': int(match.group(3), 16)}

def CheckInstructionPtr(connection, expected_ip):
  ip_value = DecodeRegs(connection.RspRequest('g'))['pc']
  AssertEquals(ip_value, expected_ip)

def DecodeRegs(reply):
  defs = REG_DEFS[ARCH]
  names = [reg_name for reg_name, reg_fmt in defs]
  fmt = ''.join([reg_fmt for reg_name, reg_fmt in defs])

  values = struct.unpack_from(fmt, DecodeHex(reply))
  return dict(zip(names, values))

def GetLoadedModules(connection):
  modules = {}
  reply = connection.RspRequest('qXfer:libraries:read')
  AssertEquals(reply[0], 'l')
  library_list = xml.etree.ElementTree.fromstring(reply[1:])
  AssertEquals(library_list.tag, 'library-list')
  for library in library_list:
    AssertEquals(library.tag, 'library')
    section = library.find('section')
    address = section.get('address')
    assert long(address) > 0
    modules[long(address)] = library.get('name')
  return modules

def GetLoadedModuleAddress(connection):
  modules = GetLoadedModules(connection)
  assert len(modules) > 0
  return modules.keys()[0]

def ReadCodeMemory(connection, address, size):
  reply = connection.RspRequest('m%x,%x' % (address, size))
  assert not reply.startswith('E'), reply
  return DecodeHex(reply)