Commit 1478c307 authored by Michal Majewski's avatar Michal Majewski Committed by Commit Bot

[test] Return heartbeats and results during pool termination

Bug: v8:6917
Change-Id: I5cca65111141f32f8b9f241a9f482d09e1b54655
Reviewed-on: https://chromium-review.googlesource.com/893982
Commit-Queue: Michał Majewski <majeski@google.com>
Reviewed-by: 's avatarMichael Achenbach <machenbach@chromium.org>
Cr-Commit-Position: refs/heads/master@{#50987}
parent 160f6009
...@@ -28,11 +28,11 @@ def setup_testing(): ...@@ -28,11 +28,11 @@ def setup_testing():
class NormalResult(): class NormalResult():
def __init__(self, result): def __init__(self, result):
self.result = result self.result = result
self.exception = False self.exception = None
class ExceptionResult(): class ExceptionResult():
def __init__(self): def __init__(self, exception):
self.exception = True self.exception = exception
class MaybeResult(): class MaybeResult():
...@@ -70,7 +70,7 @@ def Worker(fn, work_queue, done_queue, pause_event, read_again_event, ...@@ -70,7 +70,7 @@ def Worker(fn, work_queue, done_queue, pause_event, read_again_event,
except Exception, e: except Exception, e:
traceback.print_exc() traceback.print_exc()
print(">>> EXCEPTION: %s" % e) print(">>> EXCEPTION: %s" % e)
done_queue.put(ExceptionResult()) done_queue.put(ExceptionResult(e))
except KeyboardInterrupt: except KeyboardInterrupt:
assert False, 'Unreachable' assert False, 'Unreachable'
...@@ -132,6 +132,8 @@ class Pool(): ...@@ -132,6 +132,8 @@ class Pool():
process_context_fn. All arguments will be pickled and sent beyond the process_context_fn. All arguments will be pickled and sent beyond the
process boundary. process boundary.
""" """
if self.terminated:
return
try: try:
internal_error = False internal_error = False
gen = iter(gen) gen = iter(gen)
...@@ -154,20 +156,15 @@ class Pool(): ...@@ -154,20 +156,15 @@ class Pool():
while self.processing_count > 0: while self.processing_count > 0:
while True: while True:
try: try:
result = self.done_queue.get(timeout=self.heartbeat_timeout) result = self._get_result_from_queue()
self.processing_count -= 1 except:
break
except Empty:
# Indicate a heartbeat. The iterator will continue fetching the
# next result.
yield MaybeResult.create_heartbeat()
if result.exception:
# TODO(machenbach): Handle a few known types of internal errors # TODO(machenbach): Handle a few known types of internal errors
# gracefully, e.g. missing test files. # gracefully, e.g. missing test files.
internal_error = True internal_error = True
continue continue
else: yield result
yield MaybeResult.create_result(result.result) break
self.advance(gen) self.advance(gen)
except KeyboardInterrupt: except KeyboardInterrupt:
raise raise
...@@ -175,7 +172,6 @@ class Pool(): ...@@ -175,7 +172,6 @@ class Pool():
traceback.print_exc() traceback.print_exc()
print(">>> EXCEPTION: %s" % e) print(">>> EXCEPTION: %s" % e)
finally: finally:
# Ignore results
self.terminate() self.terminate()
if internal_error: if internal_error:
...@@ -196,45 +192,63 @@ class Pool(): ...@@ -196,45 +192,63 @@ class Pool():
def add(self, args): def add(self, args):
"""Adds an item to the work queue. Can be called dynamically while """Adds an item to the work queue. Can be called dynamically while
processing the results from imap_unordered.""" processing the results from imap_unordered."""
assert not self.terminated
self.work_queue.put(args) self.work_queue.put(args)
self.processing_count += 1 self.processing_count += 1
def terminate(self): def terminate(self):
"""Terminates execution and waits for ongoing jobs."""
# Iteration but ignore the results
list(self.terminate_with_results())
def terminate_with_results(self):
"""Terminates execution and waits for ongoing jobs. It's a generator
returning heartbeats and results for all jobs that started before calling
terminate.
"""
if self.terminated: if self.terminated:
return return
self.terminated = True self.terminated = True
results = []
self.pause_event.set() self.pause_event.set()
# Drain out work queue from tests # Drain out work queue from tests
try: try:
while self.processing_count: while self.processing_count > 0:
self.work_queue.get(True, 1) self.work_queue.get(True, 1)
self.processing_count -= 1 self.processing_count -= 1
except Empty: except Empty:
pass pass
# Make sure all processes stop # Make sure all processes stop
for p in self.processes: for _ in self.processes:
# During normal tear down the workers block on get(). Feed a poison pill # During normal tear down the workers block on get(). Feed a poison pill
# per worker to make them stop. # per worker to make them stop.
self.work_queue.put("STOP") self.work_queue.put("STOP")
# Workers stopped reading work queue if stop event is true to not overtake # Workers stopped reading work queue if stop event is true to not overtake
# draining queue, but they should read again to consume poison pill and # main process that drains the queue. They should read again to consume
# possibly more tests that we couldn't get during draining. # poison pill and possibly more tests that we couldn't get during draining.
self.read_again_event.set() self.read_again_event.set()
# Wait for results # Wait for results
while self.processing_count: while self.processing_count:
# TODO(majeski): terminate as generator to return results and heartbeats, result = self._get_result_from_queue()
result = self.done_queue.get() if result.heartbeat or result.value:
if result.result: yield result
results.append(MaybeResult.create_result(result.result))
self.processing_count -= 1
for p in self.processes: for p in self.processes:
p.join() p.join()
return results def _get_result_from_queue(self):
try:
result = self.done_queue.get(timeout=self.heartbeat_timeout)
self.processing_count -= 1
except Empty:
return MaybeResult.create_heartbeat()
if result.exception:
raise result.exception
return MaybeResult.create_result(result.result)
...@@ -37,8 +37,6 @@ class Job(object): ...@@ -37,8 +37,6 @@ class Job(object):
return JobResult(self.test_id, result) return JobResult(self.test_id, result)
# TODO(majeski): Stop workers when processor is stopped. It will also require
# to call stop both directions from TimeoutProc.
class ExecutionProc(base.TestProc): class ExecutionProc(base.TestProc):
"""Last processor in the chain. Instead of passing tests further it creates """Last processor in the chain. Instead of passing tests further it creates
commands and output processors, executes them in multiple worker processes and commands and output processors, executes them in multiple worker processes and
...@@ -69,6 +67,9 @@ class ExecutionProc(base.TestProc): ...@@ -69,6 +67,9 @@ class ExecutionProc(base.TestProc):
self._pool.terminate() self._pool.terminate()
def next_test(self, test): def next_test(self, test):
if self.is_stopped:
return
test_id = test.procid test_id = test.procid
cmd = test.get_command(self._context) cmd = test.get_command(self._context)
self._tests[test_id] = test, cmd self._tests[test_id] = test, cmd
...@@ -80,7 +81,9 @@ class ExecutionProc(base.TestProc): ...@@ -80,7 +81,9 @@ class ExecutionProc(base.TestProc):
assert False, 'ExecutionProc cannot receive results' assert False, 'ExecutionProc cannot receive results'
def stop(self): def stop(self):
for pool_result in self._pool.terminate(): super(ExecutionProc, self).stop()
for pool_result in self._pool.terminate_with_results():
self._unpack_result(pool_result) self._unpack_result(pool_result)
def _unpack_result(self, pool_result): def _unpack_result(self, pool_result):
......
...@@ -12,6 +12,10 @@ class SignalProc(base.TestProcObserver): ...@@ -12,6 +12,10 @@ class SignalProc(base.TestProcObserver):
super(SignalProc, self).__init__() super(SignalProc, self).__init__()
self._ctrlc = False self._ctrlc = False
def setup(self, *args, **kwargs):
super(SignalProc, self).setup(*args, **kwargs)
# It should be called after processors are chained together to not loose
# catched signal.
signal.signal(signal.SIGINT, self._on_ctrlc) signal.signal(signal.SIGINT, self._on_ctrlc)
def _on_next_test(self, _test): def _on_next_test(self, _test):
...@@ -21,8 +25,11 @@ class SignalProc(base.TestProcObserver): ...@@ -21,8 +25,11 @@ class SignalProc(base.TestProcObserver):
self._on_event() self._on_event()
def _on_ctrlc(self, _signum, _stack_frame): def _on_ctrlc(self, _signum, _stack_frame):
if not self._ctrlc:
print '>>> Ctrl-C detected, waiting for ongoing tests to finish...' print '>>> Ctrl-C detected, waiting for ongoing tests to finish...'
self._ctrlc = True self._ctrlc = True
else:
print '>>> Pressing Ctrl-C again won\'t make this faster...'
def _on_event(self): def _on_event(self):
if self._ctrlc: if self._ctrlc:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment