Commit 4a2d9b1b authored by Michael Achenbach's avatar Michael Achenbach Committed by Commit Bot

[test] Abort testing immediately on interrupt and sigterm

This is a partial revert of:
https://crrev.com/c/890938 and https://crrev.com/c/893982

Before this CL, the test runner blocked on ongoing tests in order to
process their results after an internal timeout. However, the logic
required for this feature was overly complicated and prevented an
acceptable implementation for fast aborts. Furthermore, also the fuzzers
suffered from timeouts on swarming due to hanging tests.

Instead, we now abort immediately on internal timeout (used on
fuzzers), SIGINT (Ctrl-C) and SIGTERM. Ongoing tests are immediately
terminated and their results are disregarded. On SIGTERM and SIGINT,
we return with non-zero exit codes, and zero on internal timeout.

This will also properly return json output, when the external hard
timeout is reached on swarming (causes SIGTERM).

TBR=sergiyb@chromium.org

Bug: v8:7423, chromium:813065
Change-Id: Ib20f835f58a0970693bdd3b21dc5d766d8e115d8
Reviewed-on: https://chromium-review.googlesource.com/924852Reviewed-by: 's avatarMichael Achenbach <machenbach@chromium.org>
Commit-Queue: Michael Achenbach <machenbach@chromium.org>
Cr-Commit-Position: refs/heads/master@{#51399}
parent d3f2a925
......@@ -253,9 +253,9 @@ class BaseTestRunner(object):
tests = [t for s in suites for t in s.tests]
return self._do_execute(tests, args, options)
except TestRunnerError:
return 1
return utils.EXIT_CODE_INTERNAL_ERROR
except KeyboardInterrupt:
return 2
return utils.EXIT_CODE_INTERRUPTED
def _create_parser(self):
parser = optparse.OptionParser()
......
......@@ -4,6 +4,7 @@
import os
import signal
import subprocess
import sys
import threading
......@@ -17,6 +18,19 @@ SEM_INVALID_VALUE = -1
SEM_NOGPFAULTERRORBOX = 0x0002 # Microsoft Platform SDK WinBase.h
def setup_testing():
"""For testing only: We use threading under the hood instead of
multiprocessing to make coverage work. Signal handling is only supported
in the main thread, so we disable it for testing.
"""
signal.signal = lambda *_: None
class AbortException(Exception):
"""Indicates early abort on SIGINT, SIGTERM or internal hard timeout."""
pass
class BaseCommand(object):
def __init__(self, shell, args=None, cmd_prefix=None, timeout=60, env=None,
verbose=False):
......@@ -35,10 +49,16 @@ class BaseCommand(object):
process = self._start_process(**additional_popen_kwargs)
# Variable to communicate with the signal handler.
abort_occured = [False]
def handler(signum, frame):
self._abort(process, abort_occured)
signal.signal(signal.SIGTERM, handler)
# Variable to communicate with the timer.
timeout_occured = [False]
timer = threading.Timer(
self.timeout, self._on_timeout, [process, timeout_occured])
self.timeout, self._abort, [process, timeout_occured])
timer.start()
start_time = time.time()
......@@ -47,6 +67,9 @@ class BaseCommand(object):
timer.cancel()
if abort_occured[0]:
raise AbortException()
return output.Output(
process.returncode,
timeout_occured[0],
......@@ -85,12 +108,12 @@ class BaseCommand(object):
def _kill_process(self, process):
raise NotImplementedError()
def _on_timeout(self, process, timeout_occured):
timeout_occured[0] = True
def _abort(self, process, abort_called):
abort_called[0] = True
try:
self._kill_process(process)
except OSError:
sys.stderr.write('Error: Process %s already ended.\n' % process.pid)
pass
def __str__(self):
return self.to_string()
......
......@@ -5,24 +5,27 @@
from Queue import Empty
from contextlib import contextmanager
from multiprocessing import Event, Process, Queue
from multiprocessing import Process, Queue
import os
import signal
import time
import traceback
from . import command
def setup_testing():
"""For testing only: Use threading under the hood instead of multiprocessing
to make coverage work.
"""
global Queue
global Event
global Process
del Queue
del Event
del Process
from Queue import Queue
from threading import Event
from threading import Thread as Process
# Monkeypatch threading Queue to look like multiprocessing Queue.
Queue.cancel_join_thread = lambda self: None
class NormalResult():
......@@ -49,37 +52,43 @@ class MaybeResult():
return MaybeResult(False, value)
def Worker(fn, work_queue, done_queue, pause_event, read_again_event,
def Worker(fn, work_queue, done_queue,
process_context_fn=None, process_context_args=None):
"""Worker to be run in a child process.
The worker stops when the poison pill "STOP" is reached.
It pauses when pause event is set and waits until read again event is true.
"""
try:
kwargs = {}
if process_context_fn and process_context_args is not None:
kwargs.update(process_context=process_context_fn(*process_context_args))
for args in iter(work_queue.get, "STOP"):
if pause_event.is_set():
done_queue.put(NormalResult(None))
read_again_event.wait()
continue
try:
done_queue.put(NormalResult(fn(*args, **kwargs)))
except command.AbortException:
# SIGINT, SIGTERM or internal hard timeout.
break
except Exception, e:
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
done_queue.put(ExceptionResult(e))
# When we reach here on normal tear down, all items have been pulled from
# the done_queue before and this should have no effect. On fast abort, it's
# possible that a fast worker left items on the done_queue in memory, which
# will never be pulled. This call purges those to avoid a deadlock.
done_queue.cancel_join_thread()
except KeyboardInterrupt:
assert False, 'Unreachable'
@contextmanager
def without_sigint():
handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
def without_sig():
int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
try:
yield
signal.signal(signal.SIGINT, handler)
finally:
signal.signal(signal.SIGINT, int_handler)
signal.signal(signal.SIGTERM, term_handler)
class Pool():
......@@ -92,10 +101,11 @@ class Pool():
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
def __init__(self, num_workers, heartbeat_timeout=30):
def __init__(self, num_workers, heartbeat_timeout=1):
self.num_workers = num_workers
self.processes = []
self.terminated = False
self.abort_now = False
# Invariant: processing_count >= #work_queue + #done_queue. It is greater
# when a worker takes an item from the work_queue and before the result is
......@@ -107,13 +117,11 @@ class Pool():
self.processing_count = 0
self.heartbeat_timeout = heartbeat_timeout
# Disable sigint to make multiprocessing data structure inherit it and
# ignore ctrl-c
with without_sigint():
# Disable sigint and sigterm to prevent subprocesses from capturing the
# signals.
with without_sig():
self.work_queue = Queue()
self.done_queue = Queue()
self.pause_event = Event()
self.read_again_event = Event()
def imap_unordered(self, fn, gen,
process_context_fn=None, process_context_args=None):
......@@ -139,14 +147,13 @@ class Pool():
gen = iter(gen)
self.advance = self._advance_more
# Disable sigint to make workers inherit it and ignore ctrl-c
with without_sigint():
# Disable sigint and sigterm to prevent subprocesses from capturing the
# signals.
with without_sig():
for w in xrange(self.num_workers):
p = Process(target=Worker, args=(fn,
self.work_queue,
self.done_queue,
self.pause_event,
self.read_again_event,
process_context_fn,
process_context_args))
p.start()
......@@ -156,23 +163,31 @@ class Pool():
while self.processing_count > 0:
while True:
try:
# Read from result queue in a responsive fashion. If available,
# this will return a normal result immediately or a heartbeat on
# heartbeat timeout (default 1 second).
result = self._get_result_from_queue()
except:
# TODO(machenbach): Handle a few known types of internal errors
# gracefully, e.g. missing test files.
internal_error = True
continue
if self.abort_now:
# SIGINT, SIGTERM or internal hard timeout.
return
yield result
break
self.advance(gen)
except KeyboardInterrupt:
raise
assert False, 'Unreachable'
except Exception as e:
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
finally:
self.terminate()
self._terminate()
if internal_error:
raise Exception("Internal error in a worker process.")
......@@ -197,27 +212,28 @@ class Pool():
self.work_queue.put(args)
self.processing_count += 1
def terminate(self):
"""Terminates execution and waits for ongoing jobs."""
# Iteration but ignore the results
list(self.terminate_with_results())
def abort(self):
"""Schedules abort on next queue read.
def terminate_with_results(self):
"""Terminates execution and waits for ongoing jobs. It's a generator
returning heartbeats and results for all jobs that started before calling
terminate.
This is safe to call when handling SIGINT, SIGTERM or when an internal
hard timeout is reached.
"""
self.abort_now = True
def _terminate(self):
"""Terminates execution and cleans up the queues.
If abort() was called before termination, this also terminates the
subprocesses and doesn't wait for ongoing tests.
"""
if self.terminated:
return
self.terminated = True
self.pause_event.set()
# Drain out work queue from tests
try:
while self.processing_count > 0:
self.work_queue.get(True, 1)
self.processing_count -= 1
while True:
self.work_queue.get(True, 0.1)
except Empty:
pass
......@@ -227,28 +243,39 @@ class Pool():
# per worker to make them stop.
self.work_queue.put("STOP")
# Workers stopped reading work queue if stop event is true to not overtake
# main process that drains the queue. They should read again to consume
# poison pill and possibly more tests that we couldn't get during draining.
self.read_again_event.set()
# Wait for results
while self.processing_count:
result = self._get_result_from_queue()
if result.heartbeat or result.value:
yield result
if self.abort_now:
for p in self.processes:
os.kill(p.pid, signal.SIGTERM)
for p in self.processes:
p.join()
def _get_result_from_queue(self):
# Drain the queues to prevent stderr chatter when queues are garbage
# collected.
try:
result = self.done_queue.get(timeout=self.heartbeat_timeout)
self.processing_count -= 1
except Empty:
return MaybeResult.create_heartbeat()
while True: self.work_queue.get(False)
except:
pass
try:
while True: self.done_queue.get(False)
except:
pass
if result.exception:
raise result.exception
def _get_result_from_queue(self):
"""Attempts to get the next result from the queue.
return MaybeResult.create_result(result.result)
Returns: A wrapped result if one was available within heartbeat timeout,
a heartbeat result otherwise.
Raises:
Exception: If an exception occured when processing the task on the
worker side, it is reraised here.
"""
while True:
try:
result = self.done_queue.get(timeout=self.heartbeat_timeout)
self.processing_count -= 1
if result.exception:
raise result.exception
return MaybeResult.create_result(result.result)
except Empty:
return MaybeResult.create_heartbeat()
......@@ -3,9 +3,16 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import sys
import unittest
from pool import Pool
# Needed because the test runner contains relative imports.
TOOLS_PATH = os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__))))
sys.path.append(TOOLS_PATH)
from testrunner.local.pool import Pool
def Run(x):
if x == 10:
......@@ -17,6 +24,9 @@ class PoolTest(unittest.TestCase):
results = set()
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
if result.heartbeat:
# Any result can be a heartbeat due to timings.
continue
results.add(result.value)
self.assertEquals(set(range(0, 10)), results)
......@@ -25,6 +35,9 @@ class PoolTest(unittest.TestCase):
pool = Pool(3)
with self.assertRaises(Exception):
for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
if result.heartbeat:
# Any result can be a heartbeat due to timings.
continue
# Item 10 will not appear in results due to an internal exception.
results.add(result.value)
expect = set(range(0, 12))
......@@ -35,6 +48,9 @@ class PoolTest(unittest.TestCase):
results = set()
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
if result.heartbeat:
# Any result can be a heartbeat due to timings.
continue
results.add(result.value)
if result.value < 30:
pool.add([result.value + 20])
......
......@@ -36,6 +36,21 @@ import subprocess
import urllib2
### Exit codes and their meaning.
# Normal execution.
EXIT_CODE_PASS = 0
# Execution with test failures.
EXIT_CODE_FAILURES = 1
# Execution with no tests executed.
EXIT_CODE_NO_TESTS = 2
# Execution aborted with SIGINT (Ctrl-C).
EXIT_CODE_INTERRUPTED = 3
# Execution aborted with SIGTERM.
EXIT_CODE_TERMINATED = 4
# Internal error.
EXIT_CODE_INTERNAL_ERROR = 5
def GetSuitePaths(test_root):
return [ f for f in os.listdir(test_root) if isdir(join(test_root, f)) ]
......
......@@ -132,6 +132,7 @@ class NumFuzzer(base_runner.BaseTestRunner):
combiner = self._create_combiner(fuzzer_rng, options)
results = ResultsTracker()
execproc = ExecutionProc(options.j)
sigproc = self._create_signal_proc()
indicators = self._create_progress_indicators(options)
procs = [
loader,
......@@ -143,7 +144,7 @@ class NumFuzzer(base_runner.BaseTestRunner):
ForgiveTimeoutProc(),
combiner,
self._create_fuzzer(fuzzer_rng, options),
self._create_signal_proc(),
sigproc,
] + indicators + [
results,
self._create_timeout_proc(options),
......@@ -156,18 +157,20 @@ class NumFuzzer(base_runner.BaseTestRunner):
# TODO(majeski): maybe some notification from loader would be better?
if combiner:
combiner.generate_initial_tests(options.j * 4)
execproc.start()
# This starts up worker processes and blocks until all tests are
# processed.
execproc.run()
for indicator in indicators:
indicator.finished()
print '>>> %d tests ran' % results.total
if results.failed:
print '>>> %d tests failed' % results.failed
return utils.EXIT_CODE_FAILURES
if results.failed:
return 1
return 0
# Indicate if a SIGINT or SIGTERM happened.
return sigproc.exit_code
def _load_suites(self, names, options):
suites = super(NumFuzzer, self)._load_suites(names, options)
......
......@@ -284,6 +284,7 @@ class StandardTestRunner(base_runner.BaseTestRunner):
if self.build_config.predictable:
outproc_factory = predictable.get_outproc
execproc = ExecutionProc(jobs, outproc_factory)
sigproc = self._create_signal_proc()
procs = [
loader,
......@@ -295,7 +296,7 @@ class StandardTestRunner(base_runner.BaseTestRunner):
StatusFileFilterProc(options.slow_tests, options.pass_fail_tests),
self._create_predictable_filter(),
self._create_seed_proc(options),
self._create_signal_proc(),
sigproc,
] + indicators + [
results,
self._create_timeout_proc(options),
......@@ -311,23 +312,28 @@ class StandardTestRunner(base_runner.BaseTestRunner):
print '>>> Running %d base tests' % tests_counter.total
tests_counter.remove_from_chain()
execproc.start()
# This starts up worker processes and blocks until all tests are
# processed.
execproc.run()
for indicator in indicators:
indicator.finished()
print '>>> %d tests ran' % (results.total - results.remaining)
exit_code = 0
exit_code = utils.EXIT_CODE_PASS
if results.failed:
exit_code = 1
exit_code = utils.EXIT_CODE_FAILURES
if not results.total:
exit_code = 3
exit_code = utils.EXIT_CODE_NO_TESTS
if exit_code == 1 and options.json_test_results:
# Indicate if a SIGINT or SIGTERM happened.
exit_code = max(exit_code, sigproc.exit_code)
if exit_code == utils.EXIT_CODE_FAILURES and options.json_test_results:
print("Force exit code 0 after failures. Json test results file "
"generated with failure information.")
exit_code = 0
exit_code = utils.EXIT_CODE_PASS
return exit_code
def _create_predictable_filter(self):
......@@ -335,7 +341,6 @@ class StandardTestRunner(base_runner.BaseTestRunner):
return None
return predictable.PredictableFilterProc()
def _create_seed_proc(self, options):
if options.random_seed_stress_count == 1:
return None
......
......@@ -52,18 +52,15 @@ class ExecutionProc(base.TestProc):
def connect_to(self, next_proc):
assert False, 'ExecutionProc cannot be connected to anything'
def start(self):
try:
it = self._pool.imap_unordered(
def run(self):
it = self._pool.imap_unordered(
fn=run_job,
gen=[],
process_context_fn=create_process_context,
process_context_args=[self._prev_requirement],
)
for pool_result in it:
self._unpack_result(pool_result)
finally:
self._pool.terminate()
)
for pool_result in it:
self._unpack_result(pool_result)
def next_test(self, test):
if self.is_stopped:
......@@ -81,9 +78,7 @@ class ExecutionProc(base.TestProc):
def stop(self):
super(ExecutionProc, self).stop()
for pool_result in self._pool.terminate_with_results():
self._unpack_result(pool_result)
self._pool.abort()
def _unpack_result(self, pool_result):
if pool_result.heartbeat:
......
......@@ -103,6 +103,15 @@ class SimpleProgressIndicator(ProgressIndicator):
class VerboseProgressIndicator(SimpleProgressIndicator):
def __init__(self):
super(VerboseProgressIndicator, self).__init__()
self._last_printed_time = time.time()
def _print(self, text):
print text
sys.stdout.flush()
self._last_printed_time = time.time()
def _on_result_for(self, test, result):
super(VerboseProgressIndicator, self)._on_result_for(test, result)
# TODO(majeski): Support for dummy/grouped results
......@@ -113,12 +122,13 @@ class VerboseProgressIndicator(SimpleProgressIndicator):
outcome = 'FAIL'
else:
outcome = 'pass'
print 'Done running %s: %s' % (test, outcome)
sys.stdout.flush()
self._print('Done running %s: %s' % (test, outcome))
def _on_heartbeat(self):
print 'Still working...'
sys.stdout.flush()
if time.time() - self._last_printed_time > 30:
# Print something every 30 seconds to not get killed by an output
# timeout.
self._print('Still working...')
class DotsProgressIndicator(SimpleProgressIndicator):
......
......@@ -5,29 +5,27 @@
import signal
from . import base
from testrunner.local import utils
class SignalProc(base.TestProcObserver):
def __init__(self):
super(SignalProc, self).__init__()
self._ctrlc = False
self.exit_code = utils.EXIT_CODE_PASS
def setup(self, *args, **kwargs):
super(SignalProc, self).setup(*args, **kwargs)
# It should be called after processors are chained together to not loose
# catched signal.
signal.signal(signal.SIGINT, self._on_ctrlc)
def _on_next_test(self, _test):
self._on_event()
def _on_result_for(self, _test, _result):
self._on_event()
signal.signal(signal.SIGTERM, self._on_sigterm)
def _on_ctrlc(self, _signum, _stack_frame):
print '>>> Ctrl-C detected, waiting for ongoing tests to finish...'
self._ctrlc = True
def _on_event(self):
if self._ctrlc:
self.stop()
print '>>> Ctrl-C detected, early abort...'
self.exit_code = utils.EXIT_CODE_INTERRUPTED
self.stop()
def _on_sigterm(self, _signum, _stack_frame):
print '>>> SIGTERM received, early abort...'
self.exit_code = utils.EXIT_CODE_TERMINATED
self.stop()
......@@ -19,6 +19,9 @@ class TimeoutProc(base.TestProcObserver):
def _on_result_for(self, test, result):
self._on_event()
def _on_heartbeat(self):
self._on_event()
def _on_event(self):
if not self.is_stopped:
if time.time() - self._start > self._duration_sec:
......
......@@ -147,7 +147,9 @@ class SystemTest(unittest.TestCase):
sys.path.append(TOOLS_ROOT)
global standard_runner
from testrunner import standard_runner
from testrunner.local import command
from testrunner.local import pool
command.setup_testing()
pool.setup_testing()
@classmethod
......@@ -396,7 +398,7 @@ class SystemTest(unittest.TestCase):
else:
self.assertIn('Running 1 base tests', result.stdout, result)
self.assertIn('0 tests ran', result.stdout, result)
self.assertEqual(3, result.returncode, result)
self.assertEqual(2, result.returncode, result)
def testDefaultProc(self):
self.testDefault(infra_staging=True)
......@@ -416,14 +418,14 @@ class SystemTest(unittest.TestCase):
else:
self.assertIn('Running 0 base tests', result.stdout, result)
self.assertIn('0 tests ran', result.stdout, result)
self.assertEqual(3, result.returncode, result)
self.assertEqual(2, result.returncode, result)
def testNoBuildConfig(self):
"""Test failing run when build config is not found."""
with temp_base() as basedir:
result = run_tests(basedir)
self.assertIn('Failed to load build config', result.stdout, result)
self.assertEqual(1, result.returncode, result)
self.assertEqual(5, result.returncode, result)
def testGNOption(self):
"""Test using gn option, but no gn build folder is found."""
......@@ -439,7 +441,7 @@ class SystemTest(unittest.TestCase):
result = run_tests(basedir, '--mode=Release')
self.assertIn('execution mode (release) for release is inconsistent '
'with build config (debug)', result.stdout, result)
self.assertEqual(1, result.returncode, result)
self.assertEqual(5, result.returncode, result)
def testInconsistentArch(self):
"""Test failing run when attempting to wrongly override the arch."""
......@@ -448,13 +450,13 @@ class SystemTest(unittest.TestCase):
self.assertIn(
'--arch value (ia32) inconsistent with build config (x64).',
result.stdout, result)
self.assertEqual(1, result.returncode, result)
self.assertEqual(5, result.returncode, result)
def testWrongVariant(self):
"""Test using a bogus variant."""
with temp_base() as basedir:
result = run_tests(basedir, '--mode=Release', '--variants=meh')
self.assertEqual(1, result.returncode, result)
self.assertEqual(5, result.returncode, result)
def testModeFromBuildConfig(self):
"""Test auto-detection of mode from build config."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment