v8_commands.py 1.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
# Copyright 2016 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# Fork from commands.py and output.py in v8 test driver.

import signal
import subprocess
import sys
from threading import Event, Timer


class Output(object):
  def __init__(self, exit_code, timed_out, stdout, pid):
    self.exit_code = exit_code
    self.timed_out = timed_out
    self.stdout = stdout
    self.pid = pid

  def HasCrashed(self):
    # Timed out tests will have exit_code -signal.SIGTERM.
    if self.timed_out:
      return False
    return (self.exit_code < 0 and
            self.exit_code != -signal.SIGABRT)

  def HasTimedOut(self):
    return self.timed_out


def Execute(args, cwd, timeout=None):
  popen_args = [c for c in args if c != ""]
  try:
    process = subprocess.Popen(
      args=popen_args,
      stdout=subprocess.PIPE,
      stderr=subprocess.STDOUT,
      cwd=cwd
    )
  except Exception as e:
    sys.stderr.write("Error executing: %s\n" % popen_args)
    raise e

  timeout_event = Event()

  def kill_process():
    timeout_event.set()
    try:
      process.kill()
    except OSError:
      sys.stderr.write('Error: Process %s already ended.\n' % process.pid)


  timer = Timer(timeout, kill_process)
  timer.start()
  stdout, _ = process.communicate()
  timer.cancel()

  return Output(
      process.returncode,
      timeout_event.is_set(),
      stdout.decode('utf-8', 'replace').encode('utf-8'),
      process.pid,
  )