Commit 780077f6 authored by machenbach's avatar machenbach Committed by Commit bot

[test-runner] Move test case processing beyond the multi-process boundary.

This will allow moving the test outcome check beyond the
multi-process boundary in a follow up. It'll allow wrapping
more complex test jobs like predicable mode on the multi-
process side, which will make the code easier to maintain.

BUG=

Review URL: https://codereview.chromium.org/1469833002

Cr-Commit-Position: refs/heads/master@{#32373}
parent 6190c608
......@@ -44,6 +44,10 @@ class CcTestSuite(testsuite.TestSuite):
build_dir = "out"
self.serdes_dir = os.path.normpath(
os.path.join(root, "..", "..", build_dir, ".serdes"))
def SetupWorkingDirectory(self):
# This is only called once per machine, while init above is called once per
# process.
if os.path.exists(self.serdes_dir):
shutil.rmtree(self.serdes_dir, True)
os.makedirs(self.serdes_dir)
......
......@@ -316,6 +316,7 @@ def Main():
suite = testsuite.TestSuite.LoadTestSuite(
os.path.join(workspace, "test", root))
if suite:
suite.SetupWorkingDirectory()
suites.append(suite)
if options.download_data:
......
......@@ -603,6 +603,7 @@ def Main():
suite = testsuite.TestSuite.LoadTestSuite(
os.path.join(BASE_DIR, "test", root))
if suite:
suite.SetupWorkingDirectory()
suites.append(suite)
if options.download_data or options.download_data_only:
......
......@@ -26,6 +26,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections
import os
import shutil
import sys
......@@ -35,10 +36,17 @@ from pool import Pool
from . import commands
from . import perfdata
from . import statusfile
from . import testsuite
from . import utils
class Job(object):
# Base dir of the v8 checkout.
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))))
TEST_DIR = os.path.join(BASE_DIR, "test")
class Instructions(object):
def __init__(self, command, dep_command, test_id, timeout, verbose):
self.command = command
self.dep_command = dep_command
......@@ -47,18 +55,112 @@ class Job(object):
self.verbose = verbose
def RunTest(job):
start_time = time.time()
if job.dep_command is not None:
dep_output = commands.Execute(job.dep_command, job.verbose, job.timeout)
# TODO(jkummerow): We approximate the test suite specific function
# IsFailureOutput() by just checking the exit code here. Currently
# only cctests define dependencies, for which this simplification is
# correct.
if dep_output.exit_code != 0:
return (job.id, dep_output, time.time() - start_time)
output = commands.Execute(job.command, job.verbose, job.timeout)
return (job.id, output, time.time() - start_time)
# Structure that keeps global information per worker process.
ProcessContext = collections.namedtuple(
"process_context", ["suites", "context"])
def MakeProcessContext(context):
"""Generate a process-local context.
This reloads all suites per process and stores the global context.
Args:
context: The global context from the test runner.
"""
suite_paths = utils.GetSuitePaths(TEST_DIR)
suites = {}
for root in suite_paths:
# Don't reinitialize global state as this is concurrently called from
# different processes.
suite = testsuite.TestSuite.LoadTestSuite(
os.path.join(TEST_DIR, root), global_init=False)
if suite:
suites[suite.name] = suite
return ProcessContext(suites, context)
def GetCommand(test, context):
d8testflag = []
shell = test.suite.shell()
if shell == "d8":
d8testflag = ["--test"]
if utils.IsWindows():
shell += ".exe"
if context.random_seed:
d8testflag += ["--random-seed=%s" % context.random_seed]
cmd = (context.command_prefix +
[os.path.abspath(os.path.join(context.shell_dir, shell))] +
d8testflag +
test.suite.GetFlagsForTestCase(test, context) +
context.extra_flags)
return cmd
def _GetInstructions(test, context):
command = GetCommand(test, context)
timeout = context.timeout
if ("--stress-opt" in test.flags or
"--stress-opt" in context.mode_flags or
"--stress-opt" in context.extra_flags):
timeout *= 4
if "--noenable-vfp3" in context.extra_flags:
timeout *= 2
# FIXME(machenbach): Make this more OO. Don't expose default outcomes or
# the like.
if statusfile.IsSlow(test.outcomes or [statusfile.PASS]):
timeout *= 2
if test.dependency is not None:
dep_command = [ c.replace(test.path, test.dependency) for c in command ]
else:
dep_command = None
return Instructions(
command, dep_command, test.id, timeout, context.verbose)
class Job(object):
"""Stores data to be sent over the multi-process boundary.
All contained fields will be pickled/unpickled.
"""
def Run(self, process_context):
"""Executes the job.
Args:
process_context: Process-local information that is initialized by the
executing worker.
"""
raise NotImplementedError()
class TestJob(Job):
def __init__(self, test):
self.test = test
def Run(self, process_context):
# Retrieve a new suite object on the worker-process side. The original
# suite object isn't pickled.
self.test.SetSuiteObject(process_context.suites)
instr = _GetInstructions(self.test, process_context.context)
start_time = time.time()
if instr.dep_command is not None:
dep_output = commands.Execute(
instr.dep_command, instr.verbose, instr.timeout)
# TODO(jkummerow): We approximate the test suite specific function
# IsFailureOutput() by just checking the exit code here. Currently
# only cctests define dependencies, for which this simplification is
# correct.
if dep_output.exit_code != 0:
return (instr.id, dep_output, time.time() - start_time)
output = commands.Execute(instr.command, instr.verbose, instr.timeout)
return (instr.id, output, time.time() - start_time)
def RunTest(job, process_context):
return job.Run(process_context)
class Runner(object):
......@@ -100,25 +202,6 @@ class Runner(object):
print("PerfData exception: %s" % e)
self.perf_failures = True
def _GetJob(self, test):
command = self.GetCommand(test)
timeout = self.context.timeout
if ("--stress-opt" in test.flags or
"--stress-opt" in self.context.mode_flags or
"--stress-opt" in self.context.extra_flags):
timeout *= 4
if "--noenable-vfp3" in self.context.extra_flags:
timeout *= 2
# FIXME(machenbach): Make this more OO. Don't expose default outcomes or
# the like.
if statusfile.IsSlow(test.outcomes or [statusfile.PASS]):
timeout *= 2
if test.dependency is not None:
dep_command = [ c.replace(test.path, test.dependency) for c in command ]
else:
dep_command = None
return Job(command, dep_command, test.id, timeout, self.context.verbose)
def _MaybeRerun(self, pool, test):
if test.run <= self.context.rerun_failures_count:
# Possibly rerun this test if its run count is below the maximum per
......@@ -139,7 +222,7 @@ class Runner(object):
test.duration = None
test.output = None
test.run += 1
pool.add([self._GetJob(test)])
pool.add([TestJob(test)])
self.remaining += 1
self.total += 1
......@@ -209,7 +292,7 @@ class Runner(object):
# remember the output for comparison.
test.run += 1
test.output = result[1]
pool.add([self._GetJob(test)])
pool.add([TestJob(test)])
# Always update the perf database.
return True
......@@ -232,14 +315,19 @@ class Runner(object):
assert test.id >= 0
test_map[test.id] = test
try:
yield [self._GetJob(test)]
yield [TestJob(test)]
except Exception, e:
# If this failed, save the exception and re-raise it later (after
# all other tests have had a chance to run).
queued_exception[0] = e
continue
try:
it = pool.imap_unordered(RunTest, gen_tests())
it = pool.imap_unordered(
fn=RunTest,
gen=gen_tests(),
process_context_fn=MakeProcessContext,
process_context_args=[self.context],
)
for result in it:
if result.heartbeat:
self.indicator.Heartbeat()
......@@ -277,22 +365,6 @@ class Runner(object):
print text
sys.stdout.flush()
def GetCommand(self, test):
d8testflag = []
shell = test.suite.shell()
if shell == "d8":
d8testflag = ["--test"]
if utils.IsWindows():
shell += ".exe"
if self.context.random_seed:
d8testflag += ["--random-seed=%s" % self.context.random_seed]
cmd = (self.context.command_prefix +
[os.path.abspath(os.path.join(self.context.shell_dir, shell))] +
d8testflag +
test.suite.GetFlagsForTestCase(test, self.context) +
self.context.extra_flags)
return cmd
class BreakNowException(Exception):
def __init__(self, value):
......
......@@ -5,6 +5,8 @@
from Queue import Empty
from multiprocessing import Event, Process, Queue
import traceback
class NormalResult():
def __init__(self, result):
......@@ -39,17 +41,22 @@ class MaybeResult():
return MaybeResult(False, value)
def Worker(fn, work_queue, done_queue, done):
def Worker(fn, work_queue, done_queue, done,
process_context_fn=None, process_context_args=None):
"""Worker to be run in a child process.
The worker stops on two conditions. 1. When the poison pill "STOP" is
reached or 2. when the event "done" is set."""
try:
kwargs = {}
if process_context_fn and process_context_args is not None:
kwargs.update(process_context=process_context_fn(*process_context_args))
for args in iter(work_queue.get, "STOP"):
if done.is_set():
break
try:
done_queue.put(NormalResult(fn(*args)))
done_queue.put(NormalResult(fn(*args, **kwargs)))
except Exception, e:
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
done_queue.put(ExceptionResult())
except KeyboardInterrupt:
......@@ -84,13 +91,23 @@ class Pool():
self.done = Event()
self.heartbeat_timeout = heartbeat_timeout
def imap_unordered(self, fn, gen):
def imap_unordered(self, fn, gen,
process_context_fn=None, process_context_args=None):
"""Maps function "fn" to items in generator "gen" on the worker processes
in an arbitrary order. The items are expected to be lists of arguments to
the function. Returns a results iterator. A result value of type
MaybeResult either indicates a heartbeat of the runner, i.e. indicating
that the runner is still waiting for the result to be computed, or it wraps
the real result."""
the real result.
Args:
process_context_fn: Function executed once by each worker. Expected to
return a process-context object. If present, this object is passed
as additional argument to each call to fn.
process_context_args: List of arguments for the invocation of
process_context_fn. All arguments will be pickled and sent beyond the
process boundary.
"""
try:
gen = iter(gen)
self.advance = self._advance_more
......@@ -99,7 +116,9 @@ class Pool():
p = Process(target=Worker, args=(fn,
self.work_queue,
self.done_queue,
self.done))
self.done,
process_context_fn,
process_context_args))
self.processes.append(p)
p.start()
......
......@@ -32,24 +32,13 @@ import os
import sys
import time
from . import execution
from . import junit_output
ABS_PATH_PREFIX = os.getcwd() + os.sep
def EscapeCommand(command):
parts = []
for part in command:
if ' ' in part:
# Escape spaces. We may need to escape more characters for this
# to work properly.
parts.append('"%s"' % part)
else:
parts.append(part)
return " ".join(parts)
class ProgressIndicator(object):
def __init__(self):
......@@ -83,6 +72,18 @@ class ProgressIndicator(object):
'negative': negative_marker
}
def _EscapeCommand(self, test):
command = execution.GetCommand(test, self.runner.context)
parts = []
for part in command:
if ' ' in part:
# Escape spaces. We may need to escape more characters for this
# to work properly.
parts.append('"%s"' % part)
else:
parts.append(part)
return " ".join(parts)
class IndicatorNotifier(object):
"""Holds a list of progress indicators and notifies them all on events."""
......@@ -124,7 +125,7 @@ class SimpleProgressIndicator(ProgressIndicator):
if failed.output.stdout:
print "--- stdout ---"
print failed.output.stdout.strip()
print "Command: %s" % EscapeCommand(self.runner.GetCommand(failed))
print "Command: %s" % self._EscapeCommand(failed)
if failed.output.HasCrashed():
print "exit code: %d" % failed.output.exit_code
print "--- CRASHED ---"
......@@ -212,7 +213,7 @@ class CompactProgressIndicator(ProgressIndicator):
stderr = test.output.stderr.strip()
if len(stderr):
print self.templates['stderr'] % stderr
print "Command: %s" % EscapeCommand(self.runner.GetCommand(test))
print "Command: %s" % self._EscapeCommand(test)
if test.output.HasCrashed():
print "exit code: %d" % test.output.exit_code
print "--- CRASHED ---"
......@@ -300,7 +301,7 @@ class JUnitTestProgressIndicator(ProgressIndicator):
stderr = test.output.stderr.strip()
if len(stderr):
fail_text += "stderr:\n%s\n" % stderr
fail_text += "Command: %s" % EscapeCommand(self.runner.GetCommand(test))
fail_text += "Command: %s" % self._EscapeCommand(test)
if test.output.HasCrashed():
fail_text += "exit code: %d\n--- CRASHED ---" % test.output.exit_code
if test.output.HasTimedOut():
......@@ -335,8 +336,7 @@ class JsonTestProgressIndicator(ProgressIndicator):
{
"name": test.GetLabel(),
"flags": test.flags,
"command": EscapeCommand(self.runner.GetCommand(test)).replace(
ABS_PATH_PREFIX, ""),
"command": self._EscapeCommand(test).replace(ABS_PATH_PREFIX, ""),
"duration": test.duration,
} for test in timed_tests[:20]
]
......@@ -362,8 +362,7 @@ class JsonTestProgressIndicator(ProgressIndicator):
self.results.append({
"name": test.GetLabel(),
"flags": test.flags,
"command": EscapeCommand(self.runner.GetCommand(test)).replace(
ABS_PATH_PREFIX, ""),
"command": self._EscapeCommand(test).replace(ABS_PATH_PREFIX, ""),
"run": test.run,
"stdout": test.output.stdout,
"stderr": test.output.stderr,
......
......@@ -90,7 +90,7 @@ class VariantGenerator(object):
class TestSuite(object):
@staticmethod
def LoadTestSuite(root):
def LoadTestSuite(root, global_init=True):
name = root.split(os.path.sep)[-1]
f = None
try:
......@@ -105,6 +105,8 @@ class TestSuite(object):
f.close()
def __init__(self, name, root):
# Note: This might be called concurrently from different processes.
# Changing harddisk state should be done in 'SetupWorkingDirectory' below.
self.name = name # string
self.root = root # string containing path
self.tests = None # list of TestCase objects
......@@ -112,6 +114,11 @@ class TestSuite(object):
self.wildcards = None # dictionary mapping test paths to list of outcomes
self.total_duration = None # float, assigned on demand
def SetupWorkingDirectory(self):
# This is called once per test suite object in a multi-process setting.
# Multi-process-unsafe work-directory setup can go here.
pass
def shell(self):
return "d8"
......
......@@ -93,6 +93,7 @@ def Execute(workspace, ctx, tests, sock, server):
suite = testsuite.TestSuite.LoadTestSuite(
os.path.join(workspace, "test", root))
if suite:
suite.SetupWorkingDirectory()
suites.append(suite)
suites_dict = {}
......
......@@ -86,3 +86,11 @@ class TestCase(object):
def GetLabel(self):
return self.suitename() + "/" + self.suite.CommonTestName(self)
def __getstate__(self):
"""Representation to pickle test cases.
The original suite won't be sent beyond process boundaries. Instead
send the name only and retrieve a process-local suite later.
"""
return dict(self.__dict__, suite=self.suite.name)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment