Commit d161bc3c authored by Michal Majewski's avatar Michal Majewski Committed by Commit Bot

[test] Added signal handling to test processors

Bug: v8:6917
Change-Id: If91defd11c91fd26bdbacc146992745ea772a941
Reviewed-on: https://chromium-review.googlesource.com/890938Reviewed-by: 's avatarMichael Achenbach <machenbach@chromium.org>
Commit-Queue: Michał Majewski <majeski@google.com>
Cr-Commit-Position: refs/heads/master@{#50985}
parent 957ac364
......@@ -21,6 +21,7 @@ from testrunner.local import testsuite
from testrunner.local import utils
from testrunner.test_config import TestConfig
from testrunner.testproc.shard import ShardProc
from testrunner.testproc.sigproc import SignalProc
from testrunner.testproc.timeout import TimeoutProc
......@@ -567,3 +568,6 @@ class BaseTestRunner(object):
if not options.total_timeout_sec:
return None
return TimeoutProc(options.total_timeout_sec)
def _create_signal_proc(self):
return SignalProc()
......@@ -4,7 +4,9 @@
# found in the LICENSE file.
from Queue import Empty
from contextlib import contextmanager
from multiprocessing import Event, Process, Queue
import signal
import traceback
......@@ -27,19 +29,10 @@ class NormalResult():
def __init__(self, result):
self.result = result
self.exception = False
self.break_now = False
class ExceptionResult():
def __init__(self):
self.exception = True
self.break_now = False
class BreakResult():
def __init__(self):
self.exception = False
self.break_now = True
class MaybeResult():
......@@ -56,18 +49,22 @@ class MaybeResult():
return MaybeResult(False, value)
def Worker(fn, work_queue, done_queue, done,
def Worker(fn, work_queue, done_queue, pause_event, read_again_event,
process_context_fn=None, process_context_args=None):
"""Worker to be run in a child process.
The worker stops on two conditions. 1. When the poison pill "STOP" is
reached or 2. when the event "done" is set."""
The worker stops when the poison pill "STOP" is reached.
It pauses when pause event is set and waits until read again event is true.
"""
try:
kwargs = {}
if process_context_fn and process_context_args is not None:
kwargs.update(process_context=process_context_fn(*process_context_args))
for args in iter(work_queue.get, "STOP"):
if done.is_set():
break
if pause_event.is_set():
done_queue.put(NormalResult(None))
read_again_event.wait()
continue
try:
done_queue.put(NormalResult(fn(*args, **kwargs)))
except Exception, e:
......@@ -75,7 +72,14 @@ def Worker(fn, work_queue, done_queue, done,
print(">>> EXCEPTION: %s" % e)
done_queue.put(ExceptionResult())
except KeyboardInterrupt:
done_queue.put(BreakResult())
assert False, 'Unreachable'
@contextmanager
def without_sigint():
handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, handler)
class Pool():
......@@ -93,19 +97,24 @@ class Pool():
self.processes = []
self.terminated = False
# Invariant: count >= #work_queue + #done_queue. It is greater when a
# worker takes an item from the work_queue and before the result is
# Invariant: processing_count >= #work_queue + #done_queue. It is greater
# when a worker takes an item from the work_queue and before the result is
# submitted to the done_queue. It is equal when no worker is working,
# e.g. when all workers have finished, and when no results are processed.
# Count is only accessed by the parent process. Only the parent process is
# allowed to remove items from the done_queue and to add items to the
# work_queue.
self.count = 0
self.work_queue = Queue()
self.done_queue = Queue()
self.done = Event()
self.processing_count = 0
self.heartbeat_timeout = heartbeat_timeout
# Disable sigint to make multiprocessing data structure inherit it and
# ignore ctrl-c
with without_sigint():
self.work_queue = Queue()
self.done_queue = Queue()
self.pause_event = Event()
self.read_again_event = Event()
def imap_unordered(self, fn, gen,
process_context_fn=None, process_context_args=None):
"""Maps function "fn" to items in generator "gen" on the worker processes
......@@ -128,35 +137,35 @@ class Pool():
gen = iter(gen)
self.advance = self._advance_more
for w in xrange(self.num_workers):
p = Process(target=Worker, args=(fn,
self.work_queue,
self.done_queue,
self.done,
process_context_fn,
process_context_args))
p.start()
self.processes.append(p)
# Disable sigint to make workers inherit it and ignore ctrl-c
with without_sigint():
for w in xrange(self.num_workers):
p = Process(target=Worker, args=(fn,
self.work_queue,
self.done_queue,
self.pause_event,
self.read_again_event,
process_context_fn,
process_context_args))
p.start()
self.processes.append(p)
self.advance(gen)
while self.count > 0:
while self.processing_count > 0:
while True:
try:
result = self.done_queue.get(timeout=self.heartbeat_timeout)
self.processing_count -= 1
break
except Empty:
# Indicate a heartbeat. The iterator will continue fetching the
# next result.
yield MaybeResult.create_heartbeat()
self.count -= 1
if result.exception:
# TODO(machenbach): Handle a few known types of internal errors
# gracefully, e.g. missing test files.
internal_error = True
continue
elif result.break_now:
# A keyboard interrupt happened in one of the worker processes.
raise KeyboardInterrupt
else:
yield MaybeResult.create_result(result.result)
self.advance(gen)
......@@ -166,15 +175,17 @@ class Pool():
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
finally:
# Ignore results
self.terminate()
if internal_error:
raise Exception("Internal error in a worker process.")
def _advance_more(self, gen):
while self.count < self.num_workers * self.BUFFER_FACTOR:
while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
try:
self.work_queue.put(gen.next())
self.count += 1
self.processing_count += 1
except StopIteration:
self.advance = self._advance_empty
break
......@@ -186,31 +197,44 @@ class Pool():
"""Adds an item to the work queue. Can be called dynamically while
processing the results from imap_unordered."""
self.work_queue.put(args)
self.count += 1
self.processing_count += 1
def terminate(self):
if self.terminated:
return
self.terminated = True
results = []
# For exceptional tear down set the "done" event to stop the workers before
# they empty the queue buffer.
self.done.set()
self.pause_event.set()
# Drain out work queue from tests
try:
while self.processing_count:
self.work_queue.get(True, 1)
self.processing_count -= 1
except Empty:
pass
# Make sure all processes stop
for p in self.processes:
# During normal tear down the workers block on get(). Feed a poison pill
# per worker to make them stop.
self.work_queue.put("STOP")
# Workers stopped reading work queue if stop event is true to not overtake
# draining queue, but they should read again to consume poison pill and
# possibly more tests that we couldn't get during draining.
self.read_again_event.set()
# Wait for results
while self.processing_count:
# TODO(majeski): terminate as generator to return results and heartbeats,
result = self.done_queue.get()
if result.result:
results.append(MaybeResult.create_result(result.result))
self.processing_count -= 1
for p in self.processes:
p.join()
# Drain the queues to prevent failures when queues are garbage collected.
try:
while True: self.work_queue.get(False)
except:
pass
try:
while True: self.done_queue.get(False)
except:
pass
return results
......@@ -40,3 +40,7 @@ class PoolTest(unittest.TestCase):
pool.add([result.value + 20])
self.assertEquals(set(range(0, 10) + range(20, 30) + range(40, 50)),
results)
if __name__ == '__main__':
unittest.main()
......@@ -175,7 +175,8 @@ class NumFuzzer(base_runner.BaseTestRunner):
# different random seeds for shards instead of splitting tests.
self._create_shard_proc(options),
combiner,
self._create_fuzzer(fuzzer_rng, options)
self._create_fuzzer(fuzzer_rng, options),
self._create_signal_proc(),
] + indicators + [
results,
self._create_timeout_proc(options),
......
......@@ -589,6 +589,7 @@ class StandardTestRunner(base_runner.BaseTestRunner):
VariantProc(self._variants),
StatusFileFilterProc(options.slow_tests, options.pass_fail_tests),
self._create_seed_proc(options),
self._create_signal_proc(),
] + indicators + [
results,
self._create_timeout_proc(options),
......@@ -615,7 +616,7 @@ class StandardTestRunner(base_runner.BaseTestRunner):
for indicator in indicators:
indicator.finished()
print '>>> %d tests ran' % results.total
print '>>> %d tests ran' % (results.total - results.remaining)
exit_code = 0
if results.failed:
......
......@@ -94,9 +94,12 @@ class TestProc(object):
self._prev_proc.heartbeat()
def stop(self):
self._stopped = True
if self._prev_proc:
self._prev_proc.stop()
if not self._stopped:
self._stopped = True
if self._prev_proc:
self._prev_proc.stop()
if self._next_proc:
self._next_proc.stop()
@property
def is_stopped(self):
......
......@@ -64,20 +64,7 @@ class ExecutionProc(base.TestProc):
process_context_args=[self._prev_requirement],
)
for pool_result in it:
if pool_result.heartbeat:
continue
job_result = pool_result.value
test_id, result = job_result
test, result.cmd = self._tests[test_id]
del self._tests[test_id]
self._send_result(test, result)
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
raise
self._unpack_result(pool_result)
finally:
self._pool.terminate()
......@@ -91,3 +78,19 @@ class ExecutionProc(base.TestProc):
def result_for(self, test, result):
assert False, 'ExecutionProc cannot receive results'
def stop(self):
for pool_result in self._pool.terminate():
self._unpack_result(pool_result)
def _unpack_result(self, pool_result):
if pool_result.heartbeat:
self.heartbeat()
return
job_result = pool_result.value
test_id, result = job_result
test, result.cmd = self._tests[test_id]
del self._tests[test_id]
self._send_result(test, result)
......@@ -34,7 +34,7 @@ class RerunProc(base.TestProcProducer):
results = self._results[test.procid]
results.append(result)
if self._needs_rerun(test, result):
if not self.is_stopped and self._needs_rerun(test, result):
self._rerun[test.procid] += 1
if self._rerun_total_left is not None:
self._rerun_total_left -= 1
......
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import signal
from . import base
class SignalProc(base.TestProcObserver):
def __init__(self):
super(SignalProc, self).__init__()
self._ctrlc = False
signal.signal(signal.SIGINT, self._on_ctrlc)
def _on_next_test(self, _test):
self._on_event()
def _on_result_for(self, _test, _result):
self._on_event()
def _on_ctrlc(self, _signum, _stack_frame):
print '>>> Ctrl-C detected, waiting for ongoing tests to finish...'
self._ctrlc = True
def _on_event(self):
if self._ctrlc:
self.stop()
......@@ -7,7 +7,6 @@ import time
from . import base
# TODO(majeski): Signal handler
class TimeoutProc(base.TestProcObserver):
def __init__(self, duration_sec):
super(TimeoutProc, self).__init__()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment