Commit da3416f5 authored by Michal Majewski's avatar Michal Majewski Committed by Commit Bot

[test] Introduce test processors

Test processors can be enabled with --infra-staging flag.

Rerunning tests, execution and verbose progress indicator already
work as test processors.

Bug: v8:6917
Change-Id: I40fc42db94dbc8629e8000a3d363030045532fe3
Reviewed-on: https://chromium-review.googlesource.com/850398
Commit-Queue: Michał Majewski <majeski@google.com>
Reviewed-by: 's avatarMichael Achenbach <machenbach@chromium.org>
Cr-Commit-Position: refs/heads/master@{#50399}
parent 9a3cd042
......@@ -37,7 +37,12 @@ from . import command
from . import perfdata
from . import statusfile
from . import utils
from pool import Pool
from . pool import Pool
from ..objects import predictable
from ..testproc.execution import ExecutionProc
from ..testproc.loader import LoadProc
from ..testproc.progress import VerboseProgressIndicator, ResultsTracker
from ..testproc.rerun import RerunProc
# Base dir of the v8 checkout.
......@@ -210,6 +215,9 @@ class Runner(object):
return not has_unexpected_output
def Run(self, jobs):
if self.context.infra_staging:
return self._RunTestProc(jobs)
self.indicator.Starting()
self._RunInternal(jobs)
self.indicator.Done()
......@@ -219,6 +227,47 @@ class Runner(object):
return 2
return 0
def _RunTestProc(self, jobs):
print '>>> Running with test processors'
procs = []
indicators = self.indicator.ToProgressIndicatorProcs()
# TODO(majeski): Implement all indicators and remove this filter.
indicators = filter(None, indicators)
loader = LoadProc()
procs.append(loader)
results = ResultsTracker(count_subtests=False)
procs.append(results)
procs += indicators
if self.context.rerun_failures_count:
procs.append(RerunProc(
self.context.rerun_failures_count,
self.context.rerun_failures_max
))
execproc = ExecutionProc(jobs, self.context)
procs.append(execproc)
for i in xrange(0, len(procs) - 1):
procs[i].connect_to(procs[i + 1])
loader.load_tests(self.tests)
for indicator in indicators:
indicator.starting()
execproc.start()
for indicator in indicators:
indicator.finished()
if results.failed:
return 1
if results.remaining:
return 2
return 0
def _RunInternal(self, jobs):
pool = Pool(jobs)
test_map = {}
......
......@@ -34,6 +34,7 @@ import time
from . import junit_output
from . import statusfile
from ..testproc import progress as progress_proc
class ProgressIndicator(object):
......@@ -66,6 +67,11 @@ class ProgressIndicator(object):
'negative': negative_marker,
}
def ToProgressIndicatorProc(self):
print ('Warning: %s is not available as a processor' %
self.__class__.__name__)
return None
class IndicatorNotifier(object):
"""Holds a list of progress indicators and notifies them all on events."""
......@@ -75,6 +81,9 @@ class IndicatorNotifier(object):
def Register(self, indicator):
self.indicators.append(indicator)
def ToProgressIndicatorProcs(self):
return [i.ToProgressIndicatorProc() for i in self.indicators]
# Forge all generic event-dispatching methods in IndicatorNotifier, which are
# part of the ProgressIndicator interface.
......@@ -144,6 +153,9 @@ class VerboseProgressIndicator(SimpleProgressIndicator):
print 'Still working...'
sys.stdout.flush()
def ToProgressIndicatorProc(self):
return progress_proc.VerboseProgressIndicator()
class DotsProgressIndicator(SimpleProgressIndicator):
......@@ -302,6 +314,10 @@ class JsonTestProgressIndicator(ProgressIndicator):
self.results = []
self.tests = []
def ToProgressIndicatorProc(self):
return progress_proc.JsonTestProgressIndicator(
self.json_test_results, self.arch, self.mode, self.random_seed)
def Done(self):
complete_results = []
if os.path.exists(self.json_test_results):
......
......@@ -30,7 +30,7 @@ class Context():
def __init__(self, arch, mode, shell_dir, mode_flags, verbose, timeout,
isolates, command_prefix, extra_flags, noi18n, random_seed,
no_sorting, rerun_failures_count, rerun_failures_max, no_harness,
use_perf_data, sancov_dir):
use_perf_data, sancov_dir, infra_staging=False):
self.arch = arch
self.mode = mode
self.shell_dir = shell_dir
......@@ -48,3 +48,4 @@ class Context():
self.no_harness = no_harness
self.use_perf_data = use_perf_data
self.sancov_dir = sancov_dir
self.infra_staging = infra_staging
......@@ -53,11 +53,23 @@ class TestCase(object):
self.run = 1 # The nth time this test is executed.
self.cmd = None
# Fields used by the test processors.
self.origin = None # Test that this test is subtest of.
self.processor = None # Processor that created this subtest.
self.procid = '%s/%s' % (self.suite.name, self.name) # unique id
self._statusfile_outcomes = None
self._expected_outcomes = None # optimization: None == [statusfile.PASS]
self._statusfile_flags = None
self._prepare_outcomes()
def create_subtest(self, processor, subtest_id):
subtest = copy.copy(self)
subtest.origin = self
subtest.processor = processor
subtest.procid += '.%s' % subtest_id
return subtest
def create_variant(self, variant, flags):
"""Makes a shallow copy of the object and updates variant, variant_flags and
all fields that depend on it, e.g. expected outcomes.
......@@ -68,6 +80,7 @@ class TestCase(object):
else:
other.variant_flags = self.variant_flags + flags
other.variant = variant
other.procid += '[%s]' % variant
other._prepare_outcomes(variant != self.variant)
......
......@@ -380,7 +380,8 @@ class StandardTestRunner(base_runner.BaseTestRunner):
options.rerun_failures_max,
options.no_harness,
use_perf_data=not options.swarming,
sancov_dir=self.sancov_dir)
sancov_dir=self.sancov_dir,
infra_staging=options.infra_staging)
# TODO(all): Combine "simulator" and "simulator_run".
# TODO(machenbach): In GN we can derive simulator run from
......
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
class TestProc(object):
def __init__(self):
self._prev_proc = None
self._next_proc = None
def connect_to(self, next_proc):
next_proc._prev_proc = self
self._next_proc = next_proc
def next_test(self, test):
raise NotImplementedError()
def result_for(self, test, result, is_last):
raise NotImplementedError()
### Communication
def _send_test(self, test):
return self._next_proc.next_test(test)
def _send_result(self, test, result, is_last=True):
return self._prev_proc.result_for(test, result, is_last=is_last)
class TestProcObserver(TestProc):
def next_test(self, test):
self._on_next_test(test)
self._send_test(test)
def result_for(self, test, result, is_last):
self._on_result_for(test, result, is_last)
self._send_result(test, result, is_last)
def _on_next_test(self, test):
pass
def _on_result_for(self, test, result, is_last):
pass
class TestProcProducer(TestProc):
def __init__(self, name):
super(TestProcProducer, self).__init__()
self._name = name
def next_test(self, test):
return self._next_test(test)
def result_for(self, subtest, result, is_last):
test = self._get_subtest_origin(subtest)
self._result_for(test, subtest, result, is_last)
### Implementation
def _next_test(self, test):
raise NotImplementedError()
def _result_for(self, test, subtest, result, is_last):
raise NotImplementedError()
### Managing subtests
def _create_subtest(self, test, subtest_id):
return test.create_subtest(self, '%s-%s' % (self._name, subtest_id))
def _get_subtest_origin(self, subtest):
while subtest.processor and subtest.processor is not self:
subtest = subtest.origin
return subtest.origin
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import traceback
from . import base
from ..local import pool
# Global function for multiprocessing, because pickling a static method doesn't
# work on Windows.
def run_job(job):
return job.run()
JobResult = collections.namedtuple('JobResult', ['id', 'result'])
class Job(object):
def __init__(self, test_id, cmd, outproc):
self.test_id = test_id
self.cmd = cmd
self.outproc = outproc
def run(self):
output = self.cmd.execute()
return JobResult(self.test_id, self.outproc.process(output))
class ExecutionProc(base.TestProc):
def __init__(self, jobs, context):
super(ExecutionProc, self).__init__()
self._pool = pool.Pool(jobs)
self._context = context
self._tests = {}
def connect_to(self, next_proc):
assert False, 'ExecutionProc cannot be connected to anything'
def start(self):
try:
it = self._pool.imap_unordered(
fn=run_job,
gen=[],
process_context_fn=None,
process_context_args=None,
)
for pool_result in it:
if pool_result.heartbeat:
continue
job_result = pool_result.value
test_id, result = job_result
test = self._tests[test_id]
del self._tests[test_id]
self._send_result(test, result)
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
raise
finally:
self._pool.terminate()
def next_test(self, test):
test_id = test.procid
self._tests[test_id] = test
# TODO(majeski): Don't modify test. It's currently used in the progress
# indicator.
test.cmd = test.get_command(self._context)
# TODO(majeski): Needs factory for outproc as in local/execution.py
outproc = test.output_proc
self._pool.add([Job(test_id, test.cmd, outproc)])
def result_for(self, test, result, is_last):
assert False, 'ExecutionProc cannot receive results'
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from . import base
class LoadProc(base.TestProc):
def load_tests(self, tests):
loaded = set()
for test in tests:
if test.procid in loaded:
print 'Warning: %s already obtained' % test.procid
continue
loaded.add(test.procid)
self._send_test(test)
def result_for(self, test, result, is_last):
pass
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import os
import sys
from . import base
def print_failure_header(test):
if test.output_proc.negative:
negative_marker = '[negative] '
else:
negative_marker = ''
print "=== %(label)s %(negative)s===" % {
'label': test,
'negative': negative_marker,
}
class ResultsTracker(base.TestProcObserver):
def __init__(self, count_subtests):
super(ResultsTracker, self).__init__()
self.failed = 0
self.remaining = 0
self.total = 0
self.count_subtests = count_subtests
def _on_next_test(self, test):
self.total += 1
self.remaining += 1
# TODO(majeski): If count_subtests is set get number of subtests from the
# next proc.
def _on_result_for(self, test, result, is_last):
if not is_last and not self.count_subtests:
return
self.remaining -= 1
if result.has_unexpected_output:
self.failed += 1
class ProgressIndicator(base.TestProcObserver):
def starting(self):
pass
def finished(self):
pass
class SimpleProgressIndicator(ProgressIndicator):
def __init__(self):
super(SimpleProgressIndicator, self).__init__()
self._failed = []
self._total = 0
def _on_next_test(self, test):
# TODO(majeski): Collect information about subtests, e.g. for each test
# we create multiple variants.
self._total += 1
def _on_result_for(self, test, result, is_last):
if result.has_unexpected_output:
self._failed.append((test, result.output))
def starting(self):
print 'Running %i tests' % self._total
def finished(self):
crashed = 0
print
for test, output in self._failed:
print_failure_header(test)
if output.stderr:
print "--- stderr ---"
print output.stderr.strip()
if output.stdout:
print "--- stdout ---"
print output.stdout.strip()
print "Command: %s" % test.cmd.to_string()
if output.HasCrashed():
print "exit code: %d" % output.exit_code
print "--- CRASHED ---"
crashed += 1
if output.HasTimedOut():
print "--- TIMEOUT ---"
if len(self._failed) == 0:
print "==="
print "=== All tests succeeded"
print "==="
else:
print
print "==="
print "=== %i tests failed" % len(self._failed)
if crashed > 0:
print "=== %i tests CRASHED" % crashed
print "==="
class VerboseProgressIndicator(SimpleProgressIndicator):
def _on_result_for(self, test, result, is_last):
super(VerboseProgressIndicator, self)._on_result_for(test, result, is_last)
if result.has_unexpected_output:
if result.output.HasCrashed():
outcome = 'CRASH'
else:
outcome = 'FAIL'
else:
outcome = 'pass'
print 'Done running %s: %s' % (test, outcome)
sys.stdout.flush()
class JsonTestProgressIndicator(ProgressIndicator):
def __init__(self, json_test_results, arch, mode, random_seed):
super(JsonTestProgressIndicator, self).__init__()
self.json_test_results = json_test_results
self.arch = arch
self.mode = mode
self.random_seed = random_seed
self.results = []
self.tests = []
def _on_result_for(self, test, result, is_last):
output = result.output
# Buffer all tests for sorting the durations in the end.
self.tests.append((test, output.duration))
# TODO(majeski): Previously we included reruns here. If we still want this
# json progress indicator should be placed just before execution.
if not result.has_unexpected_output:
# Omit tests that run as expected.
return
self.results.append({
"name": str(test),
"flags": test.cmd.args,
"command": test.cmd.to_string(relative=True),
"run": -100, # TODO(majeski): do we need this?
"stdout": output.stdout,
"stderr": output.stderr,
"exit_code": output.exit_code,
"result": test.output_proc.get_outcome(output),
"expected": test.expected_outcomes,
"duration": output.duration,
# TODO(machenbach): This stores only the global random seed from the
# context and not possible overrides when using random-seed stress.
"random_seed": self.random_seed,
"target_name": test.get_shell(),
"variant": test.variant,
})
def finished(self):
complete_results = []
if os.path.exists(self.json_test_results):
with open(self.json_test_results, "r") as f:
# Buildbot might start out with an empty file.
complete_results = json.loads(f.read() or "[]")
duration_mean = None
if self.tests:
# Get duration mean.
duration_mean = (
sum(duration for (_, duration) in self.tests) /
float(len(self.tests)))
# Sort tests by duration.
self.tests.sort(key=lambda (_, duration): duration, reverse=True)
slowest_tests = [
{
"name": str(test),
"flags": test.cmd.args,
"command": test.cmd.to_string(relative=True),
"duration": duration,
"marked_slow": test.is_slow,
} for (test, duration) in self.tests[:20]
]
complete_results.append({
"arch": self.arch,
"mode": self.mode,
"results": self.results,
"slowest_tests": slowest_tests,
"duration_mean": duration_mean,
"test_total": len(self.tests),
})
with open(self.json_test_results, "w") as f:
f.write(json.dumps(complete_results))
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from . import base
class RerunProc(base.TestProcProducer):
def __init__(self, rerun_max, rerun_max_total=None):
super(RerunProc, self).__init__('Rerun')
self._rerun = dict()
self._rerun_max = rerun_max
self._rerun_total_left = rerun_max_total
def _next_test(self, test):
self._init_test(test)
self._send_next_subtest(test)
def _result_for(self, test, subtest, result, is_last):
# Rerun processor cannot be placed before any processor that produces more
# than one subtest per test.
# TODO(majeski): Introduce constraints and check them during pipeline
# creation to avoid asserts like that.
assert is_last
if self._needs_rerun(test, result):
self._rerun[test.procid] += 1
if self._rerun_total_left is not None:
self._rerun_total_left -= 1
self._send_next_subtest(test)
else:
self._finalize_test(test)
self._send_result(test, result)
def _init_test(self, test):
self._rerun[test.procid] = 0
def _needs_rerun(self, test, result):
# TODO(majeski): Limit reruns count for slow tests.
return ((self._rerun_total_left is None or self._rerun_total_left > 0) and
self._rerun[test.procid] < self._rerun_max and
result.has_unexpected_output)
def _send_next_subtest(self, test):
run = self._rerun[test.procid]
subtest = self._create_subtest(test, str(run + 1))
self._send_test(subtest)
def _finalize_test(self, test):
del self._rerun[test.procid]
......@@ -95,10 +95,12 @@ def capture():
sys.stderr = olderr
def run_tests(basedir, *args):
def run_tests(basedir, *args, **kwargs):
"""Executes the test runner with captured output."""
with capture() as (stdout, stderr):
sys_args = ['--command-prefix', sys.executable] + list(args)
if kwargs.get('infra_staging', False):
sys_args.append('--infra-staging')
code = standard_runner.StandardTestRunner(
basedir=basedir).execute(sys_args)
return Result(stdout.getvalue(), stderr.getvalue(), code)
......@@ -194,7 +196,10 @@ class SystemTest(unittest.TestCase):
self.assertIn('Done running sweet/raspberries', result.stdout, result)
self.assertEqual(0, result.returncode, result)
def testFail(self):
def testFailProc(self):
self.testFail(infra_staging=True)
def testFail(self, infra_staging=False):
"""Test running only failing tests in two variants."""
with temp_base() as basedir:
result = run_tests(
......@@ -203,12 +208,16 @@ class SystemTest(unittest.TestCase):
'--progress=verbose',
'--variants=default,stress',
'sweet/strawberries',
infra_staging=infra_staging,
)
self.assertIn('Running 2 tests', result.stdout, result)
self.assertIn('Done running sweet/strawberries: FAIL', result.stdout, result)
self.assertEqual(1, result.returncode, result)
def testFailWithRerunAndJSON(self):
def testFailWithRerunAndJSONProc(self):
self.testFailWithRerunAndJSON(infra_staging=True)
def testFailWithRerunAndJSON(self, infra_staging=False):
"""Test re-running a failing test and output to json."""
with temp_base() as basedir:
json_path = os.path.join(basedir, 'out.json')
......@@ -221,11 +230,17 @@ class SystemTest(unittest.TestCase):
'--random-seed=123',
'--json-test-results', json_path,
'sweet/strawberries',
infra_staging=infra_staging,
)
self.assertIn('Running 1 tests', result.stdout, result)
self.assertIn('Done running sweet/strawberries: FAIL', result.stdout, result)
# We run one test, which fails and gets re-run twice.
self.assertIn('3 tests failed', result.stdout, result)
if not infra_staging:
# We run one test, which fails and gets re-run twice.
self.assertIn('3 tests failed', result.stdout, result)
else:
# With test processors we don't count reruns as separated failures.
# TODO(majeski): fix it.
self.assertIn('1 tests failed', result.stdout, result)
self.assertEqual(0, result.returncode, result)
# Check relevant properties of the json output.
......@@ -246,7 +261,11 @@ class SystemTest(unittest.TestCase):
replace_variable_data(data)
json_output['duration_mean'] = 1
with open(os.path.join(TEST_DATA_ROOT, 'expected_test_results1.json')) as f:
suffix = ''
if infra_staging:
suffix = '-proc'
expected_results_name = 'expected_test_results1%s.json' % suffix
with open(os.path.join(TEST_DATA_ROOT, expected_results_name)) as f:
expected_test_results = json.load(f)
# TODO(majeski): Previously we only reported the variant flags in the
......@@ -303,12 +322,19 @@ class SystemTest(unittest.TestCase):
self.assertIn('Running 0 tests', result.stdout, result)
self.assertEqual(0, result.returncode, result)
def testDefault(self):
def testDefaultProc(self):
self.testDefault(infra_staging=True)
def testDefault(self, infra_staging=False):
"""Test using default test suites, though no tests are run since they don't
exist in a test setting.
"""
with temp_base() as basedir:
result = run_tests(basedir, '--mode=Release')
result = run_tests(
basedir,
'--mode=Release',
infra_staging=infra_staging,
)
self.assertIn('Warning: no tests were run!', result.stdout, result)
self.assertEqual(0, result.returncode, result)
......@@ -403,7 +429,10 @@ class SystemTest(unittest.TestCase):
self.assertIn('(no source available)', result.stdout, result)
self.assertEqual(0, result.returncode, result)
def testPredictable(self):
def testPredictableProc(self):
self.testPredictable(infra_staging=True)
def testPredictable(self, infra_staging=False):
"""Test running a test in verify-predictable mode.
The test will fail because of missing allocation output. We verify that and
......@@ -417,6 +446,7 @@ class SystemTest(unittest.TestCase):
'--progress=verbose',
'--variants=default',
'sweet/bananas',
infra_staging=infra_staging,
)
self.assertIn('Running 1 tests', result.stdout, result)
self.assertIn('Done running sweet/bananas: FAIL', result.stdout, result)
......
{
"arch": "x64",
"duration_mean": 1,
"mode": "release",
"results": [
{
"command": "/usr/bin/python out/Release/d8_mocked.py --random-seed=123 strawberries --nohard-abort",
"duration": 1,
"exit_code": 1,
"expected": [
"PASS"
],
"flags": [
"--random-seed=123",
"strawberries",
"--nohard-abort"
],
"name": "sweet/strawberries",
"random_seed": 123,
"result": "FAIL",
"run": -100,
"stderr": "",
"stdout": "--random-seed=123 strawberries --nohard-abort\n",
"target_name": "d8_mocked.py",
"variant": "default"
}
],
"slowest_tests": [
{
"command": "/usr/bin/python out/Release/d8_mocked.py --random-seed=123 strawberries --nohard-abort",
"duration": 1,
"flags": [
"--random-seed=123",
"strawberries",
"--nohard-abort"
],
"marked_slow": true,
"name": "sweet/strawberries"
}
],
"test_total": 1
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment